You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
955 lines
40 KiB
955 lines
40 KiB
#!/usr/bin/env python3
|
|
"""
|
|
Retargeting bridge v4: Quest 3 full body tracking -> G1 robot joints via DIRECT mapping + DDS.
|
|
|
|
Maps ALL tracked body joints directly to robot joint angles (no IK solver):
|
|
- Waist (3 DOF): hips->chest relative rotation -> ZXY Euler
|
|
- Shoulders (3 DOF x 2): chest->upper_arm -> YXZ Euler
|
|
- Elbows (1 DOF x 2): upper_arm->lower_arm -> Y rotation
|
|
- Wrists (3 DOF x 2): lower_arm->hand -> XYZ Euler
|
|
- Hands (12 DOF): dex-retargeting for Inspire hands
|
|
- Legs: zeroed (not tracked)
|
|
|
|
Calibration:
|
|
On first tracking frame, captures the user's neutral pose as reference.
|
|
All subsequent angles are deltas from this reference, ensuring:
|
|
- User arms at sides -> robot arms at sides (zero angles)
|
|
- User arm movements -> proportional robot joint movements
|
|
|
|
Architecture:
|
|
Quest 3 -> WebSocket -> TeleopServer (shared memory)
|
|
|
|
|
retarget_bridge.py
|
|
- Direct joint retargeting (17 DOF)
|
|
- Inspire hand retargeting (12 DOF)
|
|
|
|
|
CycloneDDS -> Isaac Sim
|
|
|
|
Usage:
|
|
python retarget_bridge.py [--port 8765] [--dds-domain 1] [--hz 50] [--verbose]
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import time
|
|
import argparse
|
|
import logging
|
|
import signal
|
|
import numpy as np
|
|
from scipy.spatial.transform import Rotation
|
|
|
|
# Add xr_teleoperate to Python path for hand retargeting
|
|
XR_TELEOP_DIR = "/home/sparky/git/xr_teleoperate"
|
|
sys.path.insert(0, XR_TELEOP_DIR)
|
|
|
|
# Monkey-patch logging_mp API compatibility
|
|
import logging_mp
|
|
if not hasattr(logging_mp, 'getLogger'):
|
|
logging_mp.getLogger = logging_mp.get_logger
|
|
|
|
# Monkey-patch dex_retargeting
|
|
import dex_retargeting
|
|
if not hasattr(dex_retargeting, 'RetargetingConfig'):
|
|
from dex_retargeting.retargeting_config import RetargetingConfig
|
|
dex_retargeting.RetargetingConfig = RetargetingConfig
|
|
|
|
# IK solver from upstream xr_teleoperate
|
|
sys.path.insert(0, os.path.join(XR_TELEOP_DIR, "teleop", "robot_control"))
|
|
from arm_ik_elbow import G1_29_ArmIK_Elbow
|
|
|
|
# Local imports
|
|
from teleop_server import TeleopServer
|
|
from native_tv_wrapper import (T_ROBOT_OPENXR, T_OPENXR_ROBOT,
|
|
T_TO_UNITREE_HAND,
|
|
T_TO_UNITREE_HUMANOID_LEFT_ARM,
|
|
T_TO_UNITREE_HUMANOID_RIGHT_ARM,
|
|
NativeTeleWrapper)
|
|
|
|
logger = logging.getLogger("retarget_bridge")
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# G1 joint constants
|
|
# ---------------------------------------------------------------------------
|
|
NUM_JOINTS = 29
|
|
|
|
# Joint index ranges in LowCmd
|
|
WAIST_START = 12 # indices 12-14: waist yaw, roll, pitch
|
|
LEFT_ARM_START = 15 # indices 15-21: 7 left arm joints
|
|
RIGHT_ARM_START = 22 # indices 22-28: 7 right arm joints
|
|
|
|
# Joint limits from URDF with 90% safety factor
|
|
JOINT_LIMITS = {
|
|
12: (-2.356, 2.356), # waist_yaw
|
|
13: (-0.468, 0.468), # waist_roll
|
|
14: (-0.468, 0.468), # waist_pitch
|
|
15: (-2.780, 2.403), # L shoulder_pitch
|
|
16: (-1.429, 2.026), # L shoulder_roll
|
|
17: (-2.356, 2.356), # L shoulder_yaw
|
|
18: (-0.942, 1.885), # L elbow
|
|
19: (-1.775, 1.775), # L wrist_roll
|
|
20: (-1.453, 1.453), # L wrist_pitch
|
|
21: (-1.453, 1.453), # L wrist_yaw
|
|
22: (-2.780, 2.403), # R shoulder_pitch
|
|
23: (-2.026, 1.429), # R shoulder_roll (mirrored)
|
|
24: (-2.356, 2.356), # R shoulder_yaw
|
|
25: (-0.942, 1.885), # R elbow
|
|
26: (-1.775, 1.775), # R wrist_roll
|
|
27: (-1.453, 1.453), # R wrist_pitch
|
|
28: (-1.453, 1.453), # R wrist_yaw
|
|
}
|
|
|
|
# Default Kp/Kd gains
|
|
KP = [
|
|
60, 60, 60, 100, 40, 40, # Left leg (0-5)
|
|
60, 60, 60, 100, 40, 40, # Right leg (6-11)
|
|
60, 40, 40, # Waist (12-14)
|
|
40, 40, 40, 40, 40, 40, 40, # Left arm (15-21)
|
|
40, 40, 40, 40, 40, 40, 40, # Right arm (22-28)
|
|
]
|
|
KD = [
|
|
1, 1, 1, 2, 1, 1,
|
|
1, 1, 1, 2, 1, 1,
|
|
1, 1, 1,
|
|
1, 1, 1, 1, 1, 1, 1,
|
|
1, 1, 1, 1, 1, 1, 1,
|
|
]
|
|
|
|
# Inspire hand normalization ranges
|
|
INSPIRE_RANGES = [
|
|
(0.0, 1.7), # finger flexion
|
|
(0.0, 1.7),
|
|
(0.0, 1.7),
|
|
(0.0, 1.7),
|
|
(0.0, 0.5), # thumb pitch
|
|
(-0.1, 1.3), # thumb yaw
|
|
]
|
|
|
|
# Body joint indices in shared memory (8 joints x 7 floats = 56)
|
|
BJ_HIPS = 0
|
|
BJ_CHEST = 1
|
|
BJ_L_UPPER = 2
|
|
BJ_L_LOWER = 3
|
|
BJ_L_WRIST = 4
|
|
BJ_R_UPPER = 5
|
|
BJ_R_LOWER = 6
|
|
BJ_R_WRIST = 7
|
|
|
|
# 3x3 rotation submatrices of the basis change
|
|
R3_ROBOT_OPENXR = T_ROBOT_OPENXR[:3, :3]
|
|
R3_OPENXR_ROBOT = T_OPENXR_ROBOT[:3, :3]
|
|
|
|
# Signs are now baked into the axis mapping in compute_all_angles().
|
|
# Determined empirically via joint_calibrate.py (see calibration_log.md).
|
|
|
|
# Per-joint linear mapping: robot_angle = scale * xr_angle + offset
|
|
# Determined empirically by comparing robot animation range with XR tracking range.
|
|
# Indices 0-16 map to joints 12-28 (waist 0-2, L arm 3-9, R arm 10-16)
|
|
# Default: scale=1.0, offset=0.0 (passthrough)
|
|
JOINT_SCALE = np.ones(17)
|
|
JOINT_OFFSET = np.zeros(17)
|
|
|
|
# All arm joints use IK solver output directly — no scaling needed
|
|
# JOINT_SCALE stays at 1.0 for all joints
|
|
|
|
# Smoothing parameters
|
|
EMA_ALPHA = 0.4 # Exponential moving average (0=no update, 1=no smoothing)
|
|
MAX_RATE = 3.0 # Max joint velocity in rad/s
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Body joint utilities
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def pose7_to_mat4(pose7):
|
|
"""Convert [x,y,z,qx,qy,qz,qw] to 4x4 SE(3) matrix."""
|
|
mat = np.eye(4)
|
|
r = Rotation.from_quat(pose7[3:7]) # scipy uses [qx,qy,qz,qw]
|
|
mat[:3, :3] = r.as_matrix()
|
|
mat[:3, 3] = pose7[:3]
|
|
return mat
|
|
|
|
|
|
def parse_body_joints(body_joints_flat):
|
|
"""Parse the 56-float body joints array into 8 x 4x4 matrices.
|
|
|
|
Returns dict with keys: hips, chest, l_upper, l_lower, l_wrist, r_upper, r_lower, r_wrist.
|
|
All in OpenXR world space.
|
|
"""
|
|
joints = {}
|
|
names = ['hips', 'chest', 'l_upper', 'l_lower', 'l_wrist',
|
|
'r_upper', 'r_lower', 'r_wrist']
|
|
for i, name in enumerate(names):
|
|
p7 = body_joints_flat[i*7:(i+1)*7]
|
|
if np.all(p7 == 0):
|
|
joints[name] = None
|
|
else:
|
|
joints[name] = pose7_to_mat4(p7)
|
|
return joints
|
|
|
|
|
|
def all_joints_valid(bj):
|
|
"""Check if all 8 body joints are valid (non-None)."""
|
|
return all(bj[k] is not None for k in
|
|
['hips', 'chest', 'l_upper', 'l_lower', 'l_wrist',
|
|
'r_upper', 'r_lower', 'r_wrist'])
|
|
|
|
|
|
def build_ik_targets(bj):
|
|
"""Build IK-ready wrist SE(3) poses + elbow positions from body tracking.
|
|
|
|
Uses hips as waist reference. Wrist rotation is preserved (from XRHandTracker)
|
|
to help the IK solver resolve elbow ambiguity. Elbow positions come from
|
|
XRBodyTracker l_lower/r_lower joints.
|
|
|
|
Args:
|
|
bj: dict from parse_body_joints() — world-space 4x4 matrices
|
|
|
|
Returns:
|
|
(left_wrist_4x4, right_wrist_4x4, left_elbow_pos, right_elbow_pos)
|
|
Wrists are 4x4 SE(3) in robot waist frame.
|
|
Elbows are 3-vectors (xyz) in robot waist frame.
|
|
"""
|
|
l_wrist_world = bj['l_wrist']
|
|
r_wrist_world = bj['r_wrist']
|
|
hips_world = bj['hips']
|
|
|
|
# Basis change: OpenXR → Robot convention
|
|
hips_robot = T_ROBOT_OPENXR @ hips_world @ T_OPENXR_ROBOT
|
|
left_Brobot_world = T_ROBOT_OPENXR @ l_wrist_world @ T_OPENXR_ROBOT
|
|
right_Brobot_world = T_ROBOT_OPENXR @ r_wrist_world @ T_OPENXR_ROBOT
|
|
|
|
# Arm URDF convention transform (for wrists only — adjusts end-effector frame)
|
|
left_IPunitree = left_Brobot_world @ T_TO_UNITREE_HUMANOID_LEFT_ARM
|
|
right_IPunitree = right_Brobot_world @ T_TO_UNITREE_HUMANOID_RIGHT_ARM
|
|
|
|
# Hips subtraction (translation only — makes wrist relative to waist)
|
|
hips_pos = hips_robot[0:3, 3]
|
|
|
|
left_wrist_final = left_IPunitree.copy()
|
|
right_wrist_final = right_IPunitree.copy()
|
|
left_wrist_final[0:3, 3] -= hips_pos
|
|
right_wrist_final[0:3, 3] -= hips_pos
|
|
|
|
# Correction rotation: body tracking wrist frame → URDF end-effector frame
|
|
# Computed empirically from measured rotation matrices during palms-down pose.
|
|
# R_CORR_L = Rz(+90°) @ Rx(180°), R_CORR_R = Rz(-90°) @ Rx(180°)
|
|
R_CORR_L = np.array([[ 0, 1, 0],
|
|
[ 1, 0, 0],
|
|
[ 0, 0, -1]], dtype=np.float64)
|
|
R_CORR_R = np.array([[ 0, -1, 0],
|
|
[-1, 0, 0],
|
|
[ 0, 0, -1]], dtype=np.float64)
|
|
left_wrist_final[0:3, 0:3] = left_wrist_final[0:3, 0:3] @ R_CORR_L
|
|
right_wrist_final[0:3, 0:3] = right_wrist_final[0:3, 0:3] @ R_CORR_R
|
|
|
|
# Elbow positions: basis change + hips subtraction (no arm convention transform)
|
|
l_elbow_pos = None
|
|
r_elbow_pos = None
|
|
if bj['l_lower'] is not None:
|
|
l_lower_robot = T_ROBOT_OPENXR @ bj['l_lower'] @ T_OPENXR_ROBOT
|
|
l_elbow_pos = l_lower_robot[0:3, 3] - hips_pos
|
|
if bj['r_lower'] is not None:
|
|
r_lower_robot = T_ROBOT_OPENXR @ bj['r_lower'] @ T_OPENXR_ROBOT
|
|
r_elbow_pos = r_lower_robot[0:3, 3] - hips_pos
|
|
|
|
return left_wrist_final, right_wrist_final, l_elbow_pos, r_elbow_pos
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Calibration: captures neutral pose reference rotations
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class CalibrationData:
|
|
"""Stores reference relative rotations for calibration.
|
|
|
|
When the user stands with arms at sides (robot's zero pose), we capture
|
|
the relative rotation between each parent-child joint pair (R_ref).
|
|
All subsequent tracking computes deltas from these references and
|
|
transforms them to robot frame via a simple basis change.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.calibrated = False
|
|
self.ref_hips_rot = None # frozen hips rotation
|
|
self.ref_chest_rot = None # calibration-time chest rotation
|
|
self.ref_shoulder_L = None # chest.T @ l_upper (chest-relative)
|
|
self.ref_shoulder_R = None # chest.T @ r_upper (chest-relative)
|
|
self.ref_elbow_L = None # l_upper.T @ l_lower
|
|
self.ref_elbow_R = None # r_upper.T @ r_lower
|
|
self.ref_wrist_L = None # l_lower.T @ l_wrist
|
|
self.ref_wrist_R = None # r_lower.T @ r_wrist
|
|
|
|
def capture(self, bj):
|
|
"""Capture reference rotations from current body joint data.
|
|
|
|
Args:
|
|
bj: dict from parse_body_joints(), all entries must be non-None.
|
|
|
|
Returns:
|
|
True if calibration succeeded.
|
|
"""
|
|
if not all_joints_valid(bj):
|
|
return False
|
|
|
|
self.ref_hips_rot = bj['hips'][:3, :3].copy()
|
|
self.ref_chest_rot = bj['chest'][:3, :3].copy()
|
|
self.ref_shoulder_L = self.ref_hips_rot.T @ bj['l_upper'][:3, :3]
|
|
self.ref_shoulder_R = self.ref_hips_rot.T @ bj['r_upper'][:3, :3]
|
|
self.ref_elbow_L = bj['l_upper'][:3, :3].T @ bj['l_lower'][:3, :3]
|
|
self.ref_elbow_R = bj['r_upper'][:3, :3].T @ bj['r_lower'][:3, :3]
|
|
self.ref_wrist_L = bj['l_lower'][:3, :3].T @ bj['l_wrist'][:3, :3]
|
|
self.ref_wrist_R = bj['r_lower'][:3, :3].T @ bj['r_wrist'][:3, :3]
|
|
|
|
self.calibrated = True
|
|
return True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Direct joint retargeting
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _joint_delta_robot(R_ref, R_cur):
|
|
"""Compute rotation delta from reference to current, in robot frame.
|
|
|
|
1. R_delta = R_ref.T @ R_cur (delta in parent-local XR frame)
|
|
2. R_delta_robot = R_basis @ R_delta @ R_basis.T (basis change to robot axes)
|
|
|
|
The basis change transforms the rotation's axes from OpenXR (X-right, Y-up,
|
|
Z-back) to Robot (X-forward, Y-left, Z-up) without needing the parent's
|
|
world orientation.
|
|
|
|
Args:
|
|
R_ref: 3x3 reference relative rotation (from calibration)
|
|
R_cur: 3x3 current relative rotation
|
|
|
|
Returns:
|
|
3x3 delta rotation matrix in robot coordinate frame
|
|
"""
|
|
R_delta = R_ref.T @ R_cur
|
|
return R3_ROBOT_OPENXR @ R_delta @ R3_ROBOT_OPENXR.T
|
|
|
|
|
|
_last_dbg = {} # Raw euler values for debug logging
|
|
SOLO_JOINT = None # Set by --solo-joint arg; None = all joints active
|
|
|
|
# Map solo-joint names to indices within the 17-element angles array
|
|
SOLO_JOINT_MAP = {
|
|
'l_sh_pitch': 3, # joint 15
|
|
'l_sh_roll': 4, # joint 16
|
|
'l_sh_yaw': 5, # joint 17
|
|
'l_elbow': 6, # joint 18
|
|
'l_wr_roll': 7, # joint 19
|
|
'l_wr_pitch': 8, # joint 20
|
|
'l_wr_yaw': 9, # joint 21
|
|
}
|
|
|
|
def compute_all_angles(bj, cal):
|
|
"""Compute all 17 upper body joint angles from body tracking data.
|
|
|
|
Uses calibrated references to compute delta rotations, then decomposes
|
|
into Euler angles matching the URDF joint chain axes.
|
|
|
|
URDF joint chains:
|
|
Waist: yaw(Z) -> roll(X) -> pitch(Y) = ZXY Euler
|
|
Shoulder: pitch(Y) -> roll(X) -> yaw(Z) = YXZ Euler
|
|
Elbow: pitch(Y) = Y component
|
|
Wrist: roll(X) -> pitch(Y) -> yaw(Z) = XYZ Euler
|
|
|
|
Args:
|
|
bj: dict from parse_body_joints()
|
|
cal: CalibrationData (must be calibrated)
|
|
|
|
Returns:
|
|
np.ndarray of shape (17,) for joint indices 12-28
|
|
"""
|
|
angles = np.zeros(17) # indices 0-16 map to joints 12-28
|
|
|
|
# --- Waist (0-2): DISABLED (not calibrated) ---
|
|
# angles[0:3] = [0, 0, 0]
|
|
|
|
# --- Left shoulder (3-5): frozen_hips -> l_upper ---
|
|
R_sh_L_cur = cal.ref_hips_rot.T @ bj['l_upper'][:3, :3]
|
|
R_sd_L = _joint_delta_robot(cal.ref_shoulder_L, R_sh_L_cur)
|
|
sy, sx, sz = Rotation.from_matrix(R_sd_L).as_euler('YXZ')
|
|
|
|
# --- Left elbow (6): l_upper -> l_lower ---
|
|
R_el_L_cur = bj['l_upper'][:3, :3].T @ bj['l_lower'][:3, :3]
|
|
R_ed_L = _joint_delta_robot(cal.ref_elbow_L, R_el_L_cur)
|
|
ey_L, ex_L, ez_L = Rotation.from_matrix(R_ed_L).as_euler('YXZ')
|
|
|
|
# Full matrix mapping (from pose_calibrate.py 2026-02-28)
|
|
# Solves cross-axis coupling + scales in one step
|
|
# robot = M @ [sy, sx, sz, ez]
|
|
M_LEFT = np.array([
|
|
[ +5.4026, -1.7709, +1.0419, -6.7338], # pitch
|
|
[ +4.4651, -0.0869, +2.8320, -5.4197], # roll
|
|
[ +3.1454, -1.6180, +2.3173, -3.9653], # yaw
|
|
[ +0.6962, +1.7176, +1.5304, +1.0564], # elbow
|
|
])
|
|
xr_vec = np.array([sy, sx, sz, ez_L])
|
|
pitch, roll, yaw, elbow = M_LEFT @ xr_vec
|
|
angles[3:6] = [pitch, roll, yaw]
|
|
angles[6] = elbow
|
|
|
|
_last_dbg['sh_YXZ'] = (sy, sx, sz)
|
|
_last_dbg['el_YXZ'] = (ey_L, ex_L, ez_L)
|
|
|
|
# --- Left wrist (7-9): l_lower -> l_wrist ---
|
|
# Parent (lower arm) hangs down: same pattern as elbow (Z-dominant)
|
|
R_wr_L_cur = bj['l_lower'][:3, :3].T @ bj['l_wrist'][:3, :3]
|
|
R_wrd_L = _joint_delta_robot(cal.ref_wrist_L, R_wr_L_cur)
|
|
wy_wl, wx_wl, wz_wl = Rotation.from_matrix(R_wrd_L).as_euler('YXZ')
|
|
angles[7:10] = [0.0, 0.0, 0.0] # all wrist joints disabled for now (noisy)
|
|
|
|
# --- Right arm (10-16): DISABLED (not calibrated, Quest tracking weak) ---
|
|
# angles[10:17] = [0, 0, 0, 0, 0, 0, 0]
|
|
|
|
# Apply per-joint linear mapping: robot_angle = scale * xr_angle + offset
|
|
angles = JOINT_SCALE * angles + JOINT_OFFSET
|
|
|
|
# Solo-joint mode: zero everything except the isolated joint
|
|
if SOLO_JOINT is not None and SOLO_JOINT in SOLO_JOINT_MAP:
|
|
solo_idx = SOLO_JOINT_MAP[SOLO_JOINT]
|
|
solo_val = angles[solo_idx]
|
|
angles[:] = 0.0
|
|
angles[solo_idx] = solo_val
|
|
|
|
return angles
|
|
|
|
|
|
def apply_limits(angles_17):
|
|
"""Clamp 17 upper body angles to URDF joint limits."""
|
|
for i in range(17):
|
|
joint_idx = 12 + i
|
|
lo, hi = JOINT_LIMITS[joint_idx]
|
|
angles_17[i] = np.clip(angles_17[i], lo, hi)
|
|
return angles_17
|
|
|
|
|
|
def smooth_angles(current, target, alpha, max_step):
|
|
"""Apply EMA smoothing + rate limiting.
|
|
|
|
Args:
|
|
current: current angles (modified in place)
|
|
target: target angles
|
|
alpha: EMA alpha (higher = less smooth)
|
|
max_step: max change per step (rad)
|
|
|
|
Returns:
|
|
smoothed angles
|
|
"""
|
|
blended = alpha * target + (1.0 - alpha) * current
|
|
delta = blended - current
|
|
clipped = np.clip(delta, -max_step, max_step)
|
|
return current + clipped
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DDS publisher
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class DDSPublisher:
|
|
"""Publishes G1 joint commands and Inspire hand commands via CycloneDDS."""
|
|
|
|
def __init__(self, domain_id=1):
|
|
from unitree_sdk2py.core.channel import (
|
|
ChannelPublisher, ChannelFactoryInitialize)
|
|
from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowCmd_
|
|
from unitree_sdk2py.idl.default import unitree_hg_msg_dds__LowCmd_
|
|
from unitree_sdk2py.utils.crc import CRC
|
|
from unitree_sdk2py.idl.unitree_go.msg.dds_ import MotorCmds_
|
|
from unitree_sdk2py.idl.default import unitree_go_msg_dds__MotorCmd_
|
|
|
|
logger.info(f"Initializing DDS (domain {domain_id})...")
|
|
ChannelFactoryInitialize(domain_id)
|
|
|
|
self._body_pub = ChannelPublisher("rt/lowcmd", LowCmd_)
|
|
self._body_pub.Init()
|
|
self._low_cmd = unitree_hg_msg_dds__LowCmd_()
|
|
self._crc = CRC()
|
|
self._MotorCmd_ = unitree_go_msg_dds__MotorCmd_
|
|
|
|
self._hand_pub = ChannelPublisher("rt/inspire/cmd", MotorCmds_)
|
|
self._hand_pub.Init()
|
|
self._MotorCmds_ = MotorCmds_
|
|
|
|
logger.info("DDS publishers initialized (rt/lowcmd, rt/inspire/cmd)")
|
|
|
|
def publish_body(self, joint_angles):
|
|
cmd = self._low_cmd
|
|
cmd.mode_pr = 0
|
|
cmd.mode_machine = 0
|
|
|
|
for i in range(NUM_JOINTS):
|
|
cmd.motor_cmd[i].mode = 1
|
|
cmd.motor_cmd[i].q = float(joint_angles[i])
|
|
cmd.motor_cmd[i].dq = 0.0
|
|
cmd.motor_cmd[i].tau = 0.0
|
|
cmd.motor_cmd[i].kp = float(KP[i])
|
|
cmd.motor_cmd[i].kd = float(KD[i])
|
|
|
|
# Zero out legs (not retargeted)
|
|
for i in range(12):
|
|
cmd.motor_cmd[i].mode = 0
|
|
cmd.motor_cmd[i].kp = 0.0
|
|
cmd.motor_cmd[i].kd = 0.0
|
|
|
|
cmd.crc = self._crc.Crc(cmd)
|
|
self._body_pub.Write(cmd)
|
|
|
|
def publish_hands(self, hand_values):
|
|
hand_cmd = self._MotorCmds_()
|
|
hand_cmd.cmds = []
|
|
for i in range(12):
|
|
mc = self._MotorCmd_()
|
|
mc.q = float(np.clip(hand_values[i], 0.0, 1.0))
|
|
mc.dq = 0.0
|
|
mc.tau = 0.0
|
|
mc.kp = 5.0
|
|
mc.kd = 0.5
|
|
hand_cmd.cmds.append(mc)
|
|
self._hand_pub.Write(hand_cmd)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Hand retargeting (dex-retargeting, unchanged)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def init_hand_retargeting():
|
|
from dex_retargeting.retargeting_config import RetargetingConfig
|
|
import yaml
|
|
|
|
urdf_dir = os.path.join(XR_TELEOP_DIR, "assets")
|
|
RetargetingConfig.set_default_urdf_dir(urdf_dir)
|
|
|
|
yml_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "inspire_hand.yml")
|
|
logger.info(f"Loading hand retargeting config from {yml_path}")
|
|
with open(yml_path) as f:
|
|
cfg = yaml.safe_load(f)
|
|
|
|
left_config = RetargetingConfig.from_dict(cfg['left'])
|
|
right_config = RetargetingConfig.from_dict(cfg['right'])
|
|
left_retargeting = left_config.build()
|
|
right_retargeting = right_config.build()
|
|
|
|
left_joint_names = left_retargeting.joint_names
|
|
right_joint_names = right_retargeting.joint_names
|
|
|
|
left_hw_names = ['L_pinky_proximal_joint', 'L_ring_proximal_joint', 'L_middle_proximal_joint',
|
|
'L_index_proximal_joint', 'L_thumb_proximal_pitch_joint', 'L_thumb_proximal_yaw_joint']
|
|
right_hw_names = ['R_pinky_proximal_joint', 'R_ring_proximal_joint', 'R_middle_proximal_joint',
|
|
'R_index_proximal_joint', 'R_thumb_proximal_pitch_joint', 'R_thumb_proximal_yaw_joint']
|
|
|
|
left_hw_map = [left_joint_names.index(n) for n in left_hw_names]
|
|
right_hw_map = [right_joint_names.index(n) for n in right_hw_names]
|
|
|
|
left_indices = left_retargeting.optimizer.target_link_human_indices
|
|
right_indices = right_retargeting.optimizer.target_link_human_indices
|
|
|
|
logger.info("Hand retargeting initialized (dex-retargeting)")
|
|
|
|
return (left_retargeting, right_retargeting,
|
|
left_indices, right_indices,
|
|
left_hw_map, right_hw_map)
|
|
|
|
|
|
def retarget_hands(left_retargeting, right_retargeting,
|
|
left_indices, right_indices,
|
|
left_hw_map, right_hw_map,
|
|
left_hand_pos, right_hand_pos):
|
|
"""Compute 12 normalized Inspire hand values from hand positions."""
|
|
hand_cmd = np.ones(12)
|
|
|
|
left_valid = not np.all(left_hand_pos == 0.0)
|
|
right_valid = not np.all(right_hand_pos == 0.0)
|
|
|
|
if left_valid:
|
|
ref_value = left_hand_pos[left_indices[1, :]] - left_hand_pos[left_indices[0, :]]
|
|
q_rad = left_retargeting.retarget(ref_value)
|
|
q_hw = q_rad[left_hw_map]
|
|
for idx in range(6):
|
|
min_val, max_val = INSPIRE_RANGES[idx]
|
|
hand_cmd[6 + idx] = np.clip((max_val - q_hw[idx]) / (max_val - min_val), 0.0, 1.0)
|
|
|
|
if right_valid:
|
|
ref_value = right_hand_pos[right_indices[1, :]] - right_hand_pos[right_indices[0, :]]
|
|
q_rad = right_retargeting.retarget(ref_value)
|
|
q_hw = q_rad[right_hw_map]
|
|
for idx in range(6):
|
|
min_val, max_val = INSPIRE_RANGES[idx]
|
|
hand_cmd[idx] = np.clip((max_val - q_hw[idx]) / (max_val - min_val), 0.0, 1.0)
|
|
|
|
return hand_cmd
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main loop
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Retargeting bridge v4: Quest 3 full body -> G1 direct joint mapping -> DDS")
|
|
parser.add_argument("--port", type=int, default=8765)
|
|
parser.add_argument("--host", default="0.0.0.0")
|
|
parser.add_argument("--dds-domain", type=int, default=1)
|
|
parser.add_argument("--hz", type=float, default=50.0)
|
|
parser.add_argument("--no-hands", action="store_true")
|
|
parser.add_argument("--no-waist", action="store_true", help="Disable waist tracking")
|
|
parser.add_argument("--no-arms", action="store_true", help="Disable arm tracking")
|
|
parser.add_argument("--mirror", action="store_true")
|
|
parser.add_argument("--verbose", action="store_true")
|
|
parser.add_argument("--cal-delay", type=float, default=10.0,
|
|
help="Seconds to wait before auto-calibrating (default: 10.0)")
|
|
parser.add_argument("--gain", type=float, default=None,
|
|
help="Joint angle gain multiplier (default: 1.4)")
|
|
parser.add_argument("--test", choices=["left", "right", "both", "wave"])
|
|
parser.add_argument("--test-hands", action="store_true")
|
|
parser.add_argument("--solo-joint", type=str, default=None,
|
|
help="Isolate a single joint for testing. Options: "
|
|
"l_sh_pitch, l_sh_roll, l_sh_yaw, l_elbow, "
|
|
"l_wr_roll, l_wr_pitch, l_wr_yaw")
|
|
args = parser.parse_args()
|
|
|
|
if args.solo_joint is not None:
|
|
global SOLO_JOINT
|
|
SOLO_JOINT = args.solo_joint
|
|
|
|
logging.basicConfig(
|
|
level=logging.DEBUG if args.verbose else logging.INFO,
|
|
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s"
|
|
)
|
|
|
|
# --- Start teleop server (WebSocket + shared memory) ---
|
|
logger.info(f"Starting teleop server on ws://{args.host}:{args.port}")
|
|
tv_wrapper = NativeTeleWrapper(port=args.port, host=args.host)
|
|
tv_wrapper.start()
|
|
|
|
# --- Initialize DDS ---
|
|
dds = None
|
|
for attempt in range(60):
|
|
try:
|
|
dds = DDSPublisher(domain_id=args.dds_domain)
|
|
break
|
|
except Exception as e:
|
|
logger.warning(f"DDS init attempt {attempt+1} failed ({e}), retrying in 2s...")
|
|
time.sleep(2.0)
|
|
if dds is None:
|
|
logger.error("Failed to initialize DDS after 60 attempts, exiting.")
|
|
sys.exit(1)
|
|
|
|
# --- Test mode ---
|
|
if args.test:
|
|
_run_test_mode(args, dds)
|
|
return
|
|
|
|
# --- Initialize hand retargeting ---
|
|
hand_data = init_hand_retargeting() if not args.no_hands else None
|
|
if hand_data:
|
|
l_retarget, r_retarget, l_indices, r_indices, l_hw_map, r_hw_map = hand_data
|
|
|
|
# --- Initialize IK solver ---
|
|
logger.info("Initializing IK solver (Pinocchio + CasADi)...")
|
|
orig_cwd = os.getcwd()
|
|
os.chdir(os.path.join(XR_TELEOP_DIR, "teleop"))
|
|
arm_ik = G1_29_ArmIK_Elbow(elbow_weight=5.0, Unit_Test=False, Visualization=False)
|
|
os.chdir(orig_cwd)
|
|
logger.info("IK solver ready (14 DOF: 7 left arm + 7 right arm, elbow constraints)")
|
|
|
|
# --- Control loop state ---
|
|
interval = 1.0 / args.hz
|
|
max_step = MAX_RATE * interval # rad per step
|
|
running = True
|
|
current_angles = np.zeros(NUM_JOINTS)
|
|
smoothed_upper = np.zeros(17) # joints 12-28
|
|
current_hands = np.ones(12)
|
|
tracking_timeout = 2.0 # seconds without data before considering tracking lost
|
|
return_to_zero_rate = 0.02
|
|
last_valid_time = 0.0
|
|
startup_ramp = 0.0
|
|
startup_ramp_duration = 2.0
|
|
startup_time = None
|
|
step_count = 0
|
|
retarget_error_count = 0
|
|
|
|
# Calibration state
|
|
cal = CalibrationData()
|
|
first_data_time = None
|
|
was_tracking = False # track connection state for re-calibration
|
|
|
|
def signal_handler(sig, frame):
|
|
nonlocal running
|
|
running = False
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
logger.info(f"Bridge v5 running at {args.hz} Hz (IK retargeting via Pinocchio)")
|
|
logger.info(f" Waist={'ON' if not args.no_waist else 'OFF'}, "
|
|
f"Arms={'ON' if not args.no_arms else 'OFF'}, "
|
|
f"Hands={'ON' if hand_data else 'OFF'}")
|
|
logger.info(f" Startup delay: {args.cal_delay}s after first tracking frame")
|
|
logger.info("Waiting for Quest 3...")
|
|
|
|
while running:
|
|
t_start = time.perf_counter()
|
|
|
|
# Read tracking data (for hand positions)
|
|
tele_data = tv_wrapper.get_tele_data()
|
|
data_time = tv_wrapper.last_data_time
|
|
now = time.time()
|
|
|
|
# Read body joints directly from shared memory (world-space pose7)
|
|
with tv_wrapper.server.body_joints_shared.get_lock():
|
|
bj_raw = np.array(tv_wrapper.server.body_joints_shared[:])
|
|
has_body_joints = np.any(bj_raw != 0)
|
|
|
|
if data_time > 0 and (now - data_time) < tracking_timeout:
|
|
last_valid_time = now
|
|
is_tracking = True
|
|
|
|
# Track when first data arrives
|
|
if first_data_time is None:
|
|
first_data_time = now
|
|
logger.info("First tracking data received")
|
|
|
|
# Detect reconnection: tracking resumed after being lost
|
|
if not was_tracking and cal.calibrated:
|
|
logger.info("Tracking resumed — keeping existing calibration")
|
|
|
|
was_tracking = True
|
|
|
|
# Startup ramp (starts after calibration)
|
|
if startup_time is not None:
|
|
elapsed_startup = now - startup_time
|
|
startup_ramp = min(elapsed_startup / startup_ramp_duration, 1.0)
|
|
|
|
# --- Start IK retargeting after delay ---
|
|
if not cal.calibrated and first_data_time is not None:
|
|
if (now - first_data_time) >= args.cal_delay:
|
|
cal.calibrated = True # IK doesn't need body joint calibration
|
|
logger.info("Startup delay complete — IK retargeting active.")
|
|
startup_time = now
|
|
|
|
# --- IK retarget: wrist poses -> joint angles ---
|
|
if cal.calibrated and has_body_joints:
|
|
try:
|
|
bj = parse_body_joints(bj_raw)
|
|
if (bj['l_wrist'] is not None and bj['r_wrist'] is not None
|
|
and bj['hips'] is not None):
|
|
# Build IK targets: wrist SE(3) + elbow positions
|
|
lw, rw, l_elbow, r_elbow = build_ik_targets(bj)
|
|
|
|
# Debug: log IK targets every 250 steps
|
|
if step_count % 250 == 0:
|
|
l_elb_str = f"[{l_elbow[0]:+.3f},{l_elbow[1]:+.3f},{l_elbow[2]:+.3f}]" if l_elbow is not None else "None"
|
|
r_elb_str = f"[{r_elbow[0]:+.3f},{r_elbow[1]:+.3f},{r_elbow[2]:+.3f}]" if r_elbow is not None else "None"
|
|
# Raw OpenXR positions (Y-up, -Z forward)
|
|
lw_raw = bj['l_wrist'][0:3, 3] if bj['l_wrist'] is not None else np.zeros(3)
|
|
rw_raw = bj['r_wrist'][0:3, 3] if bj['r_wrist'] is not None else np.zeros(3)
|
|
le_raw = bj['l_lower'][0:3, 3] if bj['l_lower'] is not None else np.zeros(3)
|
|
re_raw = bj['r_lower'][0:3, 3] if bj['r_lower'] is not None else np.zeros(3)
|
|
hips_raw = bj['hips'][0:3, 3] if bj['hips'] is not None else np.zeros(3)
|
|
logger.info(f" [RAW OpenXR] hips=[{hips_raw[0]:+.3f},{hips_raw[1]:+.3f},{hips_raw[2]:+.3f}]")
|
|
logger.info(f" [RAW OpenXR] L_wrist=[{lw_raw[0]:+.3f},{lw_raw[1]:+.3f},{lw_raw[2]:+.3f}] "
|
|
f"L_elbow=[{le_raw[0]:+.3f},{le_raw[1]:+.3f},{le_raw[2]:+.3f}]")
|
|
logger.info(f" [RAW OpenXR] R_wrist=[{rw_raw[0]:+.3f},{rw_raw[1]:+.3f},{rw_raw[2]:+.3f}] "
|
|
f"R_elbow=[{re_raw[0]:+.3f},{re_raw[1]:+.3f},{re_raw[2]:+.3f}]")
|
|
logger.info(f" [IK INPUT] L_wrist pos=[{lw[0,3]:+.3f},{lw[1,3]:+.3f},{lw[2,3]:+.3f}] "
|
|
f"R_wrist pos=[{rw[0,3]:+.3f},{rw[1,3]:+.3f},{rw[2,3]:+.3f}]")
|
|
logger.info(f" [IK INPUT] L_elbow={l_elb_str} R_elbow={r_elb_str}")
|
|
|
|
# Get current arm joint angles for warm-starting the IK solver
|
|
current_arm_q = np.concatenate([
|
|
current_angles[15:22], # left arm (joints 15-21)
|
|
current_angles[22:29], # right arm (joints 22-28)
|
|
])
|
|
current_arm_dq = np.zeros(14)
|
|
|
|
sol_q, sol_tauff = arm_ik.solve_ik(
|
|
lw, rw,
|
|
current_arm_q,
|
|
current_arm_dq,
|
|
left_elbow_pos=l_elbow,
|
|
right_elbow_pos=r_elbow,
|
|
)
|
|
|
|
# Map 14-DOF IK solution to 17-element upper body array
|
|
raw_angles = np.zeros(17)
|
|
raw_angles[3:10] = sol_q[:7] # left arm
|
|
raw_angles[10:17] = sol_q[7:] # right arm
|
|
|
|
raw_angles = apply_limits(raw_angles)
|
|
|
|
# Zero out disabled groups
|
|
if args.no_waist:
|
|
raw_angles[0:3] = 0.0
|
|
if args.no_arms:
|
|
raw_angles[3:17] = 0.0
|
|
|
|
# Smooth
|
|
smoothed_upper = smooth_angles(smoothed_upper, raw_angles, EMA_ALPHA, max_step)
|
|
|
|
# Apply startup ramp and write to output
|
|
current_angles[12:29] = smoothed_upper * startup_ramp
|
|
|
|
except Exception as e:
|
|
retarget_error_count += 1
|
|
if retarget_error_count <= 5 or retarget_error_count % 100 == 0:
|
|
logger.warning(f"IK solve failed ({retarget_error_count}x): {e}")
|
|
|
|
# --- Hand retargeting ---
|
|
if hand_data and cal.calibrated:
|
|
left_hand_pos = tele_data.left_hand_pos
|
|
right_hand_pos = tele_data.right_hand_pos
|
|
|
|
# Debug: log hand data validity periodically
|
|
if step_count % 250 == 0:
|
|
l_valid = left_hand_pos is not None and not np.all(left_hand_pos == 0.0)
|
|
r_valid = right_hand_pos is not None and not np.all(right_hand_pos == 0.0)
|
|
if left_hand_pos is not None:
|
|
l_range = np.ptp(left_hand_pos, axis=0) if left_hand_pos.ndim == 2 else np.ptp(left_hand_pos)
|
|
else:
|
|
l_range = 0
|
|
logger.info(f" [HAND DEBUG] L_valid={l_valid} R_valid={r_valid} L_shape={left_hand_pos.shape if left_hand_pos is not None else None} L_range={l_range}")
|
|
|
|
if args.mirror:
|
|
left_hand_pos, right_hand_pos = right_hand_pos, left_hand_pos
|
|
|
|
try:
|
|
raw_hands = retarget_hands(
|
|
l_retarget, r_retarget,
|
|
l_indices, r_indices,
|
|
l_hw_map, r_hw_map,
|
|
left_hand_pos, right_hand_pos)
|
|
current_hands = raw_hands * startup_ramp + (1.0 - startup_ramp) * 1.0
|
|
if step_count % 250 == 0:
|
|
np.set_printoptions(precision=2, suppress=True)
|
|
logger.info(f" [HAND OUT] R=[{current_hands[0]:.2f},{current_hands[1]:.2f},{current_hands[2]:.2f},{current_hands[3]:.2f},{current_hands[4]:.2f},{current_hands[5]:.2f}] L=[{current_hands[6]:.2f},{current_hands[7]:.2f},{current_hands[8]:.2f},{current_hands[9]:.2f},{current_hands[10]:.2f},{current_hands[11]:.2f}]")
|
|
except Exception as e:
|
|
logger.warning(f"Hand retarget failed: {e}")
|
|
|
|
else:
|
|
# No tracking data or stale
|
|
was_tracking = False
|
|
if last_valid_time > 0 and (now - last_valid_time) > tracking_timeout:
|
|
# Gradually return to zero
|
|
for i in range(12, NUM_JOINTS):
|
|
if abs(current_angles[i]) > return_to_zero_rate:
|
|
current_angles[i] -= np.sign(current_angles[i]) * return_to_zero_rate
|
|
else:
|
|
current_angles[i] = 0.0
|
|
smoothed_upper *= 0.95 # decay smoothed state too
|
|
current_hands = np.minimum(current_hands + 0.02, 1.0)
|
|
|
|
# --- Publish ---
|
|
dds.publish_body(current_angles)
|
|
if hand_data:
|
|
dds.publish_hands(current_hands)
|
|
|
|
# --- Status logging ---
|
|
step_count += 1
|
|
log_interval = int(args.hz * (1 if args.verbose else 5))
|
|
if log_interval > 0 and step_count % log_interval == 0:
|
|
tracking = data_time > 0 and (now - data_time) < tracking_timeout
|
|
status = "TRACKING" if (tracking and cal.calibrated) else (
|
|
"CALIBRATING" if (tracking and not cal.calibrated) else "WAITING")
|
|
ramp_pct = int(startup_ramp * 100)
|
|
waist = current_angles[12:15]
|
|
l_sh = current_angles[15:18]
|
|
l_el = current_angles[18]
|
|
r_sh = current_angles[22:25]
|
|
r_el = current_angles[25]
|
|
logger.info(
|
|
f"[{status}] step={step_count} ramp={ramp_pct}% "
|
|
f"waist=[{waist[0]:+.2f},{waist[1]:+.2f},{waist[2]:+.2f}] "
|
|
f"L_sh=[{l_sh[0]:+.2f},{l_sh[1]:+.2f},{l_sh[2]:+.2f}] L_el={l_el:+.2f} "
|
|
f"R_sh=[{r_sh[0]:+.2f},{r_sh[1]:+.2f},{r_sh[2]:+.2f}] R_el={r_el:+.2f}")
|
|
if tracking and cal.calibrated:
|
|
l_wr = current_angles[19:22]
|
|
r_wr = current_angles[26:29]
|
|
logger.info(
|
|
f" L_wr=[{l_wr[0]:+.2f},{l_wr[1]:+.2f},{l_wr[2]:+.2f}] "
|
|
f"R_wr=[{r_wr[0]:+.2f},{r_wr[1]:+.2f},{r_wr[2]:+.2f}]")
|
|
if args.verbose and tracking and has_body_joints:
|
|
logger.debug(
|
|
f" Full upper body: {np.array2string(current_angles[12:29], precision=3, separator=',')}")
|
|
# Log raw quaternions for L/R upper arms to diagnose tracking issues
|
|
l_upper_q = bj_raw[BJ_L_UPPER*7:(BJ_L_UPPER+1)*7]
|
|
r_upper_q = bj_raw[BJ_R_UPPER*7:(BJ_R_UPPER+1)*7]
|
|
l_lower_q = bj_raw[BJ_L_LOWER*7:(BJ_L_LOWER+1)*7]
|
|
r_lower_q = bj_raw[BJ_R_LOWER*7:(BJ_R_LOWER+1)*7]
|
|
logger.debug(
|
|
f" L_upper_q=[{l_upper_q[3]:+.4f},{l_upper_q[4]:+.4f},{l_upper_q[5]:+.4f},{l_upper_q[6]:+.4f}] "
|
|
f"R_upper_q=[{r_upper_q[3]:+.4f},{r_upper_q[4]:+.4f},{r_upper_q[5]:+.4f},{r_upper_q[6]:+.4f}]")
|
|
logger.debug(
|
|
f" L_lower_q=[{l_lower_q[3]:+.4f},{l_lower_q[4]:+.4f},{l_lower_q[5]:+.4f},{l_lower_q[6]:+.4f}] "
|
|
f"R_lower_q=[{r_lower_q[3]:+.4f},{r_lower_q[4]:+.4f},{r_lower_q[5]:+.4f},{r_lower_q[6]:+.4f}]")
|
|
if retarget_error_count > 0:
|
|
logger.debug(f" Retarget errors: {retarget_error_count}")
|
|
|
|
sleep_time = interval - (time.perf_counter() - t_start)
|
|
if sleep_time > 0:
|
|
time.sleep(sleep_time)
|
|
|
|
logger.info("Bridge shutting down")
|
|
|
|
|
|
def _run_test_mode(args, dds):
|
|
"""Run test mode: send known poses without Quest."""
|
|
logger.info(f"TEST MODE: '{args.test}' — sending fixed poses (no Quest needed)")
|
|
running = True
|
|
|
|
def signal_handler(sig, frame):
|
|
nonlocal running
|
|
running = False
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
interval = 1.0 / args.hz
|
|
step_count = 0
|
|
t0 = time.time()
|
|
|
|
while running:
|
|
t_start = time.perf_counter()
|
|
angles = np.zeros(NUM_JOINTS)
|
|
elapsed = time.time() - t0
|
|
|
|
if args.test == "left":
|
|
angles[15] = 0.5; angles[16] = 0.3; angles[18] = 0.4
|
|
elif args.test == "right":
|
|
angles[22] = 0.5; angles[23] = -0.3; angles[25] = 0.4
|
|
elif args.test == "both":
|
|
angles[15] = 0.5; angles[16] = 0.3; angles[18] = 0.4
|
|
angles[22] = -0.5; angles[23] = -0.3; angles[25] = 0.8
|
|
elif args.test == "wave":
|
|
wave = np.sin(elapsed * 1.5)
|
|
angles[15] = 0.8 * wave; angles[18] = 0.3 + 0.3 * wave
|
|
angles[22] = -0.8 * wave; angles[25] = 0.3 - 0.3 * wave
|
|
|
|
dds.publish_body(angles)
|
|
|
|
if args.test_hands:
|
|
hand_vals = np.ones(12)
|
|
cycle = (elapsed % 4.0) / 4.0
|
|
curl = 1.0 - np.sin(cycle * np.pi)
|
|
for i in [0, 1, 2, 3, 6, 7, 8, 9]:
|
|
hand_vals[i] = curl
|
|
hand_vals[4] = 0.5 + 0.5 * curl
|
|
hand_vals[10] = 0.5 + 0.5 * curl
|
|
hand_vals[5] = 0.5; hand_vals[11] = 0.5
|
|
dds.publish_hands(hand_vals)
|
|
|
|
step_count += 1
|
|
if step_count % int(args.hz * 2) == 0:
|
|
logger.info(f"[TEST:{args.test}] step={step_count}")
|
|
|
|
sleep_time = interval - (time.perf_counter() - t_start)
|
|
if sleep_time > 0:
|
|
time.sleep(sleep_time)
|
|
|
|
logger.info("Test mode shutting down")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|