diff --git a/teleop/robot_control/robot_hand_inspire.py b/teleop/robot_control/robot_hand_inspire.py new file mode 100644 index 0000000..367728d --- /dev/null +++ b/teleop/robot_control/robot_hand_inspire.py @@ -0,0 +1,346 @@ +from unitree_sdk2py.core.channel import ChannelPublisher, ChannelSubscriber, ChannelFactoryInitialize # dds +from unitree_sdk2py.idl.unitree_go.msg.dds_ import MotorCmds_, MotorStates_ # idl +from unitree_sdk2py.idl.default import unitree_go_msg_dds__MotorCmd_ +from teleop.robot_control.hand_retargeting import HandRetargeting, HandType +import numpy as np +from enum import IntEnum +import threading +import time +from multiprocessing import Process, Array + +import logging_mp +logger_mp = logging_mp.get_logger(__name__) + +Inspire_Num_Motors = 6 +kTopicInspireDFXCommand = "rt/inspire/cmd" +kTopicInspireDFXState = "rt/inspire/state" + +class Inspire_Controller_DFX: + def __init__(self, left_hand_array, right_hand_array, dual_hand_data_lock = None, dual_hand_state_array = None, + dual_hand_action_array = None, fps = 100.0, Unit_Test = False, simulation_mode = False): + logger_mp.info("Initialize Inspire_Controller_DFX...") + self.fps = fps + self.Unit_Test = Unit_Test + self.simulation_mode = simulation_mode + if not self.Unit_Test: + self.hand_retargeting = HandRetargeting(HandType.INSPIRE_HAND) + else: + self.hand_retargeting = HandRetargeting(HandType.INSPIRE_HAND_Unit_Test) + + if self.simulation_mode: + ChannelFactoryInitialize(1) + else: + ChannelFactoryInitialize(0) + + # initialize handcmd publisher and handstate subscriber + self.HandCmb_publisher = ChannelPublisher(kTopicInspireDFXCommand, MotorCmds_) + self.HandCmb_publisher.Init() + + self.HandState_subscriber = ChannelSubscriber(kTopicInspireDFXState, MotorStates_) + self.HandState_subscriber.Init() + + # Shared Arrays for hand states + self.left_hand_state_array = Array('d', Inspire_Num_Motors, lock=True) + self.right_hand_state_array = Array('d', Inspire_Num_Motors, lock=True) + + # initialize subscribe thread + self.subscribe_state_thread = threading.Thread(target=self._subscribe_hand_state) + self.subscribe_state_thread.daemon = True + self.subscribe_state_thread.start() + + while True: + if any(self.right_hand_state_array): # any(self.left_hand_state_array) and + break + time.sleep(0.01) + logger_mp.warning("[Inspire_Controller_DFX] Waiting to subscribe dds...") + logger_mp.info("[Inspire_Controller_DFX] Subscribe dds ok.") + + hand_control_process = Process(target=self.control_process, args=(left_hand_array, right_hand_array, self.left_hand_state_array, self.right_hand_state_array, + dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array)) + hand_control_process.daemon = True + hand_control_process.start() + + logger_mp.info("Initialize Inspire_Controller_DFX OK!") + + def _subscribe_hand_state(self): + while True: + hand_msg = self.HandState_subscriber.Read() + if hand_msg is not None: + for idx, id in enumerate(Inspire_Left_Hand_JointIndex): + self.left_hand_state_array[idx] = hand_msg.states[id].q + for idx, id in enumerate(Inspire_Right_Hand_JointIndex): + self.right_hand_state_array[idx] = hand_msg.states[id].q + time.sleep(0.002) + + def ctrl_dual_hand(self, left_q_target, right_q_target): + """ + Set current left, right hand motor state target q + """ + for idx, id in enumerate(Inspire_Left_Hand_JointIndex): + self.hand_msg.cmds[id].q = left_q_target[idx] + for idx, id in enumerate(Inspire_Right_Hand_JointIndex): + self.hand_msg.cmds[id].q = right_q_target[idx] + + self.HandCmb_publisher.Write(self.hand_msg) + # logger_mp.debug("hand ctrl publish ok.") + + def control_process(self, left_hand_array, right_hand_array, left_hand_state_array, right_hand_state_array, + dual_hand_data_lock = None, dual_hand_state_array = None, dual_hand_action_array = None): + self.running = True + + left_q_target = np.full(Inspire_Num_Motors, 1.0) + right_q_target = np.full(Inspire_Num_Motors, 1.0) + + # initialize inspire hand's cmd msg + self.hand_msg = MotorCmds_() + self.hand_msg.cmds = [unitree_go_msg_dds__MotorCmd_() for _ in range(len(Inspire_Right_Hand_JointIndex) + len(Inspire_Left_Hand_JointIndex))] + + for idx, id in enumerate(Inspire_Left_Hand_JointIndex): + self.hand_msg.cmds[id].q = 1.0 + for idx, id in enumerate(Inspire_Right_Hand_JointIndex): + self.hand_msg.cmds[id].q = 1.0 + + try: + while self.running: + start_time = time.time() + # get dual hand state + with left_hand_array.get_lock(): + left_hand_data = np.array(left_hand_array[:]).reshape(25, 3).copy() + with right_hand_array.get_lock(): + right_hand_data = np.array(right_hand_array[:]).reshape(25, 3).copy() + + # Read left and right q_state from shared arrays + state_data = np.concatenate((np.array(left_hand_state_array[:]), np.array(right_hand_state_array[:]))) + + if not np.all(right_hand_data == 0.0) and not np.all(left_hand_data[4] == np.array([-1.13, 0.3, 0.15])): # if hand data has been initialized. + ref_left_value = left_hand_data[self.hand_retargeting.left_indices[1,:]] - left_hand_data[self.hand_retargeting.left_indices[0,:]] + ref_right_value = right_hand_data[self.hand_retargeting.right_indices[1,:]] - right_hand_data[self.hand_retargeting.right_indices[0,:]] + + left_q_target = self.hand_retargeting.left_retargeting.retarget(ref_left_value)[self.hand_retargeting.left_dex_retargeting_to_hardware] + right_q_target = self.hand_retargeting.right_retargeting.retarget(ref_right_value)[self.hand_retargeting.right_dex_retargeting_to_hardware] + + # In website https://support.unitree.com/home/en/G1_developer/inspire_dfx_dexterous_hand, you can find + # In the official document, the angles are in the range [0, 1] ==> 0.0: fully closed 1.0: fully open + # The q_target now is in radians, ranges: + # - idx 0~3: 0~1.7 (1.7 = closed) + # - idx 4: 0~0.5 + # - idx 5: -0.1~1.3 + # We normalize them using (max - value) / range + def normalize(val, min_val, max_val): + return np.clip((max_val - val) / (max_val - min_val), 0.0, 1.0) + + for idx in range(Inspire_Num_Motors): + if idx <= 3: + left_q_target[idx] = normalize(left_q_target[idx], 0.0, 1.7) + right_q_target[idx] = normalize(right_q_target[idx], 0.0, 1.7) + elif idx == 4: + left_q_target[idx] = normalize(left_q_target[idx], 0.0, 0.5) + right_q_target[idx] = normalize(right_q_target[idx], 0.0, 0.5) + elif idx == 5: + left_q_target[idx] = normalize(left_q_target[idx], -0.1, 1.3) + right_q_target[idx] = normalize(right_q_target[idx], -0.1, 1.3) + + # get dual hand action + action_data = np.concatenate((left_q_target, right_q_target)) + if dual_hand_state_array and dual_hand_action_array: + with dual_hand_data_lock: + dual_hand_state_array[:] = state_data + dual_hand_action_array[:] = action_data + + self.ctrl_dual_hand(left_q_target, right_q_target) + current_time = time.time() + time_elapsed = current_time - start_time + sleep_time = max(0, (1 / self.fps) - time_elapsed) + time.sleep(sleep_time) + finally: + logger_mp.info("Inspire_Controller_DFX has been closed.") + + + +kTopicInspireFTPLeftCommand = "rt/inspire_hand/ctrl/l" +kTopicInspireFTPRightCommand = "rt/inspire_hand/ctrl/r" +kTopicInspireFTPLeftState = "rt/inspire_hand/state/l" +kTopicInspireFTPRightState = "rt/inspire_hand/state/r" + +class Inspire_Controller_FTP: + def __init__(self, left_hand_array, right_hand_array, dual_hand_data_lock = None, dual_hand_state_array = None, + dual_hand_action_array = None, fps = 100.0, Unit_Test = False, simulation_mode = False): + logger_mp.info("Initialize Inspire_Controller_FTP...") + from inspire_sdkpy import inspire_dds, inspire_hand_defaut # lazy import + self.fps = fps + self.Unit_Test = Unit_Test + self.simulation_mode = simulation_mode + if not self.Unit_Test: + self.hand_retargeting = HandRetargeting(HandType.INSPIRE_HAND) + else: + self.hand_retargeting = HandRetargeting(HandType.INSPIRE_HAND_Unit_Test) + + if self.simulation_mode: + ChannelFactoryInitialize(1) + else: + ChannelFactoryInitialize(0) + + # Initialize hand command publishers + self.LeftHandCmd_publisher = ChannelPublisher(kTopicInspireFTPLeftCommand, inspire_dds.inspire_hand_ctrl) + self.LeftHandCmd_publisher.Init() + self.RightHandCmd_publisher = ChannelPublisher(kTopicInspireFTPRightCommand, inspire_dds.inspire_hand_ctrl) + self.RightHandCmd_publisher.Init() + + # Initialize hand state subscribers + self.LeftHandState_subscriber = ChannelSubscriber(kTopicInspireFTPLeftState, inspire_dds.inspire_hand_state) + self.LeftHandState_subscriber.Init() # Consider using callback if preferred: Init(callback_func, period_ms) + self.RightHandState_subscriber = ChannelSubscriber(kTopicInspireFTPRightState, inspire_dds.inspire_hand_state) + self.RightHandState_subscriber.Init() + + # Shared Arrays for hand states ([0,1] normalized values) + self.left_hand_state_array = Array('d', Inspire_Num_Motors, lock=True) + self.right_hand_state_array = Array('d', Inspire_Num_Motors, lock=True) + + # Initialize subscribe thread + self.subscribe_state_thread = threading.Thread(target=self._subscribe_hand_state) + self.subscribe_state_thread.daemon = True + self.subscribe_state_thread.start() + + # Wait for initial DDS messages (optional, but good for ensuring connection) + wait_count = 0 + while not (any(self.left_hand_state_array) or any(self.right_hand_state_array)): + if wait_count % 100 == 0: # Print every second + logger_mp.info(f"[Inspire_Controller_FTP] Waiting to subscribe to hand states from DDS (L: {any(self.left_hand_state_array)}, R: {any(self.right_hand_state_array)})...") + time.sleep(0.01) + wait_count += 1 + if wait_count > 500: # Timeout after 5 seconds + logger_mp.warning("[Inspire_Controller_FTP] Warning: Timeout waiting for initial hand states. Proceeding anyway.") + break + logger_mp.info("[Inspire_Controller_FTP] Initial hand states received or timeout.") + + hand_control_process = Process(target=self.control_process, args=(left_hand_array, right_hand_array, self.left_hand_state_array, self.right_hand_state_array, + dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array)) + hand_control_process.daemon = True + hand_control_process.start() + + logger_mp.info("Initialize Inspire_Controller_FTP OK!\n") + + def _subscribe_hand_state(self): + logger_mp.info("[Inspire_Controller_FTP] Subscribe thread started.") + while True: + # Left Hand + left_state_msg = self.LeftHandState_subscriber.Read() + if left_state_msg is not None: + if hasattr(left_state_msg, 'angle_act') and len(left_state_msg.angle_act) == Inspire_Num_Motors: + with self.left_hand_state_array.get_lock(): + for i in range(Inspire_Num_Motors): + self.left_hand_state_array[i] = left_state_msg.angle_act[i] / 1000.0 + else: + logger_mp.warning(f"[Inspire_Controller_FTP] Received left_state_msg but attributes are missing or incorrect. Type: {type(left_state_msg)}, Content: {str(left_state_msg)[:100]}") + # Right Hand + right_state_msg = self.RightHandState_subscriber.Read() + if right_state_msg is not None: + if hasattr(right_state_msg, 'angle_act') and len(right_state_msg.angle_act) == Inspire_Num_Motors: + with self.right_hand_state_array.get_lock(): + for i in range(Inspire_Num_Motors): + self.right_hand_state_array[i] = right_state_msg.angle_act[i] / 1000.0 + else: + logger_mp.warning(f"[Inspire_Controller_FTP] Received right_state_msg but attributes are missing or incorrect. Type: {type(right_state_msg)}, Content: {str(right_state_msg)[:100]}") + + time.sleep(0.002) + + def _send_hand_command(self, left_angle_cmd_scaled, right_angle_cmd_scaled): + """ + Send scaled angle commands [0-1000] to both hands. + """ + # Left Hand Command + left_cmd_msg = inspire_hand_defaut.get_inspire_hand_ctrl() + left_cmd_msg.angle_set = left_angle_cmd_scaled + left_cmd_msg.mode = 0b0001 # Mode 1: Angle control + self.LeftHandCmd_publisher.Write(left_cmd_msg) + + # Right Hand Command + right_cmd_msg = inspire_hand_defaut.get_inspire_hand_ctrl() + right_cmd_msg.angle_set = right_angle_cmd_scaled + right_cmd_msg.mode = 0b0001 # Mode 1: Angle control + self.RightHandCmd_publisher.Write(right_cmd_msg) + + def control_process(self, left_hand_array, right_hand_array, left_hand_state_array, right_hand_state_array, + dual_hand_data_lock = None, dual_hand_state_array = None, dual_hand_action_array = None): + logger_mp.info("[Inspire_Controller_FTP] Control process started.") + self.running = True + + left_q_target = np.full(Inspire_Num_Motors, 1.0) + right_q_target = np.full(Inspire_Num_Motors, 1.0) + + try: + while self.running: + start_time = time.time() + # get dual hand state + with left_hand_array.get_lock(): + left_hand_data = np.array(left_hand_array[:]).reshape(25, 3).copy() + with right_hand_array.get_lock(): + right_hand_data = np.array(right_hand_array[:]).reshape(25, 3).copy() + + # Read left and right q_state from shared arrays + state_data = np.concatenate((np.array(left_hand_state_array[:]), np.array(right_hand_state_array[:]))) + + if not np.all(right_hand_data == 0.0) and not np.all(left_hand_data[4] == np.array([-1.13, 0.3, 0.15])): # if hand data has been initialized. + ref_left_value = left_hand_data[self.hand_retargeting.left_indices[1,:]] - left_hand_data[self.hand_retargeting.left_indices[0,:]] + ref_right_value = right_hand_data[self.hand_retargeting.right_indices[1,:]] - right_hand_data[self.hand_retargeting.right_indices[0,:]] + + left_q_target = self.hand_retargeting.left_retargeting.retarget(ref_left_value)[self.hand_retargeting.left_dex_retargeting_to_hardware] + right_q_target = self.hand_retargeting.right_retargeting.retarget(ref_right_value)[self.hand_retargeting.right_dex_retargeting_to_hardware] + + def normalize(val, min_val, max_val): + return np.clip((max_val - val) / (max_val - min_val), 0.0, 1.0) + + for idx in range(Inspire_Num_Motors): + if idx <= 3: + left_q_target[idx] = normalize(left_q_target[idx], 0.0, 1.7) + right_q_target[idx] = normalize(right_q_target[idx], 0.0, 1.7) + elif idx == 4: + left_q_target[idx] = normalize(left_q_target[idx], 0.0, 0.5) + right_q_target[idx] = normalize(right_q_target[idx], 0.0, 0.5) + elif idx == 5: + left_q_target[idx] = normalize(left_q_target[idx], -0.1, 1.3) + right_q_target[idx] = normalize(right_q_target[idx], -0.1, 1.3) + + scaled_left_cmd = [int(np.clip(val * 1000, 0, 1000)) for val in left_q_target] + scaled_right_cmd = [int(np.clip(val * 1000, 0, 1000)) for val in right_q_target] + + # get dual hand action + action_data = np.concatenate((left_q_target, right_q_target)) + if dual_hand_state_array and dual_hand_action_array: + with dual_hand_data_lock: + dual_hand_state_array[:] = state_data + dual_hand_action_array[:] = action_data + + self._send_hand_command(scaled_left_cmd, scaled_right_cmd) + current_time = time.time() + time_elapsed = current_time - start_time + sleep_time = max(0, (1 / self.fps) - time_elapsed) + time.sleep(sleep_time) + finally: + logger_mp.info("Inspire_Controller_FTP has been closed.") + +# Update hand state, according to the official documentation: +# 1. https://support.unitree.com/home/en/G1_developer/inspire_dfx_dexterous_hand +# 2. https://support.unitree.com/home/en/G1_developer/inspire_ftp_dexterity_hand +# the state sequence is as shown in the table below +# ┌──────┬───────┬──────┬────────┬────────┬────────────┬────────────────┬───────┬──────┬────────┬────────┬────────────┬────────────────┐ +# │ Id │ 0 │ 1 │ 2 │ 3 │ 4 │ 5 │ 6 │ 7 │ 8 │ 9 │ 10 │ 11 │ +# ├──────┼───────┼──────┼────────┼────────┼────────────┼────────────────┼───────┼──────┼────────┼────────┼────────────┼────────────────┤ +# │ │ Right Hand │ Left Hand │ +# │Joint │ pinky │ ring │ middle │ index │ thumb-bend │ thumb-rotation │ pinky │ ring │ middle │ index │ thumb-bend │ thumb-rotation │ +# └──────┴───────┴──────┴────────┴────────┴────────────┴────────────────┴───────┴──────┴────────┴────────┴────────────┴────────────────┘ +class Inspire_Right_Hand_JointIndex(IntEnum): + kRightHandPinky = 0 + kRightHandRing = 1 + kRightHandMiddle = 2 + kRightHandIndex = 3 + kRightHandThumbBend = 4 + kRightHandThumbRotation = 5 + +class Inspire_Left_Hand_JointIndex(IntEnum): + kLeftHandPinky = 6 + kLeftHandRing = 7 + kLeftHandMiddle = 8 + kLeftHandIndex = 9 + kLeftHandThumbBend = 10 + kLeftHandThumbRotation = 11 \ No newline at end of file diff --git a/teleop/robot_control/robot_hand_inspire_dfx.py b/teleop/robot_control/robot_hand_inspire_dfx.py deleted file mode 100644 index d223482..0000000 --- a/teleop/robot_control/robot_hand_inspire_dfx.py +++ /dev/null @@ -1,181 +0,0 @@ -from unitree_sdk2py.core.channel import ChannelPublisher, ChannelSubscriber, ChannelFactoryInitialize # dds -from unitree_sdk2py.idl.unitree_go.msg.dds_ import MotorCmds_, MotorStates_ # idl -from unitree_sdk2py.idl.default import unitree_go_msg_dds__MotorCmd_ - -from teleop.robot_control.hand_retargeting import HandRetargeting, HandType -import numpy as np -from enum import IntEnum -import threading -import time -from multiprocessing import Process, Array - -import logging_mp -logger_mp = logging_mp.get_logger(__name__) - -Inspire_Num_Motors = 6 -kTopicInspireCommand = "rt/inspire/cmd" -kTopicInspireState = "rt/inspire/state" - -class Inspire_Controller_DFX: - def __init__(self, left_hand_array, right_hand_array, dual_hand_data_lock = None, dual_hand_state_array = None, - dual_hand_action_array = None, fps = 100.0, Unit_Test = False, simulation_mode = False): - logger_mp.info("Initialize Inspire_Controller...") - self.fps = fps - self.Unit_Test = Unit_Test - self.simulation_mode = simulation_mode - if not self.Unit_Test: - self.hand_retargeting = HandRetargeting(HandType.INSPIRE_HAND) - else: - self.hand_retargeting = HandRetargeting(HandType.INSPIRE_HAND_Unit_Test) - - if self.simulation_mode: - ChannelFactoryInitialize(1) - else: - ChannelFactoryInitialize(0) - - # initialize handcmd publisher and handstate subscriber - self.HandCmb_publisher = ChannelPublisher(kTopicInspireCommand, MotorCmds_) - self.HandCmb_publisher.Init() - - self.HandState_subscriber = ChannelSubscriber(kTopicInspireState, MotorStates_) - self.HandState_subscriber.Init() - - # Shared Arrays for hand states - self.left_hand_state_array = Array('d', Inspire_Num_Motors, lock=True) - self.right_hand_state_array = Array('d', Inspire_Num_Motors, lock=True) - - # initialize subscribe thread - self.subscribe_state_thread = threading.Thread(target=self._subscribe_hand_state) - self.subscribe_state_thread.daemon = True - self.subscribe_state_thread.start() - - while True: - if any(self.right_hand_state_array): # any(self.left_hand_state_array) and - break - time.sleep(0.01) - logger_mp.warning("[Inspire_Controller] Waiting to subscribe dds...") - logger_mp.info("[Inspire_Controller] Subscribe dds ok.") - - hand_control_process = Process(target=self.control_process, args=(left_hand_array, right_hand_array, self.left_hand_state_array, self.right_hand_state_array, - dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array)) - hand_control_process.daemon = True - hand_control_process.start() - - logger_mp.info("Initialize Inspire_Controller OK!") - - def _subscribe_hand_state(self): - while True: - hand_msg = self.HandState_subscriber.Read() - if hand_msg is not None: - for idx, id in enumerate(Inspire_Left_Hand_JointIndex): - self.left_hand_state_array[idx] = hand_msg.states[id].q - for idx, id in enumerate(Inspire_Right_Hand_JointIndex): - self.right_hand_state_array[idx] = hand_msg.states[id].q - time.sleep(0.002) - - def ctrl_dual_hand(self, left_q_target, right_q_target): - """ - Set current left, right hand motor state target q - """ - for idx, id in enumerate(Inspire_Left_Hand_JointIndex): - self.hand_msg.cmds[id].q = left_q_target[idx] - for idx, id in enumerate(Inspire_Right_Hand_JointIndex): - self.hand_msg.cmds[id].q = right_q_target[idx] - - self.HandCmb_publisher.Write(self.hand_msg) - # logger_mp.debug("hand ctrl publish ok.") - - def control_process(self, left_hand_array, right_hand_array, left_hand_state_array, right_hand_state_array, - dual_hand_data_lock = None, dual_hand_state_array = None, dual_hand_action_array = None): - self.running = True - - left_q_target = np.full(Inspire_Num_Motors, 1.0) - right_q_target = np.full(Inspire_Num_Motors, 1.0) - - # initialize inspire hand's cmd msg - self.hand_msg = MotorCmds_() - self.hand_msg.cmds = [unitree_go_msg_dds__MotorCmd_() for _ in range(len(Inspire_Right_Hand_JointIndex) + len(Inspire_Left_Hand_JointIndex))] - - for idx, id in enumerate(Inspire_Left_Hand_JointIndex): - self.hand_msg.cmds[id].q = 1.0 - for idx, id in enumerate(Inspire_Right_Hand_JointIndex): - self.hand_msg.cmds[id].q = 1.0 - - try: - while self.running: - start_time = time.time() - # get dual hand state - with left_hand_array.get_lock(): - left_hand_data = np.array(left_hand_array[:]).reshape(25, 3).copy() - with right_hand_array.get_lock(): - right_hand_data = np.array(right_hand_array[:]).reshape(25, 3).copy() - - # Read left and right q_state from shared arrays - state_data = np.concatenate((np.array(left_hand_state_array[:]), np.array(right_hand_state_array[:]))) - - if not np.all(right_hand_data == 0.0) and not np.all(left_hand_data[4] == np.array([-1.13, 0.3, 0.15])): # if hand data has been initialized. - ref_left_value = left_hand_data[self.hand_retargeting.left_indices[1,:]] - left_hand_data[self.hand_retargeting.left_indices[0,:]] - ref_right_value = right_hand_data[self.hand_retargeting.right_indices[1,:]] - right_hand_data[self.hand_retargeting.right_indices[0,:]] - - left_q_target = self.hand_retargeting.left_retargeting.retarget(ref_left_value)[self.hand_retargeting.left_dex_retargeting_to_hardware] - right_q_target = self.hand_retargeting.right_retargeting.retarget(ref_right_value)[self.hand_retargeting.right_dex_retargeting_to_hardware] - - # In website https://support.unitree.com/home/en/G1_developer/inspire_dfx_dexterous_hand, you can find - # In the official document, the angles are in the range [0, 1] ==> 0.0: fully closed 1.0: fully open - # The q_target now is in radians, ranges: - # - idx 0~3: 0~1.7 (1.7 = closed) - # - idx 4: 0~0.5 - # - idx 5: -0.1~1.3 - # We normalize them using (max - value) / range - def normalize(val, min_val, max_val): - return np.clip((max_val - val) / (max_val - min_val), 0.0, 1.0) - - for idx in range(Inspire_Num_Motors): - if idx <= 3: - left_q_target[idx] = normalize(left_q_target[idx], 0.0, 1.7) - right_q_target[idx] = normalize(right_q_target[idx], 0.0, 1.7) - elif idx == 4: - left_q_target[idx] = normalize(left_q_target[idx], 0.0, 0.5) - right_q_target[idx] = normalize(right_q_target[idx], 0.0, 0.5) - elif idx == 5: - left_q_target[idx] = normalize(left_q_target[idx], -0.1, 1.3) - right_q_target[idx] = normalize(right_q_target[idx], -0.1, 1.3) - - # get dual hand action - action_data = np.concatenate((left_q_target, right_q_target)) - if dual_hand_state_array and dual_hand_action_array: - with dual_hand_data_lock: - dual_hand_state_array[:] = state_data - dual_hand_action_array[:] = action_data - - self.ctrl_dual_hand(left_q_target, right_q_target) - current_time = time.time() - time_elapsed = current_time - start_time - sleep_time = max(0, (1 / self.fps) - time_elapsed) - time.sleep(sleep_time) - finally: - logger_mp.info("Inspire_Controller has been closed.") - -# Update hand state, according to the official documentation, https://support.unitree.com/home/en/G1_developer/inspire_dfx_dexterous_hand -# the state sequence is as shown in the table below -# ┌──────┬───────┬──────┬────────┬────────┬────────────┬────────────────┬───────┬──────┬────────┬────────┬────────────┬────────────────┐ -# │ Id │ 0 │ 1 │ 2 │ 3 │ 4 │ 5 │ 6 │ 7 │ 8 │ 9 │ 10 │ 11 │ -# ├──────┼───────┼──────┼────────┼────────┼────────────┼────────────────┼───────┼──────┼────────┼────────┼────────────┼────────────────┤ -# │ │ Right Hand │ Left Hand │ -# │Joint │ pinky │ ring │ middle │ index │ thumb-bend │ thumb-rotation │ pinky │ ring │ middle │ index │ thumb-bend │ thumb-rotation │ -# └──────┴───────┴──────┴────────┴────────┴────────────┴────────────────┴───────┴──────┴────────┴────────┴────────────┴────────────────┘ -class Inspire_Right_Hand_JointIndex(IntEnum): - kRightHandPinky = 0 - kRightHandRing = 1 - kRightHandMiddle = 2 - kRightHandIndex = 3 - kRightHandThumbBend = 4 - kRightHandThumbRotation = 5 - -class Inspire_Left_Hand_JointIndex(IntEnum): - kLeftHandPinky = 6 - kLeftHandRing = 7 - kLeftHandMiddle = 8 - kLeftHandIndex = 9 - kLeftHandThumbBend = 10 - kLeftHandThumbRotation = 11 \ No newline at end of file diff --git a/teleop/robot_control/robot_hand_inspire_ftp.py b/teleop/robot_control/robot_hand_inspire_ftp.py deleted file mode 100644 index 385f942..0000000 --- a/teleop/robot_control/robot_hand_inspire_ftp.py +++ /dev/null @@ -1,229 +0,0 @@ -from unitree_sdk2py.core.channel import ChannelPublisher, ChannelSubscriber, ChannelFactoryInitialize -from inspire_sdkpy import inspire_dds, inspire_hand_defaut - -from teleop.robot_control.hand_retargeting import HandRetargeting, HandType -import numpy as np -import threading -import time -from multiprocessing import Process, Array, Lock - -import logging_mp -logger_mp = logging_mp.get_logger(__name__) - -inspire_tip_indices = [4, 9, 14, 19, 24] -Inspire_Num_Motors = 6 - -kTopicInspireCtrlLeft = "rt/inspire_hand/ctrl/l" -kTopicInspireCtrlRight = "rt/inspire_hand/ctrl/r" -kTopicInspireStateLeft = "rt/inspire_hand/state/l" -kTopicInspireStateRight = "rt/inspire_hand/state/r" - -class Inspire_Controller_FTP: - def __init__(self, left_hand_array, right_hand_array, dual_hand_data_lock = None, dual_hand_state_array = None, - dual_hand_action_array = None, fps = 100.0, Unit_Test = False, simulation_mode = False): - logger_mp.info("Initialize Inspire_Controller...") - self.fps = fps - self.Unit_Test = Unit_Test - self.simulation_mode = simulation_mode - if not self.Unit_Test: - self.hand_retargeting = HandRetargeting(HandType.INSPIRE_HAND) - else: - self.hand_retargeting = HandRetargeting(HandType.INSPIRE_HAND_Unit_Test) - - if self.simulation_mode: - ChannelFactoryInitialize(1) - else: - ChannelFactoryInitialize(0) - - # Initialize hand command publishers - self.LeftHandCmd_publisher = ChannelPublisher(kTopicInspireCtrlLeft, inspire_dds.inspire_hand_ctrl) - self.LeftHandCmd_publisher.Init() - self.RightHandCmd_publisher = ChannelPublisher(kTopicInspireCtrlRight, inspire_dds.inspire_hand_ctrl) - self.RightHandCmd_publisher.Init() - - # Initialize hand state subscribers - self.LeftHandState_subscriber = ChannelSubscriber(kTopicInspireStateLeft, inspire_dds.inspire_hand_state) - self.LeftHandState_subscriber.Init() # Consider using callback if preferred: Init(callback_func, period_ms) - self.RightHandState_subscriber = ChannelSubscriber(kTopicInspireStateRight, inspire_dds.inspire_hand_state) - self.RightHandState_subscriber.Init() - - # Shared Arrays for hand states ([0,1] normalized values) - self.left_hand_state_array = Array('d', Inspire_Num_Motors, lock=True) - self.right_hand_state_array = Array('d', Inspire_Num_Motors, lock=True) - - # Initialize subscribe thread - self.subscribe_state_thread = threading.Thread(target=self._subscribe_hand_state) - self.subscribe_state_thread.daemon = True - self.subscribe_state_thread.start() - - # Wait for initial DDS messages (optional, but good for ensuring connection) - wait_count = 0 - while not (any(self.left_hand_state_array) or any(self.right_hand_state_array)): - if wait_count % 100 == 0: # Print every second - logger_mp.info(f"[Inspire_Controller] Waiting to subscribe to hand states from DDS (L: {any(self.left_hand_state_array)}, R: {any(self.right_hand_state_array)})...") - time.sleep(0.01) - wait_count += 1 - if wait_count > 500: # Timeout after 5 seconds - logger_mp.warning("[Inspire_Controller] Warning: Timeout waiting for initial hand states. Proceeding anyway.") - break - logger_mp.info("[Inspire_Controller] Initial hand states received or timeout.") - - hand_control_process = Process(target=self.control_process, args=(left_hand_array, right_hand_array, self.left_hand_state_array, self.right_hand_state_array, - dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array)) - hand_control_process.daemon = True - hand_control_process.start() - - logger_mp.info("Initialize Inspire_Controller OK!\n") - - def _subscribe_hand_state(self): - logger_mp.info("[Inspire_Controller] Subscribe thread started.") - while True: - # Left Hand - left_state_msg = self.LeftHandState_subscriber.Read() - if left_state_msg is not None: - if hasattr(left_state_msg, 'angle_act') and len(left_state_msg.angle_act) == Inspire_Num_Motors: - with self.left_hand_state_array.get_lock(): - for i in range(Inspire_Num_Motors): - self.left_hand_state_array[i] = left_state_msg.angle_act[i] / 1000.0 - else: - logger_mp.warning(f"[Inspire_Controller] Received left_state_msg but attributes are missing or incorrect. Type: {type(left_state_msg)}, Content: {str(left_state_msg)[:100]}") - # Right Hand - right_state_msg = self.RightHandState_subscriber.Read() - if right_state_msg is not None: - if hasattr(right_state_msg, 'angle_act') and len(right_state_msg.angle_act) == Inspire_Num_Motors: - with self.right_hand_state_array.get_lock(): - for i in range(Inspire_Num_Motors): - self.right_hand_state_array[i] = right_state_msg.angle_act[i] / 1000.0 - else: - logger_mp.warning(f"[Inspire_Controller] Received right_state_msg but attributes are missing or incorrect. Type: {type(right_state_msg)}, Content: {str(right_state_msg)[:100]}") - - time.sleep(0.002) - - def _send_hand_command(self, left_angle_cmd_scaled, right_angle_cmd_scaled): - """ - Send scaled angle commands [0-1000] to both hands. - """ - # Left Hand Command - left_cmd_msg = inspire_hand_defaut.get_inspire_hand_ctrl() - left_cmd_msg.angle_set = left_angle_cmd_scaled - left_cmd_msg.mode = 0b0001 # Mode 1: Angle control - self.LeftHandCmd_publisher.Write(left_cmd_msg) - - # Right Hand Command - right_cmd_msg = inspire_hand_defaut.get_inspire_hand_ctrl() - right_cmd_msg.angle_set = right_angle_cmd_scaled - right_cmd_msg.mode = 0b0001 # Mode 1: Angle control - self.RightHandCmd_publisher.Write(right_cmd_msg) - - def control_process(self, left_hand_array, right_hand_array, left_hand_state_array, right_hand_state_array, - dual_hand_data_lock = None, dual_hand_state_array = None, dual_hand_action_array = None): - logger_mp.info("[Inspire_Controller] Control process started.") - self.running = True - - left_q_target = np.full(Inspire_Num_Motors, 1.0) - right_q_target = np.full(Inspire_Num_Motors, 1.0) - - try: - while self.running: - start_time = time.time() - # get dual hand state - with left_hand_array.get_lock(): - left_hand_data = np.array(left_hand_array[:]).reshape(25, 3).copy() - with right_hand_array.get_lock(): - right_hand_data = np.array(right_hand_array[:]).reshape(25, 3).copy() - - # Read left and right q_state from shared arrays - state_data = np.concatenate((np.array(left_hand_state_array[:]), np.array(right_hand_state_array[:]))) - - if not np.all(right_hand_data == 0.0) and not np.all(left_hand_data[4] == np.array([-1.13, 0.3, 0.15])): # if hand data has been initialized. - ref_left_value = left_hand_data[self.hand_retargeting.left_indices[1,:]] - left_hand_data[self.hand_retargeting.left_indices[0,:]] - ref_right_value = right_hand_data[self.hand_retargeting.right_indices[1,:]] - right_hand_data[self.hand_retargeting.right_indices[0,:]] - - left_q_target = self.hand_retargeting.left_retargeting.retarget(ref_left_value)[self.hand_retargeting.left_dex_retargeting_to_hardware] - right_q_target = self.hand_retargeting.right_retargeting.retarget(ref_right_value)[self.hand_retargeting.right_dex_retargeting_to_hardware] - - def normalize(val, min_val, max_val): - return np.clip((max_val - val) / (max_val - min_val), 0.0, 1.0) - - for idx in range(Inspire_Num_Motors): - if idx <= 3: - left_q_target[idx] = normalize(left_q_target[idx], 0.0, 1.7) - right_q_target[idx] = normalize(right_q_target[idx], 0.0, 1.7) - elif idx == 4: - left_q_target[idx] = normalize(left_q_target[idx], 0.0, 0.5) - right_q_target[idx] = normalize(right_q_target[idx], 0.0, 0.5) - elif idx == 5: - left_q_target[idx] = normalize(left_q_target[idx], -0.1, 1.3) - right_q_target[idx] = normalize(right_q_target[idx], -0.1, 1.3) - - scaled_left_cmd = [int(np.clip(val * 1000, 0, 1000)) for val in left_q_target] - scaled_right_cmd = [int(np.clip(val * 1000, 0, 1000)) for val in right_q_target] - - # get dual hand action - action_data = np.concatenate((left_q_target, right_q_target)) - if dual_hand_state_array and dual_hand_action_array: - with dual_hand_data_lock: - dual_hand_state_array[:] = state_data - dual_hand_action_array[:] = action_data - - self._send_hand_command(scaled_left_cmd, scaled_right_cmd) - current_time = time.time() - time_elapsed = current_time - start_time - sleep_time = max(0, (1 / self.fps) - time_elapsed) - time.sleep(sleep_time) - finally: - logger_mp.info("Inspire_Controller has been closed.") - -if __name__ == '__main__': - logger_mp.info("Starting Inspire_Controller example...") - mock_left_hand_input = Array('d', 75, lock=True) - mock_right_hand_input = Array('d', 75, lock=True) - - with mock_right_hand_input.get_lock(): - for i in range(len(mock_right_hand_input)): - mock_right_hand_input[i] = (i % 10) * 0.01 - - with mock_left_hand_input.get_lock(): - temp_left_mat = np.zeros((25,3)) - temp_left_mat[4] = np.array([-1.13, 0.3, 0.15]) - mock_left_hand_input[:] = temp_left_mat.flatten() - - shared_lock = Lock() - shared_state = Array('d', Inspire_Num_Motors * 2, lock=False) - shared_action = Array('d', Inspire_Num_Motors * 2, lock=False) - - try: - controller = Inspire_Controller( - left_hand_array=mock_left_hand_input, - right_hand_array=mock_right_hand_input, - dual_hand_data_lock=shared_lock, - dual_hand_state_array=shared_state, - dual_hand_action_array=shared_action, - fps=50.0, - Unit_Test=False, - ) - - count = 0 - main_loop_running = True - while main_loop_running: - try: - time.sleep(1.0) - # Simulate a slight change in human hand input - with mock_right_hand_input.get_lock(): - mock_right_hand_input[inspire_tip_indices[0]*3 + 1] = 0.1 + (count % 10) * 0.02 - - with shared_lock: - print(f"Cycle {count} - Logged State: {[f'{x:.3f}' for x in shared_state[:]]}, Logged Action: {[f'{x:.3f}' for x in shared_action[:]]}") - count +=1 - if count > 3000 : # Increased run time for more observation - print("Example finished after 3000 cycles.") - main_loop_running = False - except KeyboardInterrupt: - print("Main loop interrupted. Finishing example.") - main_loop_running = False - - - except Exception as e: - print(f"An error occurred in the example: {e}") - finally: - print("Exiting main program.") diff --git a/teleop/teleop_hand_and_arm.py b/teleop/teleop_hand_and_arm.py index 684daed..da090f0 100644 --- a/teleop/teleop_hand_and_arm.py +++ b/teleop/teleop_hand_and_arm.py @@ -16,8 +16,7 @@ from televuer import TeleVuerWrapper from teleop.robot_control.robot_arm import G1_29_ArmController, G1_23_ArmController, H1_2_ArmController, H1_ArmController from teleop.robot_control.robot_arm_ik import G1_29_ArmIK, G1_23_ArmIK, H1_2_ArmIK, H1_ArmIK from teleop.robot_control.robot_hand_unitree import Dex3_1_Controller, Dex1_1_Gripper_Controller -from teleop.robot_control.robot_hand_inspire_dfx import Inspire_Controller_DFX -from teleop.robot_control.robot_hand_inspire_ftp import Inspire_Controller_FTP +from teleop.robot_control.robot_hand_inspire import Inspire_Controller_DFX, Inspire_Controller_FTP from teleop.robot_control.robot_hand_brainco import Brainco_Controller from teleimager.image_client import ImageClient from teleop.utils.episode_writer import EpisodeWriter