You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

846 lines
30 KiB

#!/usr/bin/env python3
"""
Retargeting bridge: Quest 3 body tracking -> G1 robot joint commands via DDS.
Reads body tracking data from the teleop server's shared memory, retargets
body joint orientations to G1 robot joint angles, and publishes commands
over CycloneDDS for Isaac Sim (or real robot).
Usage:
python retarget_bridge.py [--port 8765] [--dds-domain 1] [--hz 50]
Architecture:
Quest 3 -> WebSocket -> TeleopServer (shared memory)
|
retarget_bridge.py
|
CycloneDDS -> Isaac Sim
"""
import sys
import os
import time
import argparse
import logging
import signal
import numpy as np
from multiprocessing import Array
# Local imports
from teleop_server import TeleopServer
logger = logging.getLogger("retarget_bridge")
# ---------------------------------------------------------------------------
# Coordinate transform constants
# ---------------------------------------------------------------------------
# OpenXR (Y-up, -Z forward, X-right) -> Robot (X-forward, Y-left, Z-up)
R_ROBOT_OPENXR = np.array([[ 0, 0,-1],
[-1, 0, 0],
[ 0, 1, 0]], dtype=np.float64)
R_OPENXR_ROBOT = R_ROBOT_OPENXR.T
# ---------------------------------------------------------------------------
# G1 joint indices (29 DOF)
# ---------------------------------------------------------------------------
NUM_JOINTS = 29
# Waist
WAIST_YAW = 12
WAIST_ROLL = 13
WAIST_PITCH = 14
# Left arm
L_SHOULDER_PITCH = 15
L_SHOULDER_ROLL = 16
L_SHOULDER_YAW = 17
L_ELBOW = 18
L_WRIST_ROLL = 19
L_WRIST_PITCH = 20
L_WRIST_YAW = 21
# Right arm
R_SHOULDER_PITCH = 22
R_SHOULDER_ROLL = 23
R_SHOULDER_YAW = 24
R_ELBOW = 25
R_WRIST_ROLL = 26
R_WRIST_PITCH = 27
R_WRIST_YAW = 28
# ---------------------------------------------------------------------------
# Body joints shared memory layout (8 joints x 7 floats = 56)
# Each joint: [x, y, z, qx, qy, qz, qw]
# ---------------------------------------------------------------------------
BJ_HIPS = 0 # offset 0
BJ_CHEST = 1 # offset 7
BJ_L_UPPER_ARM = 2 # offset 14
BJ_L_LOWER_ARM = 3 # offset 21
BJ_L_WRIST = 4 # offset 28
BJ_R_UPPER_ARM = 5 # offset 35
BJ_R_LOWER_ARM = 6 # offset 42
BJ_R_WRIST = 7 # offset 49
NUM_BODY_JOINTS = 8
# ---------------------------------------------------------------------------
# URDF joint limits (from g1_custom_collision_29dof.urdf)
# ---------------------------------------------------------------------------
JOINT_LIMITS = {
12: (-2.618, 2.618), # waist_yaw
13: (-0.52, 0.52), # waist_roll
14: (-0.52, 0.52), # waist_pitch
15: (-3.0892, 2.6704), # left_shoulder_pitch
16: (-1.5882, 2.2515), # left_shoulder_roll
17: (-2.618, 2.618), # left_shoulder_yaw
18: (-1.0472, 2.0944), # left_elbow
19: (-1.972, 1.972), # left_wrist_roll
20: (-1.614, 1.614), # left_wrist_pitch
21: (-1.614, 1.614), # left_wrist_yaw
22: (-3.0892, 2.6704), # right_shoulder_pitch
23: (-2.2515, 1.5882), # right_shoulder_roll
24: (-2.618, 2.618), # right_shoulder_yaw
25: (-1.0472, 2.0944), # right_elbow
26: (-1.972, 1.972), # right_wrist_roll
27: (-1.614, 1.614), # right_wrist_pitch
28: (-1.614, 1.614), # right_wrist_yaw
}
SOFT_LIMIT_FACTOR = 0.9
# Default Kp/Kd from Unitree SDK example
KP = [
60, 60, 60, 100, 40, 40, # Left leg
60, 60, 60, 100, 40, 40, # Right leg
60, 40, 40, # Waist
40, 40, 40, 40, 40, 40, 40, # Left arm
40, 40, 40, 40, 40, 40, 40, # Right arm
]
KD = [
1, 1, 1, 2, 1, 1,
1, 1, 1, 2, 1, 1,
1, 1, 1,
1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1,
]
# Inspire hand joint ordering in DDS MotorCmds_ (12 total)
# Indices 0-5: right hand, 6-11: left hand
INSPIRE_R_PINKY = 0
INSPIRE_R_RING = 1
INSPIRE_R_MIDDLE = 2
INSPIRE_R_INDEX = 3
INSPIRE_R_THUMB_PITCH = 4
INSPIRE_R_THUMB_YAW = 5
INSPIRE_L_PINKY = 6
INSPIRE_L_RING = 7
INSPIRE_L_MIDDLE = 8
INSPIRE_L_INDEX = 9
INSPIRE_L_THUMB_PITCH = 10
INSPIRE_L_THUMB_YAW = 11
# Quest hand joint indices (XR_EXT_hand_tracking, 25 per hand)
QH_PALM = 0
QH_WRIST = 1
QH_THUMB_METACARPAL = 2
QH_THUMB_PROXIMAL = 3
QH_THUMB_TIP = 4
QH_INDEX_METACARPAL = 5
QH_INDEX_PROXIMAL = 6
QH_INDEX_INTERMEDIATE = 7
QH_INDEX_TIP = 8
QH_MIDDLE_METACARPAL = 9
QH_MIDDLE_PROXIMAL = 10
QH_MIDDLE_INTERMEDIATE = 11
QH_MIDDLE_TIP = 12
QH_RING_METACARPAL = 13
QH_RING_PROXIMAL = 14
QH_RING_INTERMEDIATE = 15
QH_RING_TIP = 16
QH_PINKY_METACARPAL = 17
QH_PINKY_PROXIMAL = 18
QH_PINKY_INTERMEDIATE = 19
QH_PINKY_TIP = 20
# ---------------------------------------------------------------------------
# Rotation math utilities
# ---------------------------------------------------------------------------
def _rx(angle):
"""Rotation matrix about X axis."""
c, s = np.cos(angle), np.sin(angle)
return np.array([[1, 0, 0], [0, c, -s], [0, s, c]], dtype=np.float64)
def _ry(angle):
"""Rotation matrix about Y axis."""
c, s = np.cos(angle), np.sin(angle)
return np.array([[c, 0, s], [0, 1, 0], [-s, 0, c]], dtype=np.float64)
def _rz(angle):
"""Rotation matrix about Z axis."""
c, s = np.cos(angle), np.sin(angle)
return np.array([[c, -s, 0], [s, c, 0], [0, 0, 1]], dtype=np.float64)
def quat_to_rotmat(qx, qy, qz, qw):
"""Quaternion to 3x3 rotation matrix."""
xx = qx * qx; yy = qy * qy; zz = qz * qz
xy = qx * qy; xz = qx * qz; yz = qy * qz
wx = qw * qx; wy = qw * qy; wz = qw * qz
return np.array([
[1 - 2*(yy + zz), 2*(xy - wz), 2*(xz + wy)],
[2*(xy + wz), 1 - 2*(xx + zz), 2*(yz - wx)],
[2*(xz - wy), 2*(yz + wx), 1 - 2*(xx + yy)],
], dtype=np.float64)
def euler_zxy(R):
"""Decompose R = Rz(yaw) * Rx(roll) * Ry(pitch). Returns (yaw, roll, pitch).
Used for waist joints: yaw(Z) -> roll(X) -> pitch(Y).
"""
# R[2,1] = sin(roll)
roll = np.arcsin(np.clip(R[2, 1], -1.0, 1.0))
if np.abs(R[2, 1]) < 0.9999:
yaw = np.arctan2(-R[0, 1], R[1, 1])
pitch = np.arctan2(-R[2, 0], R[2, 2])
else:
yaw = np.arctan2(R[1, 0], R[0, 0])
pitch = 0.0
return yaw, roll, pitch
def euler_yxz(R):
"""Decompose R = Ry(pitch) * Rx(roll) * Rz(yaw). Returns (pitch, roll, yaw).
Used for shoulder joints: pitch(Y) -> roll(X) -> yaw(Z).
"""
# R[1,2] = -sin(roll)
roll = np.arcsin(np.clip(-R[1, 2], -1.0, 1.0))
if np.abs(R[1, 2]) < 0.9999:
pitch = np.arctan2(R[0, 2], R[2, 2])
yaw = np.arctan2(R[1, 0], R[1, 1])
else:
pitch = np.arctan2(-R[2, 0], R[0, 0])
yaw = 0.0
return pitch, roll, yaw
def euler_xyz(R):
"""Decompose R = Rx(roll) * Ry(pitch) * Rz(yaw). Returns (roll, pitch, yaw).
Used for wrist joints: roll(X) -> pitch(Y) -> yaw(Z).
"""
# R[0,2] = sin(pitch)
pitch = np.arcsin(np.clip(R[0, 2], -1.0, 1.0))
if np.abs(R[0, 2]) < 0.9999:
roll = np.arctan2(-R[1, 2], R[2, 2])
yaw = np.arctan2(-R[0, 1], R[0, 0])
else:
roll = np.arctan2(R[2, 1], R[1, 1])
yaw = 0.0
return roll, pitch, yaw
def extract_y_rotation(R):
"""Extract the Y-axis rotation angle from a rotation matrix.
Used for elbow (single-axis pitch).
"""
return np.arctan2(R[0, 2], R[0, 0])
# Pre-compute shoulder rest pose rotations
L_SHOULDER_REST = _ry(0.27931) @ _rx(-0.27925)
R_SHOULDER_REST = _ry(-0.27931) @ _rx(0.27925)
L_SHOULDER_REST_INV = L_SHOULDER_REST.T
R_SHOULDER_REST_INV = R_SHOULDER_REST.T
# ---------------------------------------------------------------------------
# Joint smoother
# ---------------------------------------------------------------------------
class JointSmoother:
"""EMA smoothing with rate limiter for joint angles."""
def __init__(self, n_joints, alpha=0.3, max_delta=0.15):
"""
Args:
n_joints: Number of joints to smooth.
alpha: EMA factor (0=no change, 1=instant). Default 0.3.
max_delta: Max angle change per step in radians. At 50Hz,
0.15 rad/step = 7.5 rad/s.
"""
self.prev = np.zeros(n_joints)
self.alpha = alpha
self.max_delta = max_delta
def smooth(self, target):
"""Apply EMA + rate limiting. Returns smoothed values."""
# EMA
smoothed = self.alpha * target + (1.0 - self.alpha) * self.prev
# Rate limiter
delta = np.clip(smoothed - self.prev, -self.max_delta, self.max_delta)
result = self.prev + delta
self.prev = result.copy()
return result
# ---------------------------------------------------------------------------
# Body retargeter
# ---------------------------------------------------------------------------
class BodyRetargeter:
"""Converts body tracking poses to G1 joint angles via orientation decomposition."""
def __init__(self):
# Pre-compute soft joint limits
self.soft_lo = np.zeros(NUM_JOINTS)
self.soft_hi = np.zeros(NUM_JOINTS)
for idx, (lo, hi) in JOINT_LIMITS.items():
mid = (lo + hi) / 2.0
half = (hi - lo) / 2.0
self.soft_lo[idx] = mid - SOFT_LIMIT_FACTOR * half
self.soft_hi[idx] = mid + SOFT_LIMIT_FACTOR * half
# Calibration: captured at first valid frame
self._calibrated = False
self._ref_rotations = {} # joint_group -> R_rel at calibration
def retarget(self, body_rotmats):
"""Compute 29 joint angles from body tracking rotation matrices.
Args:
body_rotmats: dict mapping joint index (BJ_*) to 3x3 rotation matrix
in OpenXR world frame.
Returns:
np.ndarray of shape (29,) with joint angles in radians.
"""
angles = np.zeros(NUM_JOINTS)
# --- Waist: HIPS -> CHEST ---
R_hips = body_rotmats[BJ_HIPS]
R_chest = body_rotmats[BJ_CHEST]
R_waist_rel = self._to_robot_rel(R_hips, R_chest)
if not self._calibrated:
self._ref_rotations['waist'] = R_waist_rel.copy()
R_waist_delta = self._ref_rotations.get('waist', np.eye(3)).T @ R_waist_rel
yaw, roll, pitch = euler_zxy(R_waist_delta)
angles[WAIST_YAW] = yaw
angles[WAIST_ROLL] = roll
angles[WAIST_PITCH] = pitch
# --- Left shoulder: CHEST -> LEFT_UPPER_ARM ---
R_l_upper = body_rotmats[BJ_L_UPPER_ARM]
R_l_shoulder_rel = self._to_robot_rel(R_chest, R_l_upper)
if not self._calibrated:
self._ref_rotations['l_shoulder'] = R_l_shoulder_rel.copy()
R_l_shoulder_delta = self._ref_rotations.get('l_shoulder', np.eye(3)).T @ R_l_shoulder_rel
l_sp, l_sr, l_sy = euler_yxz(R_l_shoulder_delta)
angles[L_SHOULDER_PITCH] = l_sp
angles[L_SHOULDER_ROLL] = l_sr
angles[L_SHOULDER_YAW] = l_sy
# --- Left elbow: LEFT_UPPER_ARM -> LEFT_LOWER_ARM ---
R_l_lower = body_rotmats[BJ_L_LOWER_ARM]
R_l_elbow_rel = self._to_robot_rel(R_l_upper, R_l_lower)
if not self._calibrated:
self._ref_rotations['l_elbow'] = R_l_elbow_rel.copy()
R_l_elbow_delta = self._ref_rotations.get('l_elbow', np.eye(3)).T @ R_l_elbow_rel
angles[L_ELBOW] = extract_y_rotation(R_l_elbow_delta)
# --- Left wrist: LEFT_LOWER_ARM -> LEFT_WRIST ---
R_l_wrist = body_rotmats[BJ_L_WRIST]
R_l_wrist_rel = self._to_robot_rel(R_l_lower, R_l_wrist)
if not self._calibrated:
self._ref_rotations['l_wrist'] = R_l_wrist_rel.copy()
R_l_wrist_delta = self._ref_rotations.get('l_wrist', np.eye(3)).T @ R_l_wrist_rel
l_wr, l_wp, l_wy = euler_xyz(R_l_wrist_delta)
angles[L_WRIST_ROLL] = l_wr
angles[L_WRIST_PITCH] = l_wp
angles[L_WRIST_YAW] = l_wy
# --- Right shoulder: CHEST -> RIGHT_UPPER_ARM ---
R_r_upper = body_rotmats[BJ_R_UPPER_ARM]
R_r_shoulder_rel = self._to_robot_rel(R_chest, R_r_upper)
if not self._calibrated:
self._ref_rotations['r_shoulder'] = R_r_shoulder_rel.copy()
R_r_shoulder_delta = self._ref_rotations.get('r_shoulder', np.eye(3)).T @ R_r_shoulder_rel
r_sp, r_sr, r_sy = euler_yxz(R_r_shoulder_delta)
angles[R_SHOULDER_PITCH] = r_sp
angles[R_SHOULDER_ROLL] = r_sr
angles[R_SHOULDER_YAW] = r_sy
# --- Right elbow: RIGHT_UPPER_ARM -> RIGHT_LOWER_ARM ---
R_r_lower = body_rotmats[BJ_R_LOWER_ARM]
R_r_elbow_rel = self._to_robot_rel(R_r_upper, R_r_lower)
if not self._calibrated:
self._ref_rotations['r_elbow'] = R_r_elbow_rel.copy()
R_r_elbow_delta = self._ref_rotations.get('r_elbow', np.eye(3)).T @ R_r_elbow_rel
angles[R_ELBOW] = extract_y_rotation(R_r_elbow_delta)
# --- Right wrist: RIGHT_LOWER_ARM -> RIGHT_WRIST ---
R_r_wrist = body_rotmats[BJ_R_WRIST]
R_r_wrist_rel = self._to_robot_rel(R_r_lower, R_r_wrist)
if not self._calibrated:
self._ref_rotations['r_wrist'] = R_r_wrist_rel.copy()
R_r_wrist_delta = self._ref_rotations.get('r_wrist', np.eye(3)).T @ R_r_wrist_rel
r_wr, r_wp, r_wy = euler_xyz(R_r_wrist_delta)
angles[R_WRIST_ROLL] = r_wr
angles[R_WRIST_PITCH] = r_wp
angles[R_WRIST_YAW] = r_wy
# Mark calibration complete
if not self._calibrated:
self._calibrated = True
logger.info("Calibration captured (neutral pose reference)")
# Clamp to soft joint limits
for idx in JOINT_LIMITS:
angles[idx] = np.clip(angles[idx], self.soft_lo[idx], self.soft_hi[idx])
return angles
def _to_robot_rel(self, R_parent_openxr, R_child_openxr):
"""Compute parent-relative rotation in robot frame.
R_rel_openxr = R_parent^T @ R_child
R_rel_robot = R_ROBOT_OPENXR @ R_rel_openxr @ R_OPENXR_ROBOT
"""
R_rel = R_parent_openxr.T @ R_child_openxr
return R_ROBOT_OPENXR @ R_rel @ R_OPENXR_ROBOT
def reset_calibration(self):
"""Reset calibration so next frame becomes the new reference."""
self._calibrated = False
self._ref_rotations.clear()
logger.info("Calibration reset -- next valid frame will be captured")
# ---------------------------------------------------------------------------
# Hand retargeter
# ---------------------------------------------------------------------------
class HandRetargeter:
"""Maps Quest hand tracking to Inspire hand joint values."""
def retarget(self, left_hand_pos, right_hand_pos):
"""Compute 12 Inspire hand joint values from Quest hand positions.
Args:
left_hand_pos: (25, 3) wrist-relative hand positions (robot frame)
right_hand_pos: (25, 3) wrist-relative hand positions (robot frame)
Returns:
np.ndarray of shape (12,) with normalized hand values.
Convention: 1.0 = closed, 0.0 = open (matching Inspire DDS).
"""
hand_cmd = np.zeros(12)
if np.any(left_hand_pos != 0):
hand_cmd[INSPIRE_L_INDEX] = self._finger_curl(
left_hand_pos, QH_INDEX_METACARPAL, QH_INDEX_PROXIMAL,
QH_INDEX_INTERMEDIATE, QH_INDEX_TIP)
hand_cmd[INSPIRE_L_MIDDLE] = self._finger_curl(
left_hand_pos, QH_MIDDLE_METACARPAL, QH_MIDDLE_PROXIMAL,
QH_MIDDLE_INTERMEDIATE, QH_MIDDLE_TIP)
hand_cmd[INSPIRE_L_RING] = self._finger_curl(
left_hand_pos, QH_RING_METACARPAL, QH_RING_PROXIMAL,
QH_RING_INTERMEDIATE, QH_RING_TIP)
hand_cmd[INSPIRE_L_PINKY] = self._finger_curl(
left_hand_pos, QH_PINKY_METACARPAL, QH_PINKY_PROXIMAL,
QH_PINKY_INTERMEDIATE, QH_PINKY_TIP)
t_pitch, t_yaw = self._thumb_angles(left_hand_pos)
hand_cmd[INSPIRE_L_THUMB_PITCH] = t_pitch
hand_cmd[INSPIRE_L_THUMB_YAW] = t_yaw
if np.any(right_hand_pos != 0):
hand_cmd[INSPIRE_R_INDEX] = self._finger_curl(
right_hand_pos, QH_INDEX_METACARPAL, QH_INDEX_PROXIMAL,
QH_INDEX_INTERMEDIATE, QH_INDEX_TIP)
hand_cmd[INSPIRE_R_MIDDLE] = self._finger_curl(
right_hand_pos, QH_MIDDLE_METACARPAL, QH_MIDDLE_PROXIMAL,
QH_MIDDLE_INTERMEDIATE, QH_MIDDLE_TIP)
hand_cmd[INSPIRE_R_RING] = self._finger_curl(
right_hand_pos, QH_RING_METACARPAL, QH_RING_PROXIMAL,
QH_RING_INTERMEDIATE, QH_RING_TIP)
hand_cmd[INSPIRE_R_PINKY] = self._finger_curl(
right_hand_pos, QH_PINKY_METACARPAL, QH_PINKY_PROXIMAL,
QH_PINKY_INTERMEDIATE, QH_PINKY_TIP)
t_pitch, t_yaw = self._thumb_angles(right_hand_pos)
hand_cmd[INSPIRE_R_THUMB_PITCH] = t_pitch
hand_cmd[INSPIRE_R_THUMB_YAW] = t_yaw
return hand_cmd
def _finger_curl(self, positions, meta_idx, prox_idx, inter_idx, tip_idx):
"""Compute normalized finger curl value.
Returns: 0.0 (open) to 1.0 (closed) matching Inspire DDS convention.
"""
v1 = positions[prox_idx] - positions[meta_idx]
v2 = positions[inter_idx] - positions[prox_idx]
v3 = positions[tip_idx] - positions[inter_idx]
n1 = np.linalg.norm(v1)
n2 = np.linalg.norm(v2)
n3 = np.linalg.norm(v3)
if n1 < 1e-6 or n2 < 1e-6 or n3 < 1e-6:
return 0.0
# Angle between consecutive bone segments
cos1 = np.clip(np.dot(v1 / n1, v2 / n2), -1.0, 1.0)
cos2 = np.clip(np.dot(v2 / n2, v3 / n3), -1.0, 1.0)
angle1 = np.arccos(cos1)
angle2 = np.arccos(cos2)
# Average curl angle: 0 (straight) to ~pi/2 (curled)
avg_curl = (angle1 + angle2) / 2.0
# Normalize to [0, 1] where 1 = fully curled
# Max curl is approximately 1.5 rad
normalized = np.clip(avg_curl / 1.5, 0.0, 1.0)
return normalized
def _thumb_angles(self, positions):
"""Compute thumb pitch and yaw from hand positions.
Returns: (pitch_normalized, yaw_normalized) both in [0, 1].
"""
meta = positions[QH_THUMB_METACARPAL]
prox = positions[QH_THUMB_PROXIMAL]
tip = positions[QH_THUMB_TIP]
v1 = prox - meta
v2 = tip - prox
n1 = np.linalg.norm(v1)
n2 = np.linalg.norm(v2)
if n1 < 1e-6 or n2 < 1e-6:
return 0.0, 0.5
v1n = v1 / n1
v2n = v2 / n2
# Pitch: flexion angle between thumb bones
cos_pitch = np.clip(np.dot(v1n, v2n), -1.0, 1.0)
pitch_rad = np.arccos(cos_pitch)
# Inspire thumb pitch range: [0.0, 0.5] rad
# Normalize: 1.0 = max flexion (0.5 rad), 0.0 = straight (0 rad)
pitch_norm = np.clip(pitch_rad / 0.5, 0.0, 1.0)
# Yaw: abduction (spread) of thumb
# Use the lateral component of the thumb direction relative to palm
palm = positions[QH_PALM]
index_meta = positions[QH_INDEX_METACARPAL]
palm_to_index = index_meta - palm
palm_to_index_n = np.linalg.norm(palm_to_index)
if palm_to_index_n < 1e-6:
return pitch_norm, 0.5
# Project thumb direction onto palm plane
palm_dir = palm_to_index / palm_to_index_n
thumb_dir = (tip - meta)
thumb_dir_n = np.linalg.norm(thumb_dir)
if thumb_dir_n < 1e-6:
return pitch_norm, 0.5
thumb_dir = thumb_dir / thumb_dir_n
# Yaw angle: how far the thumb points away from the index finger
cos_yaw = np.clip(np.dot(thumb_dir, palm_dir), -1.0, 1.0)
yaw_rad = np.arccos(cos_yaw)
# Inspire thumb yaw range: [-0.1, 1.3] rad. Normalize to [0, 1].
yaw_norm = np.clip((yaw_rad - (-0.1)) / (1.3 - (-0.1)), 0.0, 1.0)
return pitch_norm, yaw_norm
# ---------------------------------------------------------------------------
# DDS publisher
# ---------------------------------------------------------------------------
class DDSPublisher:
"""Publishes G1 joint commands and Inspire hand commands via CycloneDDS."""
def __init__(self, domain_id=1):
from unitree_sdk2py.core.channel import (
ChannelPublisher, ChannelFactoryInitialize)
from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowCmd_
from unitree_sdk2py.idl.default import unitree_hg_msg_dds__LowCmd_
from unitree_sdk2py.utils.crc import CRC
from unitree_sdk2py.idl.unitree_go.msg.dds_ import MotorCmds_
from unitree_sdk2py.idl.default import unitree_go_msg_dds__MotorCmd_
logger.info(f"Initializing DDS (domain {domain_id})...")
ChannelFactoryInitialize(domain_id)
# Body joint publisher
self._body_pub = ChannelPublisher("rt/lowcmd", LowCmd_)
self._body_pub.Init()
self._low_cmd = unitree_hg_msg_dds__LowCmd_()
self._crc = CRC()
self._MotorCmd_ = unitree_go_msg_dds__MotorCmd_
# Inspire hand publisher
self._hand_pub = ChannelPublisher("rt/inspire/cmd", MotorCmds_)
self._hand_pub.Init()
self._MotorCmds_ = MotorCmds_
logger.info("DDS publishers initialized (rt/lowcmd, rt/inspire/cmd)")
def publish_body(self, joint_angles):
"""Publish 29 joint angle commands via LowCmd_.
Args:
joint_angles: np.ndarray of shape (29,) in radians.
"""
cmd = self._low_cmd
cmd.mode_pr = 0
cmd.mode_machine = 0
for i in range(NUM_JOINTS):
cmd.motor_cmd[i].mode = 1
cmd.motor_cmd[i].q = float(joint_angles[i])
cmd.motor_cmd[i].dq = 0.0
cmd.motor_cmd[i].tau = 0.0
cmd.motor_cmd[i].kp = float(KP[i])
cmd.motor_cmd[i].kd = float(KD[i])
# Zero out legs (not retargeted -- leave to standing controller)
for i in range(12):
cmd.motor_cmd[i].mode = 0
cmd.motor_cmd[i].kp = 0.0
cmd.motor_cmd[i].kd = 0.0
cmd.crc = self._crc.Crc(cmd)
self._body_pub.Write(cmd)
def publish_hands(self, hand_values):
"""Publish 12 Inspire hand joint commands via MotorCmds_.
Args:
hand_values: np.ndarray of shape (12,) with normalized [0,1] values.
"""
hand_cmd = self._MotorCmds_()
hand_cmd.cmds = []
for i in range(12):
mc = self._MotorCmd_()
mc.q = float(np.clip(hand_values[i], 0.0, 1.0))
mc.dq = 0.0
mc.tau = 0.0
mc.kp = 5.0
mc.kd = 0.5
hand_cmd.cmds.append(mc)
self._hand_pub.Write(hand_cmd)
# ---------------------------------------------------------------------------
# Shared memory reader
# ---------------------------------------------------------------------------
def read_body_joints(server):
"""Read body joint rotation matrices from shared memory.
Returns:
dict mapping BJ_* index to 3x3 rotation matrix, or None if no data.
"""
with server.body_joints_shared.get_lock():
raw = np.array(server.body_joints_shared[:])
# Check if any data has arrived (all zeros = no data yet)
if np.all(raw == 0):
return None
rotmats = {}
for i in range(NUM_BODY_JOINTS):
offset = i * 7
qx, qy, qz, qw = raw[offset+3], raw[offset+4], raw[offset+5], raw[offset+6]
# Check for valid quaternion
qnorm = qx*qx + qy*qy + qz*qz + qw*qw
if qnorm < 0.5:
return None # Invalid data
rotmats[i] = quat_to_rotmat(qx, qy, qz, qw)
return rotmats
def read_hand_positions(server):
"""Read hand joint positions from shared memory.
Returns:
(left_25x3, right_25x3) numpy arrays in robot frame.
"""
with server.left_hand_position_shared.get_lock():
raw_left = np.array(server.left_hand_position_shared[:])
with server.right_hand_position_shared.get_lock():
raw_right = np.array(server.right_hand_position_shared[:])
left = np.zeros((25, 3))
right = np.zeros((25, 3))
if np.any(raw_left != 0):
left_openxr = raw_left.reshape(25, 3)
# Transform each position from OpenXR to robot frame
left = (R_ROBOT_OPENXR @ left_openxr.T).T
if np.any(raw_right != 0):
right_openxr = raw_right.reshape(25, 3)
right = (R_ROBOT_OPENXR @ right_openxr.T).T
return left, right
# ---------------------------------------------------------------------------
# Main loop
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Retargeting bridge: Quest 3 body tracking -> G1 DDS commands")
parser.add_argument("--port", type=int, default=8765,
help="WebSocket server port (default: 8765)")
parser.add_argument("--host", default="0.0.0.0",
help="WebSocket bind address (default: 0.0.0.0)")
parser.add_argument("--dds-domain", type=int, default=1,
help="DDS domain ID (default: 1 for sim)")
parser.add_argument("--hz", type=float, default=50.0,
help="Publishing frequency in Hz (default: 50)")
parser.add_argument("--alpha", type=float, default=0.3,
help="Smoothing factor, 0=slow 1=instant (default: 0.3)")
parser.add_argument("--no-hands", action="store_true",
help="Disable hand retargeting")
parser.add_argument("--verbose", action="store_true",
help="Enable debug logging")
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s"
)
# --- Start teleop server ---
logger.info(f"Starting teleop server on ws://{args.host}:{args.port}")
server = TeleopServer(host=args.host, port=args.port)
server_thread = server.start_in_thread()
# --- Initialize DDS ---
dds = DDSPublisher(domain_id=args.dds_domain)
# --- Initialize retargeters ---
body_retargeter = BodyRetargeter()
hand_retargeter = HandRetargeter()
body_smoother = JointSmoother(NUM_JOINTS, alpha=args.alpha, max_delta=0.15)
hand_smoother = JointSmoother(12, alpha=0.4, max_delta=0.2)
# --- Deadman / timeout state ---
tracking_timeout = 0.5 # seconds
return_to_zero_rate = 0.02 # rad/step
last_valid_time = 0.0
startup_ramp = 0.0 # 0.0 -> 1.0 over 2 seconds
startup_ramp_duration = 2.0
startup_time = None
# --- Control loop ---
interval = 1.0 / args.hz
running = True
def signal_handler(sig, frame):
nonlocal running
running = False
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
logger.info(f"Bridge running at {args.hz} Hz. Waiting for Quest 3 connection...")
step_count = 0
current_angles = np.zeros(NUM_JOINTS)
current_hands = np.zeros(12)
while running:
t_start = time.perf_counter()
# Read body tracking data
body_rotmats = read_body_joints(server)
if body_rotmats is not None:
now = time.time()
with server.last_data_time_shared.get_lock():
data_time = server.last_data_time_shared.value
if data_time > 0:
last_valid_time = now
# Retarget body
raw_angles = body_retargeter.retarget(body_rotmats)
smoothed_angles = body_smoother.smooth(raw_angles)
# Startup ramp
if startup_time is None:
startup_time = now
logger.info("First tracking data received -- starting ramp-up")
elapsed = now - startup_time
startup_ramp = min(elapsed / startup_ramp_duration, 1.0)
current_angles = smoothed_angles * startup_ramp
# Retarget hands
if not args.no_hands:
left_hand, right_hand = read_hand_positions(server)
raw_hands = hand_retargeter.retarget(left_hand, right_hand)
current_hands = hand_smoother.smooth(raw_hands) * startup_ramp
else:
# No data -- check timeout
now = time.time()
if last_valid_time > 0 and (now - last_valid_time) > tracking_timeout:
# Gradually return to zero
for i in range(12, NUM_JOINTS):
if abs(current_angles[i]) > return_to_zero_rate:
current_angles[i] -= np.sign(current_angles[i]) * return_to_zero_rate
else:
current_angles[i] = 0.0
# Return hands to open
current_hands = np.maximum(current_hands - 0.02, 0.0)
# Publish
dds.publish_body(current_angles)
if not args.no_hands:
dds.publish_hands(current_hands)
# Status logging
step_count += 1
if step_count % int(args.hz * 5) == 0: # Every 5 seconds
status = "TRACKING" if body_rotmats is not None else "WAITING"
ramp_pct = int(startup_ramp * 100)
logger.info(f"[{status}] step={step_count} ramp={ramp_pct}% "
f"waist=[{current_angles[12]:.2f},{current_angles[13]:.2f},{current_angles[14]:.2f}] "
f"L_shoulder=[{current_angles[15]:.2f},{current_angles[16]:.2f},{current_angles[17]:.2f}] "
f"R_shoulder=[{current_angles[22]:.2f},{current_angles[23]:.2f},{current_angles[24]:.2f}]")
# Sleep to maintain target frequency
elapsed = time.perf_counter() - t_start
sleep_time = interval - elapsed
if sleep_time > 0:
time.sleep(sleep_time)
logger.info("Bridge shutting down")
if __name__ == "__main__":
main()