You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

828 lines
44 KiB

import time
import argparse
import struct
import numpy as np
from multiprocessing import Value, Array, Lock
import threading
import logging_mp
logging_mp.basicConfig(level=logging_mp.INFO)
logger_mp = logging_mp.getLogger(__name__)
import os
import sys
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(current_dir)
sys.path.append(parent_dir)
from unitree_sdk2py.core.channel import ChannelFactoryInitialize # dds
from televuer import TeleVuerWrapper
from teleop.robot_control.robot_arm import G1_29_ArmController, G1_23_ArmController, H1_2_ArmController, H1_ArmController
from teleop.robot_control.robot_arm_ik import G1_29_ArmIK, G1_23_ArmIK, H1_2_ArmIK, H1_ArmIK
from teleimager.image_client import ImageClient
from teleop.utils.episode_writer import EpisodeWriter
from teleop.utils.ipc import IPC_Server
from teleop.utils.motion_switcher import MotionSwitcher, LocoClientWrapper
from sshkeyboard import listen_keyboard, stop_listening
# for simulation
from unitree_sdk2py.core.channel import ChannelPublisher
from unitree_sdk2py.idl.std_msgs.msg.dds_ import String_
def publish_reset_category(category: int, publisher): # Scene Reset signal
msg = String_(data=str(category))
publisher.Write(msg)
logger_mp.info(f"published reset category: {category}")
# state transition
START = False # Enable to start robot following VR user motion
STOP = False # Enable to begin system exit procedure
READY = False # Ready to (1) enter START state, (2) enter RECORD_RUNNING state
RECORD_RUNNING = False # True if [Recording]
RECORD_TOGGLE = False # Toggle recording state
INTEGRAL_ENABLED = True # Toggle integral drift correction via 'i' key
RESET_ARMS = False # Trigger arm reset via 'h' key
# ------- --------- ----------- ----------- ---------
# state [Ready] ==> [Recording] ==> [AutoSave] --> [Ready]
# ------- --------- | ----------- | ----------- | ---------
# START True |manual True |manual True | True
# READY True |set False |set False |auto True
# RECORD_RUNNING False |to True |to False | False
# ∨ ∨ ∨
# RECORD_TOGGLE False True False True False False
# ------- --------- ----------- ----------- ---------
# ==> manual: when READY is True, set RECORD_TOGGLE=True to transition.
# --> auto : Auto-transition after saving data.
def parse_r3_buttons(data):
"""Parse R3 controller button bytes from wireless_remote. Returns dict of button states."""
if len(data) < 4:
return {}
btn1 = data[2] # R1=0x01, L1=0x02, Start=0x04, Select=0x08, R2=0x10, L2=0x20
btn2 = data[3] # A=0x01, B=0x02, X=0x04, Y=0x08, Up=0x10, Right=0x20, Down=0x40, Left=0x80
return {
'A': bool(btn2 & 0x01),
'B': bool(btn2 & 0x02),
'X': bool(btn2 & 0x04),
'Y': bool(btn2 & 0x08),
'Start': bool(btn1 & 0x04),
}
def rotation_to_rotvec(R):
"""Convert 3x3 rotation matrix to axis-angle (rotation vector, 3-vec). Pure numpy."""
cos_theta = np.clip((np.trace(R) - 1.0) / 2.0, -1.0, 1.0)
theta = np.arccos(cos_theta)
if theta < 1e-6:
return np.zeros(3)
axis = np.array([
R[2, 1] - R[1, 2],
R[0, 2] - R[2, 0],
R[1, 0] - R[0, 1]
]) / (2.0 * np.sin(theta))
return axis * theta
def rotvec_to_rotation(rv):
"""Convert rotation vector (3-vec) to 3x3 rotation matrix via Rodrigues. Pure numpy."""
theta = np.linalg.norm(rv)
if theta < 1e-6:
return np.eye(3)
axis = rv / theta
K = np.array([[0, -axis[2], axis[1]],
[axis[2], 0, -axis[0]],
[-axis[1], axis[0], 0]])
return np.eye(3) + np.sin(theta) * K + (1.0 - np.cos(theta)) * (K @ K)
def scale_rotation(R, alpha):
"""Scale a rotation matrix's angle by alpha (0=identity, 1=original). Pure numpy."""
rv = rotation_to_rotvec(R)
return rotvec_to_rotation(rv * alpha)
# Previous button state for edge detection
_r3_prev_buttons = {}
def on_press(key):
global STOP, START, RECORD_TOGGLE, INTEGRAL_ENABLED, RESET_ARMS
if key == 'r':
START = not START
logger_mp.info(f"[key] r pressed → tracking {'STARTED' if START else 'PAUSED'}")
elif key == 'q':
START = False
STOP = True
elif key == 's' and START == True:
RECORD_TOGGLE = True
elif key == 'i':
INTEGRAL_ENABLED = not INTEGRAL_ENABLED
logger_mp.info(f"[on_press] Integral correction {'ENABLED' if INTEGRAL_ENABLED else 'DISABLED'}")
elif key == 'h':
RESET_ARMS = True
logger_mp.info("[on_press] Arm reset requested (return to home)")
else:
logger_mp.warning(f"[on_press] {key} was pressed, but no action is defined for this key.")
def get_state() -> dict:
"""Return current heartbeat state"""
global START, STOP, RECORD_RUNNING, READY
return {
"START": START,
"STOP": STOP,
"READY": READY,
"RECORD_RUNNING": RECORD_RUNNING,
}
if __name__ == '__main__':
parser = argparse.ArgumentParser()
# basic control parameters
parser.add_argument('--frequency', type = float, default = 30.0, help = 'control and record \'s frequency')
parser.add_argument('--input-mode', type=str, choices=['hand', 'controller'], default='hand', help='Select XR device input tracking source')
parser.add_argument('--display-mode', type=str, choices=['immersive', 'ego', 'pass-through'], default='immersive', help='Select XR device display mode')
parser.add_argument('--arm', type=str, choices=['G1_29', 'G1_23', 'H1_2', 'H1'], default='G1_29', help='Select arm controller')
parser.add_argument('--ee', type=str, choices=['dex1', 'dex3', 'inspire_ftp', 'inspire_dfx', 'brainco'], help='Select end effector controller')
parser.add_argument('--img-server-ip', type=str, default='192.168.123.164', help='IP address of image server, used by teleimager and televuer')
parser.add_argument('--network-interface', type=str, default=None, help='Network interface for dds communication, e.g., eth0, wlan0. If None, use default interface.')
# mode flags
parser.add_argument('--motion', action = 'store_true', help = 'Enable motion control mode')
parser.add_argument('--headless', action='store_true', help='Enable headless mode (no display)')
parser.add_argument('--sim', action = 'store_true', help = 'Enable isaac simulation mode')
parser.add_argument('--ipc', action = 'store_true', help = 'Enable IPC server to handle input; otherwise enable sshkeyboard')
parser.add_argument('--affinity', action = 'store_true', help = 'Enable high priority and set CPU affinity mode')
# integral drift correction
parser.add_argument('--ki', type=float, default=0.3, help='Integral gain for drift correction (0.0 = disabled)')
parser.add_argument('--i-clamp', type=float, default=0.10, help='Anti-windup clamp for I term in meters (default: 0.10)')
parser.add_argument('--i-decay', type=float, default=0.995, help='Per-frame decay factor for I term (default: 0.995)')
# rotation integral drift correction
parser.add_argument('--ki-rot', type=float, default=0.3, help='Rotation integral gain (0.0 = disabled)')
parser.add_argument('--i-rot-clamp', type=float, default=0.3, help='Anti-windup clamp for rotation I term in radians (default: 0.3)')
parser.add_argument('--no-calibration', action='store_true',
help='Disable start-time calibration (use absolute VP poses)')
parser.add_argument('--settle-time', type=float, default=0.5,
help='Seconds to ramp tracking after calibration (default: 0.5)')
parser.add_argument('--rot-blend', type=float, default=0.3,
help='Rotation tracking blend (0=position-only, 1=full rotation, default: 0.3)')
# record mode and task info
parser.add_argument('--record', action = 'store_true', help = 'Enable data recording mode')
parser.add_argument('--task-dir', type = str, default = './utils/data/', help = 'path to save data')
parser.add_argument('--task-name', type = str, default = 'pick cube', help = 'task file name for recording')
parser.add_argument('--task-goal', type = str, default = 'pick up cube.', help = 'task goal for recording at json file')
parser.add_argument('--task-desc', type = str, default = 'task description', help = 'task description for recording at json file')
parser.add_argument('--task-steps', type = str, default = 'step1: do this; step2: do that;', help = 'task steps for recording at json file')
args = parser.parse_args()
logger_mp.info(f"args: {args}")
try:
# setup dds communication domains id
if args.sim:
ChannelFactoryInitialize(1, networkInterface=args.network_interface)
else:
ChannelFactoryInitialize(0, networkInterface=args.network_interface)
# ipc communication mode. client usage: see utils/ipc.py
if args.ipc:
ipc_server = IPC_Server(on_press=on_press,get_state=get_state)
ipc_server.start()
# sshkeyboard communication mode
else:
listen_keyboard_thread = threading.Thread(target=listen_keyboard,
kwargs={"on_press": on_press, "until": None, "sequential": False,},
daemon=True)
listen_keyboard_thread.start()
# image client
img_client = ImageClient(host=args.img_server_ip, request_bgr=True)
camera_config = img_client.get_cam_config()
logger_mp.debug(f"Camera config: {camera_config}")
xr_need_local_img = not (args.display_mode == 'pass-through' or camera_config['head_camera']['enable_webrtc'])
# televuer_wrapper: obtain hand pose data from the XR device and transmit the robot's head camera image to the XR device.
tv_wrapper = TeleVuerWrapper(use_hand_tracking=args.input_mode == "hand",
binocular=camera_config['head_camera']['binocular'],
img_shape=camera_config['head_camera']['image_shape'],
# maybe should decrease fps for better performance?
# https://github.com/unitreerobotics/xr_teleoperate/issues/172
# display_fps=camera_config['head_camera']['fps'] ? args.frequency? 30.0?
display_mode=args.display_mode,
zmq=camera_config['head_camera']['enable_zmq'],
webrtc=camera_config['head_camera']['enable_webrtc'],
webrtc_url=f"https://{args.img_server_ip}:{camera_config['head_camera']['webrtc_port']}/offer",
)
# motion mode (G1: Regular mode R1+X, not Running mode R2+A)
if args.motion:
if args.input_mode == "controller":
loco_wrapper = LocoClientWrapper()
else:
motion_switcher = MotionSwitcher()
status, result = motion_switcher.Enter_Debug_Mode()
logger_mp.info(f"Enter debug mode: {'Success' if status == 0 else 'Failed'}")
# arm
if args.arm == "G1_29":
arm_ik = G1_29_ArmIK()
arm_ctrl = G1_29_ArmController(motion_mode=args.motion, simulation_mode=args.sim)
elif args.arm == "G1_23":
arm_ik = G1_23_ArmIK()
arm_ctrl = G1_23_ArmController(motion_mode=args.motion, simulation_mode=args.sim)
elif args.arm == "H1_2":
arm_ik = H1_2_ArmIK()
arm_ctrl = H1_2_ArmController(motion_mode=args.motion, simulation_mode=args.sim)
elif args.arm == "H1":
arm_ik = H1_ArmIK()
arm_ctrl = H1_ArmController(simulation_mode=args.sim)
# end-effector
if args.ee == "dex3":
from teleop.robot_control.robot_hand_unitree import Dex3_1_Controller
left_hand_pos_array = Array('d', 75, lock = True) # [input]
right_hand_pos_array = Array('d', 75, lock = True) # [input]
dual_hand_data_lock = Lock()
dual_hand_state_array = Array('d', 14, lock = False) # [output] current left, right hand state(14) data.
dual_hand_action_array = Array('d', 14, lock = False) # [output] current left, right hand action(14) data.
hand_ctrl = Dex3_1_Controller(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock,
dual_hand_state_array, dual_hand_action_array, simulation_mode=args.sim)
elif args.ee == "dex1":
from teleop.robot_control.robot_hand_unitree import Dex1_1_Gripper_Controller
left_gripper_value = Value('d', 0.0, lock=True) # [input]
right_gripper_value = Value('d', 0.0, lock=True) # [input]
dual_gripper_data_lock = Lock()
dual_gripper_state_array = Array('d', 2, lock=False) # current left, right gripper state(2) data.
dual_gripper_action_array = Array('d', 2, lock=False) # current left, right gripper action(2) data.
gripper_ctrl = Dex1_1_Gripper_Controller(left_gripper_value, right_gripper_value, dual_gripper_data_lock,
dual_gripper_state_array, dual_gripper_action_array, simulation_mode=args.sim)
elif args.ee == "inspire_dfx":
from teleop.robot_control.robot_hand_inspire import Inspire_Controller_DFX
left_hand_pos_array = Array('d', 75, lock = True) # [input]
right_hand_pos_array = Array('d', 75, lock = True) # [input]
dual_hand_data_lock = Lock()
dual_hand_state_array = Array('d', 12, lock = False) # [output] current left, right hand state(12) data.
dual_hand_action_array = Array('d', 12, lock = False) # [output] current left, right hand action(12) data.
hand_ctrl = Inspire_Controller_DFX(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array, simulation_mode=args.sim)
elif args.ee == "inspire_ftp":
from teleop.robot_control.robot_hand_inspire import Inspire_Controller_FTP
left_hand_pos_array = Array('d', 75, lock = True) # [input]
right_hand_pos_array = Array('d', 75, lock = True) # [input]
dual_hand_data_lock = Lock()
dual_hand_state_array = Array('d', 12, lock = False) # [output] current left, right hand state(12) data.
dual_hand_action_array = Array('d', 12, lock = False) # [output] current left, right hand action(12) data.
hand_ctrl = Inspire_Controller_FTP(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array, simulation_mode=args.sim)
elif args.ee == "brainco":
from teleop.robot_control.robot_hand_brainco import Brainco_Controller
left_hand_pos_array = Array('d', 75, lock = True) # [input]
right_hand_pos_array = Array('d', 75, lock = True) # [input]
dual_hand_data_lock = Lock()
dual_hand_state_array = Array('d', 12, lock = False) # [output] current left, right hand state(12) data.
dual_hand_action_array = Array('d', 12, lock = False) # [output] current left, right hand action(12) data.
hand_ctrl = Brainco_Controller(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock,
dual_hand_state_array, dual_hand_action_array, simulation_mode=args.sim)
else:
pass
# affinity mode (if you dont know what it is, then you probably don't need it)
if args.affinity:
import psutil
p = psutil.Process(os.getpid())
p.cpu_affinity([0,1,2,3]) # Set CPU affinity to cores 0-3
try:
p.nice(-20) # Set highest priority
logger_mp.info("Set high priority successfully.")
except psutil.AccessDenied:
logger_mp.warning("Failed to set high priority. Please run as root.")
for child in p.children(recursive=True):
try:
logger_mp.info(f"Child process {child.pid} name: {child.name()}")
child.cpu_affinity([5,6])
child.nice(-20)
except psutil.AccessDenied:
pass
# simulation mode
if args.sim:
reset_pose_publisher = ChannelPublisher("rt/reset_pose/cmd", String_)
reset_pose_publisher.Init()
from teleop.utils.sim_state_topic import start_sim_state_subscribe
sim_state_subscriber = start_sim_state_subscribe()
# record + headless / non-headless mode
if args.record:
recorder = EpisodeWriter(task_dir = os.path.join(args.task_dir, args.task_name),
task_goal = args.task_goal,
task_desc = args.task_desc,
task_steps = args.task_steps,
frequency = args.frequency,
rerun_log = not args.headless)
logger_mp.info("----------------------------------------------------------------")
logger_mp.info(" Keyboard | R3 Controller | Action")
logger_mp.info(" --------- --------------- ------")
logger_mp.info(" [r] | [A] | Toggle arm tracking")
logger_mp.info(" [q] | [B] | Stop and exit")
logger_mp.info(" [h] | [X] | Reset arms to home")
logger_mp.info(" [i] | [Y] | Toggle I-term correction")
if args.record:
logger_mp.info(" [s] | [Start] | Toggle recording")
if args.ki > 0.0:
logger_mp.info(f" I-term: Ki={args.ki}, clamp={args.i_clamp}m, decay={args.i_decay}")
logger_mp.info("----------------------------------------------------------------")
logger_mp.info("⚠️ IMPORTANT: Please keep your distance and stay safe.")
READY = True # now ready to (1) enter START state
while not START and not STOP: # wait for start or stop signal.
time.sleep(0.033)
# Poll R3 A button in wait loop (edge-detected)
r3_data = arm_ctrl.get_wireless_remote()
r3_btns = parse_r3_buttons(r3_data)
if r3_btns.get('A') and not _r3_prev_buttons.get('A'):
START = True
logger_mp.info("[R3] A pressed → START tracking")
_r3_prev_buttons = r3_btns
if camera_config['head_camera']['enable_zmq'] and xr_need_local_img:
head_img = img_client.get_head_frame()
tv_wrapper.render_to_xr(head_img)
logger_mp.info("---------------------🚀start Tracking🚀-------------------------")
arm_ctrl.speed_gradual_max()
# Integral drift correction state
i_error_left = np.zeros(3) # Accumulated task-space position error (meters)
i_error_right = np.zeros(3)
i_rot_error_left = np.zeros(3) # Accumulated rotation error (axis-angle, radians)
i_rot_error_right = np.zeros(3)
last_loop_time = time.time()
i_frame_count = 0
# Relative-mode calibration state
calibrated = False
calibration_time = 0.0 # When calibration was captured (for settle ramp)
vp_ref_left = None # 4x4: VP wrist pose at calibration moment
vp_ref_right = None
robot_ref_left = None # 4x4: Robot FK pose at calibration moment
robot_ref_right = None
head_ref = None # 3-vec: Head position at calibration (for decoupling)
prev_start = False
# Initial calibration: capture reference poses on first START
if not args.no_calibration:
_cal_tele = tv_wrapper.get_tele_data()
_cal_arm_q = arm_ctrl.get_current_dual_arm_q()
robot_ref_left, robot_ref_right = arm_ik.compute_fk_pose(_cal_arm_q)
vp_ref_left = _cal_tele.left_wrist_pose.copy()
vp_ref_right = _cal_tele.right_wrist_pose.copy()
head_ref = _cal_tele.head_pose[:3, 3].copy()
calibrated = True
calibration_time = time.time()
prev_start = True
logger_mp.info(f"[calibration] Initial reference captured. "
f"Robot L={robot_ref_left[:3,3]}, R={robot_ref_right[:3,3]} "
f"Head={head_ref}")
# main loop. robot start to follow VR user's motion
while not STOP:
start_time = time.time()
# handle arm reset request
if RESET_ARMS:
RESET_ARMS = False
logger_mp.info("[main] Resetting arms to home position...")
i_error_left = np.zeros(3)
i_error_right = np.zeros(3)
i_rot_error_left = np.zeros(3)
i_rot_error_right = np.zeros(3)
arm_ctrl.ctrl_dual_arm(np.zeros(14), np.zeros(14))
reset_start = time.time()
while time.time() - reset_start < 3.0:
if np.all(np.abs(arm_ctrl.get_current_dual_arm_q()) < 0.1):
break
time.sleep(0.05)
logger_mp.info("[main] Arms reset complete. Tracking PAUSED — press A/r to recalibrate and resume.")
START = False
prev_start = False
calibrated = False
arm_ctrl.speed_gradual_max()
last_loop_time = time.time()
continue
# Poll R3 controller buttons (edge-detected)
r3_data = arm_ctrl.get_wireless_remote()
r3_btns = parse_r3_buttons(r3_data)
if r3_btns.get('A') and not _r3_prev_buttons.get('A'):
START = not START
logger_mp.info(f"[R3] A pressed → tracking {'STARTED' if START else 'PAUSED'}")
if r3_btns.get('B') and not _r3_prev_buttons.get('B'):
START = False
STOP = True
logger_mp.info("[R3] B pressed → STOP")
if r3_btns.get('X') and not _r3_prev_buttons.get('X'):
RESET_ARMS = True
logger_mp.info("[R3] X pressed → Reset arms")
if r3_btns.get('Y') and not _r3_prev_buttons.get('Y'):
INTEGRAL_ENABLED = not INTEGRAL_ENABLED
logger_mp.info(f"[R3] Y pressed → Integral {'ENABLED' if INTEGRAL_ENABLED else 'DISABLED'}")
if r3_btns.get('Start') and not _r3_prev_buttons.get('Start') and START:
RECORD_TOGGLE = True
logger_mp.info("[R3] Start pressed → Toggle recording")
_r3_prev_buttons = r3_btns
# Detect START resume transition → re-calibrate
if START and not prev_start and not args.no_calibration:
_cal_tele = tv_wrapper.get_tele_data()
_cal_arm_q = arm_ctrl.get_current_dual_arm_q()
robot_ref_left, robot_ref_right = arm_ik.compute_fk_pose(_cal_arm_q)
vp_ref_left = _cal_tele.left_wrist_pose.copy()
vp_ref_right = _cal_tele.right_wrist_pose.copy()
head_ref = _cal_tele.head_pose[:3, 3].copy()
calibrated = True
calibration_time = time.time()
i_error_left = np.zeros(3)
i_error_right = np.zeros(3)
i_rot_error_left = np.zeros(3)
i_rot_error_right = np.zeros(3)
logger_mp.info(f"[calibration] Re-calibrated on resume. "
f"Robot L={robot_ref_left[:3,3]}, R={robot_ref_right[:3,3]}")
prev_start = START
# Skip arm tracking while paused (buttons + reset still work)
if not START:
i_error_left = np.zeros(3)
i_error_right = np.zeros(3)
i_rot_error_left = np.zeros(3)
i_rot_error_right = np.zeros(3)
current_time = time.time()
time_elapsed = current_time - start_time
sleep_time = max(0, (1 / args.frequency) - time_elapsed)
time.sleep(sleep_time)
continue
# get image
if camera_config['head_camera']['enable_zmq']:
if args.record or xr_need_local_img:
head_img = img_client.get_head_frame()
if xr_need_local_img:
tv_wrapper.render_to_xr(head_img)
if camera_config['left_wrist_camera']['enable_zmq']:
if args.record:
left_wrist_img = img_client.get_left_wrist_frame()
if camera_config['right_wrist_camera']['enable_zmq']:
if args.record:
right_wrist_img = img_client.get_right_wrist_frame()
# record mode
if args.record and RECORD_TOGGLE:
RECORD_TOGGLE = False
if not RECORD_RUNNING:
if recorder.create_episode():
RECORD_RUNNING = True
else:
logger_mp.error("Failed to create episode. Recording not started.")
else:
RECORD_RUNNING = False
recorder.save_episode()
if args.sim:
publish_reset_category(1, reset_pose_publisher)
# get xr's tele data
tele_data = tv_wrapper.get_tele_data()
if (args.ee == "dex3" or args.ee == "inspire_dfx" or args.ee == "inspire_ftp" or args.ee == "brainco") and args.input_mode == "hand":
with left_hand_pos_array.get_lock():
left_hand_pos_array[:] = tele_data.left_hand_pos.flatten()
with right_hand_pos_array.get_lock():
right_hand_pos_array[:] = tele_data.right_hand_pos.flatten()
elif args.ee == "dex1" and args.input_mode == "controller":
with left_gripper_value.get_lock():
left_gripper_value.value = tele_data.left_ctrl_triggerValue
with right_gripper_value.get_lock():
right_gripper_value.value = tele_data.right_ctrl_triggerValue
elif args.ee == "dex1" and args.input_mode == "hand":
with left_gripper_value.get_lock():
left_gripper_value.value = tele_data.left_hand_pinchValue
with right_gripper_value.get_lock():
right_gripper_value.value = tele_data.right_hand_pinchValue
else:
pass
# high level control
if args.input_mode == "controller" and args.motion:
# quit teleoperate
if tele_data.right_ctrl_aButton:
START = False
STOP = True
# command robot to enter damping mode. soft emergency stop function
if tele_data.left_ctrl_thumbstick and tele_data.right_ctrl_thumbstick:
loco_wrapper.Damp()
# https://github.com/unitreerobotics/xr_teleoperate/issues/135, control, limit velocity to within 0.3
loco_wrapper.Move(-tele_data.left_ctrl_thumbstickValue[1] * 0.3,
-tele_data.left_ctrl_thumbstickValue[0] * 0.3,
-tele_data.right_ctrl_thumbstickValue[0]* 0.3)
# get current robot state data.
current_lr_arm_q = arm_ctrl.get_current_dual_arm_q()
current_lr_arm_dq = arm_ctrl.get_current_dual_arm_dq()
# --- Integral drift correction ---
now = time.time()
dt = min(now - last_loop_time, 0.1) # clamp to prevent huge jumps after stalls
last_loop_time = now
# --- Apply relative-mode calibration ---
if calibrated:
# Settle ramp: smoothly blend from 0% to 100% delta over settle_time
settle_elapsed = now - calibration_time
settle_alpha = min(1.0, settle_elapsed / max(args.settle_time, 0.01))
# Head decoupling: tv_wrapper subtracts live head position from wrists.
# Add back head drift so only hand movement (not head movement) drives arms.
head_drift = tele_data.head_pose[:3, 3] - head_ref
# Position: robot_ref + settle_alpha * (vp_current + head_drift - vp_ref)
# head_drift cancels tv_wrapper's live head subtraction, replacing it with fixed ref
left_delta_pos = (tele_data.left_wrist_pose[:3, 3] + head_drift) - vp_ref_left[:3, 3]
right_delta_pos = (tele_data.right_wrist_pose[:3, 3] + head_drift) - vp_ref_right[:3, 3]
left_wrist_adjusted = np.eye(4)
right_wrist_adjusted = np.eye(4)
left_wrist_adjusted[:3, 3] = robot_ref_left[:3, 3] + settle_alpha * left_delta_pos
right_wrist_adjusted[:3, 3] = robot_ref_right[:3, 3] + settle_alpha * right_delta_pos
# Rotation: compute delta, scale by rot_blend (and settle_alpha)
left_delta_R = tele_data.left_wrist_pose[:3, :3] @ vp_ref_left[:3, :3].T
right_delta_R = tele_data.right_wrist_pose[:3, :3] @ vp_ref_right[:3, :3].T
rot_alpha = args.rot_blend * settle_alpha
if rot_alpha < 0.01:
left_wrist_adjusted[:3, :3] = robot_ref_left[:3, :3]
right_wrist_adjusted[:3, :3] = robot_ref_right[:3, :3]
else:
left_wrist_adjusted[:3, :3] = scale_rotation(left_delta_R, rot_alpha) @ robot_ref_left[:3, :3]
right_wrist_adjusted[:3, :3] = scale_rotation(right_delta_R, rot_alpha) @ robot_ref_right[:3, :3]
else:
left_wrist_adjusted = tele_data.left_wrist_pose.copy()
right_wrist_adjusted = tele_data.right_wrist_pose.copy()
if (args.ki > 0.0 or args.ki_rot > 0.0) and INTEGRAL_ENABLED:
# FK: where are the robot hands actually? (full SE(3) for rotation I-term)
left_ee_pose, right_ee_pose = arm_ik.compute_fk_pose(current_lr_arm_q)
left_ee_pos = left_ee_pose[:3, 3]
right_ee_pos = right_ee_pose[:3, 3]
# --- Position I-term ---
if args.ki > 0.0:
left_error = left_wrist_adjusted[:3, 3] - left_ee_pos
right_error = right_wrist_adjusted[:3, 3] - right_ee_pos
# Leaky integrator with anti-windup
i_error_left = i_error_left * args.i_decay + left_error * dt
i_error_right = i_error_right * args.i_decay + right_error * dt
i_error_left = np.clip(i_error_left, -args.i_clamp, args.i_clamp)
i_error_right = np.clip(i_error_right, -args.i_clamp, args.i_clamp)
# Bias IK target position
left_wrist_adjusted[:3, 3] += args.ki * i_error_left
right_wrist_adjusted[:3, 3] += args.ki * i_error_right
# --- Rotation I-term ---
if args.ki_rot > 0.0:
# Rotation error: target_R @ actual_R^T → how far off is actual from target
left_rot_err = rotation_to_rotvec(
left_wrist_adjusted[:3, :3] @ left_ee_pose[:3, :3].T)
right_rot_err = rotation_to_rotvec(
right_wrist_adjusted[:3, :3] @ right_ee_pose[:3, :3].T)
# Leaky integrator
i_rot_error_left = i_rot_error_left * args.i_decay + left_rot_err * dt
i_rot_error_right = i_rot_error_right * args.i_decay + right_rot_err * dt
# Anti-windup: clamp by magnitude (radians)
l_rot_mag = np.linalg.norm(i_rot_error_left)
if l_rot_mag > args.i_rot_clamp:
i_rot_error_left = i_rot_error_left * (args.i_rot_clamp / l_rot_mag)
r_rot_mag = np.linalg.norm(i_rot_error_right)
if r_rot_mag > args.i_rot_clamp:
i_rot_error_right = i_rot_error_right * (args.i_rot_clamp / r_rot_mag)
# Apply rotation correction to IK target
left_wrist_adjusted[:3, :3] = (
rotvec_to_rotation(args.ki_rot * i_rot_error_left)
@ left_wrist_adjusted[:3, :3])
right_wrist_adjusted[:3, :3] = (
rotvec_to_rotation(args.ki_rot * i_rot_error_right)
@ right_wrist_adjusted[:3, :3])
# Log every ~1 second (every 30 frames at 30Hz)
i_frame_count += 1
if i_frame_count % 30 == 0:
pos_info = ""
rot_info = ""
if args.ki > 0.0:
pos_info = (
f"pos L_err={np.linalg.norm(left_error):.4f}m "
f"R_err={np.linalg.norm(right_error):.4f}m "
f"L_I={np.linalg.norm(i_error_left):.4f} "
f"R_I={np.linalg.norm(i_error_right):.4f}")
if args.ki_rot > 0.0:
rot_info = (
f"rot L_err={np.linalg.norm(left_rot_err):.3f}rad "
f"R_err={np.linalg.norm(right_rot_err):.3f}rad "
f"L_I={np.linalg.norm(i_rot_error_left):.3f} "
f"R_I={np.linalg.norm(i_rot_error_right):.3f}")
logger_mp.info(f"[I-term] {pos_info} {rot_info}".strip())
# solve ik using motor data and adjusted wrist pose
time_ik_start = time.time()
sol_q, sol_tauff = arm_ik.solve_ik(left_wrist_adjusted, right_wrist_adjusted, current_lr_arm_q, current_lr_arm_dq)
time_ik_end = time.time()
logger_mp.debug(f"ik:\t{round(time_ik_end - time_ik_start, 6)}")
arm_ctrl.ctrl_dual_arm(sol_q, sol_tauff)
# record data
if args.record:
READY = recorder.is_ready() # now ready to (2) enter RECORD_RUNNING state
# dex hand or gripper
if args.ee == "dex3" and args.input_mode == "hand":
with dual_hand_data_lock:
left_ee_state = dual_hand_state_array[:7]
right_ee_state = dual_hand_state_array[-7:]
left_hand_action = dual_hand_action_array[:7]
right_hand_action = dual_hand_action_array[-7:]
current_body_state = []
current_body_action = []
elif args.ee == "dex1" and args.input_mode == "hand":
with dual_gripper_data_lock:
left_ee_state = [dual_gripper_state_array[0]]
right_ee_state = [dual_gripper_state_array[1]]
left_hand_action = [dual_gripper_action_array[0]]
right_hand_action = [dual_gripper_action_array[1]]
current_body_state = []
current_body_action = []
elif args.ee == "dex1" and args.input_mode == "controller":
with dual_gripper_data_lock:
left_ee_state = [dual_gripper_state_array[0]]
right_ee_state = [dual_gripper_state_array[1]]
left_hand_action = [dual_gripper_action_array[0]]
right_hand_action = [dual_gripper_action_array[1]]
current_body_state = arm_ctrl.get_current_motor_q().tolist()
current_body_action = [-tele_data.left_ctrl_thumbstickValue[1] * 0.3,
-tele_data.left_ctrl_thumbstickValue[0] * 0.3,
-tele_data.right_ctrl_thumbstickValue[0] * 0.3]
elif (args.ee == "inspire_dfx" or args.ee == "inspire_ftp" or args.ee == "brainco") and args.input_mode == "hand":
with dual_hand_data_lock:
left_ee_state = dual_hand_state_array[:6]
right_ee_state = dual_hand_state_array[-6:]
left_hand_action = dual_hand_action_array[:6]
right_hand_action = dual_hand_action_array[-6:]
current_body_state = []
current_body_action = []
else:
left_ee_state = []
right_ee_state = []
left_hand_action = []
right_hand_action = []
current_body_state = []
current_body_action = []
# arm state and action
left_arm_state = current_lr_arm_q[:7]
right_arm_state = current_lr_arm_q[-7:]
left_arm_action = sol_q[:7]
right_arm_action = sol_q[-7:]
if RECORD_RUNNING:
colors = {}
depths = {}
if camera_config['head_camera']['binocular']:
if head_img is not None:
colors[f"color_{0}"] = head_img.bgr[:, :camera_config['head_camera']['image_shape'][1]//2]
colors[f"color_{1}"] = head_img.bgr[:, camera_config['head_camera']['image_shape'][1]//2:]
else:
logger_mp.warning("Head image is None!")
if camera_config['left_wrist_camera']['enable_zmq']:
if left_wrist_img is not None:
colors[f"color_{2}"] = left_wrist_img.bgr
else:
logger_mp.warning("Left wrist image is None!")
if camera_config['right_wrist_camera']['enable_zmq']:
if right_wrist_img is not None:
colors[f"color_{3}"] = right_wrist_img.bgr
else:
logger_mp.warning("Right wrist image is None!")
else:
if head_img is not None:
colors[f"color_{0}"] = head_img
else:
logger_mp.warning("Head image is None!")
if camera_config['left_wrist_camera']['enable_zmq']:
if left_wrist_img is not None:
colors[f"color_{1}"] = left_wrist_img.bgr
else:
logger_mp.warning("Left wrist image is None!")
if camera_config['right_wrist_camera']['enable_zmq']:
if right_wrist_img is not None:
colors[f"color_{2}"] = right_wrist_img.bgr
else:
logger_mp.warning("Right wrist image is None!")
states = {
"left_arm": {
"qpos": left_arm_state.tolist(), # numpy.array -> list
"qvel": [],
"torque": [],
},
"right_arm": {
"qpos": right_arm_state.tolist(),
"qvel": [],
"torque": [],
},
"left_ee": {
"qpos": left_ee_state,
"qvel": [],
"torque": [],
},
"right_ee": {
"qpos": right_ee_state,
"qvel": [],
"torque": [],
},
"body": {
"qpos": current_body_state,
},
}
actions = {
"left_arm": {
"qpos": left_arm_action.tolist(),
"qvel": [],
"torque": [],
},
"right_arm": {
"qpos": right_arm_action.tolist(),
"qvel": [],
"torque": [],
},
"left_ee": {
"qpos": left_hand_action,
"qvel": [],
"torque": [],
},
"right_ee": {
"qpos": right_hand_action,
"qvel": [],
"torque": [],
},
"body": {
"qpos": current_body_action,
},
}
if args.sim:
sim_state = sim_state_subscriber.read_data()
recorder.add_item(colors=colors, depths=depths, states=states, actions=actions, sim_state=sim_state)
else:
recorder.add_item(colors=colors, depths=depths, states=states, actions=actions)
current_time = time.time()
time_elapsed = current_time - start_time
sleep_time = max(0, (1 / args.frequency) - time_elapsed)
time.sleep(sleep_time)
logger_mp.debug(f"main process sleep: {sleep_time}")
except KeyboardInterrupt:
logger_mp.info("⛔ KeyboardInterrupt, exiting program...")
except Exception:
import traceback
logger_mp.error(traceback.format_exc())
finally:
try:
arm_ctrl.ctrl_dual_arm_go_home()
except Exception as e:
logger_mp.error(f"Failed to ctrl_dual_arm_go_home: {e}")
try:
if args.ipc:
ipc_server.stop()
else:
stop_listening()
listen_keyboard_thread.join()
except Exception as e:
logger_mp.error(f"Failed to stop keyboard listener or ipc server: {e}")
try:
img_client.close()
except Exception as e:
logger_mp.error(f"Failed to close image client: {e}")
try:
tv_wrapper.close()
except Exception as e:
logger_mp.error(f"Failed to close televuer wrapper: {e}")
try:
if not args.motion:
pass
# status, result = motion_switcher.Exit_Debug_Mode()
# logger_mp.info(f"Exit debug mode: {'Success' if status == 3104 else 'Failed'}")
except Exception as e:
logger_mp.error(f"Failed to exit debug mode: {e}")
try:
if args.sim:
sim_state_subscriber.stop_subscribe()
except Exception as e:
logger_mp.error(f"Failed to stop sim state subscriber: {e}")
try:
if args.record:
recorder.close()
except Exception as e:
logger_mp.error(f"Failed to close recorder: {e}")
logger_mp.info("✅ Finally, exiting program.")
exit(0)