From a09f024e5f05795aec29b97ea6ae25394e00d564 Mon Sep 17 00:00:00 2001 From: silencht Date: Mon, 21 Jul 2025 16:07:41 +0800 Subject: [PATCH] [update] Upgrade the Dex1_1 gripper control code --- README.md | 8 + README_zh-CN.md | 4 + teleop/robot_control/robot_hand_unitree.py | 176 ++++++++++++--------- teleop/teleop_hand_and_arm.py | 16 +- 4 files changed, 117 insertions(+), 87 deletions(-) diff --git a/README.md b/README.md index 928d16d..1799e3e 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,10 @@ # 🔖 Release Note +## 🏷️ v1.2 + +1. Upgrade the Dex1_1 gripper control code to be compatible with the [dex1_1 service](https://github.com/unitreerobotics/dex1_1_service) driver. + ## 🏷️ v1.1 1. Added support for a new end-effector type: **`brainco`**, which refers to the [Brain Hand](https://www.brainco-hz.com/docs/revolimb-hand/) developed by [BrainCo](https://www.brainco.cn/#/product/dexterous). @@ -244,6 +248,7 @@ Next steps: 3. Open a browser (e.g. Safari or PICO Browser) and go to: `https://192.168.123.2:8012?ws=wss://192.168.123.2:8012` > **Note 1**: This IP must match your **Host** IP (check with `ifconfig`). + > > **Note 2**: You may see a warning page. Click **Advanced**, then **Proceed to IP (unsafe)**.

@@ -275,6 +280,7 @@ Next steps:

Recording Process

> **Note 1**: Recorded data is stored in `xr_teleoperate/teleop/utils/data` by default, with usage instructions at this repo: [unitree_IL_lerobot](https://github.com/unitreerobotics/unitree_IL_lerobot/tree/main?tab=readme-ov-file#data-collection-and-conversion). +> > **Note 2**: Please pay attention to your disk space size during data recording. ## 2.3 🔚 Exit @@ -323,7 +329,9 @@ After image service is started, you can use `image_client.py` **in the Host** te ## 3.2 ✋ Inspire Hand Service (optional) > **Note 1**: Skip this if your config does not use the Inspire hand. +> > **Note 2**: For the G1 robot with [Inspire DFX hand](https://support.unitree.com/home/zh/G1_developer/inspire_dfx_dexterous_hand), see [issue #46](https://github.com/unitreerobotics/xr_teleoperate/issues/46). +> > **Note 3**: For [Inspire FTP hand]((https://support.unitree.com/home/zh/G1_developer/inspire_ftp_dexterity_hand)), see [issue #48](https://github.com/unitreerobotics/xr_teleoperate/issues/48). You can refer to [Dexterous Hand Development](https://support.unitree.com/home/zh/H1_developer/Dexterous_hand) to configure related environments and compile control programs. First, use [this URL](https://oss-global-cdn.unitree.com/static/0a8335f7498548d28412c31ea047d4be.zip) to download the dexterous hand control interface program. Copy it to **PC2** of Unitree robots. diff --git a/README_zh-CN.md b/README_zh-CN.md index 26b17aa..c8b623a 100644 --- a/README_zh-CN.md +++ b/README_zh-CN.md @@ -33,6 +33,10 @@ # 🔖 版本说明 +## 🏷️ v1.2 + +1. 升级Dex1_1夹爪控制代码,匹配 [dex1_1 service](https://github.com/unitreerobotics/dex1_1_service) 驱动 + ## 🏷️ v1.1 1. 末端执行器类型新增'brainco',这是[强脑科技第二代灵巧手](https://www.brainco-hz.com/docs/revolimb-hand/) diff --git a/teleop/robot_control/robot_hand_unitree.py b/teleop/robot_control/robot_hand_unitree.py index 1870fdf..9d808a5 100644 --- a/teleop/robot_control/robot_hand_unitree.py +++ b/teleop/robot_control/robot_hand_unitree.py @@ -13,7 +13,7 @@ import time import os import sys import threading -from multiprocessing import Process, shared_memory, Array, Lock +from multiprocessing import Process, shared_memory, Array, Value, Lock parent2_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.append(parent2_dir) @@ -50,6 +50,8 @@ class Dex3_1_Controller: fps: Control frequency Unit_Test: Whether to enable unit testing + + simulation_mode: Whether to use simulation mode (default is False, which means using real robot) """ logger_mp.info("Initialize Dex3_1_Controller...") @@ -227,10 +229,10 @@ class Dex3_1_Right_JointIndex(IntEnum): kRightHandMiddle1 = 6 -unitree_gripper_indices = [4, 9] # [thumb, index] -Gripper_Num_Motors = 2 -kTopicGripperCommand = "rt/unitree_actuator/cmd" -kTopicGripperState = "rt/unitree_actuator/state" +kTopicGripperLeftCommand = "rt/dex1/left/cmd" +kTopicGripperLeftState = "rt/dex1/left/state" +kTopicGripperRightCommand = "rt/dex1/right/cmd" +kTopicGripperRightState = "rt/dex1/right/state" class Dex1_1_Gripper_Controller: def __init__(self, left_gripper_value_in, right_gripper_value_in, dual_gripper_data_lock = None, dual_gripper_state_out = None, dual_gripper_action_out = None, @@ -251,16 +253,19 @@ class Dex1_1_Gripper_Controller: fps: Control frequency Unit_Test: Whether to enable unit testing + + simulation_mode: Whether to use simulation mode (default is False, which means using real robot) """ logger_mp.info("Initialize Dex1_1_Gripper_Controller...") self.fps = fps self.Unit_Test = Unit_Test + self.gripper_sub_ready = False self.simulation_mode = simulation_mode if filter: - self.smooth_filter = WeightedMovingFilter(np.array([0.5, 0.3, 0.2]), Gripper_Num_Motors) + self.smooth_filter = WeightedMovingFilter(np.array([0.5, 0.3, 0.2]), 2) else: self.smooth_filter = None @@ -270,27 +275,31 @@ class Dex1_1_Gripper_Controller: ChannelFactoryInitialize(0) # initialize handcmd publisher and handstate subscriber - self.GripperCmb_publisher = ChannelPublisher(kTopicGripperCommand, MotorCmds_) - self.GripperCmb_publisher.Init() + self.LeftGripperCmb_publisher = ChannelPublisher(kTopicGripperLeftCommand, MotorCmds_) + self.LeftGripperCmb_publisher.Init() + self.RightGripperCmb_publisher = ChannelPublisher(kTopicGripperRightCommand, MotorCmds_) + self.RightGripperCmb_publisher.Init() - self.GripperState_subscriber = ChannelSubscriber(kTopicGripperState, MotorStates_) - self.GripperState_subscriber.Init() + self.LeftGripperState_subscriber = ChannelSubscriber(kTopicGripperLeftState, MotorStates_) + self.LeftGripperState_subscriber.Init() + self.RightGripperState_subscriber = ChannelSubscriber(kTopicGripperRightState, MotorStates_) + self.RightGripperState_subscriber.Init() - self.dual_gripper_state = [0.0] * len(Gripper_JointIndex) + # Shared Arrays for gripper states + self.left_gripper_state_value = Value('d', 0.0, lock=True) + self.right_gripper_state_value = Value('d', 0.0, lock=True) # initialize subscribe thread self.subscribe_state_thread = threading.Thread(target=self._subscribe_gripper_state) self.subscribe_state_thread.daemon = True self.subscribe_state_thread.start() - while True: - if any(state != 0.0 for state in self.dual_gripper_state): - break + while not self.gripper_sub_ready: time.sleep(0.01) logger_mp.warning("[Dex1_1_Gripper_Controller] Waiting to subscribe dds...") logger_mp.info("[Dex1_1_Gripper_Controller] Subscribe dds ok.") - self.gripper_control_thread = threading.Thread(target=self.control_thread, args=(left_gripper_value_in, right_gripper_value_in, self.dual_gripper_state, + self.gripper_control_thread = threading.Thread(target=self.control_thread, args=(left_gripper_value_in, right_gripper_value_in, self.left_gripper_state_value, self.right_gripper_state_value, dual_gripper_data_lock, dual_gripper_state_out, dual_gripper_action_out)) self.gripper_control_thread.daemon = True self.gripper_control_thread.start() @@ -299,27 +308,30 @@ class Dex1_1_Gripper_Controller: def _subscribe_gripper_state(self): while True: - gripper_msg = self.GripperState_subscriber.Read() - if gripper_msg is not None: - for idx, id in enumerate(Gripper_JointIndex): - self.dual_gripper_state[idx] = gripper_msg.states[id].q + left_gripper_msg = self.LeftGripperState_subscriber.Read() + right_gripper_msg = self.RightGripperState_subscriber.Read() + self.gripper_sub_ready = True + if left_gripper_msg is not None and right_gripper_msg is not None: + self.left_gripper_state_value.value = left_gripper_msg.states[0].q + self.right_gripper_state_value.value = right_gripper_msg.states[0].q time.sleep(0.002) - def ctrl_dual_gripper(self, gripper_q_target): - """set current left, right gripper motor state target q""" - for idx, id in enumerate(Gripper_JointIndex): - self.gripper_msg.cmds[id].q = gripper_q_target[idx] + def ctrl_dual_gripper(self, dual_gripper_action): + """set current left, right gripper motor cmd target q""" + self.left_gripper_msg.cmds[0].q = dual_gripper_action[0] + self.right_gripper_msg.cmds[0].q = dual_gripper_action[1] - self.GripperCmb_publisher.Write(self.gripper_msg) + self.LeftGripperCmb_publisher.Write(self.left_gripper_msg) + self.RightGripperCmb_publisher.Write(self.right_gripper_msg) # logger_mp.debug("gripper ctrl publish ok.") - def control_thread(self, left_gripper_value_in, right_gripper_value_in, dual_gripper_state_in, dual_hand_data_lock = None, + def control_thread(self, left_gripper_value_in, right_gripper_value_in, left_gripper_state_value, right_gripper_state_value, dual_hand_data_lock = None, dual_gripper_state_out = None, dual_gripper_action_out = None): self.running = True if self.simulation_mode: DELTA_GRIPPER_CMD = 1.0 else: - DELTA_GRIPPER_CMD = 0.18 # The motor rotates 5.4 radians, the clamping jaw slide open 9 cm, so 0.6 rad <==> 1 cm, 0.18 rad <==> 3 mm + DELTA_GRIPPER_CMD = 0.18 # The motor rotates 5.4 radians, the clamping jaw slide open 9 cm, so 0.6 rad <==> 1 cm, 0.18 rad <==> 3 mm THUMB_INDEX_DISTANCE_MIN = 5.0 THUMB_INDEX_DISTANCE_MAX = 7.0 LEFT_MAPPED_MIN = 0.0 # The minimum initial motor position when the gripper closes at startup. @@ -335,14 +347,20 @@ class Dex1_1_Gripper_Controller: kp = 5.00 kd = 0.05 # initialize gripper cmd msg - self.gripper_msg = MotorCmds_() - self.gripper_msg.cmds = [unitree_go_msg_dds__MotorCmd_() for _ in range(len(Gripper_JointIndex))] - for id in Gripper_JointIndex: - self.gripper_msg.cmds[id].dq = dq - self.gripper_msg.cmds[id].tau = tau - self.gripper_msg.cmds[id].kp = kp - self.gripper_msg.cmds[id].kd = kd - + self.left_gripper_msg = MotorCmds_() + self.left_gripper_msg.cmds = [unitree_go_msg_dds__MotorCmd_()] + self.right_gripper_msg = MotorCmds_() + self.right_gripper_msg.cmds = [unitree_go_msg_dds__MotorCmd_()] + + self.left_gripper_msg.cmds[0].dq = dq + self.left_gripper_msg.cmds[0].tau = tau + self.left_gripper_msg.cmds[0].kp = kp + self.left_gripper_msg.cmds[0].kd = kd + + self.right_gripper_msg.cmds[0].dq = dq + self.right_gripper_msg.cmds[0].tau = tau + self.right_gripper_msg.cmds[0].kp = kp + self.right_gripper_msg.cmds[0].kd = kd try: while self.running: start_time = time.time() @@ -352,27 +370,19 @@ class Dex1_1_Gripper_Controller: with right_gripper_value_in.get_lock(): right_gripper_value = right_gripper_value_in.value - if left_gripper_value != 0.0 or right_gripper_value != 0.0: # if hand data has been initialized. + if left_gripper_value != 0.0 or right_gripper_value != 0.0: # if input data has been initialized. # Linear mapping from [0, THUMB_INDEX_DISTANCE_MAX] to gripper action range left_target_action = np.interp(left_gripper_value, [THUMB_INDEX_DISTANCE_MIN, THUMB_INDEX_DISTANCE_MAX], [LEFT_MAPPED_MIN, LEFT_MAPPED_MAX]) right_target_action = np.interp(right_gripper_value, [THUMB_INDEX_DISTANCE_MIN, THUMB_INDEX_DISTANCE_MAX], [RIGHT_MAPPED_MIN, RIGHT_MAPPED_MAX]) - # else: # TEST WITHOUT XR DEVICE - # current_time = time.time() - # period = 2.5 - # import math - # left_euclidean_distance = THUMB_INDEX_DISTANCE_MAX * (math.sin(2 * math.pi * current_time / period) + 1) / 2 - # right_euclidean_distance = THUMB_INDEX_DISTANCE_MAX * (math.sin(2 * math.pi * current_time / period) + 1) / 2 - # left_target_action = np.interp(left_euclidean_distance, [THUMB_INDEX_DISTANCE_MIN, THUMB_INDEX_DISTANCE_MAX], [LEFT_MAPPED_MIN, LEFT_MAPPED_MAX]) - # right_target_action = np.interp(right_euclidean_distance, [THUMB_INDEX_DISTANCE_MIN, THUMB_INDEX_DISTANCE_MAX], [RIGHT_MAPPED_MIN, RIGHT_MAPPED_MAX]) # get current dual gripper motor state - dual_gripper_state = np.array(dual_gripper_state_in[:]) + dual_gripper_state = np.array([left_gripper_state_value.value, right_gripper_state_value.value]) # clip dual gripper action to avoid overflow - left_actual_action = np.clip(left_target_action, dual_gripper_state[1] - DELTA_GRIPPER_CMD, dual_gripper_state[1] + DELTA_GRIPPER_CMD) - right_actual_action = np.clip(right_target_action, dual_gripper_state[0] - DELTA_GRIPPER_CMD, dual_gripper_state[0] + DELTA_GRIPPER_CMD) + left_actual_action = np.clip(left_target_action, dual_gripper_state[0] - DELTA_GRIPPER_CMD, dual_gripper_state[0] + DELTA_GRIPPER_CMD) + right_actual_action = np.clip(right_target_action, dual_gripper_state[1] - DELTA_GRIPPER_CMD, dual_gripper_state[1] + DELTA_GRIPPER_CMD) - dual_gripper_action = np.array([right_actual_action, left_actual_action]) + dual_gripper_action = np.array([left_actual_action, right_actual_action]) if self.smooth_filter: self.smooth_filter.add_data(dual_gripper_action) @@ -380,13 +390,8 @@ class Dex1_1_Gripper_Controller: if dual_gripper_state_out and dual_gripper_action_out: with dual_hand_data_lock: - dual_gripper_state_out[:] = dual_gripper_state - np.array([RIGHT_MAPPED_MIN, LEFT_MAPPED_MIN]) - dual_gripper_action_out[:] = dual_gripper_action - np.array([RIGHT_MAPPED_MIN, LEFT_MAPPED_MIN]) - - # logger_mp.debug(f"LEFT: euclidean:{left_euclidean_distance:.4f} \tstate:{dual_gripper_state_out[1]:.4f}\ - # \ttarget_action:{right_target_action - RIGHT_MAPPED_MIN:.4f} \tactual_action:{dual_gripper_action_out[1]:.4f}") - # logger_mp.debug(f"RIGHT:euclidean:{right_euclidean_distance:.4f} \tstate:{dual_gripper_state_out[0]:.4f}\ - # \ttarget_action:{left_target_action - LEFT_MAPPED_MIN:.4f} \tactual_action:{dual_gripper_action_out[0]:.4f}") + dual_gripper_state_out[:] = dual_gripper_state - np.array([LEFT_MAPPED_MIN, RIGHT_MAPPED_MIN]) + dual_gripper_action_out[:] = dual_gripper_action - np.array([LEFT_MAPPED_MIN, RIGHT_MAPPED_MIN]) self.ctrl_dual_gripper(dual_gripper_action) current_time = time.time() @@ -397,8 +402,7 @@ class Dex1_1_Gripper_Controller: logger_mp.info("Dex1_1_Gripper_Controller has been closed.") class Gripper_JointIndex(IntEnum): - kLeftGripper = 0 - kRightGripper = 1 + kGripper = 0 if __name__ == "__main__": @@ -407,9 +411,8 @@ if __name__ == "__main__": from teleop.image_server.image_client import ImageClient parser = argparse.ArgumentParser() - parser.add_argument('--dex', action='store_true', help='Use dex3-1 hand') - parser.add_argument('--gripper', dest='dex', action='store_false', help='Use gripper') - parser.set_defaults(dex=True) + parser.add_argument('--xr-mode', type=str, choices=['hand', 'controller'], default='hand', help='Select XR device tracking source') + parser.add_argument('--ee', type=str, choices=['dex1', 'dex3', 'inspire1', 'brainco'], help='Select end effector controller') args = parser.parse_args() logger_mp.info(f"args:{args}\n") @@ -431,39 +434,54 @@ if __name__ == "__main__": else: tv_img_shape = (img_config['head_camera_image_shape'][0], img_config['head_camera_image_shape'][1], 3) - img_shm = shared_memory.SharedMemory(create = True, size = np.prod(tv_img_shape) * np.uint8().itemsize) - img_array = np.ndarray(tv_img_shape, dtype = np.uint8, buffer = img_shm.buf) - img_client = ImageClient(tv_img_shape = tv_img_shape, tv_img_shm_name = img_shm.name) + tv_img_shm = shared_memory.SharedMemory(create = True, size = np.prod(tv_img_shape) * np.uint8().itemsize) + tv_img_array = np.ndarray(tv_img_shape, dtype = np.uint8, buffer = tv_img_shm.buf) + img_client = ImageClient(tv_img_shape = tv_img_shape, tv_img_shm_name = tv_img_shm.name) image_receive_thread = threading.Thread(target = img_client.receive_process, daemon = True) image_receive_thread.daemon = True image_receive_thread.start() - # television and arm - tv_wrapper = TeleVuerWrapper(BINOCULAR, tv_img_shape, img_shm.name) + # television: obtain hand pose data from the XR device and transmit the robot's head camera image to the XR device. + tv_wrapper = TeleVuerWrapper(binocular=BINOCULAR, use_hand_tracking=args.xr_mode == "hand", img_shape=tv_img_shape, img_shm_name=tv_img_shm.name, + return_state_data=True, return_hand_rot_data = False) - if args.dex: - left_hand_array = Array('d', 75, lock=True) - right_hand_array = Array('d', 75, lock=True) +# end-effector + if args.ee == "dex3": + left_hand_pos_array = Array('d', 75, lock = True) # [input] + right_hand_pos_array = Array('d', 75, lock = True) # [input] dual_hand_data_lock = Lock() - dual_hand_state_array = Array('d', 14, lock=False) # current left, right hand state(14) data. - dual_hand_action_array = Array('d', 14, lock=False) # current left, right hand action(14) data. - hand_ctrl = Dex3_1_Controller(left_hand_array, right_hand_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array, Unit_Test = True) - else: - left_hand_array = Array('d', 75, lock=True) - right_hand_array = Array('d', 75, lock=True) + dual_hand_state_array = Array('d', 14, lock = False) # [output] current left, right hand state(14) data. + dual_hand_action_array = Array('d', 14, lock = False) # [output] current left, right hand action(14) data. + hand_ctrl = Dex3_1_Controller(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array) + elif args.ee == "dex1": + left_gripper_value = Value('d', 0.0, lock=True) # [input] + right_gripper_value = Value('d', 0.0, lock=True) # [input] dual_gripper_data_lock = Lock() dual_gripper_state_array = Array('d', 2, lock=False) # current left, right gripper state(2) data. dual_gripper_action_array = Array('d', 2, lock=False) # current left, right gripper action(2) data. - gripper_ctrl = Dex1_1_Gripper_Controller(left_hand_array, right_hand_array, dual_gripper_data_lock, dual_gripper_state_array, dual_gripper_action_array, Unit_Test = True) - + gripper_ctrl = Dex1_1_Gripper_Controller(left_gripper_value, right_gripper_value, dual_gripper_data_lock, dual_gripper_state_array, dual_gripper_action_array) user_input = input("Please enter the start signal (enter 's' to start the subsequent program):\n") if user_input.lower() == 's': while True: - head_rmat, left_wrist, right_wrist, left_hand, right_hand = tv_wrapper.get_data() - # send hand skeleton data to hand_ctrl.control_process - left_hand_array[:] = left_hand.flatten() - right_hand_array[:] = right_hand.flatten() + tele_data = tv_wrapper.get_motion_state_data() + if args.ee == "dex3" and args.xr_mode == "hand": + with left_hand_pos_array.get_lock(): + left_hand_pos_array[:] = tele_data.left_hand_pos.flatten() + with right_hand_pos_array.get_lock(): + right_hand_pos_array[:] = tele_data.right_hand_pos.flatten() + elif args.ee == "dex1" and args.xr_mode == "controller": + with left_gripper_value.get_lock(): + left_gripper_value.value = tele_data.left_trigger_value + with right_gripper_value.get_lock(): + right_gripper_value.value = tele_data.right_trigger_value + elif args.ee == "dex1" and args.xr_mode == "hand": + with left_gripper_value.get_lock(): + left_gripper_value.value = tele_data.left_pinch_value + with right_gripper_value.get_lock(): + right_gripper_value.value = tele_data.right_pinch_value + else: + pass # with dual_hand_data_lock: # logger_mp.info(f"state : {list(dual_hand_state_array)} \naction: {list(dual_hand_action_array)} \n") diff --git a/teleop/teleop_hand_and_arm.py b/teleop/teleop_hand_and_arm.py index 62fc2ff..323986a 100644 --- a/teleop/teleop_hand_and_arm.py +++ b/teleop/teleop_hand_and_arm.py @@ -292,18 +292,18 @@ if __name__ == '__main__': current_body_action = [] elif args.ee == "dex1" and args.xr_mode == "hand": with dual_gripper_data_lock: - left_ee_state = [dual_gripper_state_array[1]] - right_ee_state = [dual_gripper_state_array[0]] - left_hand_action = [dual_gripper_action_array[1]] - right_hand_action = [dual_gripper_action_array[0]] + left_ee_state = [dual_gripper_state_array[0]] + right_ee_state = [dual_gripper_state_array[1]] + left_hand_action = [dual_gripper_action_array[0]] + right_hand_action = [dual_gripper_action_array[1]] current_body_state = [] current_body_action = [] elif args.ee == "dex1" and args.xr_mode == "controller": with dual_gripper_data_lock: - left_ee_state = [dual_gripper_state_array[1]] - right_ee_state = [dual_gripper_state_array[0]] - left_hand_action = [dual_gripper_action_array[1]] - right_hand_action = [dual_gripper_action_array[0]] + left_ee_state = [dual_gripper_state_array[0]] + right_ee_state = [dual_gripper_state_array[1]] + left_hand_action = [dual_gripper_action_array[0]] + right_hand_action = [dual_gripper_action_array[1]] current_body_state = arm_ctrl.get_current_motor_q().tolist() current_body_action = [-tele_data.tele_state.left_thumbstick_value[1] * 0.3, -tele_data.tele_state.left_thumbstick_value[0] * 0.3,