Browse Source
[update] 1. discard temp dds 2.mono|bino camera support 3. optim hand_unitree 4. more comments
main
[update] 1. discard temp dds 2.mono|bino camera support 3. optim hand_unitree 4. more comments
main
8 changed files with 404 additions and 203 deletions
-
22teleop/image_server/image_client.py
-
28teleop/image_server/image_server.py
-
43teleop/open_television/television.py
-
4teleop/open_television/tv_wrapper.py
-
36teleop/robot_control/hand_retargeting.py
-
126teleop/robot_control/robot_arm.py
-
272teleop/robot_control/robot_hand_unitree.py
-
62teleop/teleop_hand_and_arm.py
@ -1,97 +1,249 @@ |
|||
from unitree_dds_wrapper.robots.trihand.trihand_pub_cmd import UnitreeTrihand as trihand_pub |
|||
from unitree_dds_wrapper.robots.trihand.trihand_sub_state import UnitreeTrihand as trihand_sub |
|||
from unitree_sdk2py.core.channel import ChannelPublisher, ChannelSubscriber, ChannelFactoryInitialize # dds |
|||
from unitree_sdk2py.idl.unitree_hg.msg.dds_ import HandCmd_, HandState_ # idl |
|||
from unitree_sdk2py.idl.default import unitree_hg_msg_dds__HandCmd_ |
|||
|
|||
import numpy as np |
|||
from enum import IntEnum |
|||
import time |
|||
from multiprocessing import Array |
|||
from teleop.robot_control.hand_retargeting import HandRetargeting, HandType |
|||
import os |
|||
import sys |
|||
import threading |
|||
from multiprocessing import Process, shared_memory, Array, Lock |
|||
|
|||
parent2_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
|||
sys.path.append(parent2_dir) |
|||
from teleop.robot_control.hand_retargeting import HandRetargeting, HandType |
|||
|
|||
|
|||
unitree_tip_indices = [4, 9, 14] |
|||
|
|||
Dex3_Num_Motors = 7 |
|||
kTopicDex3LeftCommand = "rt/dex3/left/cmd" |
|||
kTopicDex3RightCommand = "rt/dex3/right/cmd" |
|||
kTopicDex3LeftState = "rt/dex3/left/state" |
|||
kTopicDex3RightState = "rt/dex3/right/state" |
|||
|
|||
|
|||
class Dex3_1_Controller: |
|||
def __init__(self, fps = 100.0): |
|||
self.dex3_pub = trihand_pub() |
|||
|
|||
kp = np.full(7, 1.5) |
|||
kd = np.full(7, 0.2) |
|||
q = np.full(7,0.0) |
|||
dq = np.full(7,0.0) |
|||
tau = np.full(7,0.0) |
|||
self.dex3_pub.left_hand.kp = kp |
|||
self.dex3_pub.left_hand.kd = kd |
|||
self.dex3_pub.left_hand.q = q |
|||
self.dex3_pub.left_hand.dq = dq |
|||
self.dex3_pub.left_hand.tau = tau |
|||
|
|||
self.dex3_pub.right_hand.kp = kp |
|||
self.dex3_pub.right_hand.kd = kd |
|||
self.dex3_pub.right_hand.q = q |
|||
self.dex3_pub.right_hand.dq = dq |
|||
self.dex3_pub.right_hand.tau = tau |
|||
|
|||
self.dual_hand_state_array = [0.0] * 14 |
|||
self.lr_hand_state_lock = threading.Lock() |
|||
# self.dual_hand_state_array = Array('d', 14, lock=True) |
|||
|
|||
self.sub_state = trihand_sub() |
|||
self.sub_state.wait_for_connection() |
|||
|
|||
self.subscribe_state_thread = threading.Thread(target=self.subscribe_state) |
|||
def __init__(self, left_hand_array, right_hand_array, dual_hand_data_lock = None, dual_hand_state_array = None, |
|||
dual_hand_action_array = None, fps = 100.0, Unit_Test = False): |
|||
""" |
|||
[note] A *_array type parameter requires using a multiprocessing Array, because it needs to be passed to the internal child process |
|||
|
|||
left_hand_array: [input] Left hand skeleton data (required from XR device) to hand_ctrl.control_process |
|||
|
|||
right_hand_array: [input] Right hand skeleton data (required from XR device) to hand_ctrl.control_process |
|||
|
|||
dual_hand_data_lock: Data synchronization lock for dual_hand_state_array and dual_hand_action_array |
|||
|
|||
dual_hand_state_array: [output] Return left(7), right(7) hand motor state |
|||
|
|||
dual_hand_action_array: [output] Return left(7), right(7) hand motor action |
|||
|
|||
fps: Control frequency |
|||
|
|||
Unit_Test: Whether to enable unit testing |
|||
""" |
|||
print("Initialize Dex3_1_Controller...") |
|||
|
|||
self.fps = fps |
|||
self.Unit_Test = Unit_Test |
|||
if not self.Unit_Test: |
|||
self.hand_retargeting = HandRetargeting(HandType.UNITREE_DEX3) |
|||
else: |
|||
self.hand_retargeting = HandRetargeting(HandType.UNITREE_DEX3_Unit_Test) |
|||
ChannelFactoryInitialize(0) |
|||
|
|||
# initialize handcmd publisher and handstate subscriber |
|||
self.LeftHandCmb_publisher = ChannelPublisher(kTopicDex3LeftCommand, HandCmd_) |
|||
self.LeftHandCmb_publisher.Init() |
|||
self.RightHandCmb_publisher = ChannelPublisher(kTopicDex3RightCommand, HandCmd_) |
|||
self.RightHandCmb_publisher.Init() |
|||
|
|||
self.LeftHandState_subscriber = ChannelSubscriber(kTopicDex3LeftState, HandState_) |
|||
self.LeftHandState_subscriber.Init() |
|||
self.RightHandState_subscriber = ChannelSubscriber(kTopicDex3RightState, HandState_) |
|||
self.RightHandState_subscriber.Init() |
|||
|
|||
# Shared Arrays for hand states |
|||
self.left_hand_state_array = Array('d', Dex3_Num_Motors, lock=True) |
|||
self.right_hand_state_array = Array('d', Dex3_Num_Motors, lock=True) |
|||
|
|||
# initialize subscribe thread |
|||
self.subscribe_state_thread = threading.Thread(target=self._subscribe_hand_state) |
|||
self.subscribe_state_thread.daemon = True |
|||
self.subscribe_state_thread.start() |
|||
|
|||
self.hand_retargeting = HandRetargeting(HandType.UNITREE_DEX3) |
|||
while True: |
|||
if any(self.left_hand_state_array) and any(self.right_hand_state_array): |
|||
break |
|||
time.sleep(0.01) |
|||
print("[Dex3_1_Controller] Waiting to subscribe dds...") |
|||
|
|||
self.running = True |
|||
self.fps = fps |
|||
hand_control_process = Process(target=self.control_process, args=(left_hand_array, right_hand_array, self.left_hand_state_array, self.right_hand_state_array, |
|||
dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array)) |
|||
hand_control_process.daemon = True |
|||
hand_control_process.start() |
|||
|
|||
print("UnitreeDex3 Controller init ok.\n") |
|||
print("Initialize Dex3_1_Controller OK!\n") |
|||
|
|||
def subscribe_state(self): |
|||
def _subscribe_hand_state(self): |
|||
while True: |
|||
lq,rq= self.sub_state.sub() |
|||
with self.lr_hand_state_lock: |
|||
self.dual_hand_state_array[:] = np.concatenate((lq, rq)) |
|||
# self.dual_hand_state_array[:] = np.concatenate((lq,rq)) |
|||
left_hand_msg = self.LeftHandState_subscriber.Read() |
|||
right_hand_msg = self.RightHandState_subscriber.Read() |
|||
if left_hand_msg is not None and right_hand_msg is not None: |
|||
# Update left hand state |
|||
for idx, id in enumerate(Dex3_1_Left_JointIndex): |
|||
self.left_hand_state_array[idx] = left_hand_msg.motor_state[id].q |
|||
# Update right hand state |
|||
for idx, id in enumerate(Dex3_1_Right_JointIndex): |
|||
self.right_hand_state_array[idx] = right_hand_msg.motor_state[id].q |
|||
time.sleep(0.002) |
|||
|
|||
class _RIS_Mode: |
|||
def __init__(self, id=0, status=0x01, timeout=0): |
|||
self.motor_mode = 0 |
|||
self.id = id & 0x0F # 4 bits for id |
|||
self.status = status & 0x07 # 3 bits for status |
|||
self.timeout = timeout & 0x01 # 1 bit for timeout |
|||
|
|||
def ctrl(self, left_angles, right_angles): |
|||
def _mode_to_uint8(self): |
|||
self.motor_mode |= (self.id & 0x0F) |
|||
self.motor_mode |= (self.status & 0x07) << 4 |
|||
self.motor_mode |= (self.timeout & 0x01) << 7 |
|||
return self.motor_mode |
|||
|
|||
def ctrl_dual_hand(self, left_q_target, right_q_target): |
|||
"""set current left, right hand motor state target q""" |
|||
self.dex3_pub.left_hand.q = left_angles |
|||
self.dex3_pub.right_hand.q = right_angles |
|||
self.dex3_pub.pub() |
|||
for idx, id in enumerate(Dex3_1_Left_JointIndex): |
|||
self.left_msg.motor_cmd[id].q = left_q_target[idx] |
|||
for idx, id in enumerate(Dex3_1_Right_JointIndex): |
|||
self.right_msg.motor_cmd[id].q = right_q_target[idx] |
|||
|
|||
self.LeftHandCmb_publisher.Write(self.left_msg) |
|||
self.RightHandCmb_publisher.Write(self.right_msg) |
|||
# print("hand ctrl publish ok.") |
|||
|
|||
def get_current_dual_hand_q(self): |
|||
"""return current left, right hand motor state q""" |
|||
with self.lr_hand_state_lock: |
|||
return self.dual_hand_state_array[:].copy() |
|||
def control_process(self, left_hand_array, right_hand_array, left_hand_state_array, right_hand_state_array, |
|||
dual_hand_data_lock = None, dual_hand_state_array = None, dual_hand_action_array = None): |
|||
self.running = True |
|||
|
|||
left_q_target = np.full(Dex3_Num_Motors, 0) |
|||
right_q_target = np.full(Dex3_Num_Motors, 0) |
|||
|
|||
q = 0.0 |
|||
dq = 0.0 |
|||
tau = 0.0 |
|||
kp = 1.5 |
|||
kd = 0.2 |
|||
|
|||
# initialize dex3-1's left hand cmd msg |
|||
self.left_msg = unitree_hg_msg_dds__HandCmd_() |
|||
for id in Dex3_1_Left_JointIndex: |
|||
ris_mode = self._RIS_Mode(id = id, status = 0x01) |
|||
motor_mode = ris_mode._mode_to_uint8() |
|||
self.left_msg.motor_cmd[id].mode = motor_mode |
|||
self.left_msg.motor_cmd[id].q = q |
|||
self.left_msg.motor_cmd[id].dq = dq |
|||
self.left_msg.motor_cmd[id].tau = tau |
|||
self.left_msg.motor_cmd[id].kp = kp |
|||
self.left_msg.motor_cmd[id].kd = kd |
|||
|
|||
# initialize dex3-1's right hand cmd msg |
|||
self.right_msg = unitree_hg_msg_dds__HandCmd_() |
|||
for id in Dex3_1_Right_JointIndex: |
|||
ris_mode = self._RIS_Mode(id = id, status = 0x01) |
|||
motor_mode = ris_mode._mode_to_uint8() |
|||
self.right_msg.motor_cmd[id].mode = motor_mode |
|||
self.right_msg.motor_cmd[id].q = q |
|||
self.right_msg.motor_cmd[id].dq = dq |
|||
self.right_msg.motor_cmd[id].tau = tau |
|||
self.right_msg.motor_cmd[id].kp = kp |
|||
self.right_msg.motor_cmd[id].kd = kd |
|||
|
|||
def control_process(self, left_hand_array, right_hand_array, dual_hand_state_array = None, dual_hand_aciton_array = None): |
|||
left_qpos = np.full(7, 0) |
|||
right_qpos = np.full(7, 0) |
|||
try: |
|||
while self.running: |
|||
start_time = time.time() |
|||
# get dual hand state |
|||
left_hand_mat = np.array(left_hand_array[:]).reshape(25, 3).copy() |
|||
right_hand_mat = np.array(right_hand_array[:]).reshape(25, 3).copy() |
|||
|
|||
is_initial = np.all(left_hand_mat == 0.0) |
|||
if not is_initial: |
|||
left_qpos = self.hand_retargeting.left_retargeting.retarget(left_hand_mat[unitree_tip_indices])[[0,1,2,3,4,5,6]] |
|||
right_qpos = self.hand_retargeting.right_retargeting.retarget(right_hand_mat[unitree_tip_indices])[[0,1,2,3,4,5,6]] |
|||
# Read left and right q_state from shared arrays |
|||
state_data = np.concatenate((np.array(left_hand_state_array[:]), np.array(right_hand_state_array[:]))) |
|||
|
|||
self.ctrl(left_qpos, right_qpos) |
|||
is_hand_data_initialized = not np.all(left_hand_mat == 0.0) |
|||
if is_hand_data_initialized: |
|||
left_q_target = self.hand_retargeting.left_retargeting.retarget(left_hand_mat[unitree_tip_indices])[[0,1,2,3,4,5,6]] |
|||
right_q_target = self.hand_retargeting.right_retargeting.retarget(right_hand_mat[unitree_tip_indices])[[0,1,2,3,4,5,6]] |
|||
|
|||
if dual_hand_state_array and dual_hand_aciton_array: |
|||
dual_hand_state_array[:] = self.get_current_dual_hand_q() |
|||
dual_hand_aciton_array[:] = np.concatenate((left_qpos, right_qpos)) |
|||
# get dual hand action |
|||
action_data = np.concatenate((left_q_target, right_q_target)) |
|||
if dual_hand_state_array and dual_hand_action_array: |
|||
with dual_hand_data_lock: |
|||
dual_hand_state_array[:] = state_data |
|||
dual_hand_action_array[:] = action_data |
|||
|
|||
self.ctrl_dual_hand(left_q_target, right_q_target) |
|||
current_time = time.time() |
|||
time_elapsed = current_time - start_time |
|||
sleep_time = max(0, (1 / self.fps) - time_elapsed) |
|||
time.sleep(sleep_time) |
|||
finally: |
|||
print("Dex3_1_Controller has been closed.") |
|||
|
|||
class Dex3_1_Left_JointIndex(IntEnum): |
|||
kLeftHandZero = 0 # thumb_0 |
|||
kLeftHandOne = 1 # thumb_1 |
|||
kLeftHandTwo = 2 # thumb_2 |
|||
kLeftHandFive = 3 # middle_0 |
|||
kLeftHandSix = 4 # middle_0 |
|||
kLeftHandThree = 5 # index_0 |
|||
kLeftHandFour = 6 # index_1 |
|||
|
|||
class Dex3_1_Right_JointIndex(IntEnum): |
|||
kRightHandZero = 0 # thumb_0 |
|||
kRightHandOne = 1 # thumb_1 |
|||
kRightHandTwo = 2 # thumb_2 |
|||
kRightHandThree = 3 # index_0 |
|||
kRightHandFour = 4 # index_1 |
|||
kRightHandFive = 5 # middle_0 |
|||
kRightHandSix = 6 # middle_1 |
|||
|
|||
|
|||
if __name__ == "__main__": |
|||
import argparse |
|||
from teleop.open_television.tv_wrapper import TeleVisionWrapper |
|||
from teleop.image_server.image_client import ImageClient |
|||
|
|||
parser = argparse.ArgumentParser() |
|||
parser.add_argument('--binocular', action='store_true', help='Use binocular camera') |
|||
parser.add_argument('--monocular', dest='binocular', action='store_false', help='Use monocular camera') |
|||
parser.set_defaults(binocular=True) |
|||
args = parser.parse_args() |
|||
print(f"args:{args}\n") |
|||
|
|||
# image |
|||
img_shape = (720, 1280, 3) |
|||
img_shm = shared_memory.SharedMemory(create=True, size=np.prod(img_shape) * np.uint8().itemsize) |
|||
img_array = np.ndarray(img_shape, dtype=np.uint8, buffer=img_shm.buf) |
|||
img_client = ImageClient(img_shape = img_shape, img_shm_name = img_shm.name) |
|||
image_receive_thread = threading.Thread(target=img_client.receive_process, daemon=True) |
|||
image_receive_thread.daemon = True |
|||
image_receive_thread.start() |
|||
|
|||
# television and arm |
|||
tv_wrapper = TeleVisionWrapper(args.binocular, img_shape, img_shm.name) |
|||
|
|||
left_hand_array = Array('d', 75, lock=True) |
|||
right_hand_array = Array('d', 75, lock=True) |
|||
dual_hand_data_lock = Lock() |
|||
dual_hand_state_array = Array('d', 14, lock=False) # current left, right hand state(14) data. |
|||
dual_hand_action_array = Array('d', 14, lock=False) # current left, right hand action(14) data. |
|||
hand_ctrl = Dex3_1_Controller(left_hand_array, right_hand_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array, Unit_Test = True) |
|||
|
|||
|
|||
user_input = input("Please enter the start signal (enter 's' to start the subsequent program):\n") |
|||
if user_input.lower() == 's': |
|||
while True: |
|||
print(f"{dual_hand_state_array[1]}") |
|||
time.sleep(0.1) |
|||
Write
Preview
Loading…
Cancel
Save
Reference in new issue