Browse Source

[update] optimize interface

main
silencht 10 months ago
parent
commit
85ffd6b8de
  1. 11
      teleop/open_television/_test_television.py
  2. 10
      teleop/open_television/_test_tv_wrapper.py
  3. 24
      teleop/open_television/setup.py
  4. 47
      teleop/open_television/television.py
  5. 5
      teleop/open_television/tv_wrapper.py
  6. 28
      teleop/robot_control/robot_arm.py
  7. 50
      teleop/teleop_hand_and_arm.py

11
teleop/open_television/_test_television.py

@ -15,9 +15,16 @@ def run_test_television():
image_shm = shared_memory.SharedMemory(create=True, size=np.prod(image_shape) * np.uint8().itemsize) image_shm = shared_memory.SharedMemory(create=True, size=np.prod(image_shape) * np.uint8().itemsize)
image_array = np.ndarray(image_shape, dtype=np.uint8, buffer=image_shm.buf) image_array = np.ndarray(image_shape, dtype=np.uint8, buffer=image_shm.buf)
# from image_server.image_client import ImageClient
# import threading
# image_client = ImageClient(tv_img_shape = image_shape, tv_img_shm_name = image_shm.name, image_show=True, server_address="127.0.0.1")
# image_receive_thread = threading.Thread(target = image_client.receive_process, daemon = True)
# image_receive_thread.daemon = True
# image_receive_thread.start()
# television # television
use_hand_track = False
tv = TeleVision(binocular = True, use_hand_tracking = use_hand_track, img_shape = image_shape, img_shm_name = image_shm.name, webrtc=True)
use_hand_track = True
tv = TeleVision(binocular = True, use_hand_tracking = use_hand_track, img_shape = image_shape, img_shm_name = image_shm.name, webrtc=False)
try: try:
input("Press Enter to start television test...") input("Press Enter to start television test...")

10
teleop/open_television/_test_tv_wrapper.py

@ -13,7 +13,15 @@ def run_test_tv_wrapper():
image_shape = (480, 640 * 2, 3) image_shape = (480, 640 * 2, 3)
image_shm = shared_memory.SharedMemory(create=True, size=np.prod(image_shape) * np.uint8().itemsize) image_shm = shared_memory.SharedMemory(create=True, size=np.prod(image_shape) * np.uint8().itemsize)
image_array = np.ndarray(image_shape, dtype=np.uint8, buffer=image_shm.buf) image_array = np.ndarray(image_shape, dtype=np.uint8, buffer=image_shm.buf)
use_hand_track=False
# from image_server.image_client import ImageClient
# import threading
# image_client = ImageClient(tv_img_shape = image_shape, tv_img_shm_name = image_shm.name, image_show=True, server_address="127.0.0.1")
# image_receive_thread = threading.Thread(target = image_client.receive_process, daemon = True)
# image_receive_thread.daemon = True
# image_receive_thread.start()
use_hand_track=True
tv_wrapper = TeleVisionWrapper(binocular=True, use_hand_tracking=use_hand_track, img_shape=image_shape, img_shm_name=image_shm.name, tv_wrapper = TeleVisionWrapper(binocular=True, use_hand_tracking=use_hand_track, img_shape=image_shape, img_shm_name=image_shm.name,
return_state_data=True, return_hand_rot_data = True) return_state_data=True, return_hand_rot_data = True)
try: try:

24
teleop/open_television/setup.py

@ -14,7 +14,29 @@ setup(
'aiohttp_cors==0.7.0', 'aiohttp_cors==0.7.0',
'aiortc==1.8.0', 'aiortc==1.8.0',
'av==11.0.0', 'av==11.0.0',
'vuer==0.0.42rc16',
# 'vuer==0.0.32rc7', # avp \ pico, hand tracking all work fine.
# 'vuer==0.0.35', # avp hand tracking works fine. pico is fine too, but the right eye occasionally goes black for a short time at start.
# from 'vuer==0.0.36rc1', # avp hand tracking only shows a flat RGB image — there's no stereo view. Pico (hand / controller tracking) is the same,
# and sometimes the right eye occasionally goes black for a short time at start.
# Both avp / pico can display the hand or controller marker, which looks like a black box.
# to 'vuer==0.0.42rc16', # avp hand tracking only shows a flat RGB image — there's no stereo view. Pico (hand / controller tracking) is the same,
# and sometimes the right eye occasionally goes black for a short time at start.
# Both avp / pico can display the hand or controller marker, which looks like RGB three-axis color coordinates.
# from 'vuer==0.0.42', # avp hand tracking only shows a flat RGB image — there's no stereo view.
# to 'vuer==0.0.45', pico hand tracking also only shows a flat RGB image — there's no stereo view. Unable to retrieve hand tracking data.
# pico controller tracking also only shows a flat RGB image — there's no stereo view. Controller data can be retrieved.
# from 'vuer==0.0.46' # avp hand tracking work fine.
# to
'vuer==0.0.56', # In pico hand tracking mode, using a hand gesture to click the "Virtual Reality" button
# causes the screen to stay black and stuck loading. But if the button is clicked with the controller instead, everything works fine.
# In pico controller tracking mode, using controller to click the "Virtual Reality" button
# causes the screen to stay black and stuck loading. But if the button is clicked with a hand gesture instead, everything works fine.
# avp \ pico all have hand marker visualization (RGB three-axis color coordinates).
], ],
python_requires='>=3.8', python_requires='>=3.8',
) )

47
teleop/open_television/television.py

@ -30,7 +30,6 @@ class TeleVision:
self.img_width = img_shape[1] self.img_width = img_shape[1]
current_module_dir = os.path.dirname(os.path.abspath(__file__)) current_module_dir = os.path.dirname(os.path.abspath(__file__))
print(f"current_module_dir: {current_module_dir}")
if cert_file is None: if cert_file is None:
cert_file = os.path.join(current_module_dir, "cert.pem") cert_file = os.path.join(current_module_dir, "cert.pem")
if key_file is None: if key_file is None:
@ -193,9 +192,25 @@ class TeleVision:
async def main_image_binocular(self, session, fps=60): async def main_image_binocular(self, session, fps=60):
if self.use_hand_tracking: if self.use_hand_tracking:
session.upsert @ Hands(fps=fps, stream=True, key="hands", showLeft=False, showRight=False)
session.upsert(
Hands(
stream=True,
key="hands",
showLeft=False,
showRight=False
),
to="bgChildren",
)
else: else:
session.upsert @ MotionControllers(fps=fps, stream=True, key="motion-controller", showLeft=False, showRight=False)
session.upsert(
MotionControllers(
stream=True,
key="motionControllers",
showLeft=False,
showRight=False,
),
to="bgChildren",
)
while True: while True:
display_image = cv2.cvtColor(self.img_array, cv2.COLOR_BGR2RGB) display_image = cv2.cvtColor(self.img_array, cv2.COLOR_BGR2RGB)
@ -212,7 +227,7 @@ class TeleVision:
# Note that these two masks are associated with left eye’s camera and the right eye’s camera. # Note that these two masks are associated with left eye’s camera and the right eye’s camera.
layers=1, layers=1,
format="jpeg", format="jpeg",
quality=90,
quality=100,
key="background-left", key="background-left",
interpolate=True, interpolate=True,
), ),
@ -223,7 +238,7 @@ class TeleVision:
distanceToCamera=1, distanceToCamera=1,
layers=2, layers=2,
format="jpeg", format="jpeg",
quality=90,
quality=100,
key="background-right", key="background-right",
interpolate=True, interpolate=True,
), ),
@ -235,9 +250,25 @@ class TeleVision:
async def main_image_monocular(self, session, fps=60): async def main_image_monocular(self, session, fps=60):
if self.use_hand_tracking: if self.use_hand_tracking:
session.upsert @ Hands(fps=fps, stream=True, key="hands", showLeft=False, showRight=False)
session.upsert(
Hands(
stream=True,
key="hands",
showLeft=False,
showRight=False
),
to="bgChildren",
)
else: else:
session.upsert @ MotionControllers(fps=fps, stream=True, key="motion-controller", showLeft=False, showRight=False)
session.upsert(
MotionControllers(
stream=True,
key="motionControllers",
showLeft=False,
showRight=False,
),
to="bgChildren",
)
while True: while True:
display_image = cv2.cvtColor(self.img_array, cv2.COLOR_BGR2RGB) display_image = cv2.cvtColor(self.img_array, cv2.COLOR_BGR2RGB)
@ -274,7 +305,7 @@ class TeleVision:
session.upsert( session.upsert(
MotionControllers( MotionControllers(
stream=True, stream=True,
key="motion-controller",
key="motionControllers",
showLeft=False, showLeft=False,
showRight=False, showRight=False,
) )

5
teleop/open_television/tv_wrapper.py

@ -182,7 +182,7 @@ class TeleData:
class TeleVisionWrapper: class TeleVisionWrapper:
def __init__(self, binocular: bool, use_hand_tracking: bool, img_shape, img_shm_name, return_state_data: bool = True, return_hand_rot_data: bool = False, def __init__(self, binocular: bool, use_hand_tracking: bool, img_shape, img_shm_name, return_state_data: bool = True, return_hand_rot_data: bool = False,
cert_file = None, key_file = None):
cert_file = None, key_file = None, ngrok = False, webrtc = False):
""" """
TeleVisionWrapper is a wrapper for the TeleVision class, which handles XR device's data suit for robot control. TeleVisionWrapper is a wrapper for the TeleVision class, which handles XR device's data suit for robot control.
It initializes the TeleVision instance with the specified parameters and provides a method to get motion state data. It initializes the TeleVision instance with the specified parameters and provides a method to get motion state data.
@ -199,7 +199,8 @@ class TeleVisionWrapper:
self.use_hand_tracking = use_hand_tracking self.use_hand_tracking = use_hand_tracking
self.return_state_data = return_state_data self.return_state_data = return_state_data
self.return_hand_rot_data = return_hand_rot_data self.return_hand_rot_data = return_hand_rot_data
self.tvuer = TeleVision(binocular, use_hand_tracking, img_shape, img_shm_name, cert_file=cert_file, key_file=key_file)
self.tvuer = TeleVision(binocular, use_hand_tracking, img_shape, img_shm_name, cert_file=cert_file, key_file=key_file,
ngrok=ngrok, webrtc=webrtc)
def get_motion_state_data(self): def get_motion_state_data(self):
""" """

28
teleop/robot_control/robot_arm.py

@ -11,9 +11,9 @@ from unitree_sdk2py.utils.crc import CRC
from unitree_sdk2py.idl.unitree_go.msg.dds_ import ( LowCmd_ as go_LowCmd, LowState_ as go_LowState) # idl for h1 from unitree_sdk2py.idl.unitree_go.msg.dds_ import ( LowCmd_ as go_LowCmd, LowState_ as go_LowState) # idl for h1
from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowCmd_ from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowCmd_
# kTopicLowCommand = "rt/lowcmd"
kTopicLowCommand_Debug = "rt/lowcmd"
kTopicLowCommand_Motion = "rt/arm_sdk"
kTopicLowState = "rt/lowstate" kTopicLowState = "rt/lowstate"
kTopicLowCommand = "rt/arm_sdk"
G1_29_Num_Motors = 35 G1_29_Num_Motors = 35
G1_23_Num_Motors = 35 G1_23_Num_Motors = 35
@ -56,10 +56,11 @@ class DataBuffer:
self.data = data self.data = data
class G1_29_ArmController: class G1_29_ArmController:
def __init__(self):
def __init__(self, debug_mode = True):
print("Initialize G1_29_ArmController...") print("Initialize G1_29_ArmController...")
self.q_target = np.zeros(14) self.q_target = np.zeros(14)
self.tauff_target = np.zeros(14) self.tauff_target = np.zeros(14)
self.debug_mode = debug_mode
self.kp_high = 300.0 self.kp_high = 300.0
self.kd_high = 3.0 self.kd_high = 3.0
@ -78,7 +79,10 @@ class G1_29_ArmController:
# initialize lowcmd publisher and lowstate subscriber # initialize lowcmd publisher and lowstate subscriber
ChannelFactoryInitialize(0) ChannelFactoryInitialize(0)
self.lowcmd_publisher = ChannelPublisher(kTopicLowCommand, hg_LowCmd)
if self.debug_mode:
self.lowcmd_publisher = ChannelPublisher(kTopicLowCommand_Debug, hg_LowCmd)
else:
self.lowcmd_publisher = ChannelPublisher(kTopicLowCommand_Motion, hg_LowCmd)
self.lowcmd_publisher.Init() self.lowcmd_publisher.Init()
self.lowstate_subscriber = ChannelSubscriber(kTopicLowState, hg_LowState) self.lowstate_subscriber = ChannelSubscriber(kTopicLowState, hg_LowState)
self.lowstate_subscriber.Init() self.lowstate_subscriber.Init()
@ -151,7 +155,8 @@ class G1_29_ArmController:
return cliped_arm_q_target return cliped_arm_q_target
def _ctrl_motor_state(self): def _ctrl_motor_state(self):
self.msg.motor_cmd[G1_29_JointIndex.kNotUsedJoint0].q = 1.0;
if self.debug_mode:
self.msg.motor_cmd[G1_29_JointIndex.kNotUsedJoint0].q = 1.0;
while True: while True:
start_time = time.time() start_time = time.time()
@ -213,9 +218,10 @@ class G1_29_ArmController:
while True: while True:
current_q = self.get_current_dual_arm_q() current_q = self.get_current_dual_arm_q()
if np.all(np.abs(current_q) < tolerance): if np.all(np.abs(current_q) < tolerance):
for weight in np.arange(1, 0, -0.01):
self.msg.motor_cmd[G1_29_JointIndex.kNotUsedJoint0].q = weight;
time.sleep(0.02)
if self.debug_mode:
for weight in np.arange(1, 0, -0.01):
self.msg.motor_cmd[G1_29_JointIndex.kNotUsedJoint0].q = weight;
time.sleep(0.02)
print("[G1_29_ArmController] both arms have reached the home position.") print("[G1_29_ArmController] both arms have reached the home position.")
break break
time.sleep(0.05) time.sleep(0.05)
@ -347,7 +353,7 @@ class G1_23_ArmController:
# initialize lowcmd publisher and lowstate subscriber # initialize lowcmd publisher and lowstate subscriber
ChannelFactoryInitialize(0) ChannelFactoryInitialize(0)
self.lowcmd_publisher = ChannelPublisher(kTopicLowCommand, hg_LowCmd)
self.lowcmd_publisher = ChannelPublisher(kTopicLowCommand_Debug, hg_LowCmd)
self.lowcmd_publisher.Init() self.lowcmd_publisher.Init()
self.lowstate_subscriber = ChannelSubscriber(kTopicLowState, hg_LowState) self.lowstate_subscriber = ChannelSubscriber(kTopicLowState, hg_LowState)
self.lowstate_subscriber.Init() self.lowstate_subscriber.Init()
@ -603,7 +609,7 @@ class H1_2_ArmController:
# initialize lowcmd publisher and lowstate subscriber # initialize lowcmd publisher and lowstate subscriber
ChannelFactoryInitialize(0) ChannelFactoryInitialize(0)
self.lowcmd_publisher = ChannelPublisher(kTopicLowCommand, hg_LowCmd)
self.lowcmd_publisher = ChannelPublisher(kTopicLowCommand_Debug, hg_LowCmd)
self.lowcmd_publisher.Init() self.lowcmd_publisher.Init()
self.lowstate_subscriber = ChannelSubscriber(kTopicLowState, hg_LowState) self.lowstate_subscriber = ChannelSubscriber(kTopicLowState, hg_LowState)
self.lowstate_subscriber.Init() self.lowstate_subscriber.Init()
@ -864,7 +870,7 @@ class H1_ArmController:
# initialize lowcmd publisher and lowstate subscriber # initialize lowcmd publisher and lowstate subscriber
ChannelFactoryInitialize(0) ChannelFactoryInitialize(0)
self.lowcmd_publisher = ChannelPublisher(kTopicLowCommand, go_LowCmd)
self.lowcmd_publisher = ChannelPublisher(kTopicLowCommand_Debug, go_LowCmd)
self.lowcmd_publisher.Init() self.lowcmd_publisher.Init()
self.lowstate_subscriber = ChannelSubscriber(kTopicLowState, go_LowState) self.lowstate_subscriber = ChannelSubscriber(kTopicLowState, go_LowState)
self.lowstate_subscriber.Init() self.lowstate_subscriber.Init()

50
teleop/teleop_hand_and_arm.py

@ -78,7 +78,8 @@ if __name__ == '__main__':
image_receive_thread.start() image_receive_thread.start()
# television: obtain hand pose data from the XR device and transmit the robot's head camera image to the XR device. # television: obtain hand pose data from the XR device and transmit the robot's head camera image to the XR device.
tv_wrapper = TeleVisionWrapper(BINOCULAR, args.xr_mode == 'hand', tv_img_shape, tv_img_shm.name)
tv_wrapper = TeleVisionWrapper(binocular=BINOCULAR, use_hand_tracking=args.xr_mode == 'hand', img_shape=tv_img_shape, img_shm_name=tv_img_shm.name,
return_state_data=True, return_hand_rot_data = False)
# arm # arm
if args.arm == 'G1_29': if args.arm == 'G1_29':
@ -96,8 +97,8 @@ if __name__ == '__main__':
# end-effector # end-effector
if args.ee == "dex3": if args.ee == "dex3":
left_hand_pos_array = Array('d', 75, lock = True) # [input]
right_hand_pos_array = Array('d', 75, lock = True) # [input]
left_hand_pos_array = Array('d', 75, lock = True) # [input]
right_hand_pos_array = Array('d', 75, lock = True) # [input]
dual_hand_data_lock = Lock() dual_hand_data_lock = Lock()
dual_hand_state_array = Array('d', 14, lock = False) # [output] current left, right hand state(14) data. dual_hand_state_array = Array('d', 14, lock = False) # [output] current left, right hand state(14) data.
dual_hand_action_array = Array('d', 14, lock = False) # [output] current left, right hand action(14) data. dual_hand_action_array = Array('d', 14, lock = False) # [output] current left, right hand action(14) data.
@ -110,8 +111,8 @@ if __name__ == '__main__':
dual_gripper_action_array = Array('d', 2, lock=False) # current left, right gripper action(2) data. dual_gripper_action_array = Array('d', 2, lock=False) # current left, right gripper action(2) data.
gripper_ctrl = Gripper_Controller(left_gripper_value, right_gripper_value, dual_gripper_data_lock, dual_gripper_state_array, dual_gripper_action_array) gripper_ctrl = Gripper_Controller(left_gripper_value, right_gripper_value, dual_gripper_data_lock, dual_gripper_state_array, dual_gripper_action_array)
elif args.ee == "inspire1": elif args.ee == "inspire1":
left_hand_pos_array = Array('d', 75, lock = True) # [input]
right_hand_pos_array = Array('d', 75, lock = True) # [input]
left_hand_pos_array = Array('d', 75, lock = True) # [input]
right_hand_pos_array = Array('d', 75, lock = True) # [input]
dual_hand_data_lock = Lock() dual_hand_data_lock = Lock()
dual_hand_state_array = Array('d', 12, lock = False) # [output] current left, right hand state(12) data. dual_hand_state_array = Array('d', 12, lock = False) # [output] current left, right hand state(12) data.
dual_hand_action_array = Array('d', 12, lock = False) # [output] current left, right hand action(12) data. dual_hand_action_array = Array('d', 12, lock = False) # [output] current left, right hand action(12) data.
@ -137,8 +138,23 @@ if __name__ == '__main__':
running = True running = True
while running: while running:
start_time = time.time() start_time = time.time()
tele_data = tv_wrapper.get_motion_state_data()
# opencv image
tv_resized_image = cv2.resize(tv_img_array, (tv_img_shape[1] // 2, tv_img_shape[0] // 2))
cv2.imshow("record image", tv_resized_image)
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
running = False
elif key == ord('s') and args.record:
recording = not recording # state flipping
if recording:
if not recorder.create_episode():
recording = False
else:
recorder.save_episode()
# get input data
tele_data = tv_wrapper.get_motion_state_data()
if (args.ee == 'dex3' or args.ee == 'inspire1') and args.xr_mode == 'hand': if (args.ee == 'dex3' or args.ee == 'inspire1') and args.xr_mode == 'hand':
with left_hand_pos_array.get_lock(): with left_hand_pos_array.get_lock():
left_hand_pos_array[:] = tele_data.left_hand_pos.flatten() left_hand_pos_array[:] = tele_data.left_hand_pos.flatten()
@ -149,12 +165,13 @@ if __name__ == '__main__':
left_gripper_value.value = tele_data.left_trigger_value left_gripper_value.value = tele_data.left_trigger_value
with right_gripper_value.get_lock(): with right_gripper_value.get_lock():
right_gripper_value.value = tele_data.right_trigger_value right_gripper_value.value = tele_data.right_trigger_value
# quit or damp
# quit teleoperate
if tele_data.tele_state.right_aButton: if tele_data.tele_state.right_aButton:
running = False running = False
if tele_data.tele_state.right_thumbstick_state:
# command robot to enter damping mode. soft emergency stop function
if tele_data.tele_state.left_thumbstick_state and tele_data.tele_state.right_thumbstick_state:
sport_client.Damp() sport_client.Damp()
# control
# high level control, limit velocity to within 0.3
sport_client.Move(-tele_data.tele_state.left_thumbstick_value[1] * 0.3, sport_client.Move(-tele_data.tele_state.left_thumbstick_value[1] * 0.3,
-tele_data.tele_state.left_thumbstick_value[0] * 0.3, -tele_data.tele_state.left_thumbstick_value[0] * 0.3,
-tele_data.tele_state.right_thumbstick_value[0] * 0.3) -tele_data.tele_state.right_thumbstick_value[0] * 0.3)
@ -166,7 +183,7 @@ if __name__ == '__main__':
else: else:
pass pass
# get current state data.
# get current robot state data.
current_lr_arm_q = arm_ctrl.get_current_dual_arm_q() current_lr_arm_q = arm_ctrl.get_current_dual_arm_q()
current_lr_arm_dq = arm_ctrl.get_current_dual_arm_dq() current_lr_arm_dq = arm_ctrl.get_current_dual_arm_dq()
@ -177,19 +194,6 @@ if __name__ == '__main__':
# print(f"ik:\t{round(time_ik_end - time_ik_start, 6)}") # print(f"ik:\t{round(time_ik_end - time_ik_start, 6)}")
arm_ctrl.ctrl_dual_arm(sol_q, sol_tauff) arm_ctrl.ctrl_dual_arm(sol_q, sol_tauff)
tv_resized_image = cv2.resize(tv_img_array, (tv_img_shape[1] // 2, tv_img_shape[0] // 2))
cv2.imshow("record image", tv_resized_image)
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
running = False
elif key == ord('s') and args.record:
recording = not recording # state flipping
if recording:
if not recorder.create_episode():
recording = False
else:
recorder.save_episode()
# record data # record data
if args.record: if args.record:
# dex hand or gripper # dex hand or gripper

Loading…
Cancel
Save