You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

265 lines
9.7 KiB

#!/usr/bin/env python3
"""
Joint calibration tool: animates ONE robot joint while recording Quest tracking data.
The user copies the robot's motion. By comparing the known robot joint angle
with the raw XR tracking deltas, we determine the correct axis mapping and sign.
Usage:
python joint_calibrate.py l_shoulder_pitch [--period 6] [--amplitude 0.8]
"""
import sys
import os
import time
import argparse
import signal
import numpy as np
from scipy.spatial.transform import Rotation
# Add xr_teleoperate to path
sys.path.insert(0, "/home/sparky/git/xr_teleoperate")
import logging_mp
if not hasattr(logging_mp, 'getLogger'):
logging_mp.getLogger = logging_mp.get_logger
import dex_retargeting
if not hasattr(dex_retargeting, 'RetargetingConfig'):
from dex_retargeting.retargeting_config import RetargetingConfig
dex_retargeting.RetargetingConfig = RetargetingConfig
from teleop_server import TeleopServer
from native_tv_wrapper import T_ROBOT_OPENXR, NativeTeleWrapper
NUM_JOINTS = 29
R3_ROBOT_OPENXR = T_ROBOT_OPENXR[:3, :3]
JOINTS = {
'l_shoulder_pitch': 15, 'l_shoulder_roll': 16, 'l_shoulder_yaw': 17,
'l_elbow': 18,
'l_wrist_roll': 19, 'l_wrist_pitch': 20, 'l_wrist_yaw': 21,
'r_shoulder_pitch': 22, 'r_shoulder_roll': 23, 'r_shoulder_yaw': 24,
'r_elbow': 25,
}
# Which XR parent-child pair to monitor for each robot joint
# Using hips (not chest) for shoulder to avoid head rotation contamination
JOINT_PARENTS = {
'l_shoulder_pitch': ('hips', 'l_upper'),
'l_shoulder_roll': ('hips', 'l_upper'),
'l_shoulder_yaw': ('hips', 'l_upper'),
'l_elbow': ('l_upper', 'l_lower'),
'l_wrist_roll': ('l_lower', 'l_wrist'),
'l_wrist_pitch': ('l_lower', 'l_wrist'),
'l_wrist_yaw': ('l_lower', 'l_wrist'),
'r_shoulder_pitch': ('hips', 'r_upper'),
'r_shoulder_roll': ('hips', 'r_upper'),
'r_shoulder_yaw': ('hips', 'r_upper'),
'r_elbow': ('r_upper', 'r_lower'),
}
KP = [60,60,60,100,40,40, 60,60,60,100,40,40, 60,40,40, 40,40,40,40,40,40,40, 40,40,40,40,40,40,40]
KD = [1,1,1,2,1,1, 1,1,1,2,1,1, 1,1,1, 1,1,1,1,1,1,1, 1,1,1,1,1,1,1]
# Calibrated extents from joint testing
DEFAULT_AMP = {
'l_shoulder_pitch': 1.6, 'l_shoulder_roll': 0.8, 'l_shoulder_yaw': 1.15,
'l_elbow': 1.065,
'l_wrist_roll': 1.35, 'l_wrist_pitch': 0.8, 'l_wrist_yaw': 0.89,
'r_shoulder_pitch': 1.6, 'r_shoulder_roll': 0.8, 'r_shoulder_yaw': 1.15,
'r_elbow': 1.065,
}
DEFAULT_OFFSET = {
'l_shoulder_pitch': -0.8, 'l_shoulder_roll': 0.785, 'l_shoulder_yaw': 0.0,
'l_elbow': 0.265,
'l_wrist_roll': 0.0, 'l_wrist_pitch': 0.0, 'l_wrist_yaw': -0.39,
'r_shoulder_pitch': -0.8, 'r_shoulder_roll': -0.785, 'r_shoulder_yaw': 0.0,
'r_elbow': 0.265,
}
def pose7_to_mat4(pose7):
mat = np.eye(4)
r = Rotation.from_quat(pose7[3:7])
mat[:3, :3] = r.as_matrix()
mat[:3, 3] = pose7[:3]
return mat
def parse_body_joints(flat):
names = ['hips', 'chest', 'l_upper', 'l_lower', 'l_wrist',
'r_upper', 'r_lower', 'r_wrist']
joints = {}
for i, name in enumerate(names):
p7 = flat[i*7:(i+1)*7]
if np.all(p7 == 0):
joints[name] = None
else:
joints[name] = pose7_to_mat4(p7)
return joints
def main():
parser = argparse.ArgumentParser(description="Joint calibration with Quest tracking")
parser.add_argument("joint", choices=list(JOINTS.keys()))
parser.add_argument("--amplitude", "-a", type=float, default=None)
parser.add_argument("--offset", "-o", type=float, default=None)
parser.add_argument("--period", "-p", type=float, default=8.0)
parser.add_argument("--hz", type=float, default=50.0)
parser.add_argument("--port", type=int, default=8765)
parser.add_argument("--dds-domain", type=int, default=1)
parser.add_argument("--cal-delay", type=float, default=10.0,
help="Seconds to wait before calibrating XR (default: 10.0)")
args = parser.parse_args()
joint_idx = JOINTS[args.joint]
amplitude = args.amplitude if args.amplitude is not None else DEFAULT_AMP.get(args.joint, 0.5)
offset = args.offset if args.offset is not None else DEFAULT_OFFSET.get(args.joint, 0.0)
parent_name, child_name = JOINT_PARENTS[args.joint]
print(f"\n{'='*70}")
print(f"JOINT CALIBRATION: {args.joint} (index {joint_idx})")
print(f"Monitoring XR: {parent_name} -> {child_name}")
print(f"Animation: sine amp={amplitude:.2f} offset={offset:.2f} rad, period {args.period:.1f}s")
print(f"Range: [{offset-amplitude:.2f}, {offset+amplitude:.2f}] rad")
print(f"XR cal delay: {args.cal_delay}s")
print(f"{'='*70}")
print(f"\nStarting WebSocket server on port {args.port}...")
# Start teleop server
tv = NativeTeleWrapper(port=args.port, host="0.0.0.0")
tv.start()
time.sleep(0.5)
# Init DDS
from unitree_sdk2py.core.channel import ChannelPublisher, ChannelFactoryInitialize
from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowCmd_
from unitree_sdk2py.idl.default import unitree_hg_msg_dds__LowCmd_
from unitree_sdk2py.utils.crc import CRC
ChannelFactoryInitialize(args.dds_domain)
pub = ChannelPublisher("rt/lowcmd", LowCmd_)
pub.Init()
cmd = unitree_hg_msg_dds__LowCmd_()
crc = CRC()
running = True
def handler(sig, frame):
nonlocal running
running = False
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
# Wait for Quest connection
print("\nConnect Quest 3 and hold the ROBOT'S REST POSE for calibration...")
print("(Arms at sides, elbows bent 90 degrees, forearms forward)")
print(f"XR calibration will happen {args.cal_delay}s after first tracking frame.\n")
cal_ref = None
first_tracking_time = None
interval = 1.0 / args.hz
t0 = time.time() # Start animation immediately
step = 0
while running:
t_start = time.perf_counter()
now = time.time()
step += 1
# 1) Compute robot animation value
elapsed = now - t0
robot_val = offset + amplitude * np.sin(2 * np.pi * elapsed / args.period)
angles = np.zeros(NUM_JOINTS)
angles[joint_idx] = robot_val
# 2) Read XR tracking and compute comparison (if available)
xr_info = None
with tv.server.body_joints_shared.get_lock():
bj_raw = np.array(tv.server.body_joints_shared[:])
has_data = np.any(bj_raw != 0)
data_time = tv.last_data_time
tracking = data_time > 0 and (now - data_time) < 0.5
if tracking and has_data:
if first_tracking_time is None:
first_tracking_time = now
print(f"\nFirst tracking data! Calibrating in {args.cal_delay}s — hold the rest pose...")
bj = parse_body_joints(bj_raw)
parent = bj.get(parent_name)
child = bj.get(child_name)
if parent is not None and child is not None:
R_parent = parent[:3, :3]
R_child = child[:3, :3]
R_rel = R_parent.T @ R_child
# Calibrate after delay
if cal_ref is None and first_tracking_time is not None and (now - first_tracking_time) >= args.cal_delay:
cal_ref = R_rel.copy()
print("\nCALIBRATED! Now copy the robot's motion.\n")
print(f"{'Robot Value':>12s} | {'XR Delta (robot frame YXZ Euler)':>40s} | {'XR Delta (robot frame XYZ)':>35s}")
print("-" * 95)
if cal_ref is not None:
# Simple basis change: XR frame -> robot frame
R_delta = cal_ref.T @ R_rel
R_delta_robot = R3_ROBOT_OPENXR @ R_delta @ R3_ROBOT_OPENXR.T
yxz = Rotation.from_matrix(R_delta_robot).as_euler('YXZ')
xyz = Rotation.from_matrix(R_delta_robot).as_euler('XYZ')
xr_info = (yxz, xyz)
# 3) Publish DDS (once per frame, always with current angles)
cmd.mode_pr = 0
cmd.mode_machine = 0
for i in range(NUM_JOINTS):
cmd.motor_cmd[i].mode = 1
cmd.motor_cmd[i].q = float(angles[i])
cmd.motor_cmd[i].dq = 0.0
cmd.motor_cmd[i].tau = 0.0
cmd.motor_cmd[i].kp = float(KP[i])
cmd.motor_cmd[i].kd = float(KD[i])
for i in range(12):
cmd.motor_cmd[i].mode = 0
cmd.motor_cmd[i].kp = 0.0
cmd.motor_cmd[i].kd = 0.0
cmd.crc = crc.Crc(cmd)
pub.Write(cmd)
# 4) Log
if xr_info and step % int(args.hz) == 0:
yxz, xyz = xr_info
print(f" {robot_val:+.3f} rad | "
f"Y={yxz[0]:+.3f} X={yxz[1]:+.3f} Z={yxz[2]:+.3f} | "
f"X={xyz[0]:+.3f} Y={xyz[1]:+.3f} Z={xyz[2]:+.3f}")
elif not xr_info and step % int(args.hz * 2) == 0:
print(f" robot={robot_val:+.3f} (Waiting for Quest tracking data...)")
sleep_time = interval - (time.perf_counter() - t_start)
if sleep_time > 0:
time.sleep(sleep_time)
# Return to zero
print("\nReturning to zero...")
angles = np.zeros(NUM_JOINTS)
for _ in range(25):
cmd.mode_pr = 0
cmd.mode_machine = 0
for i in range(NUM_JOINTS):
cmd.motor_cmd[i].mode = 1
cmd.motor_cmd[i].q = 0.0
cmd.motor_cmd[i].dq = 0.0
cmd.motor_cmd[i].tau = 0.0
cmd.motor_cmd[i].kp = float(KP[i])
cmd.motor_cmd[i].kd = float(KD[i])
for i in range(12):
cmd.motor_cmd[i].mode = 0
cmd.motor_cmd[i].kp = 0.0
cmd.motor_cmd[i].kd = 0.0
cmd.crc = crc.Crc(cmd)
pub.Write(cmd)
time.sleep(0.02)
print("Done.")
if __name__ == "__main__":
main()