Browse Source

[update] merge classes into unified module. lazy loading for optional dependencies.

main
silencht 5 months ago
parent
commit
204636dea7
  1. 346
      teleop/robot_control/robot_hand_inspire.py
  2. 181
      teleop/robot_control/robot_hand_inspire_dfx.py
  3. 229
      teleop/robot_control/robot_hand_inspire_ftp.py
  4. 3
      teleop/teleop_hand_and_arm.py

346
teleop/robot_control/robot_hand_inspire.py

@ -0,0 +1,346 @@
from unitree_sdk2py.core.channel import ChannelPublisher, ChannelSubscriber, ChannelFactoryInitialize # dds
from unitree_sdk2py.idl.unitree_go.msg.dds_ import MotorCmds_, MotorStates_ # idl
from unitree_sdk2py.idl.default import unitree_go_msg_dds__MotorCmd_
from teleop.robot_control.hand_retargeting import HandRetargeting, HandType
import numpy as np
from enum import IntEnum
import threading
import time
from multiprocessing import Process, Array
import logging_mp
logger_mp = logging_mp.get_logger(__name__)
Inspire_Num_Motors = 6
kTopicInspireDFXCommand = "rt/inspire/cmd"
kTopicInspireDFXState = "rt/inspire/state"
class Inspire_Controller_DFX:
def __init__(self, left_hand_array, right_hand_array, dual_hand_data_lock = None, dual_hand_state_array = None,
dual_hand_action_array = None, fps = 100.0, Unit_Test = False, simulation_mode = False):
logger_mp.info("Initialize Inspire_Controller_DFX...")
self.fps = fps
self.Unit_Test = Unit_Test
self.simulation_mode = simulation_mode
if not self.Unit_Test:
self.hand_retargeting = HandRetargeting(HandType.INSPIRE_HAND)
else:
self.hand_retargeting = HandRetargeting(HandType.INSPIRE_HAND_Unit_Test)
if self.simulation_mode:
ChannelFactoryInitialize(1)
else:
ChannelFactoryInitialize(0)
# initialize handcmd publisher and handstate subscriber
self.HandCmb_publisher = ChannelPublisher(kTopicInspireDFXCommand, MotorCmds_)
self.HandCmb_publisher.Init()
self.HandState_subscriber = ChannelSubscriber(kTopicInspireDFXState, MotorStates_)
self.HandState_subscriber.Init()
# Shared Arrays for hand states
self.left_hand_state_array = Array('d', Inspire_Num_Motors, lock=True)
self.right_hand_state_array = Array('d', Inspire_Num_Motors, lock=True)
# initialize subscribe thread
self.subscribe_state_thread = threading.Thread(target=self._subscribe_hand_state)
self.subscribe_state_thread.daemon = True
self.subscribe_state_thread.start()
while True:
if any(self.right_hand_state_array): # any(self.left_hand_state_array) and
break
time.sleep(0.01)
logger_mp.warning("[Inspire_Controller_DFX] Waiting to subscribe dds...")
logger_mp.info("[Inspire_Controller_DFX] Subscribe dds ok.")
hand_control_process = Process(target=self.control_process, args=(left_hand_array, right_hand_array, self.left_hand_state_array, self.right_hand_state_array,
dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array))
hand_control_process.daemon = True
hand_control_process.start()
logger_mp.info("Initialize Inspire_Controller_DFX OK!")
def _subscribe_hand_state(self):
while True:
hand_msg = self.HandState_subscriber.Read()
if hand_msg is not None:
for idx, id in enumerate(Inspire_Left_Hand_JointIndex):
self.left_hand_state_array[idx] = hand_msg.states[id].q
for idx, id in enumerate(Inspire_Right_Hand_JointIndex):
self.right_hand_state_array[idx] = hand_msg.states[id].q
time.sleep(0.002)
def ctrl_dual_hand(self, left_q_target, right_q_target):
"""
Set current left, right hand motor state target q
"""
for idx, id in enumerate(Inspire_Left_Hand_JointIndex):
self.hand_msg.cmds[id].q = left_q_target[idx]
for idx, id in enumerate(Inspire_Right_Hand_JointIndex):
self.hand_msg.cmds[id].q = right_q_target[idx]
self.HandCmb_publisher.Write(self.hand_msg)
# logger_mp.debug("hand ctrl publish ok.")
def control_process(self, left_hand_array, right_hand_array, left_hand_state_array, right_hand_state_array,
dual_hand_data_lock = None, dual_hand_state_array = None, dual_hand_action_array = None):
self.running = True
left_q_target = np.full(Inspire_Num_Motors, 1.0)
right_q_target = np.full(Inspire_Num_Motors, 1.0)
# initialize inspire hand's cmd msg
self.hand_msg = MotorCmds_()
self.hand_msg.cmds = [unitree_go_msg_dds__MotorCmd_() for _ in range(len(Inspire_Right_Hand_JointIndex) + len(Inspire_Left_Hand_JointIndex))]
for idx, id in enumerate(Inspire_Left_Hand_JointIndex):
self.hand_msg.cmds[id].q = 1.0
for idx, id in enumerate(Inspire_Right_Hand_JointIndex):
self.hand_msg.cmds[id].q = 1.0
try:
while self.running:
start_time = time.time()
# get dual hand state
with left_hand_array.get_lock():
left_hand_data = np.array(left_hand_array[:]).reshape(25, 3).copy()
with right_hand_array.get_lock():
right_hand_data = np.array(right_hand_array[:]).reshape(25, 3).copy()
# Read left and right q_state from shared arrays
state_data = np.concatenate((np.array(left_hand_state_array[:]), np.array(right_hand_state_array[:])))
if not np.all(right_hand_data == 0.0) and not np.all(left_hand_data[4] == np.array([-1.13, 0.3, 0.15])): # if hand data has been initialized.
ref_left_value = left_hand_data[self.hand_retargeting.left_indices[1,:]] - left_hand_data[self.hand_retargeting.left_indices[0,:]]
ref_right_value = right_hand_data[self.hand_retargeting.right_indices[1,:]] - right_hand_data[self.hand_retargeting.right_indices[0,:]]
left_q_target = self.hand_retargeting.left_retargeting.retarget(ref_left_value)[self.hand_retargeting.left_dex_retargeting_to_hardware]
right_q_target = self.hand_retargeting.right_retargeting.retarget(ref_right_value)[self.hand_retargeting.right_dex_retargeting_to_hardware]
# In website https://support.unitree.com/home/en/G1_developer/inspire_dfx_dexterous_hand, you can find
# In the official document, the angles are in the range [0, 1] ==> 0.0: fully closed 1.0: fully open
# The q_target now is in radians, ranges:
# - idx 0~3: 0~1.7 (1.7 = closed)
# - idx 4: 0~0.5
# - idx 5: -0.1~1.3
# We normalize them using (max - value) / range
def normalize(val, min_val, max_val):
return np.clip((max_val - val) / (max_val - min_val), 0.0, 1.0)
for idx in range(Inspire_Num_Motors):
if idx <= 3:
left_q_target[idx] = normalize(left_q_target[idx], 0.0, 1.7)
right_q_target[idx] = normalize(right_q_target[idx], 0.0, 1.7)
elif idx == 4:
left_q_target[idx] = normalize(left_q_target[idx], 0.0, 0.5)
right_q_target[idx] = normalize(right_q_target[idx], 0.0, 0.5)
elif idx == 5:
left_q_target[idx] = normalize(left_q_target[idx], -0.1, 1.3)
right_q_target[idx] = normalize(right_q_target[idx], -0.1, 1.3)
# get dual hand action
action_data = np.concatenate((left_q_target, right_q_target))
if dual_hand_state_array and dual_hand_action_array:
with dual_hand_data_lock:
dual_hand_state_array[:] = state_data
dual_hand_action_array[:] = action_data
self.ctrl_dual_hand(left_q_target, right_q_target)
current_time = time.time()
time_elapsed = current_time - start_time
sleep_time = max(0, (1 / self.fps) - time_elapsed)
time.sleep(sleep_time)
finally:
logger_mp.info("Inspire_Controller_DFX has been closed.")
kTopicInspireFTPLeftCommand = "rt/inspire_hand/ctrl/l"
kTopicInspireFTPRightCommand = "rt/inspire_hand/ctrl/r"
kTopicInspireFTPLeftState = "rt/inspire_hand/state/l"
kTopicInspireFTPRightState = "rt/inspire_hand/state/r"
class Inspire_Controller_FTP:
def __init__(self, left_hand_array, right_hand_array, dual_hand_data_lock = None, dual_hand_state_array = None,
dual_hand_action_array = None, fps = 100.0, Unit_Test = False, simulation_mode = False):
logger_mp.info("Initialize Inspire_Controller_FTP...")
from inspire_sdkpy import inspire_dds, inspire_hand_defaut # lazy import
self.fps = fps
self.Unit_Test = Unit_Test
self.simulation_mode = simulation_mode
if not self.Unit_Test:
self.hand_retargeting = HandRetargeting(HandType.INSPIRE_HAND)
else:
self.hand_retargeting = HandRetargeting(HandType.INSPIRE_HAND_Unit_Test)
if self.simulation_mode:
ChannelFactoryInitialize(1)
else:
ChannelFactoryInitialize(0)
# Initialize hand command publishers
self.LeftHandCmd_publisher = ChannelPublisher(kTopicInspireFTPLeftCommand, inspire_dds.inspire_hand_ctrl)
self.LeftHandCmd_publisher.Init()
self.RightHandCmd_publisher = ChannelPublisher(kTopicInspireFTPRightCommand, inspire_dds.inspire_hand_ctrl)
self.RightHandCmd_publisher.Init()
# Initialize hand state subscribers
self.LeftHandState_subscriber = ChannelSubscriber(kTopicInspireFTPLeftState, inspire_dds.inspire_hand_state)
self.LeftHandState_subscriber.Init() # Consider using callback if preferred: Init(callback_func, period_ms)
self.RightHandState_subscriber = ChannelSubscriber(kTopicInspireFTPRightState, inspire_dds.inspire_hand_state)
self.RightHandState_subscriber.Init()
# Shared Arrays for hand states ([0,1] normalized values)
self.left_hand_state_array = Array('d', Inspire_Num_Motors, lock=True)
self.right_hand_state_array = Array('d', Inspire_Num_Motors, lock=True)
# Initialize subscribe thread
self.subscribe_state_thread = threading.Thread(target=self._subscribe_hand_state)
self.subscribe_state_thread.daemon = True
self.subscribe_state_thread.start()
# Wait for initial DDS messages (optional, but good for ensuring connection)
wait_count = 0
while not (any(self.left_hand_state_array) or any(self.right_hand_state_array)):
if wait_count % 100 == 0: # Print every second
logger_mp.info(f"[Inspire_Controller_FTP] Waiting to subscribe to hand states from DDS (L: {any(self.left_hand_state_array)}, R: {any(self.right_hand_state_array)})...")
time.sleep(0.01)
wait_count += 1
if wait_count > 500: # Timeout after 5 seconds
logger_mp.warning("[Inspire_Controller_FTP] Warning: Timeout waiting for initial hand states. Proceeding anyway.")
break
logger_mp.info("[Inspire_Controller_FTP] Initial hand states received or timeout.")
hand_control_process = Process(target=self.control_process, args=(left_hand_array, right_hand_array, self.left_hand_state_array, self.right_hand_state_array,
dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array))
hand_control_process.daemon = True
hand_control_process.start()
logger_mp.info("Initialize Inspire_Controller_FTP OK!\n")
def _subscribe_hand_state(self):
logger_mp.info("[Inspire_Controller_FTP] Subscribe thread started.")
while True:
# Left Hand
left_state_msg = self.LeftHandState_subscriber.Read()
if left_state_msg is not None:
if hasattr(left_state_msg, 'angle_act') and len(left_state_msg.angle_act) == Inspire_Num_Motors:
with self.left_hand_state_array.get_lock():
for i in range(Inspire_Num_Motors):
self.left_hand_state_array[i] = left_state_msg.angle_act[i] / 1000.0
else:
logger_mp.warning(f"[Inspire_Controller_FTP] Received left_state_msg but attributes are missing or incorrect. Type: {type(left_state_msg)}, Content: {str(left_state_msg)[:100]}")
# Right Hand
right_state_msg = self.RightHandState_subscriber.Read()
if right_state_msg is not None:
if hasattr(right_state_msg, 'angle_act') and len(right_state_msg.angle_act) == Inspire_Num_Motors:
with self.right_hand_state_array.get_lock():
for i in range(Inspire_Num_Motors):
self.right_hand_state_array[i] = right_state_msg.angle_act[i] / 1000.0
else:
logger_mp.warning(f"[Inspire_Controller_FTP] Received right_state_msg but attributes are missing or incorrect. Type: {type(right_state_msg)}, Content: {str(right_state_msg)[:100]}")
time.sleep(0.002)
def _send_hand_command(self, left_angle_cmd_scaled, right_angle_cmd_scaled):
"""
Send scaled angle commands [0-1000] to both hands.
"""
# Left Hand Command
left_cmd_msg = inspire_hand_defaut.get_inspire_hand_ctrl()
left_cmd_msg.angle_set = left_angle_cmd_scaled
left_cmd_msg.mode = 0b0001 # Mode 1: Angle control
self.LeftHandCmd_publisher.Write(left_cmd_msg)
# Right Hand Command
right_cmd_msg = inspire_hand_defaut.get_inspire_hand_ctrl()
right_cmd_msg.angle_set = right_angle_cmd_scaled
right_cmd_msg.mode = 0b0001 # Mode 1: Angle control
self.RightHandCmd_publisher.Write(right_cmd_msg)
def control_process(self, left_hand_array, right_hand_array, left_hand_state_array, right_hand_state_array,
dual_hand_data_lock = None, dual_hand_state_array = None, dual_hand_action_array = None):
logger_mp.info("[Inspire_Controller_FTP] Control process started.")
self.running = True
left_q_target = np.full(Inspire_Num_Motors, 1.0)
right_q_target = np.full(Inspire_Num_Motors, 1.0)
try:
while self.running:
start_time = time.time()
# get dual hand state
with left_hand_array.get_lock():
left_hand_data = np.array(left_hand_array[:]).reshape(25, 3).copy()
with right_hand_array.get_lock():
right_hand_data = np.array(right_hand_array[:]).reshape(25, 3).copy()
# Read left and right q_state from shared arrays
state_data = np.concatenate((np.array(left_hand_state_array[:]), np.array(right_hand_state_array[:])))
if not np.all(right_hand_data == 0.0) and not np.all(left_hand_data[4] == np.array([-1.13, 0.3, 0.15])): # if hand data has been initialized.
ref_left_value = left_hand_data[self.hand_retargeting.left_indices[1,:]] - left_hand_data[self.hand_retargeting.left_indices[0,:]]
ref_right_value = right_hand_data[self.hand_retargeting.right_indices[1,:]] - right_hand_data[self.hand_retargeting.right_indices[0,:]]
left_q_target = self.hand_retargeting.left_retargeting.retarget(ref_left_value)[self.hand_retargeting.left_dex_retargeting_to_hardware]
right_q_target = self.hand_retargeting.right_retargeting.retarget(ref_right_value)[self.hand_retargeting.right_dex_retargeting_to_hardware]
def normalize(val, min_val, max_val):
return np.clip((max_val - val) / (max_val - min_val), 0.0, 1.0)
for idx in range(Inspire_Num_Motors):
if idx <= 3:
left_q_target[idx] = normalize(left_q_target[idx], 0.0, 1.7)
right_q_target[idx] = normalize(right_q_target[idx], 0.0, 1.7)
elif idx == 4:
left_q_target[idx] = normalize(left_q_target[idx], 0.0, 0.5)
right_q_target[idx] = normalize(right_q_target[idx], 0.0, 0.5)
elif idx == 5:
left_q_target[idx] = normalize(left_q_target[idx], -0.1, 1.3)
right_q_target[idx] = normalize(right_q_target[idx], -0.1, 1.3)
scaled_left_cmd = [int(np.clip(val * 1000, 0, 1000)) for val in left_q_target]
scaled_right_cmd = [int(np.clip(val * 1000, 0, 1000)) for val in right_q_target]
# get dual hand action
action_data = np.concatenate((left_q_target, right_q_target))
if dual_hand_state_array and dual_hand_action_array:
with dual_hand_data_lock:
dual_hand_state_array[:] = state_data
dual_hand_action_array[:] = action_data
self._send_hand_command(scaled_left_cmd, scaled_right_cmd)
current_time = time.time()
time_elapsed = current_time - start_time
sleep_time = max(0, (1 / self.fps) - time_elapsed)
time.sleep(sleep_time)
finally:
logger_mp.info("Inspire_Controller_FTP has been closed.")
# Update hand state, according to the official documentation:
# 1. https://support.unitree.com/home/en/G1_developer/inspire_dfx_dexterous_hand
# 2. https://support.unitree.com/home/en/G1_developer/inspire_ftp_dexterity_hand
# the state sequence is as shown in the table below
# ┌──────┬───────┬──────┬────────┬────────┬────────────┬────────────────┬───────┬──────┬────────┬────────┬────────────┬────────────────┐
# │ Id │ 0 │ 1 │ 2 │ 3 │ 4 │ 5 │ 6 │ 7 │ 8 │ 9 │ 10 │ 11 │
# ├──────┼───────┼──────┼────────┼────────┼────────────┼────────────────┼───────┼──────┼────────┼────────┼────────────┼────────────────┤
# │ │ Right Hand │ Left Hand │
# │Joint │ pinky │ ring │ middle │ index │ thumb-bend │ thumb-rotation │ pinky │ ring │ middle │ index │ thumb-bend │ thumb-rotation │
# └──────┴───────┴──────┴────────┴────────┴────────────┴────────────────┴───────┴──────┴────────┴────────┴────────────┴────────────────┘
class Inspire_Right_Hand_JointIndex(IntEnum):
kRightHandPinky = 0
kRightHandRing = 1
kRightHandMiddle = 2
kRightHandIndex = 3
kRightHandThumbBend = 4
kRightHandThumbRotation = 5
class Inspire_Left_Hand_JointIndex(IntEnum):
kLeftHandPinky = 6
kLeftHandRing = 7
kLeftHandMiddle = 8
kLeftHandIndex = 9
kLeftHandThumbBend = 10
kLeftHandThumbRotation = 11

181
teleop/robot_control/robot_hand_inspire_dfx.py

@ -1,181 +0,0 @@
from unitree_sdk2py.core.channel import ChannelPublisher, ChannelSubscriber, ChannelFactoryInitialize # dds
from unitree_sdk2py.idl.unitree_go.msg.dds_ import MotorCmds_, MotorStates_ # idl
from unitree_sdk2py.idl.default import unitree_go_msg_dds__MotorCmd_
from teleop.robot_control.hand_retargeting import HandRetargeting, HandType
import numpy as np
from enum import IntEnum
import threading
import time
from multiprocessing import Process, Array
import logging_mp
logger_mp = logging_mp.get_logger(__name__)
Inspire_Num_Motors = 6
kTopicInspireCommand = "rt/inspire/cmd"
kTopicInspireState = "rt/inspire/state"
class Inspire_Controller_DFX:
def __init__(self, left_hand_array, right_hand_array, dual_hand_data_lock = None, dual_hand_state_array = None,
dual_hand_action_array = None, fps = 100.0, Unit_Test = False, simulation_mode = False):
logger_mp.info("Initialize Inspire_Controller...")
self.fps = fps
self.Unit_Test = Unit_Test
self.simulation_mode = simulation_mode
if not self.Unit_Test:
self.hand_retargeting = HandRetargeting(HandType.INSPIRE_HAND)
else:
self.hand_retargeting = HandRetargeting(HandType.INSPIRE_HAND_Unit_Test)
if self.simulation_mode:
ChannelFactoryInitialize(1)
else:
ChannelFactoryInitialize(0)
# initialize handcmd publisher and handstate subscriber
self.HandCmb_publisher = ChannelPublisher(kTopicInspireCommand, MotorCmds_)
self.HandCmb_publisher.Init()
self.HandState_subscriber = ChannelSubscriber(kTopicInspireState, MotorStates_)
self.HandState_subscriber.Init()
# Shared Arrays for hand states
self.left_hand_state_array = Array('d', Inspire_Num_Motors, lock=True)
self.right_hand_state_array = Array('d', Inspire_Num_Motors, lock=True)
# initialize subscribe thread
self.subscribe_state_thread = threading.Thread(target=self._subscribe_hand_state)
self.subscribe_state_thread.daemon = True
self.subscribe_state_thread.start()
while True:
if any(self.right_hand_state_array): # any(self.left_hand_state_array) and
break
time.sleep(0.01)
logger_mp.warning("[Inspire_Controller] Waiting to subscribe dds...")
logger_mp.info("[Inspire_Controller] Subscribe dds ok.")
hand_control_process = Process(target=self.control_process, args=(left_hand_array, right_hand_array, self.left_hand_state_array, self.right_hand_state_array,
dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array))
hand_control_process.daemon = True
hand_control_process.start()
logger_mp.info("Initialize Inspire_Controller OK!")
def _subscribe_hand_state(self):
while True:
hand_msg = self.HandState_subscriber.Read()
if hand_msg is not None:
for idx, id in enumerate(Inspire_Left_Hand_JointIndex):
self.left_hand_state_array[idx] = hand_msg.states[id].q
for idx, id in enumerate(Inspire_Right_Hand_JointIndex):
self.right_hand_state_array[idx] = hand_msg.states[id].q
time.sleep(0.002)
def ctrl_dual_hand(self, left_q_target, right_q_target):
"""
Set current left, right hand motor state target q
"""
for idx, id in enumerate(Inspire_Left_Hand_JointIndex):
self.hand_msg.cmds[id].q = left_q_target[idx]
for idx, id in enumerate(Inspire_Right_Hand_JointIndex):
self.hand_msg.cmds[id].q = right_q_target[idx]
self.HandCmb_publisher.Write(self.hand_msg)
# logger_mp.debug("hand ctrl publish ok.")
def control_process(self, left_hand_array, right_hand_array, left_hand_state_array, right_hand_state_array,
dual_hand_data_lock = None, dual_hand_state_array = None, dual_hand_action_array = None):
self.running = True
left_q_target = np.full(Inspire_Num_Motors, 1.0)
right_q_target = np.full(Inspire_Num_Motors, 1.0)
# initialize inspire hand's cmd msg
self.hand_msg = MotorCmds_()
self.hand_msg.cmds = [unitree_go_msg_dds__MotorCmd_() for _ in range(len(Inspire_Right_Hand_JointIndex) + len(Inspire_Left_Hand_JointIndex))]
for idx, id in enumerate(Inspire_Left_Hand_JointIndex):
self.hand_msg.cmds[id].q = 1.0
for idx, id in enumerate(Inspire_Right_Hand_JointIndex):
self.hand_msg.cmds[id].q = 1.0
try:
while self.running:
start_time = time.time()
# get dual hand state
with left_hand_array.get_lock():
left_hand_data = np.array(left_hand_array[:]).reshape(25, 3).copy()
with right_hand_array.get_lock():
right_hand_data = np.array(right_hand_array[:]).reshape(25, 3).copy()
# Read left and right q_state from shared arrays
state_data = np.concatenate((np.array(left_hand_state_array[:]), np.array(right_hand_state_array[:])))
if not np.all(right_hand_data == 0.0) and not np.all(left_hand_data[4] == np.array([-1.13, 0.3, 0.15])): # if hand data has been initialized.
ref_left_value = left_hand_data[self.hand_retargeting.left_indices[1,:]] - left_hand_data[self.hand_retargeting.left_indices[0,:]]
ref_right_value = right_hand_data[self.hand_retargeting.right_indices[1,:]] - right_hand_data[self.hand_retargeting.right_indices[0,:]]
left_q_target = self.hand_retargeting.left_retargeting.retarget(ref_left_value)[self.hand_retargeting.left_dex_retargeting_to_hardware]
right_q_target = self.hand_retargeting.right_retargeting.retarget(ref_right_value)[self.hand_retargeting.right_dex_retargeting_to_hardware]
# In website https://support.unitree.com/home/en/G1_developer/inspire_dfx_dexterous_hand, you can find
# In the official document, the angles are in the range [0, 1] ==> 0.0: fully closed 1.0: fully open
# The q_target now is in radians, ranges:
# - idx 0~3: 0~1.7 (1.7 = closed)
# - idx 4: 0~0.5
# - idx 5: -0.1~1.3
# We normalize them using (max - value) / range
def normalize(val, min_val, max_val):
return np.clip((max_val - val) / (max_val - min_val), 0.0, 1.0)
for idx in range(Inspire_Num_Motors):
if idx <= 3:
left_q_target[idx] = normalize(left_q_target[idx], 0.0, 1.7)
right_q_target[idx] = normalize(right_q_target[idx], 0.0, 1.7)
elif idx == 4:
left_q_target[idx] = normalize(left_q_target[idx], 0.0, 0.5)
right_q_target[idx] = normalize(right_q_target[idx], 0.0, 0.5)
elif idx == 5:
left_q_target[idx] = normalize(left_q_target[idx], -0.1, 1.3)
right_q_target[idx] = normalize(right_q_target[idx], -0.1, 1.3)
# get dual hand action
action_data = np.concatenate((left_q_target, right_q_target))
if dual_hand_state_array and dual_hand_action_array:
with dual_hand_data_lock:
dual_hand_state_array[:] = state_data
dual_hand_action_array[:] = action_data
self.ctrl_dual_hand(left_q_target, right_q_target)
current_time = time.time()
time_elapsed = current_time - start_time
sleep_time = max(0, (1 / self.fps) - time_elapsed)
time.sleep(sleep_time)
finally:
logger_mp.info("Inspire_Controller has been closed.")
# Update hand state, according to the official documentation, https://support.unitree.com/home/en/G1_developer/inspire_dfx_dexterous_hand
# the state sequence is as shown in the table below
# ┌──────┬───────┬──────┬────────┬────────┬────────────┬────────────────┬───────┬──────┬────────┬────────┬────────────┬────────────────┐
# │ Id │ 0 │ 1 │ 2 │ 3 │ 4 │ 5 │ 6 │ 7 │ 8 │ 9 │ 10 │ 11 │
# ├──────┼───────┼──────┼────────┼────────┼────────────┼────────────────┼───────┼──────┼────────┼────────┼────────────┼────────────────┤
# │ │ Right Hand │ Left Hand │
# │Joint │ pinky │ ring │ middle │ index │ thumb-bend │ thumb-rotation │ pinky │ ring │ middle │ index │ thumb-bend │ thumb-rotation │
# └──────┴───────┴──────┴────────┴────────┴────────────┴────────────────┴───────┴──────┴────────┴────────┴────────────┴────────────────┘
class Inspire_Right_Hand_JointIndex(IntEnum):
kRightHandPinky = 0
kRightHandRing = 1
kRightHandMiddle = 2
kRightHandIndex = 3
kRightHandThumbBend = 4
kRightHandThumbRotation = 5
class Inspire_Left_Hand_JointIndex(IntEnum):
kLeftHandPinky = 6
kLeftHandRing = 7
kLeftHandMiddle = 8
kLeftHandIndex = 9
kLeftHandThumbBend = 10
kLeftHandThumbRotation = 11

229
teleop/robot_control/robot_hand_inspire_ftp.py

@ -1,229 +0,0 @@
from unitree_sdk2py.core.channel import ChannelPublisher, ChannelSubscriber, ChannelFactoryInitialize
from inspire_sdkpy import inspire_dds, inspire_hand_defaut
from teleop.robot_control.hand_retargeting import HandRetargeting, HandType
import numpy as np
import threading
import time
from multiprocessing import Process, Array, Lock
import logging_mp
logger_mp = logging_mp.get_logger(__name__)
inspire_tip_indices = [4, 9, 14, 19, 24]
Inspire_Num_Motors = 6
kTopicInspireCtrlLeft = "rt/inspire_hand/ctrl/l"
kTopicInspireCtrlRight = "rt/inspire_hand/ctrl/r"
kTopicInspireStateLeft = "rt/inspire_hand/state/l"
kTopicInspireStateRight = "rt/inspire_hand/state/r"
class Inspire_Controller_FTP:
def __init__(self, left_hand_array, right_hand_array, dual_hand_data_lock = None, dual_hand_state_array = None,
dual_hand_action_array = None, fps = 100.0, Unit_Test = False, simulation_mode = False):
logger_mp.info("Initialize Inspire_Controller...")
self.fps = fps
self.Unit_Test = Unit_Test
self.simulation_mode = simulation_mode
if not self.Unit_Test:
self.hand_retargeting = HandRetargeting(HandType.INSPIRE_HAND)
else:
self.hand_retargeting = HandRetargeting(HandType.INSPIRE_HAND_Unit_Test)
if self.simulation_mode:
ChannelFactoryInitialize(1)
else:
ChannelFactoryInitialize(0)
# Initialize hand command publishers
self.LeftHandCmd_publisher = ChannelPublisher(kTopicInspireCtrlLeft, inspire_dds.inspire_hand_ctrl)
self.LeftHandCmd_publisher.Init()
self.RightHandCmd_publisher = ChannelPublisher(kTopicInspireCtrlRight, inspire_dds.inspire_hand_ctrl)
self.RightHandCmd_publisher.Init()
# Initialize hand state subscribers
self.LeftHandState_subscriber = ChannelSubscriber(kTopicInspireStateLeft, inspire_dds.inspire_hand_state)
self.LeftHandState_subscriber.Init() # Consider using callback if preferred: Init(callback_func, period_ms)
self.RightHandState_subscriber = ChannelSubscriber(kTopicInspireStateRight, inspire_dds.inspire_hand_state)
self.RightHandState_subscriber.Init()
# Shared Arrays for hand states ([0,1] normalized values)
self.left_hand_state_array = Array('d', Inspire_Num_Motors, lock=True)
self.right_hand_state_array = Array('d', Inspire_Num_Motors, lock=True)
# Initialize subscribe thread
self.subscribe_state_thread = threading.Thread(target=self._subscribe_hand_state)
self.subscribe_state_thread.daemon = True
self.subscribe_state_thread.start()
# Wait for initial DDS messages (optional, but good for ensuring connection)
wait_count = 0
while not (any(self.left_hand_state_array) or any(self.right_hand_state_array)):
if wait_count % 100 == 0: # Print every second
logger_mp.info(f"[Inspire_Controller] Waiting to subscribe to hand states from DDS (L: {any(self.left_hand_state_array)}, R: {any(self.right_hand_state_array)})...")
time.sleep(0.01)
wait_count += 1
if wait_count > 500: # Timeout after 5 seconds
logger_mp.warning("[Inspire_Controller] Warning: Timeout waiting for initial hand states. Proceeding anyway.")
break
logger_mp.info("[Inspire_Controller] Initial hand states received or timeout.")
hand_control_process = Process(target=self.control_process, args=(left_hand_array, right_hand_array, self.left_hand_state_array, self.right_hand_state_array,
dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array))
hand_control_process.daemon = True
hand_control_process.start()
logger_mp.info("Initialize Inspire_Controller OK!\n")
def _subscribe_hand_state(self):
logger_mp.info("[Inspire_Controller] Subscribe thread started.")
while True:
# Left Hand
left_state_msg = self.LeftHandState_subscriber.Read()
if left_state_msg is not None:
if hasattr(left_state_msg, 'angle_act') and len(left_state_msg.angle_act) == Inspire_Num_Motors:
with self.left_hand_state_array.get_lock():
for i in range(Inspire_Num_Motors):
self.left_hand_state_array[i] = left_state_msg.angle_act[i] / 1000.0
else:
logger_mp.warning(f"[Inspire_Controller] Received left_state_msg but attributes are missing or incorrect. Type: {type(left_state_msg)}, Content: {str(left_state_msg)[:100]}")
# Right Hand
right_state_msg = self.RightHandState_subscriber.Read()
if right_state_msg is not None:
if hasattr(right_state_msg, 'angle_act') and len(right_state_msg.angle_act) == Inspire_Num_Motors:
with self.right_hand_state_array.get_lock():
for i in range(Inspire_Num_Motors):
self.right_hand_state_array[i] = right_state_msg.angle_act[i] / 1000.0
else:
logger_mp.warning(f"[Inspire_Controller] Received right_state_msg but attributes are missing or incorrect. Type: {type(right_state_msg)}, Content: {str(right_state_msg)[:100]}")
time.sleep(0.002)
def _send_hand_command(self, left_angle_cmd_scaled, right_angle_cmd_scaled):
"""
Send scaled angle commands [0-1000] to both hands.
"""
# Left Hand Command
left_cmd_msg = inspire_hand_defaut.get_inspire_hand_ctrl()
left_cmd_msg.angle_set = left_angle_cmd_scaled
left_cmd_msg.mode = 0b0001 # Mode 1: Angle control
self.LeftHandCmd_publisher.Write(left_cmd_msg)
# Right Hand Command
right_cmd_msg = inspire_hand_defaut.get_inspire_hand_ctrl()
right_cmd_msg.angle_set = right_angle_cmd_scaled
right_cmd_msg.mode = 0b0001 # Mode 1: Angle control
self.RightHandCmd_publisher.Write(right_cmd_msg)
def control_process(self, left_hand_array, right_hand_array, left_hand_state_array, right_hand_state_array,
dual_hand_data_lock = None, dual_hand_state_array = None, dual_hand_action_array = None):
logger_mp.info("[Inspire_Controller] Control process started.")
self.running = True
left_q_target = np.full(Inspire_Num_Motors, 1.0)
right_q_target = np.full(Inspire_Num_Motors, 1.0)
try:
while self.running:
start_time = time.time()
# get dual hand state
with left_hand_array.get_lock():
left_hand_data = np.array(left_hand_array[:]).reshape(25, 3).copy()
with right_hand_array.get_lock():
right_hand_data = np.array(right_hand_array[:]).reshape(25, 3).copy()
# Read left and right q_state from shared arrays
state_data = np.concatenate((np.array(left_hand_state_array[:]), np.array(right_hand_state_array[:])))
if not np.all(right_hand_data == 0.0) and not np.all(left_hand_data[4] == np.array([-1.13, 0.3, 0.15])): # if hand data has been initialized.
ref_left_value = left_hand_data[self.hand_retargeting.left_indices[1,:]] - left_hand_data[self.hand_retargeting.left_indices[0,:]]
ref_right_value = right_hand_data[self.hand_retargeting.right_indices[1,:]] - right_hand_data[self.hand_retargeting.right_indices[0,:]]
left_q_target = self.hand_retargeting.left_retargeting.retarget(ref_left_value)[self.hand_retargeting.left_dex_retargeting_to_hardware]
right_q_target = self.hand_retargeting.right_retargeting.retarget(ref_right_value)[self.hand_retargeting.right_dex_retargeting_to_hardware]
def normalize(val, min_val, max_val):
return np.clip((max_val - val) / (max_val - min_val), 0.0, 1.0)
for idx in range(Inspire_Num_Motors):
if idx <= 3:
left_q_target[idx] = normalize(left_q_target[idx], 0.0, 1.7)
right_q_target[idx] = normalize(right_q_target[idx], 0.0, 1.7)
elif idx == 4:
left_q_target[idx] = normalize(left_q_target[idx], 0.0, 0.5)
right_q_target[idx] = normalize(right_q_target[idx], 0.0, 0.5)
elif idx == 5:
left_q_target[idx] = normalize(left_q_target[idx], -0.1, 1.3)
right_q_target[idx] = normalize(right_q_target[idx], -0.1, 1.3)
scaled_left_cmd = [int(np.clip(val * 1000, 0, 1000)) for val in left_q_target]
scaled_right_cmd = [int(np.clip(val * 1000, 0, 1000)) for val in right_q_target]
# get dual hand action
action_data = np.concatenate((left_q_target, right_q_target))
if dual_hand_state_array and dual_hand_action_array:
with dual_hand_data_lock:
dual_hand_state_array[:] = state_data
dual_hand_action_array[:] = action_data
self._send_hand_command(scaled_left_cmd, scaled_right_cmd)
current_time = time.time()
time_elapsed = current_time - start_time
sleep_time = max(0, (1 / self.fps) - time_elapsed)
time.sleep(sleep_time)
finally:
logger_mp.info("Inspire_Controller has been closed.")
if __name__ == '__main__':
logger_mp.info("Starting Inspire_Controller example...")
mock_left_hand_input = Array('d', 75, lock=True)
mock_right_hand_input = Array('d', 75, lock=True)
with mock_right_hand_input.get_lock():
for i in range(len(mock_right_hand_input)):
mock_right_hand_input[i] = (i % 10) * 0.01
with mock_left_hand_input.get_lock():
temp_left_mat = np.zeros((25,3))
temp_left_mat[4] = np.array([-1.13, 0.3, 0.15])
mock_left_hand_input[:] = temp_left_mat.flatten()
shared_lock = Lock()
shared_state = Array('d', Inspire_Num_Motors * 2, lock=False)
shared_action = Array('d', Inspire_Num_Motors * 2, lock=False)
try:
controller = Inspire_Controller(
left_hand_array=mock_left_hand_input,
right_hand_array=mock_right_hand_input,
dual_hand_data_lock=shared_lock,
dual_hand_state_array=shared_state,
dual_hand_action_array=shared_action,
fps=50.0,
Unit_Test=False,
)
count = 0
main_loop_running = True
while main_loop_running:
try:
time.sleep(1.0)
# Simulate a slight change in human hand input
with mock_right_hand_input.get_lock():
mock_right_hand_input[inspire_tip_indices[0]*3 + 1] = 0.1 + (count % 10) * 0.02
with shared_lock:
print(f"Cycle {count} - Logged State: {[f'{x:.3f}' for x in shared_state[:]]}, Logged Action: {[f'{x:.3f}' for x in shared_action[:]]}")
count +=1
if count > 3000 : # Increased run time for more observation
print("Example finished after 3000 cycles.")
main_loop_running = False
except KeyboardInterrupt:
print("Main loop interrupted. Finishing example.")
main_loop_running = False
except Exception as e:
print(f"An error occurred in the example: {e}")
finally:
print("Exiting main program.")

3
teleop/teleop_hand_and_arm.py

@ -16,8 +16,7 @@ from televuer import TeleVuerWrapper
from teleop.robot_control.robot_arm import G1_29_ArmController, G1_23_ArmController, H1_2_ArmController, H1_ArmController
from teleop.robot_control.robot_arm_ik import G1_29_ArmIK, G1_23_ArmIK, H1_2_ArmIK, H1_ArmIK
from teleop.robot_control.robot_hand_unitree import Dex3_1_Controller, Dex1_1_Gripper_Controller
from teleop.robot_control.robot_hand_inspire_dfx import Inspire_Controller_DFX
from teleop.robot_control.robot_hand_inspire_ftp import Inspire_Controller_FTP
from teleop.robot_control.robot_hand_inspire import Inspire_Controller_DFX, Inspire_Controller_FTP
from teleop.robot_control.robot_hand_brainco import Brainco_Controller
from teleimager.image_client import ImageClient
from teleop.utils.episode_writer import EpisodeWriter

Loading…
Cancel
Save