Browse Source

[update] support new vuer version

main
silencht 10 months ago
parent
commit
0e9918f937
  1. 8
      requirements.txt
  2. 4
      teleop/open_television/__init__.py
  3. 77
      teleop/open_television/_test_television.py
  4. 80
      teleop/open_television/_test_tv_wrapper.py
  5. 53
      teleop/open_television/constants.py
  6. 20
      teleop/open_television/setup.py
  7. 476
      teleop/open_television/television.py
  8. 470
      teleop/open_television/tv_wrapper.py
  9. 5
      teleop/robot_control/robot_arm_ik.py
  10. 6
      teleop/robot_control/robot_hand_inspire.py
  11. 70
      teleop/robot_control/robot_hand_unitree.py
  12. 90
      teleop/teleop_hand_and_arm.py
  13. 14
      teleop/utils/mat_tool.py

8
requirements.txt

@ -1,14 +1,7 @@
aiohttp==3.9.5
aiohttp_cors==0.7.0
aiortc==1.8.0
av==11.0.0
einops==0.8.0 einops==0.8.0
h5py==3.11.0 h5py==3.11.0
ipython==8.12.3 ipython==8.12.3
matplotlib==3.7.5 matplotlib==3.7.5
numpy==1.23.0
opencv_contrib_python==4.10.0.82
opencv_python==4.9.0.80
packaging==24.1 packaging==24.1
pandas==2.0.3 pandas==2.0.3
params_proto==2.12.1 params_proto==2.12.1
@ -20,7 +13,6 @@ seaborn==0.13.2
setuptools==69.5.1 setuptools==69.5.1
torch==2.3.0 torch==2.3.0
torchvision==0.18.0 torchvision==0.18.0
vuer==0.0.32rc7
rerun-sdk==0.20.1 rerun-sdk==0.20.1
nlopt>=2.6.1,<2.8.0 nlopt>=2.6.1,<2.8.0
trimesh>=4.4.0 trimesh>=4.4.0

4
teleop/open_television/__init__.py

@ -0,0 +1,4 @@
# open_television/__init__.py
# openssl req -x509 -nodes -days 365 -newkey rsa:2048 -keyout key.pem -out cert.pem
from .television import TeleVision
from .tv_wrapper import TeleVisionWrapper, TeleData, TeleStateData

77
teleop/open_television/_test_television.py

@ -0,0 +1,77 @@
import os, sys
this_file = os.path.abspath(__file__)
project_root = os.path.abspath(os.path.join(os.path.dirname(this_file), '..'))
if project_root not in sys.path:
sys.path.insert(0, project_root)
import time
import numpy as np
from multiprocessing import shared_memory
from open_television import TeleVision
def run_test_television():
# image
image_shape = (480, 640 * 2, 3)
image_shm = shared_memory.SharedMemory(create=True, size=np.prod(image_shape) * np.uint8().itemsize)
image_array = np.ndarray(image_shape, dtype=np.uint8, buffer=image_shm.buf)
# television
use_hand_track = False
tv = TeleVision(binocular = True, use_hand_tracking = use_hand_track, img_shape = image_shape, img_shm_name = image_shm.name, webrtc=True)
try:
input("Press Enter to start television test...")
running = True
while running:
print("=" * 80)
print("Common Data (always available):")
print(f"head_pose shape: {tv.head_pose.shape}\n{tv.head_pose}\n")
print(f"left_arm_pose shape: {tv.left_arm_pose.shape}\n{tv.left_arm_pose}\n")
print(f"right_arm_pose shape: {tv.right_arm_pose.shape}\n{tv.right_arm_pose}\n")
print("=" * 80)
if use_hand_track:
print("Hand Tracking Data:")
print(f"left_hand_positions shape: {tv.left_hand_positions.shape}\n{tv.left_hand_positions}\n")
print(f"right_hand_positions shape: {tv.right_hand_positions.shape}\n{tv.right_hand_positions}\n")
print(f"left_hand_orientations shape: {tv.left_hand_orientations.shape}\n{tv.left_hand_orientations}\n")
print(f"right_hand_orientations shape: {tv.right_hand_orientations.shape}\n{tv.right_hand_orientations}\n")
print(f"left_hand_pinch_state: {tv.left_hand_pinch_state}")
print(f"left_hand_pinch_value: {tv.left_hand_pinch_value}")
print(f"left_hand_squeeze_state: {tv.left_hand_squeeze_state}")
print(f"left_hand_squeeze_value: {tv.left_hand_squeeze_value}")
print(f"right_hand_pinch_state: {tv.right_hand_pinch_state}")
print(f"right_hand_pinch_value: {tv.right_hand_pinch_value}")
print(f"right_hand_squeeze_state: {tv.right_hand_squeeze_state}")
print(f"right_hand_squeeze_value: {tv.right_hand_squeeze_value}")
else:
print("Controller Data:")
print(f"left_controller_trigger_state: {tv.left_controller_trigger_state}")
print(f"left_controller_trigger_value: {tv.left_controller_trigger_value}")
print(f"left_controller_squeeze_state: {tv.left_controller_squeeze_state}")
print(f"left_controller_squeeze_value: {tv.left_controller_squeeze_value}")
print(f"left_controller_thumbstick_state: {tv.left_controller_thumbstick_state}")
print(f"left_controller_thumbstick_value: {tv.left_controller_thumbstick_value}")
print(f"left_controller_aButton: {tv.left_controller_aButton}")
print(f"left_controller_bButton: {tv.left_controller_bButton}")
print(f"right_controller_trigger_state: {tv.right_controller_trigger_state}")
print(f"right_controller_trigger_value: {tv.right_controller_trigger_value}")
print(f"right_controller_squeeze_state: {tv.right_controller_squeeze_state}")
print(f"right_controller_squeeze_value: {tv.right_controller_squeeze_value}")
print(f"right_controller_thumbstick_state: {tv.right_controller_thumbstick_state}")
print(f"right_controller_thumbstick_value: {tv.right_controller_thumbstick_value}")
print(f"right_controller_aButton: {tv.right_controller_aButton}")
print(f"right_controller_bButton: {tv.right_controller_bButton}")
print("=" * 80)
time.sleep(0.03)
except KeyboardInterrupt:
running = False
print("KeyboardInterrupt, exiting program...")
finally:
image_shm.unlink()
image_shm.close()
print("Finally, exiting program...")
exit(0)
if __name__ == '__main__':
run_test_television()

80
teleop/open_television/_test_tv_wrapper.py

@ -0,0 +1,80 @@
import os, sys
this_file = os.path.abspath(__file__)
project_root = os.path.abspath(os.path.join(os.path.dirname(this_file), '..'))
if project_root not in sys.path:
sys.path.insert(0, project_root)
import numpy as np
import time
from multiprocessing import shared_memory
from open_television import TeleVisionWrapper
def run_test_tv_wrapper():
image_shape = (480, 640 * 2, 3)
image_shm = shared_memory.SharedMemory(create=True, size=np.prod(image_shape) * np.uint8().itemsize)
image_array = np.ndarray(image_shape, dtype=np.uint8, buffer=image_shm.buf)
use_hand_track=False
tv_wrapper = TeleVisionWrapper(binocular=True, use_hand_tracking=use_hand_track, img_shape=image_shape, img_shm_name=image_shm.name,
return_state_data=True, return_hand_rot_data = True)
try:
input("Press Enter to start tv_wrapper test...")
running = True
while running:
start_time = time.time()
teleData = tv_wrapper.get_motion_state_data()
print("=== TeleData Snapshot ===")
print("[Head Rotation Matrix]:\n", teleData.head_rotation)
print("[Left EE Pose]:\n", teleData.left_arm_pose)
print("[Right EE Pose]:\n", teleData.right_arm_pose)
if use_hand_track:
print("[Left Hand Position] shape {}:\n{}".format(teleData.left_hand_pos.shape, teleData.left_hand_pos))
print("[Right Hand Position] shape {}:\n{}".format(teleData.right_hand_pos.shape, teleData.right_hand_pos))
if teleData.left_hand_rot is not None:
print("[Left Hand Rotation] shape {}:\n{}".format(teleData.left_hand_rot.shape, teleData.left_hand_rot))
if teleData.right_hand_rot is not None:
print("[Right Hand Rotation] shape {}:\n{}".format(teleData.right_hand_rot.shape, teleData.right_hand_rot))
if teleData.left_pinch_value is not None:
print("[Left Pinch Value]: {:.2f}".format(teleData.left_pinch_value))
if teleData.right_pinch_value is not None:
print("[Right Pinch Value]: {:.2f}".format(teleData.right_pinch_value))
if teleData.tele_state:
state = teleData.tele_state
print("[Hand State]:")
print(f" Left Pinch state: {state.left_pinch_state}")
print(f" Left Squeeze: {state.left_squeeze_state} ({state.left_squeeze_value:.2f})")
print(f" Right Pinch state: {state.right_pinch_state}")
print(f" Right Squeeze: {state.right_squeeze_state} ({state.right_squeeze_value:.2f})")
else:
print(f"[Left Trigger Value]: {teleData.left_trigger_value:.2f}")
print(f"[Right Trigger Value]: {teleData.right_trigger_value:.2f}")
if teleData.tele_state:
state = teleData.tele_state
print("[Controller State]:")
print(f" Left Trigger: {state.left_trigger_state}")
print(f" Left Squeeze: {state.left_squeeze_ctrl_state} ({state.left_squeeze_ctrl_value:.2f})")
print(f" Left Thumbstick: {state.left_thumbstick_state} ({state.left_thumbstick_value})")
print(f" Left A/B Buttons: A={state.left_aButton}, B={state.left_bButton}")
print(f" Right Trigger: {state.right_trigger_state}")
print(f" Right Squeeze: {state.right_squeeze_ctrl_state} ({state.right_squeeze_ctrl_value:.2f})")
print(f" Right Thumbstick: {state.right_thumbstick_state} ({state.right_thumbstick_value})")
print(f" Right A/B Buttons: A={state.right_aButton}, B={state.right_bButton}")
current_time = time.time()
time_elapsed = current_time - start_time
sleep_time = max(0, 0.033 - time_elapsed)
time.sleep(sleep_time)
# print(f"main process sleep: {sleep_time}")
except KeyboardInterrupt:
running = False
print("KeyboardInterrupt, exiting program...")
finally:
image_shm.unlink()
image_shm.close()
print("Finally, exiting program...")
exit(0)
if __name__ == '__main__':
run_test_tv_wrapper()

53
teleop/open_television/constants.py

@ -1,53 +0,0 @@
import numpy as np
T_to_unitree_left_wrist = np.array([[1, 0, 0, 0],
[0, 0, -1, 0],
[0, 1, 0, 0],
[0, 0, 0, 1]])
T_to_unitree_right_wrist = np.array([[1, 0, 0, 0],
[0, 0, 1, 0],
[0, -1, 0, 0],
[0, 0, 0, 1]])
T_to_unitree_hand = np.array([[0, 0, 1, 0],
[-1,0, 0, 0],
[0, -1,0, 0],
[0, 0, 0, 1]])
T_robot_openxr = np.array([[0, 0, -1, 0],
[-1, 0, 0, 0],
[0, 1, 0, 0],
[0, 0, 0, 1]])
const_head_vuer_mat = np.array([[1, 0, 0, 0],
[0, 1, 0, 1.5],
[0, 0, 1, -0.2],
[0, 0, 0, 1]])
# For G1 initial position
const_right_wrist_vuer_mat = np.array([[1, 0, 0, 0.15],
[0, 1, 0, 1.13],
[0, 0, 1, -0.3],
[0, 0, 0, 1]])
# For G1 initial position
const_left_wrist_vuer_mat = np.array([[1, 0, 0, -0.15],
[0, 1, 0, 1.13],
[0, 0, 1, -0.3],
[0, 0, 0, 1]])
# legacy
# const_right_wrist_vuer_mat = np.array([[1, 0, 0, 0.5],
# [0, 1, 0, 1],
# [0, 0, 1, -0.5],
# [0, 0, 0, 1]])
# const_left_wrist_vuer_mat = np.array([[1, 0, 0, -0.5],
# [0, 1, 0, 1],
# [0, 0, 1, -0.5],
# [0, 0, 0, 1]])

20
teleop/open_television/setup.py

@ -0,0 +1,20 @@
from setuptools import setup, find_packages
setup(
name='open_television',
version='0.0.1',
description='XR vision and hand/controller interface for unitree robotics',
author='silencht',
packages=find_packages(),
install_requires=[
'numpy==1.23.0',
'opencv_contrib_python==4.10.0.82',
'opencv_python==4.9.0.80',
'aiohttp==3.9.5',
'aiohttp_cors==0.7.0',
'aiortc==1.8.0',
'av==11.0.0',
'vuer==0.0.42rc16',
],
python_requires='>=3.8',
)

476
teleop/open_television/television.py

@ -1,74 +1,220 @@
import time
from vuer import Vuer from vuer import Vuer
from vuer.schemas import ImageBackground, Hands
from multiprocessing import Array, Process, shared_memory
from vuer.schemas import ImageBackground, Hands, MotionControllers, WebRTCVideoPlane, WebRTCStereoVideoPlane
from multiprocessing import Value, Array, Process, shared_memory
import numpy as np import numpy as np
import asyncio import asyncio
import cv2 import cv2
from multiprocessing import context
Value = context._default_context.Value
import os
class TeleVision: class TeleVision:
def __init__(self, binocular, img_shape, img_shm_name, cert_file="./cert.pem", key_file="./key.pem", ngrok=False):
def __init__(self, binocular: bool, use_hand_tracking: bool, img_shape, img_shm_name, cert_file=None, key_file=None, ngrok=False, webrtc=False):
"""
TeleVision class for OpenXR-based VR/AR applications.
This class handles the communication with the Vuer server and manages the shared memory for image and pose data.
:param binocular: bool, whether the application is binocular (stereoscopic) or monocular.
:param use_hand_tracking: bool, whether to use hand tracking or controller tracking.
:param img_shape: tuple, shape of the image (height, width, channels).
:param img_shm_name: str, name of the shared memory for the image.
:param cert_file: str, path to the SSL certificate file.
:param key_file: str, path to the SSL key file.
:param ngrok: bool, whether to use ngrok for tunneling.
"""
self.binocular = binocular self.binocular = binocular
self.use_hand_tracking = use_hand_tracking
self.img_height = img_shape[0] self.img_height = img_shape[0]
if binocular:
if self.binocular:
self.img_width = img_shape[1] // 2 self.img_width = img_shape[1] // 2
else: else:
self.img_width = img_shape[1] self.img_width = img_shape[1]
current_module_dir = os.path.dirname(os.path.abspath(__file__))
print(f"current_module_dir: {current_module_dir}")
if cert_file is None:
cert_file = os.path.join(current_module_dir, "cert.pem")
if key_file is None:
key_file = os.path.join(current_module_dir, "key.pem")
if ngrok: if ngrok:
self.vuer = Vuer(host='0.0.0.0', queries=dict(grid=False), queue_len=3) self.vuer = Vuer(host='0.0.0.0', queries=dict(grid=False), queue_len=3)
else: else:
self.vuer = Vuer(host='0.0.0.0', cert=cert_file, key=key_file, queries=dict(grid=False), queue_len=3) self.vuer = Vuer(host='0.0.0.0', cert=cert_file, key=key_file, queries=dict(grid=False), queue_len=3)
self.vuer.add_handler("HAND_MOVE")(self.on_hand_move)
self.vuer.add_handler("CAMERA_MOVE")(self.on_cam_move) self.vuer.add_handler("CAMERA_MOVE")(self.on_cam_move)
if self.use_hand_tracking:
self.vuer.add_handler("HAND_MOVE")(self.on_hand_move)
else:
self.vuer.add_handler("CONTROLLER_MOVE")(self.on_controller_move)
existing_shm = shared_memory.SharedMemory(name=img_shm_name) existing_shm = shared_memory.SharedMemory(name=img_shm_name)
self.img_array = np.ndarray(img_shape, dtype=np.uint8, buffer=existing_shm.buf) self.img_array = np.ndarray(img_shape, dtype=np.uint8, buffer=existing_shm.buf)
if binocular:
self.webrtc = webrtc
if self.binocular and not self.webrtc:
self.vuer.spawn(start=False)(self.main_image_binocular) self.vuer.spawn(start=False)(self.main_image_binocular)
else:
elif not self.binocular and not self.webrtc:
self.vuer.spawn(start=False)(self.main_image_monocular) self.vuer.spawn(start=False)(self.main_image_monocular)
elif self.webrtc:
self.vuer.spawn(start=False)(self.main_image_webrtc)
self.left_hand_shared = Array('d', 16, lock=True)
self.right_hand_shared = Array('d', 16, lock=True)
self.left_landmarks_shared = Array('d', 75, lock=True)
self.right_landmarks_shared = Array('d', 75, lock=True)
self.head_matrix_shared = Array('d', 16, lock=True)
self.aspect_shared = Value('d', 1.0, lock=True)
self.head_pose_shared = Array('d', 16, lock=True)
self.left_arm_pose_shared = Array('d', 16, lock=True)
self.right_arm_pose_shared = Array('d', 16, lock=True)
if self.use_hand_tracking:
self.left_hand_position_shared = Array('d', 75, lock=True)
self.right_hand_position_shared = Array('d', 75, lock=True)
self.left_hand_orientation_shared = Array('d', 25 * 9, lock=True)
self.right_hand_orientation_shared = Array('d', 25 * 9, lock=True)
self.left_pinch_state_shared = Value('b', False, lock=True)
self.left_pinch_value_shared = Value('d', 0.0, lock=True)
self.left_squeeze_state_shared = Value('b', False, lock=True)
self.left_squeeze_value_shared = Value('d', 0.0, lock=True)
self.right_pinch_state_shared = Value('b', False, lock=True)
self.right_pinch_value_shared = Value('d', 0.0, lock=True)
self.right_squeeze_state_shared = Value('b', False, lock=True)
self.right_squeeze_value_shared = Value('d', 0.0, lock=True)
else:
self.left_trigger_state_shared = Value('b', False, lock=True)
self.left_trigger_value_shared = Value('d', 0.0, lock=True)
self.left_squeeze_state_shared = Value('b', False, lock=True)
self.left_squeeze_value_shared = Value('d', 0.0, lock=True)
self.left_thumbstick_state_shared = Value('b', False, lock=True)
self.left_thumbstick_value_shared = Array('d', 2, lock=True)
self.left_aButton_shared = Value('b', False, lock=True)
self.left_bButton_shared = Value('b', False, lock=True)
self.right_trigger_state_shared = Value('b', False, lock=True)
self.right_trigger_value_shared = Value('d', 0.0, lock=True)
self.right_squeeze_state_shared = Value('b', False, lock=True)
self.right_squeeze_value_shared = Value('d', 0.0, lock=True)
self.right_thumbstick_state_shared = Value('b', False, lock=True)
self.right_thumbstick_value_shared = Array('d', 2, lock=True)
self.right_aButton_shared = Value('b', False, lock=True)
self.right_bButton_shared = Value('b', False, lock=True)
self.process = Process(target=self.vuer_run) self.process = Process(target=self.vuer_run)
self.process.daemon = True self.process.daemon = True
self.process.start() self.process.start()
def vuer_run(self): def vuer_run(self):
self.vuer.run() self.vuer.run()
async def on_cam_move(self, event, session, fps=60): async def on_cam_move(self, event, session, fps=60):
try: try:
self.head_matrix_shared[:] = event.value["camera"]["matrix"]
self.aspect_shared.value = event.value['camera']['aspect']
with self.head_pose_shared.get_lock():
self.head_pose_shared[:] = event.value["camera"]["matrix"]
except:
pass
async def on_controller_move(self, event, session, fps=60):
try:
with self.left_arm_pose_shared.get_lock():
self.left_arm_pose_shared[:] = event.value["left"]
with self.right_arm_pose_shared.get_lock():
self.right_arm_pose_shared[:] = event.value["right"]
left_controller_state = event.value["leftState"]
right_controller_state = event.value["rightState"]
def extract_controller_states(state_dict, prefix):
# trigger
with getattr(self, f"{prefix}_trigger_state_shared").get_lock():
getattr(self, f"{prefix}_trigger_state_shared").value = bool(state_dict.get("trigger", False))
with getattr(self, f"{prefix}_trigger_value_shared").get_lock():
getattr(self, f"{prefix}_trigger_value_shared").value = float(state_dict.get("triggerValue", 0.0))
# squeeze
with getattr(self, f"{prefix}_squeeze_state_shared").get_lock():
getattr(self, f"{prefix}_squeeze_state_shared").value = bool(state_dict.get("squeeze", False))
with getattr(self, f"{prefix}_squeeze_value_shared").get_lock():
getattr(self, f"{prefix}_squeeze_value_shared").value = float(state_dict.get("squeezeValue", 0.0))
# thumbstick
with getattr(self, f"{prefix}_thumbstick_state_shared").get_lock():
getattr(self, f"{prefix}_thumbstick_state_shared").value = bool(state_dict.get("thumbstick", False))
with getattr(self, f"{prefix}_thumbstick_value_shared").get_lock():
getattr(self, f"{prefix}_thumbstick_value_shared")[:] = state_dict.get("thumbstickValue", [0.0, 0.0])
# buttons
with getattr(self, f"{prefix}_aButton_shared").get_lock():
getattr(self, f"{prefix}_aButton_shared").value = bool(state_dict.get("aButton", False))
with getattr(self, f"{prefix}_bButton_shared").get_lock():
getattr(self, f"{prefix}_bButton_shared").value = bool(state_dict.get("bButton", False))
extract_controller_states(left_controller_state, "left")
extract_controller_states(right_controller_state, "right")
except: except:
pass pass
async def on_hand_move(self, event, session, fps=60): async def on_hand_move(self, event, session, fps=60):
try: try:
self.left_hand_shared[:] = event.value["leftHand"]
self.right_hand_shared[:] = event.value["rightHand"]
self.left_landmarks_shared[:] = np.array(event.value["leftLandmarks"]).flatten()
self.right_landmarks_shared[:] = np.array(event.value["rightLandmarks"]).flatten()
except:
left_hand_data = event.value["left"]
right_hand_data = event.value["right"]
left_hand_state = event.value["leftState"]
right_hand_state = event.value["rightState"]
def extract_hand_poses(hand_data, arm_pose_shared, hand_position_shared, hand_orientation_shared):
with arm_pose_shared.get_lock():
arm_pose_shared[:] = hand_data[0:16]
with hand_position_shared.get_lock():
for i in range(25):
base = i * 16
hand_position_shared[i * 3: i * 3 + 3] = [hand_data[base + 12], hand_data[base + 13], hand_data[base + 14]]
with hand_orientation_shared.get_lock():
for i in range(25):
base = i * 16
hand_orientation_shared[i * 9: i * 9 + 9] = [
hand_data[base + 0], hand_data[base + 1], hand_data[base + 2],
hand_data[base + 4], hand_data[base + 5], hand_data[base + 6],
hand_data[base + 8], hand_data[base + 9], hand_data[base + 10],
]
def extract_hand_states(state_dict, prefix):
# pinch
with getattr(self, f"{prefix}_pinch_state_shared").get_lock():
getattr(self, f"{prefix}_pinch_state_shared").value = bool(state_dict.get("pinch", False))
with getattr(self, f"{prefix}_pinch_value_shared").get_lock():
getattr(self, f"{prefix}_pinch_value_shared").value = float(state_dict.get("pinchValue", 0.0))
# squeeze
with getattr(self, f"{prefix}_squeeze_state_shared").get_lock():
getattr(self, f"{prefix}_squeeze_state_shared").value = bool(state_dict.get("squeeze", False))
with getattr(self, f"{prefix}_squeeze_value_shared").get_lock():
getattr(self, f"{prefix}_squeeze_value_shared").value = float(state_dict.get("squeezeValue", 0.0))
extract_hand_poses(left_hand_data, self.left_arm_pose_shared, self.left_hand_position_shared, self.left_hand_orientation_shared)
extract_hand_poses(right_hand_data, self.right_arm_pose_shared, self.right_hand_position_shared, self.right_hand_orientation_shared)
extract_hand_states(left_hand_state, "left")
extract_hand_states(right_hand_state, "right")
except:
pass pass
async def main_image_binocular(self, session, fps=60): async def main_image_binocular(self, session, fps=60):
session.upsert @ Hands(fps=fps, stream=True, key="hands", showLeft=False, showRight=False)
if self.use_hand_tracking:
session.upsert(
Hands(
stream=True,
key="hands",
showLeft=False,
showRight=False
),
# we place this into the background children list, so that it is
# not affected by the global rotation
to="bgChildren",
)
else:
session.upsert(
MotionControllers(
stream=True,
key="motion-controller",
showLeft=False,
showRight=False,
),
to="bgChildren",
)
while True: while True:
display_image = cv2.cvtColor(self.img_array, cv2.COLOR_BGR2RGB) display_image = cv2.cvtColor(self.img_array, cv2.COLOR_BGR2RGB)
# aspect_ratio = self.img_width / self.img_height # aspect_ratio = self.img_width / self.img_height
@ -106,7 +252,26 @@ class TeleVision:
await asyncio.sleep(0.016 * 2) await asyncio.sleep(0.016 * 2)
async def main_image_monocular(self, session, fps=60): async def main_image_monocular(self, session, fps=60):
session.upsert @ Hands(fps=fps, stream=True, key="hands", showLeft=False, showRight=False)
if self.use_hand_tracking:
session.upsert(
Hands(
stream=True,
key="hands",
showLeft=False,
showRight=False
),
to="bgChildren",
)
else:
session.upsert(
MotionControllers(
stream=True,
key="motion-controller",
showLeft=False,
showRight=False,
),
to="bgChildren",
)
while True: while True:
display_image = cv2.cvtColor(self.img_array, cv2.COLOR_BGR2RGB) display_image = cv2.cvtColor(self.img_array, cv2.COLOR_BGR2RGB)
# aspect_ratio = self.img_width / self.img_height # aspect_ratio = self.img_width / self.img_height
@ -127,52 +292,225 @@ class TeleVision:
) )
await asyncio.sleep(0.016) await asyncio.sleep(0.016)
@property
def left_hand(self):
return np.array(self.left_hand_shared[:]).reshape(4, 4, order="F")
async def main_image_webrtc(self, session, fps=60):
if self.use_hand_tracking:
session.upsert(
Hands(
stream=True,
key="hands",
showLeft=False,
showRight=False
),
to="bgChildren",
)
else:
session.upsert(
MotionControllers(
stream=True,
key="motion-controller",
showLeft=False,
showRight=False,
)
)
session.upsert(
WebRTCVideoPlane(
# WebRTCStereoVideoPlane(
src="https://10.0.7.49:8080/offer",
iceServer={},
key="webrtc",
aspect=1.778,
height = 7,
),
to="bgChildren",
)
while True:
await asyncio.sleep(1)
# ==================== common data ====================
@property @property
def right_hand(self):
return np.array(self.right_hand_shared[:]).reshape(4, 4, order="F")
def head_pose(self):
"""np.ndarray, shape (4, 4), head SE(3) pose matrix from Vuer (basis OpenXR Convention)."""
with self.head_pose_shared.get_lock():
return np.array(self.head_pose_shared[:]).reshape(4, 4, order="F")
@property @property
def left_landmarks(self):
return np.array(self.left_landmarks_shared[:]).reshape(25, 3)
def left_arm_pose(self):
"""np.ndarray, shape (4, 4), left arm SE(3) pose matrix from Vuer (basis OpenXR Convention)."""
with self.left_arm_pose_shared.get_lock():
return np.array(self.left_arm_pose_shared[:]).reshape(4, 4, order="F")
@property @property
def right_landmarks(self):
return np.array(self.right_landmarks_shared[:]).reshape(25, 3)
def right_arm_pose(self):
"""np.ndarray, shape (4, 4), right arm SE(3) pose matrix from Vuer (basis OpenXR Convention)."""
with self.right_arm_pose_shared.get_lock():
return np.array(self.right_arm_pose_shared[:]).reshape(4, 4, order="F")
# ==================== Hand Tracking Data ====================
@property @property
def head_matrix(self):
return np.array(self.head_matrix_shared[:]).reshape(4, 4, order="F")
def left_hand_positions(self):
"""np.ndarray, shape (25, 3), left hand 25 landmarks' 3D positions."""
with self.left_hand_position_shared.get_lock():
return np.array(self.left_hand_position_shared[:]).reshape(25, 3)
@property @property
def aspect(self):
return float(self.aspect_shared.value)
if __name__ == '__main__':
import os
import sys
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(current_dir)
sys.path.append(parent_dir)
import threading
from image_server.image_client import ImageClient
# image
img_shape = (480, 640 * 2, 3)
img_shm = shared_memory.SharedMemory(create=True, size=np.prod(img_shape) * np.uint8().itemsize)
img_array = np.ndarray(img_shape, dtype=np.uint8, buffer=img_shm.buf)
img_client = ImageClient(tv_img_shape = img_shape, tv_img_shm_name = img_shm.name)
image_receive_thread = threading.Thread(target=img_client.receive_process, daemon=True)
image_receive_thread.start()
# television
tv = TeleVision(True, img_shape, img_shm.name)
print("vuer unit test program running...")
print("you can press ^C to interrupt program.")
while True:
time.sleep(0.03)
def right_hand_positions(self):
"""np.ndarray, shape (25, 3), right hand 25 landmarks' 3D positions."""
with self.right_hand_position_shared.get_lock():
return np.array(self.right_hand_position_shared[:]).reshape(25, 3)
@property
def left_hand_orientations(self):
"""np.ndarray, shape (25, 3, 3), left hand 25 landmarks' orientations (flattened 3x3 matrices, column-major)."""
with self.left_hand_orientation_shared.get_lock():
return np.array(self.left_hand_orientation_shared[:]).reshape(25, 9).reshape(25, 3, 3, order="F")
@property
def right_hand_orientations(self):
"""np.ndarray, shape (25, 3, 3), right hand 25 landmarks' orientations (flattened 3x3 matrices, column-major)."""
with self.right_hand_orientation_shared.get_lock():
return np.array(self.right_hand_orientation_shared[:]).reshape(25, 9).reshape(25, 3, 3, order="F")
@property
def left_hand_pinch_state(self):
"""bool, whether left hand is pinching."""
with self.left_pinch_state_shared.get_lock():
return self.left_pinch_state_shared.value
@property
def left_hand_pinch_value(self):
"""float, pinch strength of left hand."""
with self.left_pinch_value_shared.get_lock():
return self.left_pinch_value_shared.value
@property
def left_hand_squeeze_state(self):
"""bool, whether left hand is squeezing."""
with self.left_squeeze_state_shared.get_lock():
return self.left_squeeze_state_shared.value
@property
def left_hand_squeeze_value(self):
"""float, squeeze strength of left hand."""
with self.left_squeeze_value_shared.get_lock():
return self.left_squeeze_value_shared.value
@property
def right_hand_pinch_state(self):
"""bool, whether right hand is pinching."""
with self.right_pinch_state_shared.get_lock():
return self.right_pinch_state_shared.value
@property
def right_hand_pinch_value(self):
"""float, pinch strength of right hand."""
with self.right_pinch_value_shared.get_lock():
return self.right_pinch_value_shared.value
@property
def right_hand_squeeze_state(self):
"""bool, whether right hand is squeezing."""
with self.right_squeeze_state_shared.get_lock():
return self.right_squeeze_state_shared.value
@property
def right_hand_squeeze_value(self):
"""float, squeeze strength of right hand."""
with self.right_squeeze_value_shared.get_lock():
return self.right_squeeze_value_shared.value
# ==================== Controller Data ====================
@property
def left_controller_trigger_state(self):
"""bool, left controller trigger pressed or not."""
with self.left_trigger_state_shared.get_lock():
return self.left_trigger_state_shared.value
@property
def left_controller_trigger_value(self):
"""float, left controller trigger analog value (0.0 ~ 1.0)."""
with self.left_trigger_value_shared.get_lock():
return self.left_trigger_value_shared.value
@property
def left_controller_squeeze_state(self):
"""bool, left controller squeeze pressed or not."""
with self.left_squeeze_state_shared.get_lock():
return self.left_squeeze_state_shared.value
@property
def left_controller_squeeze_value(self):
"""float, left controller squeeze analog value (0.0 ~ 1.0)."""
with self.left_squeeze_value_shared.get_lock():
return self.left_squeeze_value_shared.value
@property
def left_controller_thumbstick_state(self):
"""bool, whether left thumbstick is touched or clicked."""
with self.left_thumbstick_state_shared.get_lock():
return self.left_thumbstick_state_shared.value
@property
def left_controller_thumbstick_value(self):
"""np.ndarray, shape (2,), left thumbstick 2D axis values (x, y)."""
with self.left_thumbstick_value_shared.get_lock():
return np.array(self.left_thumbstick_value_shared[:])
@property
def left_controller_aButton(self):
"""bool, left controller 'A' button pressed."""
with self.left_aButton_shared.get_lock():
return self.left_aButton_shared.value
@property
def left_controller_bButton(self):
"""bool, left controller 'B' button pressed."""
with self.left_bButton_shared.get_lock():
return self.left_bButton_shared.value
@property
def right_controller_trigger_state(self):
"""bool, right controller trigger pressed or not."""
with self.right_trigger_state_shared.get_lock():
return self.right_trigger_state_shared.value
@property
def right_controller_trigger_value(self):
"""float, right controller trigger analog value (0.0 ~ 1.0)."""
with self.right_trigger_value_shared.get_lock():
return self.right_trigger_value_shared.value
@property
def right_controller_squeeze_state(self):
"""bool, right controller squeeze pressed or not."""
with self.right_squeeze_state_shared.get_lock():
return self.right_squeeze_state_shared.value
@property
def right_controller_squeeze_value(self):
"""float, right controller squeeze analog value (0.0 ~ 1.0)."""
with self.right_squeeze_value_shared.get_lock():
return self.right_squeeze_value_shared.value
@property
def right_controller_thumbstick_state(self):
"""bool, whether right thumbstick is touched or clicked."""
with self.right_thumbstick_state_shared.get_lock():
return self.right_thumbstick_state_shared.value
@property
def right_controller_thumbstick_value(self):
"""np.ndarray, shape (2,), right thumbstick 2D axis values (x, y)."""
with self.right_thumbstick_value_shared.get_lock():
return np.array(self.right_thumbstick_value_shared[:])
@property
def right_controller_aButton(self):
"""bool, right controller 'A' button pressed."""
with self.right_aButton_shared.get_lock():
return self.right_aButton_shared.value
@property
def right_controller_bButton(self):
"""bool, right controller 'B' button pressed."""
with self.right_bButton_shared.get_lock():
return self.right_bButton_shared.value

470
teleop/open_television/tv_wrapper.py

@ -1,147 +1,411 @@
import numpy as np import numpy as np
from teleop.open_television.television import TeleVision
from teleop.open_television.constants import *
from teleop.utils.mat_tool import mat_update, fast_mat_inv
from open_television.television import TeleVision
from dataclasses import dataclass, field
""" """
(basis) OpenXR Convention : y up, z back, x right. (basis) OpenXR Convention : y up, z back, x right.
(basis) Robot Convention : z up, y left, x front. (basis) Robot Convention : z up, y left, x front.
p.s. Vuer's all raw data follows OpenXR Convention, WORLD coordinate.
under (basis) Robot Convention, wrist's initial pose convention:
under (basis) Robot Convention, humanoid arm's initial pose convention:
# (Left Wrist) XR/AppleVisionPro Convention:
# (initial pose) OpenXR Left Arm Pose Convention (hand tracking):
- the x-axis pointing from wrist toward middle. - the x-axis pointing from wrist toward middle.
- the y-axis pointing from index toward pinky. - the y-axis pointing from index toward pinky.
- the z-axis pointing from palm toward back of the hand. - the z-axis pointing from palm toward back of the hand.
# (Right Wrist) XR/AppleVisionPro Convention:
# (initial pose) OpenXR Right Arm Pose Convention (hand tracking):
- the x-axis pointing from wrist toward middle. - the x-axis pointing from wrist toward middle.
- the y-axis pointing from pinky toward index. - the y-axis pointing from pinky toward index.
- the z-axis pointing from palm toward back of the hand. - the z-axis pointing from palm toward back of the hand.
# (Left Wrist URDF) Unitree Convention:
# (initial pose) Unitree Humanoid Left Arm URDF Convention:
- the x-axis pointing from wrist toward middle. - the x-axis pointing from wrist toward middle.
- the y-axis pointing from palm toward back of the hand. - the y-axis pointing from palm toward back of the hand.
- the z-axis pointing from pinky toward index. - the z-axis pointing from pinky toward index.
# (Right Wrist URDF) Unitree Convention:
# (initial pose) Unitree Humanoid Right Arm URDF Convention:
- the x-axis pointing from wrist toward middle. - the x-axis pointing from wrist toward middle.
- the y-axis pointing from back of the hand toward palm. - the y-axis pointing from back of the hand toward palm.
- the z-axis pointing from pinky toward index. - the z-axis pointing from pinky toward index.
under (basis) Robot Convention, hand's initial pose convention:
under (basis) Robot Convention, humanoid hand's initial pose convention:
# (Left Hand) XR/AppleVisionPro Convention:
# (initial pose) OpenXR Left Hand Pose Convention (hand tracking):
- the x-axis pointing from wrist toward middle. - the x-axis pointing from wrist toward middle.
- the y-axis pointing from index toward pinky. - the y-axis pointing from index toward pinky.
- the z-axis pointing from palm toward back of the hand. - the z-axis pointing from palm toward back of the hand.
# (Right Hand) XR/AppleVisionPro Convention:
# (initial pose) OpenXR Right Hand Pose Convention (hand tracking):
- the x-axis pointing from wrist toward middle. - the x-axis pointing from wrist toward middle.
- the y-axis pointing from pinky toward index. - the y-axis pointing from pinky toward index.
- the z-axis pointing from palm toward back of the hand. - the z-axis pointing from palm toward back of the hand.
# (Left Hand URDF) Unitree Convention:
# (initial pose) Unitree Humanoid Left Hand URDF Convention:
- The x-axis pointing from palm toward back of the hand. - The x-axis pointing from palm toward back of the hand.
- The y-axis pointing from middle toward wrist. - The y-axis pointing from middle toward wrist.
- The z-axis pointing from pinky toward index. - The z-axis pointing from pinky toward index.
# (Right Hand URDF) Unitree Convention:
# (initial pose) Unitree Humanoid Right Hand URDF Convention:
- The x-axis pointing from palm toward back of the hand. - The x-axis pointing from palm toward back of the hand.
- The y-axis pointing from middle toward wrist. - The y-axis pointing from middle toward wrist.
- The z-axis pointing from index toward pinky. - The z-axis pointing from index toward pinky.
p.s. TeleVision (Vuer) obtains all raw data under the (basis) OpenXR Convention.
In addition, arm pose data (hand tracking) follows the (initial pose) OpenXR Arm Pose Convention,
while arm pose data (controller tracking) directly follows the (initial pose) Unitree Humanoid Arm URDF Convention (thus no transform is needed).
Meanwhile, all raw data is in the WORLD frame defined by XR device odometry.
p.s. From website: https://registry.khronos.org/OpenXR/specs/1.1/man/html/openxr.html.
You can find **(Left/Right Wrist) XR/AppleVisionPro Convention** related information like this below:
"The wrist joint is located at the pivot point of the wrist, which is location invariant when twisting the hand without moving the forearm.
The backward (+Z) direction is parallel to the line from wrist joint to middle finger metacarpal joint, and points away from the finger tips.
The up (+Y) direction points out towards back of the hand and perpendicular to the skin at wrist.
The X direction is perpendicular to the Y and Z directions and follows the right hand rule."
Note: The above context is of course under **(basis) OpenXR Convention**.
p.s. From website: https://registry.khronos.org/OpenXR/specs/1.1/man/html/openxr.html.
You can find **(initial pose) OpenXR Left/Right Arm Pose Convention** related information like this below:
"The wrist joint is located at the pivot point of the wrist, which is location invariant when twisting the hand without moving the forearm.
The backward (+Z) direction is parallel to the line from wrist joint to middle finger metacarpal joint, and points away from the finger tips.
The up (+Y) direction points out towards back of the hand and perpendicular to the skin at wrist.
The X direction is perpendicular to the Y and Z directions and follows the right hand rule."
Note: The above context is of course under **(basis) OpenXR Convention**.
p.s. **(Wrist/Hand URDF) Unitree Convention** information come from URDF files.
p.s. **Unitree Arm/Hand URDF initial pose Convention** information come from URDF files.
""" """
def safe_mat_update(prev_mat, mat):
# Return previous matrix and False flag if the new matrix is non-singular (determinant ≠ 0).
det = np.linalg.det(mat)
if not np.isfinite(det) or np.isclose(det, 0.0, atol=1e-6):
return prev_mat, False
return mat, True
def fast_mat_inv(mat):
ret = np.eye(4)
ret[:3, :3] = mat[:3, :3].T
ret[:3, 3] = -mat[:3, :3].T @ mat[:3, 3]
return ret
def safe_rot_update(prev_rot_array, rot_array):
dets = np.linalg.det(rot_array)
if not np.all(np.isfinite(dets)) or np.any(np.isclose(dets, 0.0, atol=1e-6)):
return prev_rot_array, False
return rot_array, True
# constants variable
T_TO_UNITREE_HUMANOID_LEFT_ARM = np.array([[1, 0, 0, 0],
[0, 0,-1, 0],
[0, 1, 0, 0],
[0, 0, 0, 1]])
T_TO_UNITREE_HUMANOID_RIGHT_ARM = np.array([[1, 0, 0, 0],
[0, 0, 1, 0],
[0,-1, 0, 0],
[0, 0, 0, 1]])
T_TO_UNITREE_HAND = np.array([[0, 0, 1, 0],
[-1, 0, 0, 0],
[0, -1, 0, 0],
[0, 0, 0, 1]])
T_ROBOT_OPENXR = np.array([[ 0, 0,-1, 0],
[-1, 0, 0, 0],
[ 0, 1, 0, 0],
[ 0, 0, 0, 1]])
T_OPENXR_ROBOT = np.array([[ 0,-1, 0, 0],
[ 0, 0, 1, 0],
[-1, 0, 0, 0],
[ 0, 0, 0, 1]])
R_ROBOT_OPENXR = np.array([[ 0, 0,-1],
[-1, 0, 0],
[ 0, 1, 0]])
R_OPENXR_ROBOT = np.array([[ 0,-1, 0],
[ 0, 0, 1],
[-1, 0, 0]])
CONST_HEAD_POSE = np.array([[1, 0, 0, 0],
[0, 1, 0, 1.5],
[0, 0, 1, -0.2],
[0, 0, 0, 1]])
# For Robot initial position
CONST_RIGHT_ARM_POSE = np.array([[1, 0, 0, 0.15],
[0, 1, 0, 1.13],
[0, 0, 1, -0.3],
[0, 0, 0, 1]])
CONST_LEFT_ARM_POSE = np.array([[1, 0, 0, -0.15],
[0, 1, 0, 1.13],
[0, 0, 1, -0.3],
[0, 0, 0, 1]])
CONST_HAND_ROT = np.tile(np.eye(3)[None, :, :], (25, 1, 1))
@dataclass
class TeleStateData:
# hand tracking
left_pinch_state: bool = False # True if index and thumb are pinching
left_squeeze_state: bool = False # True if hand is making a fist
left_squeeze_value: float = 0.0 # (0.0 ~ 1.0) degree of hand squeeze
right_pinch_state: bool = False # True if index and thumb are pinching
right_squeeze_state: bool = False # True if hand is making a fist
right_squeeze_value: float = 0.0 # (0.0 ~ 1.0) degree of hand squeeze
# controller tracking
left_trigger_state: bool = False # True if trigger is actively pressed
left_squeeze_ctrl_state: bool = False # True if grip button is pressed
left_squeeze_ctrl_value: float = 0.0 # (0.0 ~ 1.0) grip pull depth
left_thumbstick_state: bool = False # True if thumbstick button is pressed
left_thumbstick_value: np.ndarray = field(default_factory=lambda: np.zeros(2)) # 2D vector (x, y), normalized
left_aButton: bool = False # True if A button is pressed
left_bButton: bool = False # True if B button is pressed
right_trigger_state: bool = False # True if trigger is actively pressed
right_squeeze_ctrl_state: bool = False # True if grip button is pressed
right_squeeze_ctrl_value: float = 0.0 # (0.0 ~ 1.0) grip pull depth
right_thumbstick_state: bool = False # True if thumbstick button is pressed
right_thumbstick_value: np.ndarray = field(default_factory=lambda: np.zeros(2)) # 2D vector (x, y), normalized
right_aButton: bool = False # True if A button is pressed
right_bButton: bool = False # True if B button is pressed
@dataclass
class TeleData:
head_rotation: np.ndarray # (3,3) Head orientation matrix
left_arm_pose: np.ndarray # (4,4) SE(3) pose of left arm
right_arm_pose: np.ndarray # (4,4) SE(3) pose of right arm
left_hand_pos: np.ndarray = None # (25,3) 3D positions of left hand joints
right_hand_pos: np.ndarray = None # (25,3) 3D positions of right hand joints
left_hand_rot: np.ndarray = None # (25,3,3) rotation matrices of left hand joints
right_hand_rot: np.ndarray = None # (25,3,3) rotation matrices of right hand joints
left_pinch_value: float = None # float (1x.0 ~ 0.0) pinch distance between index and thumb
right_pinch_value: float = None # float (1x.0 ~ 0.0) pinch distance between index and thumb
left_trigger_value: float = None # float (10.0 ~ 0.0) trigger pull depth
right_trigger_value: float = None # float (10.0 ~ 0.0) trigger pull depth
tele_state: TeleStateData = field(default_factory=TeleStateData)
class TeleVisionWrapper: class TeleVisionWrapper:
def __init__(self, binocular, img_shape, img_shm_name):
self.tv = TeleVision(binocular, img_shape, img_shm_name)
def get_data(self):
# --------------------------------wrist-------------------------------------
# TeleVision obtains a basis coordinate that is OpenXR Convention
head_vuer_mat, head_flag = mat_update(const_head_vuer_mat, self.tv.head_matrix.copy())
left_wrist_vuer_mat, left_wrist_flag = mat_update(const_left_wrist_vuer_mat, self.tv.left_hand.copy())
right_wrist_vuer_mat, right_wrist_flag = mat_update(const_right_wrist_vuer_mat, self.tv.right_hand.copy())
# Change basis convention: VuerMat ((basis) OpenXR Convention) to WristMat ((basis) Robot Convention)
# p.s. WristMat = T_{robot}_{openxr} * VuerMat * T_{robot}_{openxr}^T
# Reason for right multiply fast_mat_inv(T_robot_openxr):
# This is similarity transformation: B = PAP^{-1}, that is B ~ A
# For example:
# - For a pose data T_r under the Robot Convention, left-multiplying WristMat means:
# - WristMat * T_r ==> T_{robot}_{openxr} * VuerMat * T_{openxr}_{robot} * T_r
# - First, transform to the OpenXR Convention (The function of T_{openxr}_{robot})
# - then, apply the rotation VuerMat in the OpenXR Convention (The function of VuerMat)
# - finally, transform back to the Robot Convention (The function of T_{robot}_{openxr})
# This results in the same rotation effect under the Robot Convention as in the OpenXR Convention.
head_mat = T_robot_openxr @ head_vuer_mat @ fast_mat_inv(T_robot_openxr)
left_wrist_mat = T_robot_openxr @ left_wrist_vuer_mat @ fast_mat_inv(T_robot_openxr)
right_wrist_mat = T_robot_openxr @ right_wrist_vuer_mat @ fast_mat_inv(T_robot_openxr)
# Change wrist convention: WristMat ((Left Wrist) XR/AppleVisionPro Convention) to UnitreeWristMat((Left Wrist URDF) Unitree Convention)
# Reason for right multiply (T_to_unitree_left_wrist) : Rotate 90 degrees counterclockwise about its own x-axis.
# Reason for right multiply (T_to_unitree_right_wrist): Rotate 90 degrees clockwise about its own x-axis.
unitree_left_wrist = left_wrist_mat @ (T_to_unitree_left_wrist if left_wrist_flag else np.eye(4))
unitree_right_wrist = right_wrist_mat @ (T_to_unitree_right_wrist if right_wrist_flag else np.eye(4))
# Transfer from WORLD to HEAD coordinate (translation only).
unitree_left_wrist[0:3, 3] = unitree_left_wrist[0:3, 3] - head_mat[0:3, 3]
unitree_right_wrist[0:3, 3] = unitree_right_wrist[0:3, 3] - head_mat[0:3, 3]
# --------------------------------hand-------------------------------------
# Homogeneous, [xyz] to [xyz1]
# p.s. np.concatenate([25,3]^T,(1,25)) ==> hand_vuer_mat.shape is (4,25)
# Now under (basis) OpenXR Convention, mat shape like this:
# x0 x1 x2 ··· x23 x24
# y0 y1 y1 ··· y23 y24
# z0 z1 z2 ··· z23 z24
# 1 1 1 ··· 1 1
left_hand_vuer_mat = np.concatenate([self.tv.left_landmarks.copy().T, np.ones((1, self.tv.left_landmarks.shape[0]))])
right_hand_vuer_mat = np.concatenate([self.tv.right_landmarks.copy().T, np.ones((1, self.tv.right_landmarks.shape[0]))])
# Change basis convention: from (basis) OpenXR Convention to (basis) Robot Convention
# Just a change of basis for 3D points. No rotation, only translation. No need to right-multiply fast_mat_inv(T_robot_openxr).
left_hand_mat = T_robot_openxr @ left_hand_vuer_mat
right_hand_mat = T_robot_openxr @ right_hand_vuer_mat
# Transfer from WORLD to WRIST coordinate. (this process under (basis) Robot Convention)
# p.s. HandMat_WristBased = WristMat_{wrold}_{wrist}^T * HandMat_{wrold}
# HandMat_WristBased = WristMat_{wrist}_{wrold} * HandMat_{wrold}, that is HandMat_{wrist}
left_hand_mat_wb = fast_mat_inv(left_wrist_mat) @ left_hand_mat
right_hand_mat_wb = fast_mat_inv(right_wrist_mat) @ right_hand_mat
# Change hand convention: HandMat ((Left Hand) XR/AppleVisionPro Convention) to UnitreeHandMat((Left Hand URDF) Unitree Convention)
# Reason for left multiply : T_to_unitree_hand @ left_hand_mat_wb ==> (4,4) @ (4,25) ==> (4,25), (4,25)[0:3, :] ==> (3,25), (3,25).T ==> (25,3)
# Now under (Left Hand URDF) Unitree Convention, mat shape like this:
# [x0, y0, z0]
# [x1, y1, z1]
# ···
# [x23,y23,z23]
# [x24,y24,z24]
unitree_left_hand = (T_to_unitree_hand @ left_hand_mat_wb)[0:3, :].T
unitree_right_hand = (T_to_unitree_hand @ right_hand_mat_wb)[0:3, :].T
# --------------------------------offset-------------------------------------
head_rmat = head_mat[:3, :3]
# The origin of the coordinate for IK Solve is the WAIST joint motor. You can use teleop/robot_control/robot_arm_ik.py Unit_Test to check it.
# The origin of the coordinate of unitree_left_wrist is HEAD. So it is necessary to translate the origin of unitree_left_wrist from HEAD to WAIST.
unitree_left_wrist[0, 3] +=0.15
unitree_right_wrist[0,3] +=0.15
unitree_left_wrist[2, 3] +=0.45
unitree_right_wrist[2,3] +=0.45
return head_rmat, unitree_left_wrist, unitree_right_wrist, unitree_left_hand, unitree_right_hand
def __init__(self, binocular: bool, use_hand_tracking: bool, img_shape, img_shm_name, return_state_data: bool = True, return_hand_rot_data: bool = False,
cert_file = None, key_file = None):
"""
TeleVisionWrapper is a wrapper for the TeleVision class, which handles XR device's data suit for robot control.
It initializes the TeleVision instance with the specified parameters and provides a method to get motion state data.
:param binocular: A boolean indicating whether the head camera device is binocular or not.
:param use_hand_tracking: A boolean indicating whether to use hand tracking or use controller tracking.
:param img_shape: The shape of the image to be processed.
:param img_shm_name: The name of the shared memory for the image.
:param return_state: A boolean indicating whether to return the state of the hand or controller.
:param return_hand_rot: A boolean indicating whether to return the hand rotation data.
:param cert_file: The path to the certificate file for secure connection.
:param key_file: The path to the key file for secure connection.
"""
self.use_hand_tracking = use_hand_tracking
self.return_state_data = return_state_data
self.return_hand_rot_data = return_hand_rot_data
self.tvuer = TeleVision(binocular, use_hand_tracking, img_shape, img_shm_name, cert_file=cert_file, key_file=key_file)
def get_motion_state_data(self):
"""
Get processed motion state data from the TeleVision instance.
All returned data are transformed from the OpenXR Convention to the (Robot & Unitree) Convention.
"""
# Variable Naming Convention below,
# ┌────────────┬───────────────────────────┬──────────────────────────────────┬────────────────────────────────────┬────────────────────────────────────┐
# │left / right│ Bxr │ Brobot │ IPxr │ IPunitree │
# │────────────│───────────────────────────│──────────────────────────────────│────────────────────────────────────│────────────────────────────────────│
# │ side │ (basis) OpenXR Convention │ (basis) Robot Convention │ (initial pose) OpenXR Convention │ (initial pose) Unitree Convention │
# └────────────┴───────────────────────────┴──────────────────────────────────┴────────────────────────────────────┴────────────────────────────────────┘
# ┌───────────────────────────────────┬─────────────────────┐
# │ world / arm / head / waist │ arm / head / hand │
# │───────────────────────────────────│─────────────────────│
# │ source frame │ target frame │
# └───────────────────────────────────┴─────────────────────┘
# TeleVision (Vuer) obtains all raw data under the (basis) OpenXR Convention.
Bxr_world_head, head_pose_is_valid = safe_mat_update(CONST_HEAD_POSE, self.tvuer.head_pose)
if self.use_hand_tracking:
# 'Arm' pose data follows (basis) OpenXR Convention and (initial pose) OpenXR Arm Convention.
left_IPxr_Bxr_world_arm, left_arm_is_valid = safe_mat_update(CONST_LEFT_ARM_POSE, self.tvuer.left_arm_pose)
right_IPxr_Bxr_world_arm, right_arm_is_valid = safe_mat_update(CONST_RIGHT_ARM_POSE, self.tvuer.right_arm_pose)
# Change basis convention
# From (basis) OpenXR Convention to (basis) Robot Convention:
# Brobot_Pose = T_{robot}_{openxr} * Bxr_Pose * T_{robot}_{openxr}^T ==>
# Brobot_Pose = T_{robot}_{openxr} * Bxr_Pose * T_{openxr}_{robot}
# Reason for right multiply T_OPENXR_ROBOT = fast_mat_inv(T_ROBOT_OPENXR):
# This is similarity transformation: B = PAP^{-1}, that is B ~ A
# For example:
# - For a pose data T_r under the (basis) Robot Convention, left-multiplying Brobot_Pose means:
# Brobot_Pose * T_r ==> T_{robot}_{openxr} * PoseMatrix_openxr * T_{openxr}_{robot} * T_r
# - First, transform T_r to the (basis) OpenXR Convention (The function of T_{openxr}_{robot})
# - Then, apply the rotation PoseMatrix_openxr in the OpenXR Convention (The function of PoseMatrix_openxr)
# - Finally, transform back to the Robot Convention (The function of T_{robot}_{openxr})
# - This results in the same rotation effect under the Robot Convention as in the OpenXR Convention.
Brobot_world_head = T_ROBOT_OPENXR @ Bxr_world_head @ T_OPENXR_ROBOT
left_IPxr_Brobot_world_arm = T_ROBOT_OPENXR @ left_IPxr_Bxr_world_arm @ T_OPENXR_ROBOT
right_IPxr_Brobot_world_arm = T_ROBOT_OPENXR @ right_IPxr_Bxr_world_arm @ T_OPENXR_ROBOT
# Change initial pose convention
# From (initial pose) OpenXR Arm Convention to (initial pose) Unitree Humanoid Arm URDF Convention
# Reason for right multiply (T_TO_UNITREE_HUMANOID_LEFT_ARM) : Rotate 90 degrees counterclockwise about its own x-axis.
# Reason for right multiply (T_TO_UNITREE_HUMANOID_RIGHT_ARM): Rotate 90 degrees clockwise about its own x-axis.
left_IPunitree_Brobot_world_arm = left_IPxr_Brobot_world_arm @ (T_TO_UNITREE_HUMANOID_LEFT_ARM if left_arm_is_valid else np.eye(4))
right_IPunitree_Brobot_world_arm = right_IPxr_Brobot_world_arm @ (T_TO_UNITREE_HUMANOID_RIGHT_ARM if right_arm_is_valid else np.eye(4))
# Transfer from WORLD to HEAD coordinate (translation adjustment only)
left_IPunitree_Brobot_head_arm = left_IPunitree_Brobot_world_arm.copy()
right_IPunitree_Brobot_head_arm = right_IPunitree_Brobot_world_arm.copy()
left_IPunitree_Brobot_head_arm[0:3, 3] = left_IPunitree_Brobot_head_arm[0:3, 3] - Brobot_world_head[0:3, 3]
right_IPunitree_Brobot_head_arm[0:3, 3] = right_IPunitree_Brobot_world_arm[0:3, 3] - Brobot_world_head[0:3, 3]
# =====coordinate origin offset=====
Brobot_world_head_rot = Brobot_world_head[:3, :3]
# The origin of the coordinate for IK Solve is near the WAIST joint motor. You can use teleop/robot_control/robot_arm_ik.py Unit_Test to visualize it.
# The origin of the coordinate of IPunitree_Brobot_head_arm is HEAD.
# So it is necessary to translate the origin of IPunitree_Brobot_head_arm from HEAD to WAIST.
left_IPunitree_Brobot_waist_arm = left_IPunitree_Brobot_head_arm.copy()
right_IPunitree_Brobot_waist_arm = right_IPunitree_Brobot_head_arm.copy()
left_IPunitree_Brobot_waist_arm[0, 3] +=0.15 # x
right_IPunitree_Brobot_waist_arm[0,3] +=0.15
left_IPunitree_Brobot_waist_arm[2, 3] +=0.45 # z
right_IPunitree_Brobot_waist_arm[2,3] +=0.45
# -----------------------------------hand position----------------------------------------
if left_arm_is_valid and right_arm_is_valid:
# Homogeneous, [xyz] to [xyz1]
# np.concatenate([25,3]^T,(1,25)) ==> Bxr_world_hand_pos.shape is (4,25)
# Now under (basis) OpenXR Convention, Bxr_world_hand_pos data like this:
# [x0 x1 x2 ··· x23 x24]
# [y0 y1 y1 ··· y23 y24]
# [z0 z1 z2 ··· z23 z24]
# [ 1 1 1 ··· 1 1]
left_IPxr_Bxr_world_hand_pos = np.concatenate([self.tvuer.left_hand_positions.T, np.ones((1, self.tvuer.left_hand_positions.shape[0]))])
right_IPxr_Bxr_world_hand_pos = np.concatenate([self.tvuer.right_hand_positions.T, np.ones((1, self.tvuer.right_hand_positions.shape[0]))])
# Change basis convention
# From (basis) OpenXR Convention to (basis) Robot Convention
# Just a change of basis for 3D points. No rotation, only translation. So, no need to right-multiply fast_mat_inv(T_ROBOT_OPENXR).
left_IPxr_Brobot_world_hand_pos = T_ROBOT_OPENXR @ left_IPxr_Bxr_world_hand_pos
right_IPxr_Brobot_world_hand_pos = T_ROBOT_OPENXR @ right_IPxr_Bxr_world_hand_pos
# Transfer from WORLD to ARM frame under (basis) Robot Convention:
# Brobot_{world}_{arm}^T * Brobot_{world}_pos ==> Brobot_{arm}_{world} * Brobot_{world}_pos ==> Brobot_arm_hand_pos, Now it's based on the arm frame.
left_IPxr_Brobot_arm_hand_pos = fast_mat_inv(left_IPxr_Brobot_world_arm) @ left_IPxr_Brobot_world_hand_pos
right_IPxr_Brobot_arm_hand_pos = fast_mat_inv(right_IPxr_Brobot_world_arm) @ right_IPxr_Brobot_world_hand_pos
# Change initial pose convention
# From (initial pose) XR Hand Convention to (initial pose) Unitree Humanoid Hand URDF Convention:
# T_TO_UNITREE_HAND @ IPxr_Brobot_arm_hand_pos ==> IPunitree_Brobot_arm_hand_pos
# ((4,4) @ (4,25))[0:3, :].T ==> (4,25)[0:3, :].T ==> (3,25).T ==> (25,3)
# Now under (initial pose) Unitree Humanoid Hand URDF Convention, matrix shape like this:
# [x0, y0, z0]
# [x1, y1, z1]
# ···
# [x23,y23,z23]
# [x24,y24,z24]
left_IPunitree_Brobot_arm_hand_pos = (T_TO_UNITREE_HAND @ left_IPxr_Brobot_arm_hand_pos)[0:3, :].T
right_IPunitree_Brobot_arm_hand_pos = (T_TO_UNITREE_HAND @ right_IPxr_Brobot_arm_hand_pos)[0:3, :].T
else:
left_IPunitree_Brobot_arm_hand_pos = np.zeros((25, 3))
right_IPunitree_Brobot_arm_hand_pos = np.zeros((25, 3))
# -----------------------------------hand rotation----------------------------------------
if self.return_hand_rot_data:
left_Bxr_world_hand_rot, left_hand_rot_is_valid = safe_rot_update(CONST_HAND_ROT, self.tvuer.left_hand_orientations) # [25, 3, 3]
right_Bxr_world_hand_rot, right_hand_rot_is_valid = safe_rot_update(CONST_HAND_ROT, self.tvuer.right_hand_orientations)
if left_hand_rot_is_valid and right_hand_rot_is_valid:
left_Bxr_arm_hand_rot = np.einsum('ij,njk->nik', left_IPxr_Bxr_world_arm[:3, :3].T, left_Bxr_world_hand_rot)
right_Bxr_arm_hand_rot = np.einsum('ij,njk->nik', right_IPxr_Bxr_world_arm[:3, :3].T, right_Bxr_world_hand_rot)
# Change basis convention
left_Brobot_arm_hand_rot = np.einsum('ij,njk,kl->nil', R_ROBOT_OPENXR, left_Bxr_arm_hand_rot, R_OPENXR_ROBOT)
right_Brobot_arm_hand_rot = np.einsum('ij,njk,kl->nil', R_ROBOT_OPENXR, right_Bxr_arm_hand_rot, R_OPENXR_ROBOT)
else:
left_Brobot_arm_hand_rot = left_Bxr_world_hand_rot
right_Brobot_arm_hand_rot = right_Bxr_world_hand_rot
else:
left_Brobot_arm_hand_rot = None
right_Brobot_arm_hand_rot = None
if self.return_state_data:
hand_state = TeleStateData(
left_pinch_state=self.tvuer.left_hand_pinch_state,
left_squeeze_state=self.tvuer.left_hand_squeeze_state,
left_squeeze_value=self.tvuer.left_hand_squeeze_value,
right_pinch_state=self.tvuer.right_hand_pinch_state,
right_squeeze_state=self.tvuer.right_hand_squeeze_state,
right_squeeze_value=self.tvuer.right_hand_squeeze_value,
)
else:
hand_state = None
return TeleData(
head_rotation=Brobot_world_head_rot,
left_arm_pose=left_IPunitree_Brobot_waist_arm,
right_arm_pose=right_IPunitree_Brobot_waist_arm,
left_hand_pos=left_IPunitree_Brobot_arm_hand_pos,
right_hand_pos=right_IPunitree_Brobot_arm_hand_pos,
left_hand_rot=left_Brobot_arm_hand_rot,
right_hand_rot=right_Brobot_arm_hand_rot,
left_pinch_value=self.tvuer.left_hand_pinch_value * 100.0,
right_pinch_value=self.tvuer.right_hand_pinch_value * 100.0,
tele_state=hand_state
)
else:
# Controller pose data directly follows the (initial pose) Unitree Humanoid Arm URDF Convention (thus no transform is needed).
left_IPunitree_Bxr_world_arm, left_arm_is_valid = safe_mat_update(CONST_LEFT_ARM_POSE, self.tvuer.left_arm_pose)
right_IPunitree_Bxr_world_arm, right_arm_is_valid = safe_mat_update(CONST_RIGHT_ARM_POSE, self.tvuer.right_arm_pose)
# Change basis convention
Brobot_world_head = T_ROBOT_OPENXR @ Bxr_world_head @ T_OPENXR_ROBOT
left_IPunitree_Brobot_world_arm = T_ROBOT_OPENXR @ left_IPunitree_Bxr_world_arm @ T_OPENXR_ROBOT
right_IPunitree_Brobot_world_arm = T_ROBOT_OPENXR @ right_IPunitree_Bxr_world_arm @ T_OPENXR_ROBOT
# Transfer from WORLD to HEAD coordinate (translation adjustment only)
left_IPunitree_Brobot_head_arm = left_IPunitree_Brobot_world_arm.copy()
right_IPunitree_Brobot_head_arm = right_IPunitree_Brobot_world_arm.copy()
left_IPunitree_Brobot_head_arm[0:3, 3] = left_IPunitree_Brobot_head_arm[0:3, 3] - Brobot_world_head[0:3, 3]
right_IPunitree_Brobot_head_arm[0:3, 3] = right_IPunitree_Brobot_head_arm[0:3, 3] - Brobot_world_head[0:3, 3]
# =====coordinate origin offset=====
Brobot_world_head_rot = Brobot_world_head[:3, :3]
# The origin of the coordinate for IK Solve is near the WAIST joint motor. You can use teleop/robot_control/robot_arm_ik.py Unit_Test to check it.
# The origin of the coordinate of IPunitree_Brobot_head_arm is HEAD.
# So it is necessary to translate the origin of IPunitree_Brobot_head_arm from HEAD to WAIST.
left_IPunitree_Brobot_waist_arm = left_IPunitree_Brobot_head_arm.copy()
right_IPunitree_Brobot_waist_arm = right_IPunitree_Brobot_head_arm.copy()
left_IPunitree_Brobot_waist_arm[0, 3] +=0.15 # x
right_IPunitree_Brobot_waist_arm[0,3] +=0.15
left_IPunitree_Brobot_waist_arm[2, 3] +=0.45 # z
right_IPunitree_Brobot_waist_arm[2,3] +=0.45
# left_IPunitree_Brobot_waist_arm[1, 3] +=0.02 # y
# right_IPunitree_Brobot_waist_arm[1,3] +=0.02
# return data
if self.return_state_data:
controller_state = TeleStateData(
left_trigger_state=self.tvuer.left_controller_trigger_state,
left_squeeze_ctrl_state=self.tvuer.left_controller_squeeze_state,
left_squeeze_ctrl_value=self.tvuer.left_controller_squeeze_value,
left_thumbstick_state=self.tvuer.left_controller_thumbstick_state,
left_thumbstick_value=self.tvuer.left_controller_thumbstick_value,
left_aButton=self.tvuer.left_controller_aButton,
left_bButton=self.tvuer.left_controller_bButton,
right_trigger_state=self.tvuer.right_controller_trigger_state,
right_squeeze_ctrl_state=self.tvuer.right_controller_squeeze_state,
right_squeeze_ctrl_value=self.tvuer.right_controller_squeeze_value,
right_thumbstick_state=self.tvuer.right_controller_thumbstick_state,
right_thumbstick_value=self.tvuer.right_controller_thumbstick_value,
right_aButton=self.tvuer.right_controller_aButton,
right_bButton=self.tvuer.right_controller_bButton,
)
else:
controller_state = None
return TeleData(
head_rotation=Brobot_world_head_rot,
left_arm_pose=left_IPunitree_Brobot_waist_arm,
right_arm_pose=right_IPunitree_Brobot_waist_arm,
left_trigger_value=10.0 - self.tvuer.left_controller_trigger_value * 10,
right_trigger_value=10.0 - self.tvuer.right_controller_trigger_value * 10,
tele_state=controller_state
)

5
teleop/robot_control/robot_arm_ik.py

@ -84,8 +84,9 @@ class G1_29_ArmIK:
# frame = self.reduced_robot.model.frames[i] # frame = self.reduced_robot.model.frames[i]
# frame_id = self.reduced_robot.model.getFrameId(frame.name) # frame_id = self.reduced_robot.model.getFrameId(frame.name)
# print(f"Frame ID: {frame_id}, Name: {frame.name}") # print(f"Frame ID: {frame_id}, Name: {frame.name}")
for idx, name in enumerate(self.reduced_robot.model.names):
print(f"{idx}: {name}")
# for idx, name in enumerate(self.reduced_robot.model.names):
# print(f"{idx}: {name}")
# Creating Casadi models and data for symbolic computing # Creating Casadi models and data for symbolic computing
self.cmodel = cpin.Model(self.reduced_robot.model) self.cmodel = cpin.Model(self.reduced_robot.model)
self.cdata = self.cmodel.createData() self.cdata = self.cmodel.createData()

6
teleop/robot_control/robot_hand_inspire.py

@ -98,8 +98,10 @@ class Inspire_Controller:
while self.running: while self.running:
start_time = time.time() start_time = time.time()
# get dual hand state # get dual hand state
left_hand_mat = np.array(left_hand_array[:]).reshape(25, 3).copy()
right_hand_mat = np.array(right_hand_array[:]).reshape(25, 3).copy()
with left_hand_array.get_lock():
left_hand_mat = np.array(left_hand_array[:]).reshape(25, 3).copy()
with right_hand_array.get_lock():
right_hand_mat = np.array(right_hand_array[:]).reshape(25, 3).copy()
# Read left and right q_state from shared arrays # Read left and right q_state from shared arrays
state_data = np.concatenate((np.array(left_hand_state_array[:]), np.array(right_hand_state_array[:]))) state_data = np.concatenate((np.array(left_hand_state_array[:]), np.array(right_hand_state_array[:])))

70
teleop/robot_control/robot_hand_unitree.py

@ -30,20 +30,20 @@ kTopicDex3RightState = "rt/dex3/right/state"
class Dex3_1_Controller: class Dex3_1_Controller:
def __init__(self, left_hand_array, right_hand_array, dual_hand_data_lock = None, dual_hand_state_array = None,
dual_hand_action_array = None, fps = 100.0, Unit_Test = False):
def __init__(self, left_hand_array_in, right_hand_array_in, dual_hand_data_lock = None, dual_hand_state_array_out = None,
dual_hand_action_array_out = None, fps = 100.0, Unit_Test = False):
""" """
[note] A *_array type parameter requires using a multiprocessing Array, because it needs to be passed to the internal child process [note] A *_array type parameter requires using a multiprocessing Array, because it needs to be passed to the internal child process
left_hand_array: [input] Left hand skeleton data (required from XR device) to hand_ctrl.control_process
left_hand_array_in: [input] Left hand skeleton data (required from XR device) to hand_ctrl.control_process
right_hand_array: [input] Right hand skeleton data (required from XR device) to hand_ctrl.control_process
right_hand_array_in: [input] Right hand skeleton data (required from XR device) to hand_ctrl.control_process
dual_hand_data_lock: Data synchronization lock for dual_hand_state_array and dual_hand_action_array dual_hand_data_lock: Data synchronization lock for dual_hand_state_array and dual_hand_action_array
dual_hand_state_array: [output] Return left(7), right(7) hand motor state
dual_hand_state_array_out: [output] Return left(7), right(7) hand motor state
dual_hand_action_array: [output] Return left(7), right(7) hand motor action
dual_hand_action_array_out: [output] Return left(7), right(7) hand motor action
fps: Control frequency fps: Control frequency
@ -85,8 +85,8 @@ class Dex3_1_Controller:
time.sleep(0.01) time.sleep(0.01)
print("[Dex3_1_Controller] Waiting to subscribe dds...") print("[Dex3_1_Controller] Waiting to subscribe dds...")
hand_control_process = Process(target=self.control_process, args=(left_hand_array, right_hand_array, self.left_hand_state_array, self.right_hand_state_array,
dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array))
hand_control_process = Process(target=self.control_process, args=(left_hand_array_in, right_hand_array_in, self.left_hand_state_array, self.right_hand_state_array,
dual_hand_data_lock, dual_hand_state_array_out, dual_hand_action_array_out))
hand_control_process.daemon = True hand_control_process.daemon = True
hand_control_process.start() hand_control_process.start()
@ -129,8 +129,8 @@ class Dex3_1_Controller:
self.RightHandCmb_publisher.Write(self.right_msg) self.RightHandCmb_publisher.Write(self.right_msg)
# print("hand ctrl publish ok.") # print("hand ctrl publish ok.")
def control_process(self, left_hand_array, right_hand_array, left_hand_state_array, right_hand_state_array,
dual_hand_data_lock = None, dual_hand_state_array = None, dual_hand_action_array = None):
def control_process(self, left_hand_array_in, right_hand_array_in, left_hand_state_array, right_hand_state_array,
dual_hand_data_lock = None, dual_hand_state_array_out = None, dual_hand_action_array_out = None):
self.running = True self.running = True
left_q_target = np.full(Dex3_Num_Motors, 0) left_q_target = np.full(Dex3_Num_Motors, 0)
@ -170,15 +170,17 @@ class Dex3_1_Controller:
while self.running: while self.running:
start_time = time.time() start_time = time.time()
# get dual hand state # get dual hand state
left_hand_mat = np.array(left_hand_array[:]).reshape(25, 3).copy()
right_hand_mat = np.array(right_hand_array[:]).reshape(25, 3).copy()
with left_hand_array_in.get_lock():
left_hand_data = np.array(left_hand_array_in[:]).reshape(25, 3).copy()
with right_hand_array_in.get_lock():
right_hand_data = np.array(right_hand_array_in[:]).reshape(25, 3).copy()
# Read left and right q_state from shared arrays # Read left and right q_state from shared arrays
state_data = np.concatenate((np.array(left_hand_state_array[:]), np.array(right_hand_state_array[:]))) state_data = np.concatenate((np.array(left_hand_state_array[:]), np.array(right_hand_state_array[:])))
if not np.all(right_hand_mat == 0.0) and not np.all(left_hand_mat[4] == np.array([-1.13, 0.3, 0.15])): # if hand data has been initialized.
ref_left_value = left_hand_mat[unitree_tip_indices]
ref_right_value = right_hand_mat[unitree_tip_indices]
if not np.all(right_hand_data == 0.0) and not np.all(left_hand_data[4] == np.array([-1.13, 0.3, 0.15])): # if hand data has been initialized.
ref_left_value = left_hand_data[unitree_tip_indices]
ref_right_value = right_hand_data[unitree_tip_indices]
ref_left_value[0] = ref_left_value[0] * 1.15 ref_left_value[0] = ref_left_value[0] * 1.15
ref_left_value[1] = ref_left_value[1] * 1.05 ref_left_value[1] = ref_left_value[1] * 1.05
ref_left_value[2] = ref_left_value[2] * 0.95 ref_left_value[2] = ref_left_value[2] * 0.95
@ -191,10 +193,10 @@ class Dex3_1_Controller:
# get dual hand action # get dual hand action
action_data = np.concatenate((left_q_target, right_q_target)) action_data = np.concatenate((left_q_target, right_q_target))
if dual_hand_state_array and dual_hand_action_array:
if dual_hand_state_array_out and dual_hand_action_array_out:
with dual_hand_data_lock: with dual_hand_data_lock:
dual_hand_state_array[:] = state_data
dual_hand_action_array[:] = action_data
dual_hand_state_array_out[:] = state_data
dual_hand_action_array_out[:] = action_data
self.ctrl_dual_hand(left_q_target, right_q_target) self.ctrl_dual_hand(left_q_target, right_q_target)
current_time = time.time() current_time = time.time()
@ -229,20 +231,20 @@ kTopicGripperCommand = "rt/unitree_actuator/cmd"
kTopicGripperState = "rt/unitree_actuator/state" kTopicGripperState = "rt/unitree_actuator/state"
class Gripper_Controller: class Gripper_Controller:
def __init__(self, left_hand_array, right_hand_array, dual_gripper_data_lock = None, dual_gripper_state_out = None, dual_gripper_action_out = None,
def __init__(self, left_gripper_value_in, right_gripper_value_in, dual_gripper_data_lock = None, dual_gripper_state_out = None, dual_gripper_action_out = None,
filter = True, fps = 200.0, Unit_Test = False): filter = True, fps = 200.0, Unit_Test = False):
""" """
[note] A *_array type parameter requires using a multiprocessing Array, because it needs to be passed to the internal child process [note] A *_array type parameter requires using a multiprocessing Array, because it needs to be passed to the internal child process
left_hand_array: [input] Left hand skeleton data (required from XR device) to control_thread
left_gripper_value_in: [input] Left ctrl data (required from XR device) to control_thread
right_hand_array: [input] Right hand skeleton data (required from XR device) to control_thread
right_gripper_value_in: [input] Right ctrl data (required from XR device) to control_thread
dual_gripper_data_lock: Data synchronization lock for dual_gripper_state_array and dual_gripper_action_array dual_gripper_data_lock: Data synchronization lock for dual_gripper_state_array and dual_gripper_action_array
dual_gripper_state: [output] Return left(1), right(1) gripper motor state
dual_gripper_state_out: [output] Return left(1), right(1) gripper motor state
dual_gripper_action: [output] Return left(1), right(1) gripper motor action
dual_gripper_action_out: [output] Return left(1), right(1) gripper motor action
fps: Control frequency fps: Control frequency
@ -281,7 +283,7 @@ class Gripper_Controller:
time.sleep(0.01) time.sleep(0.01)
print("[Gripper_Controller] Waiting to subscribe dds...") print("[Gripper_Controller] Waiting to subscribe dds...")
self.gripper_control_thread = threading.Thread(target=self.control_thread, args=(left_hand_array, right_hand_array, self.dual_gripper_state,
self.gripper_control_thread = threading.Thread(target=self.control_thread, args=(left_gripper_value_in, right_gripper_value_in, self.dual_gripper_state,
dual_gripper_data_lock, dual_gripper_state_out, dual_gripper_action_out)) dual_gripper_data_lock, dual_gripper_state_out, dual_gripper_action_out))
self.gripper_control_thread.daemon = True self.gripper_control_thread.daemon = True
self.gripper_control_thread.start() self.gripper_control_thread.start()
@ -304,13 +306,13 @@ class Gripper_Controller:
self.GripperCmb_publisher.Write(self.gripper_msg) self.GripperCmb_publisher.Write(self.gripper_msg)
# print("gripper ctrl publish ok.") # print("gripper ctrl publish ok.")
def control_thread(self, left_hand_array, right_hand_array, dual_gripper_state_in, dual_hand_data_lock = None,
def control_thread(self, left_gripper_value_in, right_gripper_value_in, dual_gripper_state_in, dual_hand_data_lock = None,
dual_gripper_state_out = None, dual_gripper_action_out = None): dual_gripper_state_out = None, dual_gripper_action_out = None):
self.running = True self.running = True
DELTA_GRIPPER_CMD = 0.18 # The motor rotates 5.4 radians, the clamping jaw slide open 9 cm, so 0.6 rad <==> 1 cm, 0.18 rad <==> 3 mm DELTA_GRIPPER_CMD = 0.18 # The motor rotates 5.4 radians, the clamping jaw slide open 9 cm, so 0.6 rad <==> 1 cm, 0.18 rad <==> 3 mm
THUMB_INDEX_DISTANCE_MIN = 0.05 # Assuming a minimum Euclidean distance is 5 cm between thumb and index.
THUMB_INDEX_DISTANCE_MAX = 0.07 # Assuming a maximum Euclidean distance is 9 cm between thumb and index.
THUMB_INDEX_DISTANCE_MIN = 5.0
THUMB_INDEX_DISTANCE_MAX = 7.0
LEFT_MAPPED_MIN = 0.0 # The minimum initial motor position when the gripper closes at startup. LEFT_MAPPED_MIN = 0.0 # The minimum initial motor position when the gripper closes at startup.
RIGHT_MAPPED_MIN = 0.0 # The minimum initial motor position when the gripper closes at startup. RIGHT_MAPPED_MIN = 0.0 # The minimum initial motor position when the gripper closes at startup.
# The maximum initial motor position when the gripper closes before calibration (with the rail stroke calculated as 0.6 cm/rad * 9 rad = 5.4 cm). # The maximum initial motor position when the gripper closes before calibration (with the rail stroke calculated as 0.6 cm/rad * 9 rad = 5.4 cm).
@ -336,15 +338,15 @@ class Gripper_Controller:
while self.running: while self.running:
start_time = time.time() start_time = time.time()
# get dual hand skeletal point state from XR device # get dual hand skeletal point state from XR device
left_hand_mat = np.array(left_hand_array[:]).reshape(25, 3).copy()
right_hand_mat = np.array(right_hand_array[:]).reshape(25, 3).copy()
with left_gripper_value_in.get_lock():
left_gripper_value = left_gripper_value_in
with right_gripper_value_in.get_lock():
right_gripper_value = right_gripper_value_in
if not np.all(right_hand_mat == 0.0) and not np.all(left_hand_mat[4] == np.array([-1.13, 0.3, 0.15])): # if hand data has been initialized.
left_euclidean_distance = np.linalg.norm(left_hand_mat[unitree_gripper_indices[1]] - left_hand_mat[unitree_gripper_indices[0]])
right_euclidean_distance = np.linalg.norm(right_hand_mat[unitree_gripper_indices[1]] - right_hand_mat[unitree_gripper_indices[0]])
if left_gripper_value != 0.0 or right_gripper_value != 0.0: # if hand data has been initialized.
# Linear mapping from [0, THUMB_INDEX_DISTANCE_MAX] to gripper action range # Linear mapping from [0, THUMB_INDEX_DISTANCE_MAX] to gripper action range
left_target_action = np.interp(left_euclidean_distance, [THUMB_INDEX_DISTANCE_MIN, THUMB_INDEX_DISTANCE_MAX], [LEFT_MAPPED_MIN, LEFT_MAPPED_MAX])
right_target_action = np.interp(right_euclidean_distance, [THUMB_INDEX_DISTANCE_MIN, THUMB_INDEX_DISTANCE_MAX], [RIGHT_MAPPED_MIN, RIGHT_MAPPED_MAX])
left_target_action = np.interp(left_gripper_value, [THUMB_INDEX_DISTANCE_MIN, THUMB_INDEX_DISTANCE_MAX], [LEFT_MAPPED_MIN, LEFT_MAPPED_MAX])
right_target_action = np.interp(right_gripper_value, [THUMB_INDEX_DISTANCE_MIN, THUMB_INDEX_DISTANCE_MAX], [RIGHT_MAPPED_MIN, RIGHT_MAPPED_MAX])
# else: # TEST WITHOUT XR DEVICE # else: # TEST WITHOUT XR DEVICE
# current_time = time.time() # current_time = time.time()
# period = 2.5 # period = 2.5

90
teleop/teleop_hand_and_arm.py

@ -2,7 +2,7 @@ import numpy as np
import time import time
import argparse import argparse
import cv2 import cv2
from multiprocessing import shared_memory, Array, Lock
from multiprocessing import shared_memory, Value, Array, Lock
import threading import threading
import os import os
@ -11,7 +11,7 @@ current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(current_dir) parent_dir = os.path.dirname(current_dir)
sys.path.append(parent_dir) sys.path.append(parent_dir)
from teleop.open_television.tv_wrapper import TeleVisionWrapper
from teleop.open_television import TeleVisionWrapper
from teleop.robot_control.robot_arm import G1_29_ArmController, G1_23_ArmController, H1_2_ArmController, H1_ArmController from teleop.robot_control.robot_arm import G1_29_ArmController, G1_23_ArmController, H1_2_ArmController, H1_ArmController
from teleop.robot_control.robot_arm_ik import G1_29_ArmIK, G1_23_ArmIK, H1_2_ArmIK, H1_ArmIK from teleop.robot_control.robot_arm_ik import G1_29_ArmIK, G1_23_ArmIK, H1_2_ArmIK, H1_ArmIK
from teleop.robot_control.robot_hand_unitree import Dex3_1_Controller, Gripper_Controller from teleop.robot_control.robot_hand_unitree import Dex3_1_Controller, Gripper_Controller
@ -23,14 +23,15 @@ from teleop.utils.episode_writer import EpisodeWriter
if __name__ == '__main__': if __name__ == '__main__':
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('--task_dir', type = str, default = './utils/data', help = 'path to save data') parser.add_argument('--task_dir', type = str, default = './utils/data', help = 'path to save data')
parser.add_argument('--frequency', type = int, default = 30.0, help = 'save data\'s frequency')
parser.add_argument('--frequency', type = float, default = 30.0, help = 'save data\'s frequency')
parser.add_argument('--record', action = 'store_true', help = 'Save data or not') parser.add_argument('--record', action = 'store_true', help = 'Save data or not')
parser.add_argument('--no-record', dest = 'record', action = 'store_false', help = 'Do not save data') parser.add_argument('--no-record', dest = 'record', action = 'store_false', help = 'Do not save data')
parser.set_defaults(record = False) parser.set_defaults(record = False)
parser.add_argument('--xr_mode', type=str, choices=['hand', 'controller'], default='hand', help='Select XR device tracking source')
parser.add_argument('--arm', type=str, choices=['G1_29', 'G1_23', 'H1_2', 'H1'], default='G1_29', help='Select arm controller') parser.add_argument('--arm', type=str, choices=['G1_29', 'G1_23', 'H1_2', 'H1'], default='G1_29', help='Select arm controller')
parser.add_argument('--hand', type=str, choices=['dex3', 'gripper', 'inspire1'], help='Select hand controller')
parser.add_argument('--ee', type=str, choices=['dex3', 'gripper', 'inspire1'], help='Select end effector controller')
args = parser.parse_args() args = parser.parse_args()
print(f"args:{args}\n") print(f"args:{args}\n")
@ -77,7 +78,7 @@ if __name__ == '__main__':
image_receive_thread.start() image_receive_thread.start()
# television: obtain hand pose data from the XR device and transmit the robot's head camera image to the XR device. # television: obtain hand pose data from the XR device and transmit the robot's head camera image to the XR device.
tv_wrapper = TeleVisionWrapper(BINOCULAR, tv_img_shape, tv_img_shm.name)
tv_wrapper = TeleVisionWrapper(BINOCULAR, args.xr_mode == 'hand', tv_img_shape, tv_img_shm.name)
# arm # arm
if args.arm == 'G1_29': if args.arm == 'G1_29':
@ -93,28 +94,28 @@ if __name__ == '__main__':
arm_ctrl = H1_ArmController() arm_ctrl = H1_ArmController()
arm_ik = H1_ArmIK() arm_ik = H1_ArmIK()
# hand
if args.hand == "dex3":
left_hand_array = Array('d', 75, lock = True) # [input]
right_hand_array = Array('d', 75, lock = True) # [input]
# end-effector
if args.ee == "dex3":
left_hand_pos_array = Array('d', 75, lock = True) # [input]
right_hand_pos_array = Array('d', 75, lock = True) # [input]
dual_hand_data_lock = Lock() dual_hand_data_lock = Lock()
dual_hand_state_array = Array('d', 14, lock = False) # [output] current left, right hand state(14) data.
dual_hand_action_array = Array('d', 14, lock = False) # [output] current left, right hand action(14) data.
hand_ctrl = Dex3_1_Controller(left_hand_array, right_hand_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array)
elif args.hand == "gripper":
left_hand_array = Array('d', 75, lock=True)
right_hand_array = Array('d', 75, lock=True)
dual_hand_state_array = Array('d', 14, lock = False) # [output] current left, right hand state(14) data.
dual_hand_action_array = Array('d', 14, lock = False) # [output] current left, right hand action(14) data.
hand_ctrl = Dex3_1_Controller(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array)
elif args.ee == "gripper":
left_gripper_value = Value('d', 0.0, lock=True) # [input]
right_gripper_value = Value('d', 0.0, lock=True) # [input]
dual_gripper_data_lock = Lock() dual_gripper_data_lock = Lock()
dual_gripper_state_array = Array('d', 2, lock=False) # current left, right gripper state(2) data. dual_gripper_state_array = Array('d', 2, lock=False) # current left, right gripper state(2) data.
dual_gripper_action_array = Array('d', 2, lock=False) # current left, right gripper action(2) data. dual_gripper_action_array = Array('d', 2, lock=False) # current left, right gripper action(2) data.
gripper_ctrl = Gripper_Controller(left_hand_array, right_hand_array, dual_gripper_data_lock, dual_gripper_state_array, dual_gripper_action_array)
elif args.hand == "inspire1":
left_hand_array = Array('d', 75, lock = True) # [input]
right_hand_array = Array('d', 75, lock = True) # [input]
gripper_ctrl = Gripper_Controller(left_gripper_value, right_gripper_value, dual_gripper_data_lock, dual_gripper_state_array, dual_gripper_action_array)
elif args.ee == "inspire1":
left_hand_pos_array = Array('d', 75, lock = True) # [input]
right_hand_pos_array = Array('d', 75, lock = True) # [input]
dual_hand_data_lock = Lock() dual_hand_data_lock = Lock()
dual_hand_state_array = Array('d', 12, lock = False) # [output] current left, right hand state(12) data. dual_hand_state_array = Array('d', 12, lock = False) # [output] current left, right hand state(12) data.
dual_hand_action_array = Array('d', 12, lock = False) # [output] current left, right hand action(12) data. dual_hand_action_array = Array('d', 12, lock = False) # [output] current left, right hand action(12) data.
hand_ctrl = Inspire_Controller(left_hand_array, right_hand_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array)
hand_ctrl = Inspire_Controller(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array)
else: else:
pass pass
@ -126,16 +127,28 @@ if __name__ == '__main__':
user_input = input("Please enter the start signal (enter 'r' to start the subsequent program):\n") user_input = input("Please enter the start signal (enter 'r' to start the subsequent program):\n")
if user_input.lower() == 'r': if user_input.lower() == 'r':
arm_ctrl.speed_gradual_max() arm_ctrl.speed_gradual_max()
running = True running = True
while running: while running:
start_time = time.time() start_time = time.time()
head_rmat, left_wrist, right_wrist, left_hand, right_hand = tv_wrapper.get_data()
tele_data = tv_wrapper.get_motion_state_data()
# send hand skeleton data to hand_ctrl.control_process
if args.hand:
left_hand_array[:] = left_hand.flatten()
right_hand_array[:] = right_hand.flatten()
if (args.ee == 'dex3' or args.ee == 'inspire1') and args.xr_mode == 'hand':
with left_hand_pos_array.get_lock():
left_hand_pos_array[:] = tele_data.left_hand_pos.flatten()
with right_hand_pos_array.get_lock():
right_hand_pos_array[:] = tele_data.right_hand_pos.flatten()
elif args.ee == 'gripper' and args.xr_mode == 'controller':
with left_gripper_value.get_lock():
left_gripper_value.value = tele_data.left_trigger_value
with right_gripper_value.get_lock():
right_gripper_value.value = tele_data.right_trigger_value
elif args.ee == 'gripper' and args.xr_mode == 'hand':
with left_gripper_value.get_lock():
left_gripper_value.value = tele_data.left_pinch_value
with right_gripper_value.get_lock():
right_gripper_value.value = tele_data.right_pinch_value
else:
pass
# get current state data. # get current state data.
current_lr_arm_q = arm_ctrl.get_current_dual_arm_q() current_lr_arm_q = arm_ctrl.get_current_dual_arm_q()
@ -143,7 +156,7 @@ if __name__ == '__main__':
# solve ik using motor data and wrist pose, then use ik results to control arms. # solve ik using motor data and wrist pose, then use ik results to control arms.
time_ik_start = time.time() time_ik_start = time.time()
sol_q, sol_tauff = arm_ik.solve_ik(left_wrist, right_wrist, current_lr_arm_q, current_lr_arm_dq)
sol_q, sol_tauff = arm_ik.solve_ik(tele_data.left_arm_pose, tele_data.right_arm_pose, current_lr_arm_q, current_lr_arm_dq)
time_ik_end = time.time() time_ik_end = time.time()
# print(f"ik:\t{round(time_ik_end - time_ik_start, 6)}") # print(f"ik:\t{round(time_ik_end - time_ik_start, 6)}")
arm_ctrl.ctrl_dual_arm(sol_q, sol_tauff) arm_ctrl.ctrl_dual_arm(sol_q, sol_tauff)
@ -164,27 +177,29 @@ if __name__ == '__main__':
# record data # record data
if args.record: if args.record:
# dex hand or gripper # dex hand or gripper
if args.hand == "dex3":
if args.ee == "dex3":
with dual_hand_data_lock: with dual_hand_data_lock:
left_hand_state = dual_hand_state_array[:7] left_hand_state = dual_hand_state_array[:7]
right_hand_state = dual_hand_state_array[-7:] right_hand_state = dual_hand_state_array[-7:]
left_hand_action = dual_hand_action_array[:7] left_hand_action = dual_hand_action_array[:7]
right_hand_action = dual_hand_action_array[-7:] right_hand_action = dual_hand_action_array[-7:]
elif args.hand == "gripper":
elif args.ee == "gripper":
with dual_gripper_data_lock: with dual_gripper_data_lock:
left_hand_state = [dual_gripper_state_array[1]] left_hand_state = [dual_gripper_state_array[1]]
right_hand_state = [dual_gripper_state_array[0]] right_hand_state = [dual_gripper_state_array[0]]
left_hand_action = [dual_gripper_action_array[1]] left_hand_action = [dual_gripper_action_array[1]]
right_hand_action = [dual_gripper_action_array[0]] right_hand_action = [dual_gripper_action_array[0]]
elif args.hand == "inspire1":
elif args.ee == "inspire1":
with dual_hand_data_lock: with dual_hand_data_lock:
left_hand_state = dual_hand_state_array[:6] left_hand_state = dual_hand_state_array[:6]
right_hand_state = dual_hand_state_array[-6:] right_hand_state = dual_hand_state_array[-6:]
left_hand_action = dual_hand_action_array[:6] left_hand_action = dual_hand_action_array[:6]
right_hand_action = dual_hand_action_array[-6:] right_hand_action = dual_hand_action_array[-6:]
else: else:
print("No dexterous hand set.")
pass
left_hand_state = []
right_hand_state = []
left_hand_action = []
right_hand_action = []
# head image # head image
current_tv_image = tv_img_array.copy() current_tv_image = tv_img_array.copy()
# wrist image # wrist image
@ -195,7 +210,6 @@ if __name__ == '__main__':
right_arm_state = current_lr_arm_q[-7:] right_arm_state = current_lr_arm_q[-7:]
left_arm_action = sol_q[:7] left_arm_action = sol_q[:7]
right_arm_action = sol_q[-7:] right_arm_action = sol_q[-7:]
if recording: if recording:
colors = {} colors = {}
depths = {} depths = {}
@ -221,12 +235,12 @@ if __name__ == '__main__':
"qvel": [], "qvel": [],
"torque": [], "torque": [],
}, },
"left_hand": {
"left_hand_pos": {
"qpos": left_hand_state, "qpos": left_hand_state,
"qvel": [], "qvel": [],
"torque": [], "torque": [],
}, },
"right_hand": {
"right_hand_pos": {
"qpos": right_hand_state, "qpos": right_hand_state,
"qvel": [], "qvel": [],
"torque": [], "torque": [],
@ -244,12 +258,12 @@ if __name__ == '__main__':
"qvel": [], "qvel": [],
"torque": [], "torque": [],
}, },
"left_hand": {
"left_hand_pos": {
"qpos": left_hand_action, "qpos": left_hand_action,
"qvel": [], "qvel": [],
"torque": [], "torque": [],
}, },
"right_hand": {
"right_hand_pos": {
"qpos": right_hand_action, "qpos": right_hand_action,
"qvel": [], "qvel": [],
"torque": [], "torque": [],
@ -260,7 +274,7 @@ if __name__ == '__main__':
current_time = time.time() current_time = time.time()
time_elapsed = current_time - start_time time_elapsed = current_time - start_time
sleep_time = max(0, (1 / float(args.frequency)) - time_elapsed)
sleep_time = max(0, (1 / args.frequency) - time_elapsed)
time.sleep(sleep_time) time.sleep(sleep_time)
# print(f"main process sleep: {sleep_time}") # print(f"main process sleep: {sleep_time}")

14
teleop/utils/mat_tool.py

@ -1,14 +0,0 @@
import numpy as np
def mat_update(prev_mat, mat):
if np.linalg.det(mat) == 0:
return prev_mat, False # Return previous matrix and False flag if the new matrix is non-singular (determinant ≠ 0).
else:
return mat, True
def fast_mat_inv(mat):
ret = np.eye(4)
ret[:3, :3] = mat[:3, :3].T
ret[:3, 3] = -mat[:3, :3].T @ mat[:3, 3]
return ret
Loading…
Cancel
Save