diff --git a/requirements.txt b/requirements.txt index 178d506..281492b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,14 +1,7 @@ -aiohttp==3.9.5 -aiohttp_cors==0.7.0 -aiortc==1.8.0 -av==11.0.0 einops==0.8.0 h5py==3.11.0 ipython==8.12.3 matplotlib==3.7.5 -numpy==1.23.0 -opencv_contrib_python==4.10.0.82 -opencv_python==4.9.0.80 packaging==24.1 pandas==2.0.3 params_proto==2.12.1 @@ -20,7 +13,6 @@ seaborn==0.13.2 setuptools==69.5.1 torch==2.3.0 torchvision==0.18.0 -vuer==0.0.32rc7 rerun-sdk==0.20.1 nlopt>=2.6.1,<2.8.0 trimesh>=4.4.0 diff --git a/teleop/open_television/__init__.py b/teleop/open_television/__init__.py new file mode 100644 index 0000000..200cce1 --- /dev/null +++ b/teleop/open_television/__init__.py @@ -0,0 +1,4 @@ +# open_television/__init__.py +# openssl req -x509 -nodes -days 365 -newkey rsa:2048 -keyout key.pem -out cert.pem +from .television import TeleVision +from .tv_wrapper import TeleVisionWrapper, TeleData, TeleStateData \ No newline at end of file diff --git a/teleop/open_television/_test_television.py b/teleop/open_television/_test_television.py new file mode 100644 index 0000000..f311101 --- /dev/null +++ b/teleop/open_television/_test_television.py @@ -0,0 +1,77 @@ +import os, sys +this_file = os.path.abspath(__file__) +project_root = os.path.abspath(os.path.join(os.path.dirname(this_file), '..')) +if project_root not in sys.path: + sys.path.insert(0, project_root) + +import time +import numpy as np +from multiprocessing import shared_memory +from open_television import TeleVision + +def run_test_television(): + # image + image_shape = (480, 640 * 2, 3) + image_shm = shared_memory.SharedMemory(create=True, size=np.prod(image_shape) * np.uint8().itemsize) + image_array = np.ndarray(image_shape, dtype=np.uint8, buffer=image_shm.buf) + + # television + use_hand_track = False + tv = TeleVision(binocular = True, use_hand_tracking = use_hand_track, img_shape = image_shape, img_shm_name = image_shm.name, webrtc=True) + + try: + input("Press Enter to start television test...") + running = True + while running: + print("=" * 80) + print("Common Data (always available):") + print(f"head_pose shape: {tv.head_pose.shape}\n{tv.head_pose}\n") + print(f"left_arm_pose shape: {tv.left_arm_pose.shape}\n{tv.left_arm_pose}\n") + print(f"right_arm_pose shape: {tv.right_arm_pose.shape}\n{tv.right_arm_pose}\n") + print("=" * 80) + + if use_hand_track: + print("Hand Tracking Data:") + print(f"left_hand_positions shape: {tv.left_hand_positions.shape}\n{tv.left_hand_positions}\n") + print(f"right_hand_positions shape: {tv.right_hand_positions.shape}\n{tv.right_hand_positions}\n") + print(f"left_hand_orientations shape: {tv.left_hand_orientations.shape}\n{tv.left_hand_orientations}\n") + print(f"right_hand_orientations shape: {tv.right_hand_orientations.shape}\n{tv.right_hand_orientations}\n") + print(f"left_hand_pinch_state: {tv.left_hand_pinch_state}") + print(f"left_hand_pinch_value: {tv.left_hand_pinch_value}") + print(f"left_hand_squeeze_state: {tv.left_hand_squeeze_state}") + print(f"left_hand_squeeze_value: {tv.left_hand_squeeze_value}") + print(f"right_hand_pinch_state: {tv.right_hand_pinch_state}") + print(f"right_hand_pinch_value: {tv.right_hand_pinch_value}") + print(f"right_hand_squeeze_state: {tv.right_hand_squeeze_state}") + print(f"right_hand_squeeze_value: {tv.right_hand_squeeze_value}") + else: + print("Controller Data:") + print(f"left_controller_trigger_state: {tv.left_controller_trigger_state}") + print(f"left_controller_trigger_value: {tv.left_controller_trigger_value}") + print(f"left_controller_squeeze_state: {tv.left_controller_squeeze_state}") + print(f"left_controller_squeeze_value: {tv.left_controller_squeeze_value}") + print(f"left_controller_thumbstick_state: {tv.left_controller_thumbstick_state}") + print(f"left_controller_thumbstick_value: {tv.left_controller_thumbstick_value}") + print(f"left_controller_aButton: {tv.left_controller_aButton}") + print(f"left_controller_bButton: {tv.left_controller_bButton}") + print(f"right_controller_trigger_state: {tv.right_controller_trigger_state}") + print(f"right_controller_trigger_value: {tv.right_controller_trigger_value}") + print(f"right_controller_squeeze_state: {tv.right_controller_squeeze_state}") + print(f"right_controller_squeeze_value: {tv.right_controller_squeeze_value}") + print(f"right_controller_thumbstick_state: {tv.right_controller_thumbstick_state}") + print(f"right_controller_thumbstick_value: {tv.right_controller_thumbstick_value}") + print(f"right_controller_aButton: {tv.right_controller_aButton}") + print(f"right_controller_bButton: {tv.right_controller_bButton}") + print("=" * 80) + time.sleep(0.03) + except KeyboardInterrupt: + running = False + print("KeyboardInterrupt, exiting program...") + finally: + image_shm.unlink() + image_shm.close() + print("Finally, exiting program...") + exit(0) + +if __name__ == '__main__': + run_test_television() \ No newline at end of file diff --git a/teleop/open_television/_test_tv_wrapper.py b/teleop/open_television/_test_tv_wrapper.py new file mode 100644 index 0000000..938c0d3 --- /dev/null +++ b/teleop/open_television/_test_tv_wrapper.py @@ -0,0 +1,80 @@ +import os, sys +this_file = os.path.abspath(__file__) +project_root = os.path.abspath(os.path.join(os.path.dirname(this_file), '..')) +if project_root not in sys.path: + sys.path.insert(0, project_root) + +import numpy as np +import time +from multiprocessing import shared_memory +from open_television import TeleVisionWrapper + +def run_test_tv_wrapper(): + image_shape = (480, 640 * 2, 3) + image_shm = shared_memory.SharedMemory(create=True, size=np.prod(image_shape) * np.uint8().itemsize) + image_array = np.ndarray(image_shape, dtype=np.uint8, buffer=image_shm.buf) + use_hand_track=False + tv_wrapper = TeleVisionWrapper(binocular=True, use_hand_tracking=use_hand_track, img_shape=image_shape, img_shm_name=image_shm.name, + return_state_data=True, return_hand_rot_data = True) + try: + input("Press Enter to start tv_wrapper test...") + running = True + while running: + start_time = time.time() + teleData = tv_wrapper.get_motion_state_data() + + print("=== TeleData Snapshot ===") + print("[Head Rotation Matrix]:\n", teleData.head_rotation) + print("[Left EE Pose]:\n", teleData.left_arm_pose) + print("[Right EE Pose]:\n", teleData.right_arm_pose) + + if use_hand_track: + print("[Left Hand Position] shape {}:\n{}".format(teleData.left_hand_pos.shape, teleData.left_hand_pos)) + print("[Right Hand Position] shape {}:\n{}".format(teleData.right_hand_pos.shape, teleData.right_hand_pos)) + if teleData.left_hand_rot is not None: + print("[Left Hand Rotation] shape {}:\n{}".format(teleData.left_hand_rot.shape, teleData.left_hand_rot)) + if teleData.right_hand_rot is not None: + print("[Right Hand Rotation] shape {}:\n{}".format(teleData.right_hand_rot.shape, teleData.right_hand_rot)) + if teleData.left_pinch_value is not None: + print("[Left Pinch Value]: {:.2f}".format(teleData.left_pinch_value)) + if teleData.right_pinch_value is not None: + print("[Right Pinch Value]: {:.2f}".format(teleData.right_pinch_value)) + if teleData.tele_state: + state = teleData.tele_state + print("[Hand State]:") + print(f" Left Pinch state: {state.left_pinch_state}") + print(f" Left Squeeze: {state.left_squeeze_state} ({state.left_squeeze_value:.2f})") + print(f" Right Pinch state: {state.right_pinch_state}") + print(f" Right Squeeze: {state.right_squeeze_state} ({state.right_squeeze_value:.2f})") + else: + print(f"[Left Trigger Value]: {teleData.left_trigger_value:.2f}") + print(f"[Right Trigger Value]: {teleData.right_trigger_value:.2f}") + if teleData.tele_state: + state = teleData.tele_state + print("[Controller State]:") + print(f" Left Trigger: {state.left_trigger_state}") + print(f" Left Squeeze: {state.left_squeeze_ctrl_state} ({state.left_squeeze_ctrl_value:.2f})") + print(f" Left Thumbstick: {state.left_thumbstick_state} ({state.left_thumbstick_value})") + print(f" Left A/B Buttons: A={state.left_aButton}, B={state.left_bButton}") + print(f" Right Trigger: {state.right_trigger_state}") + print(f" Right Squeeze: {state.right_squeeze_ctrl_state} ({state.right_squeeze_ctrl_value:.2f})") + print(f" Right Thumbstick: {state.right_thumbstick_state} ({state.right_thumbstick_value})") + print(f" Right A/B Buttons: A={state.right_aButton}, B={state.right_bButton}") + + current_time = time.time() + time_elapsed = current_time - start_time + sleep_time = max(0, 0.033 - time_elapsed) + time.sleep(sleep_time) + # print(f"main process sleep: {sleep_time}") + + except KeyboardInterrupt: + running = False + print("KeyboardInterrupt, exiting program...") + finally: + image_shm.unlink() + image_shm.close() + print("Finally, exiting program...") + exit(0) + +if __name__ == '__main__': + run_test_tv_wrapper() \ No newline at end of file diff --git a/teleop/open_television/constants.py b/teleop/open_television/constants.py deleted file mode 100644 index 4b13257..0000000 --- a/teleop/open_television/constants.py +++ /dev/null @@ -1,53 +0,0 @@ -import numpy as np - - -T_to_unitree_left_wrist = np.array([[1, 0, 0, 0], - [0, 0, -1, 0], - [0, 1, 0, 0], - [0, 0, 0, 1]]) - -T_to_unitree_right_wrist = np.array([[1, 0, 0, 0], - [0, 0, 1, 0], - [0, -1, 0, 0], - [0, 0, 0, 1]]) - -T_to_unitree_hand = np.array([[0, 0, 1, 0], - [-1,0, 0, 0], - [0, -1,0, 0], - [0, 0, 0, 1]]) - -T_robot_openxr = np.array([[0, 0, -1, 0], - [-1, 0, 0, 0], - [0, 1, 0, 0], - [0, 0, 0, 1]]) - - - -const_head_vuer_mat = np.array([[1, 0, 0, 0], - [0, 1, 0, 1.5], - [0, 0, 1, -0.2], - [0, 0, 0, 1]]) - - -# For G1 initial position -const_right_wrist_vuer_mat = np.array([[1, 0, 0, 0.15], - [0, 1, 0, 1.13], - [0, 0, 1, -0.3], - [0, 0, 0, 1]]) - -# For G1 initial position -const_left_wrist_vuer_mat = np.array([[1, 0, 0, -0.15], - [0, 1, 0, 1.13], - [0, 0, 1, -0.3], - [0, 0, 0, 1]]) - -# legacy -# const_right_wrist_vuer_mat = np.array([[1, 0, 0, 0.5], -# [0, 1, 0, 1], -# [0, 0, 1, -0.5], -# [0, 0, 0, 1]]) - -# const_left_wrist_vuer_mat = np.array([[1, 0, 0, -0.5], -# [0, 1, 0, 1], -# [0, 0, 1, -0.5], -# [0, 0, 0, 1]]) diff --git a/teleop/open_television/setup.py b/teleop/open_television/setup.py new file mode 100644 index 0000000..7407009 --- /dev/null +++ b/teleop/open_television/setup.py @@ -0,0 +1,20 @@ +from setuptools import setup, find_packages + +setup( + name='open_television', + version='0.0.1', + description='XR vision and hand/controller interface for unitree robotics', + author='silencht', + packages=find_packages(), + install_requires=[ + 'numpy==1.23.0', + 'opencv_contrib_python==4.10.0.82', + 'opencv_python==4.9.0.80', + 'aiohttp==3.9.5', + 'aiohttp_cors==0.7.0', + 'aiortc==1.8.0', + 'av==11.0.0', + 'vuer==0.0.42rc16', + ], + python_requires='>=3.8', +) \ No newline at end of file diff --git a/teleop/open_television/television.py b/teleop/open_television/television.py index fbcc7c0..c0be8fc 100644 --- a/teleop/open_television/television.py +++ b/teleop/open_television/television.py @@ -1,74 +1,220 @@ -import time from vuer import Vuer -from vuer.schemas import ImageBackground, Hands -from multiprocessing import Array, Process, shared_memory +from vuer.schemas import ImageBackground, Hands, MotionControllers, WebRTCVideoPlane, WebRTCStereoVideoPlane +from multiprocessing import Value, Array, Process, shared_memory import numpy as np import asyncio import cv2 - -from multiprocessing import context -Value = context._default_context.Value +import os class TeleVision: - def __init__(self, binocular, img_shape, img_shm_name, cert_file="./cert.pem", key_file="./key.pem", ngrok=False): + def __init__(self, binocular: bool, use_hand_tracking: bool, img_shape, img_shm_name, cert_file=None, key_file=None, ngrok=False, webrtc=False): + """ + TeleVision class for OpenXR-based VR/AR applications. + This class handles the communication with the Vuer server and manages the shared memory for image and pose data. + + :param binocular: bool, whether the application is binocular (stereoscopic) or monocular. + :param use_hand_tracking: bool, whether to use hand tracking or controller tracking. + :param img_shape: tuple, shape of the image (height, width, channels). + :param img_shm_name: str, name of the shared memory for the image. + :param cert_file: str, path to the SSL certificate file. + :param key_file: str, path to the SSL key file. + :param ngrok: bool, whether to use ngrok for tunneling. + """ self.binocular = binocular + self.use_hand_tracking = use_hand_tracking self.img_height = img_shape[0] - if binocular: + if self.binocular: self.img_width = img_shape[1] // 2 else: self.img_width = img_shape[1] + + current_module_dir = os.path.dirname(os.path.abspath(__file__)) + print(f"current_module_dir: {current_module_dir}") + if cert_file is None: + cert_file = os.path.join(current_module_dir, "cert.pem") + if key_file is None: + key_file = os.path.join(current_module_dir, "key.pem") if ngrok: self.vuer = Vuer(host='0.0.0.0', queries=dict(grid=False), queue_len=3) else: self.vuer = Vuer(host='0.0.0.0', cert=cert_file, key=key_file, queries=dict(grid=False), queue_len=3) - self.vuer.add_handler("HAND_MOVE")(self.on_hand_move) self.vuer.add_handler("CAMERA_MOVE")(self.on_cam_move) + if self.use_hand_tracking: + self.vuer.add_handler("HAND_MOVE")(self.on_hand_move) + else: + self.vuer.add_handler("CONTROLLER_MOVE")(self.on_controller_move) existing_shm = shared_memory.SharedMemory(name=img_shm_name) self.img_array = np.ndarray(img_shape, dtype=np.uint8, buffer=existing_shm.buf) - if binocular: + self.webrtc = webrtc + if self.binocular and not self.webrtc: self.vuer.spawn(start=False)(self.main_image_binocular) - else: + elif not self.binocular and not self.webrtc: self.vuer.spawn(start=False)(self.main_image_monocular) + elif self.webrtc: + self.vuer.spawn(start=False)(self.main_image_webrtc) - self.left_hand_shared = Array('d', 16, lock=True) - self.right_hand_shared = Array('d', 16, lock=True) - self.left_landmarks_shared = Array('d', 75, lock=True) - self.right_landmarks_shared = Array('d', 75, lock=True) - - self.head_matrix_shared = Array('d', 16, lock=True) - self.aspect_shared = Value('d', 1.0, lock=True) + self.head_pose_shared = Array('d', 16, lock=True) + self.left_arm_pose_shared = Array('d', 16, lock=True) + self.right_arm_pose_shared = Array('d', 16, lock=True) + if self.use_hand_tracking: + self.left_hand_position_shared = Array('d', 75, lock=True) + self.right_hand_position_shared = Array('d', 75, lock=True) + self.left_hand_orientation_shared = Array('d', 25 * 9, lock=True) + self.right_hand_orientation_shared = Array('d', 25 * 9, lock=True) + + self.left_pinch_state_shared = Value('b', False, lock=True) + self.left_pinch_value_shared = Value('d', 0.0, lock=True) + self.left_squeeze_state_shared = Value('b', False, lock=True) + self.left_squeeze_value_shared = Value('d', 0.0, lock=True) + + self.right_pinch_state_shared = Value('b', False, lock=True) + self.right_pinch_value_shared = Value('d', 0.0, lock=True) + self.right_squeeze_state_shared = Value('b', False, lock=True) + self.right_squeeze_value_shared = Value('d', 0.0, lock=True) + else: + self.left_trigger_state_shared = Value('b', False, lock=True) + self.left_trigger_value_shared = Value('d', 0.0, lock=True) + self.left_squeeze_state_shared = Value('b', False, lock=True) + self.left_squeeze_value_shared = Value('d', 0.0, lock=True) + self.left_thumbstick_state_shared = Value('b', False, lock=True) + self.left_thumbstick_value_shared = Array('d', 2, lock=True) + self.left_aButton_shared = Value('b', False, lock=True) + self.left_bButton_shared = Value('b', False, lock=True) + + self.right_trigger_state_shared = Value('b', False, lock=True) + self.right_trigger_value_shared = Value('d', 0.0, lock=True) + self.right_squeeze_state_shared = Value('b', False, lock=True) + self.right_squeeze_value_shared = Value('d', 0.0, lock=True) + self.right_thumbstick_state_shared = Value('b', False, lock=True) + self.right_thumbstick_value_shared = Array('d', 2, lock=True) + self.right_aButton_shared = Value('b', False, lock=True) + self.right_bButton_shared = Value('b', False, lock=True) self.process = Process(target=self.vuer_run) self.process.daemon = True self.process.start() - def vuer_run(self): self.vuer.run() async def on_cam_move(self, event, session, fps=60): try: - self.head_matrix_shared[:] = event.value["camera"]["matrix"] - self.aspect_shared.value = event.value['camera']['aspect'] + with self.head_pose_shared.get_lock(): + self.head_pose_shared[:] = event.value["camera"]["matrix"] + except: + pass + + async def on_controller_move(self, event, session, fps=60): + try: + with self.left_arm_pose_shared.get_lock(): + self.left_arm_pose_shared[:] = event.value["left"] + with self.right_arm_pose_shared.get_lock(): + self.right_arm_pose_shared[:] = event.value["right"] + + left_controller_state = event.value["leftState"] + right_controller_state = event.value["rightState"] + + def extract_controller_states(state_dict, prefix): + # trigger + with getattr(self, f"{prefix}_trigger_state_shared").get_lock(): + getattr(self, f"{prefix}_trigger_state_shared").value = bool(state_dict.get("trigger", False)) + with getattr(self, f"{prefix}_trigger_value_shared").get_lock(): + getattr(self, f"{prefix}_trigger_value_shared").value = float(state_dict.get("triggerValue", 0.0)) + # squeeze + with getattr(self, f"{prefix}_squeeze_state_shared").get_lock(): + getattr(self, f"{prefix}_squeeze_state_shared").value = bool(state_dict.get("squeeze", False)) + with getattr(self, f"{prefix}_squeeze_value_shared").get_lock(): + getattr(self, f"{prefix}_squeeze_value_shared").value = float(state_dict.get("squeezeValue", 0.0)) + # thumbstick + with getattr(self, f"{prefix}_thumbstick_state_shared").get_lock(): + getattr(self, f"{prefix}_thumbstick_state_shared").value = bool(state_dict.get("thumbstick", False)) + with getattr(self, f"{prefix}_thumbstick_value_shared").get_lock(): + getattr(self, f"{prefix}_thumbstick_value_shared")[:] = state_dict.get("thumbstickValue", [0.0, 0.0]) + # buttons + with getattr(self, f"{prefix}_aButton_shared").get_lock(): + getattr(self, f"{prefix}_aButton_shared").value = bool(state_dict.get("aButton", False)) + with getattr(self, f"{prefix}_bButton_shared").get_lock(): + getattr(self, f"{prefix}_bButton_shared").value = bool(state_dict.get("bButton", False)) + + extract_controller_states(left_controller_state, "left") + extract_controller_states(right_controller_state, "right") except: pass async def on_hand_move(self, event, session, fps=60): try: - self.left_hand_shared[:] = event.value["leftHand"] - self.right_hand_shared[:] = event.value["rightHand"] - self.left_landmarks_shared[:] = np.array(event.value["leftLandmarks"]).flatten() - self.right_landmarks_shared[:] = np.array(event.value["rightLandmarks"]).flatten() - except: + left_hand_data = event.value["left"] + right_hand_data = event.value["right"] + left_hand_state = event.value["leftState"] + right_hand_state = event.value["rightState"] + + def extract_hand_poses(hand_data, arm_pose_shared, hand_position_shared, hand_orientation_shared): + with arm_pose_shared.get_lock(): + arm_pose_shared[:] = hand_data[0:16] + + with hand_position_shared.get_lock(): + for i in range(25): + base = i * 16 + hand_position_shared[i * 3: i * 3 + 3] = [hand_data[base + 12], hand_data[base + 13], hand_data[base + 14]] + + with hand_orientation_shared.get_lock(): + for i in range(25): + base = i * 16 + hand_orientation_shared[i * 9: i * 9 + 9] = [ + hand_data[base + 0], hand_data[base + 1], hand_data[base + 2], + hand_data[base + 4], hand_data[base + 5], hand_data[base + 6], + hand_data[base + 8], hand_data[base + 9], hand_data[base + 10], + ] + + def extract_hand_states(state_dict, prefix): + # pinch + with getattr(self, f"{prefix}_pinch_state_shared").get_lock(): + getattr(self, f"{prefix}_pinch_state_shared").value = bool(state_dict.get("pinch", False)) + with getattr(self, f"{prefix}_pinch_value_shared").get_lock(): + getattr(self, f"{prefix}_pinch_value_shared").value = float(state_dict.get("pinchValue", 0.0)) + # squeeze + with getattr(self, f"{prefix}_squeeze_state_shared").get_lock(): + getattr(self, f"{prefix}_squeeze_state_shared").value = bool(state_dict.get("squeeze", False)) + with getattr(self, f"{prefix}_squeeze_value_shared").get_lock(): + getattr(self, f"{prefix}_squeeze_value_shared").value = float(state_dict.get("squeezeValue", 0.0)) + + extract_hand_poses(left_hand_data, self.left_arm_pose_shared, self.left_hand_position_shared, self.left_hand_orientation_shared) + extract_hand_poses(right_hand_data, self.right_arm_pose_shared, self.right_hand_position_shared, self.right_hand_orientation_shared) + extract_hand_states(left_hand_state, "left") + extract_hand_states(right_hand_state, "right") + + except: pass async def main_image_binocular(self, session, fps=60): - session.upsert @ Hands(fps=fps, stream=True, key="hands", showLeft=False, showRight=False) + if self.use_hand_tracking: + session.upsert( + Hands( + stream=True, + key="hands", + showLeft=False, + showRight=False + ), + # we place this into the background children list, so that it is + # not affected by the global rotation + to="bgChildren", + ) + else: + session.upsert( + MotionControllers( + stream=True, + key="motion-controller", + showLeft=False, + showRight=False, + ), + to="bgChildren", + ) + while True: display_image = cv2.cvtColor(self.img_array, cv2.COLOR_BGR2RGB) # aspect_ratio = self.img_width / self.img_height @@ -106,7 +252,26 @@ class TeleVision: await asyncio.sleep(0.016 * 2) async def main_image_monocular(self, session, fps=60): - session.upsert @ Hands(fps=fps, stream=True, key="hands", showLeft=False, showRight=False) + if self.use_hand_tracking: + session.upsert( + Hands( + stream=True, + key="hands", + showLeft=False, + showRight=False + ), + to="bgChildren", + ) + else: + session.upsert( + MotionControllers( + stream=True, + key="motion-controller", + showLeft=False, + showRight=False, + ), + to="bgChildren", + ) while True: display_image = cv2.cvtColor(self.img_array, cv2.COLOR_BGR2RGB) # aspect_ratio = self.img_width / self.img_height @@ -127,52 +292,225 @@ class TeleVision: ) await asyncio.sleep(0.016) - @property - def left_hand(self): - return np.array(self.left_hand_shared[:]).reshape(4, 4, order="F") - + async def main_image_webrtc(self, session, fps=60): + if self.use_hand_tracking: + session.upsert( + Hands( + stream=True, + key="hands", + showLeft=False, + showRight=False + ), + to="bgChildren", + ) + else: + session.upsert( + MotionControllers( + stream=True, + key="motion-controller", + showLeft=False, + showRight=False, + ) + ) + session.upsert( + WebRTCVideoPlane( + # WebRTCStereoVideoPlane( + src="https://10.0.7.49:8080/offer", + iceServer={}, + key="webrtc", + aspect=1.778, + height = 7, + ), + to="bgChildren", + ) + while True: + await asyncio.sleep(1) + # ==================== common data ==================== @property - def right_hand(self): - return np.array(self.right_hand_shared[:]).reshape(4, 4, order="F") - - + def head_pose(self): + """np.ndarray, shape (4, 4), head SE(3) pose matrix from Vuer (basis OpenXR Convention).""" + with self.head_pose_shared.get_lock(): + return np.array(self.head_pose_shared[:]).reshape(4, 4, order="F") + @property - def left_landmarks(self): - return np.array(self.left_landmarks_shared[:]).reshape(25, 3) - + def left_arm_pose(self): + """np.ndarray, shape (4, 4), left arm SE(3) pose matrix from Vuer (basis OpenXR Convention).""" + with self.left_arm_pose_shared.get_lock(): + return np.array(self.left_arm_pose_shared[:]).reshape(4, 4, order="F") + @property - def right_landmarks(self): - return np.array(self.right_landmarks_shared[:]).reshape(25, 3) + def right_arm_pose(self): + """np.ndarray, shape (4, 4), right arm SE(3) pose matrix from Vuer (basis OpenXR Convention).""" + with self.right_arm_pose_shared.get_lock(): + return np.array(self.right_arm_pose_shared[:]).reshape(4, 4, order="F") + # ==================== Hand Tracking Data ==================== @property - def head_matrix(self): - return np.array(self.head_matrix_shared[:]).reshape(4, 4, order="F") + def left_hand_positions(self): + """np.ndarray, shape (25, 3), left hand 25 landmarks' 3D positions.""" + with self.left_hand_position_shared.get_lock(): + return np.array(self.left_hand_position_shared[:]).reshape(25, 3) @property - def aspect(self): - return float(self.aspect_shared.value) - -if __name__ == '__main__': - import os - import sys - current_dir = os.path.dirname(os.path.abspath(__file__)) - parent_dir = os.path.dirname(current_dir) - sys.path.append(parent_dir) - import threading - from image_server.image_client import ImageClient - - # image - img_shape = (480, 640 * 2, 3) - img_shm = shared_memory.SharedMemory(create=True, size=np.prod(img_shape) * np.uint8().itemsize) - img_array = np.ndarray(img_shape, dtype=np.uint8, buffer=img_shm.buf) - img_client = ImageClient(tv_img_shape = img_shape, tv_img_shm_name = img_shm.name) - image_receive_thread = threading.Thread(target=img_client.receive_process, daemon=True) - image_receive_thread.start() - - # television - tv = TeleVision(True, img_shape, img_shm.name) - print("vuer unit test program running...") - print("you can press ^C to interrupt program.") - while True: - time.sleep(0.03) \ No newline at end of file + def right_hand_positions(self): + """np.ndarray, shape (25, 3), right hand 25 landmarks' 3D positions.""" + with self.right_hand_position_shared.get_lock(): + return np.array(self.right_hand_position_shared[:]).reshape(25, 3) + + @property + def left_hand_orientations(self): + """np.ndarray, shape (25, 3, 3), left hand 25 landmarks' orientations (flattened 3x3 matrices, column-major).""" + with self.left_hand_orientation_shared.get_lock(): + return np.array(self.left_hand_orientation_shared[:]).reshape(25, 9).reshape(25, 3, 3, order="F") + + @property + def right_hand_orientations(self): + """np.ndarray, shape (25, 3, 3), right hand 25 landmarks' orientations (flattened 3x3 matrices, column-major).""" + with self.right_hand_orientation_shared.get_lock(): + return np.array(self.right_hand_orientation_shared[:]).reshape(25, 9).reshape(25, 3, 3, order="F") + + @property + def left_hand_pinch_state(self): + """bool, whether left hand is pinching.""" + with self.left_pinch_state_shared.get_lock(): + return self.left_pinch_state_shared.value + + @property + def left_hand_pinch_value(self): + """float, pinch strength of left hand.""" + with self.left_pinch_value_shared.get_lock(): + return self.left_pinch_value_shared.value + + @property + def left_hand_squeeze_state(self): + """bool, whether left hand is squeezing.""" + with self.left_squeeze_state_shared.get_lock(): + return self.left_squeeze_state_shared.value + + @property + def left_hand_squeeze_value(self): + """float, squeeze strength of left hand.""" + with self.left_squeeze_value_shared.get_lock(): + return self.left_squeeze_value_shared.value + + @property + def right_hand_pinch_state(self): + """bool, whether right hand is pinching.""" + with self.right_pinch_state_shared.get_lock(): + return self.right_pinch_state_shared.value + + @property + def right_hand_pinch_value(self): + """float, pinch strength of right hand.""" + with self.right_pinch_value_shared.get_lock(): + return self.right_pinch_value_shared.value + + @property + def right_hand_squeeze_state(self): + """bool, whether right hand is squeezing.""" + with self.right_squeeze_state_shared.get_lock(): + return self.right_squeeze_state_shared.value + + @property + def right_hand_squeeze_value(self): + """float, squeeze strength of right hand.""" + with self.right_squeeze_value_shared.get_lock(): + return self.right_squeeze_value_shared.value + + # ==================== Controller Data ==================== + @property + def left_controller_trigger_state(self): + """bool, left controller trigger pressed or not.""" + with self.left_trigger_state_shared.get_lock(): + return self.left_trigger_state_shared.value + + @property + def left_controller_trigger_value(self): + """float, left controller trigger analog value (0.0 ~ 1.0).""" + with self.left_trigger_value_shared.get_lock(): + return self.left_trigger_value_shared.value + + @property + def left_controller_squeeze_state(self): + """bool, left controller squeeze pressed or not.""" + with self.left_squeeze_state_shared.get_lock(): + return self.left_squeeze_state_shared.value + + @property + def left_controller_squeeze_value(self): + """float, left controller squeeze analog value (0.0 ~ 1.0).""" + with self.left_squeeze_value_shared.get_lock(): + return self.left_squeeze_value_shared.value + + @property + def left_controller_thumbstick_state(self): + """bool, whether left thumbstick is touched or clicked.""" + with self.left_thumbstick_state_shared.get_lock(): + return self.left_thumbstick_state_shared.value + + @property + def left_controller_thumbstick_value(self): + """np.ndarray, shape (2,), left thumbstick 2D axis values (x, y).""" + with self.left_thumbstick_value_shared.get_lock(): + return np.array(self.left_thumbstick_value_shared[:]) + + @property + def left_controller_aButton(self): + """bool, left controller 'A' button pressed.""" + with self.left_aButton_shared.get_lock(): + return self.left_aButton_shared.value + + @property + def left_controller_bButton(self): + """bool, left controller 'B' button pressed.""" + with self.left_bButton_shared.get_lock(): + return self.left_bButton_shared.value + + @property + def right_controller_trigger_state(self): + """bool, right controller trigger pressed or not.""" + with self.right_trigger_state_shared.get_lock(): + return self.right_trigger_state_shared.value + + @property + def right_controller_trigger_value(self): + """float, right controller trigger analog value (0.0 ~ 1.0).""" + with self.right_trigger_value_shared.get_lock(): + return self.right_trigger_value_shared.value + + @property + def right_controller_squeeze_state(self): + """bool, right controller squeeze pressed or not.""" + with self.right_squeeze_state_shared.get_lock(): + return self.right_squeeze_state_shared.value + + @property + def right_controller_squeeze_value(self): + """float, right controller squeeze analog value (0.0 ~ 1.0).""" + with self.right_squeeze_value_shared.get_lock(): + return self.right_squeeze_value_shared.value + + @property + def right_controller_thumbstick_state(self): + """bool, whether right thumbstick is touched or clicked.""" + with self.right_thumbstick_state_shared.get_lock(): + return self.right_thumbstick_state_shared.value + + @property + def right_controller_thumbstick_value(self): + """np.ndarray, shape (2,), right thumbstick 2D axis values (x, y).""" + with self.right_thumbstick_value_shared.get_lock(): + return np.array(self.right_thumbstick_value_shared[:]) + + @property + def right_controller_aButton(self): + """bool, right controller 'A' button pressed.""" + with self.right_aButton_shared.get_lock(): + return self.right_aButton_shared.value + + @property + def right_controller_bButton(self): + """bool, right controller 'B' button pressed.""" + with self.right_bButton_shared.get_lock(): + return self.right_bButton_shared.value \ No newline at end of file diff --git a/teleop/open_television/tv_wrapper.py b/teleop/open_television/tv_wrapper.py index 1363ba5..e4abfac 100644 --- a/teleop/open_television/tv_wrapper.py +++ b/teleop/open_television/tv_wrapper.py @@ -1,147 +1,411 @@ import numpy as np -from teleop.open_television.television import TeleVision -from teleop.open_television.constants import * -from teleop.utils.mat_tool import mat_update, fast_mat_inv +from open_television.television import TeleVision +from dataclasses import dataclass, field """ (basis) OpenXR Convention : y up, z back, x right. (basis) Robot Convention : z up, y left, x front. -p.s. Vuer's all raw data follows OpenXR Convention, WORLD coordinate. -under (basis) Robot Convention, wrist's initial pose convention: +under (basis) Robot Convention, humanoid arm's initial pose convention: - # (Left Wrist) XR/AppleVisionPro Convention: + # (initial pose) OpenXR Left Arm Pose Convention (hand tracking): - the x-axis pointing from wrist toward middle. - the y-axis pointing from index toward pinky. - the z-axis pointing from palm toward back of the hand. - # (Right Wrist) XR/AppleVisionPro Convention: + # (initial pose) OpenXR Right Arm Pose Convention (hand tracking): - the x-axis pointing from wrist toward middle. - the y-axis pointing from pinky toward index. - the z-axis pointing from palm toward back of the hand. - # (Left Wrist URDF) Unitree Convention: + # (initial pose) Unitree Humanoid Left Arm URDF Convention: - the x-axis pointing from wrist toward middle. - the y-axis pointing from palm toward back of the hand. - the z-axis pointing from pinky toward index. - # (Right Wrist URDF) Unitree Convention: + # (initial pose) Unitree Humanoid Right Arm URDF Convention: - the x-axis pointing from wrist toward middle. - the y-axis pointing from back of the hand toward palm. - the z-axis pointing from pinky toward index. -under (basis) Robot Convention, hand's initial pose convention: +under (basis) Robot Convention, humanoid hand's initial pose convention: - # (Left Hand) XR/AppleVisionPro Convention: + # (initial pose) OpenXR Left Hand Pose Convention (hand tracking): - the x-axis pointing from wrist toward middle. - the y-axis pointing from index toward pinky. - the z-axis pointing from palm toward back of the hand. - # (Right Hand) XR/AppleVisionPro Convention: + # (initial pose) OpenXR Right Hand Pose Convention (hand tracking): - the x-axis pointing from wrist toward middle. - the y-axis pointing from pinky toward index. - the z-axis pointing from palm toward back of the hand. - # (Left Hand URDF) Unitree Convention: + # (initial pose) Unitree Humanoid Left Hand URDF Convention: - The x-axis pointing from palm toward back of the hand. - The y-axis pointing from middle toward wrist. - The z-axis pointing from pinky toward index. - # (Right Hand URDF) Unitree Convention: + # (initial pose) Unitree Humanoid Right Hand URDF Convention: - The x-axis pointing from palm toward back of the hand. - The y-axis pointing from middle toward wrist. - The z-axis pointing from index toward pinky. + +p.s. TeleVision (Vuer) obtains all raw data under the (basis) OpenXR Convention. + In addition, arm pose data (hand tracking) follows the (initial pose) OpenXR Arm Pose Convention, + while arm pose data (controller tracking) directly follows the (initial pose) Unitree Humanoid Arm URDF Convention (thus no transform is needed). + Meanwhile, all raw data is in the WORLD frame defined by XR device odometry. - p.s. From website: https://registry.khronos.org/OpenXR/specs/1.1/man/html/openxr.html. - You can find **(Left/Right Wrist) XR/AppleVisionPro Convention** related information like this below: - "The wrist joint is located at the pivot point of the wrist, which is location invariant when twisting the hand without moving the forearm. - The backward (+Z) direction is parallel to the line from wrist joint to middle finger metacarpal joint, and points away from the finger tips. - The up (+Y) direction points out towards back of the hand and perpendicular to the skin at wrist. - The X direction is perpendicular to the Y and Z directions and follows the right hand rule." - Note: The above context is of course under **(basis) OpenXR Convention**. +p.s. From website: https://registry.khronos.org/OpenXR/specs/1.1/man/html/openxr.html. + You can find **(initial pose) OpenXR Left/Right Arm Pose Convention** related information like this below: + "The wrist joint is located at the pivot point of the wrist, which is location invariant when twisting the hand without moving the forearm. + The backward (+Z) direction is parallel to the line from wrist joint to middle finger metacarpal joint, and points away from the finger tips. + The up (+Y) direction points out towards back of the hand and perpendicular to the skin at wrist. + The X direction is perpendicular to the Y and Z directions and follows the right hand rule." + Note: The above context is of course under **(basis) OpenXR Convention**. - p.s. **(Wrist/Hand URDF) Unitree Convention** information come from URDF files. +p.s. **Unitree Arm/Hand URDF initial pose Convention** information come from URDF files. """ + +def safe_mat_update(prev_mat, mat): + # Return previous matrix and False flag if the new matrix is non-singular (determinant ≠ 0). + det = np.linalg.det(mat) + if not np.isfinite(det) or np.isclose(det, 0.0, atol=1e-6): + return prev_mat, False + return mat, True + +def fast_mat_inv(mat): + ret = np.eye(4) + ret[:3, :3] = mat[:3, :3].T + ret[:3, 3] = -mat[:3, :3].T @ mat[:3, 3] + return ret + +def safe_rot_update(prev_rot_array, rot_array): + dets = np.linalg.det(rot_array) + if not np.all(np.isfinite(dets)) or np.any(np.isclose(dets, 0.0, atol=1e-6)): + return prev_rot_array, False + return rot_array, True + +# constants variable +T_TO_UNITREE_HUMANOID_LEFT_ARM = np.array([[1, 0, 0, 0], + [0, 0,-1, 0], + [0, 1, 0, 0], + [0, 0, 0, 1]]) + +T_TO_UNITREE_HUMANOID_RIGHT_ARM = np.array([[1, 0, 0, 0], + [0, 0, 1, 0], + [0,-1, 0, 0], + [0, 0, 0, 1]]) + +T_TO_UNITREE_HAND = np.array([[0, 0, 1, 0], + [-1, 0, 0, 0], + [0, -1, 0, 0], + [0, 0, 0, 1]]) + +T_ROBOT_OPENXR = np.array([[ 0, 0,-1, 0], + [-1, 0, 0, 0], + [ 0, 1, 0, 0], + [ 0, 0, 0, 1]]) + +T_OPENXR_ROBOT = np.array([[ 0,-1, 0, 0], + [ 0, 0, 1, 0], + [-1, 0, 0, 0], + [ 0, 0, 0, 1]]) + +R_ROBOT_OPENXR = np.array([[ 0, 0,-1], + [-1, 0, 0], + [ 0, 1, 0]]) + +R_OPENXR_ROBOT = np.array([[ 0,-1, 0], + [ 0, 0, 1], + [-1, 0, 0]]) + +CONST_HEAD_POSE = np.array([[1, 0, 0, 0], + [0, 1, 0, 1.5], + [0, 0, 1, -0.2], + [0, 0, 0, 1]]) + +# For Robot initial position +CONST_RIGHT_ARM_POSE = np.array([[1, 0, 0, 0.15], + [0, 1, 0, 1.13], + [0, 0, 1, -0.3], + [0, 0, 0, 1]]) + +CONST_LEFT_ARM_POSE = np.array([[1, 0, 0, -0.15], + [0, 1, 0, 1.13], + [0, 0, 1, -0.3], + [0, 0, 0, 1]]) + +CONST_HAND_ROT = np.tile(np.eye(3)[None, :, :], (25, 1, 1)) + +@dataclass +class TeleStateData: + # hand tracking + left_pinch_state: bool = False # True if index and thumb are pinching + left_squeeze_state: bool = False # True if hand is making a fist + left_squeeze_value: float = 0.0 # (0.0 ~ 1.0) degree of hand squeeze + right_pinch_state: bool = False # True if index and thumb are pinching + right_squeeze_state: bool = False # True if hand is making a fist + right_squeeze_value: float = 0.0 # (0.0 ~ 1.0) degree of hand squeeze + + # controller tracking + left_trigger_state: bool = False # True if trigger is actively pressed + left_squeeze_ctrl_state: bool = False # True if grip button is pressed + left_squeeze_ctrl_value: float = 0.0 # (0.0 ~ 1.0) grip pull depth + left_thumbstick_state: bool = False # True if thumbstick button is pressed + left_thumbstick_value: np.ndarray = field(default_factory=lambda: np.zeros(2)) # 2D vector (x, y), normalized + left_aButton: bool = False # True if A button is pressed + left_bButton: bool = False # True if B button is pressed + right_trigger_state: bool = False # True if trigger is actively pressed + right_squeeze_ctrl_state: bool = False # True if grip button is pressed + right_squeeze_ctrl_value: float = 0.0 # (0.0 ~ 1.0) grip pull depth + right_thumbstick_state: bool = False # True if thumbstick button is pressed + right_thumbstick_value: np.ndarray = field(default_factory=lambda: np.zeros(2)) # 2D vector (x, y), normalized + right_aButton: bool = False # True if A button is pressed + right_bButton: bool = False # True if B button is pressed + +@dataclass +class TeleData: + head_rotation: np.ndarray # (3,3) Head orientation matrix + left_arm_pose: np.ndarray # (4,4) SE(3) pose of left arm + right_arm_pose: np.ndarray # (4,4) SE(3) pose of right arm + left_hand_pos: np.ndarray = None # (25,3) 3D positions of left hand joints + right_hand_pos: np.ndarray = None # (25,3) 3D positions of right hand joints + left_hand_rot: np.ndarray = None # (25,3,3) rotation matrices of left hand joints + right_hand_rot: np.ndarray = None # (25,3,3) rotation matrices of right hand joints + left_pinch_value: float = None # float (1x.0 ~ 0.0) pinch distance between index and thumb + right_pinch_value: float = None # float (1x.0 ~ 0.0) pinch distance between index and thumb + left_trigger_value: float = None # float (10.0 ~ 0.0) trigger pull depth + right_trigger_value: float = None # float (10.0 ~ 0.0) trigger pull depth + tele_state: TeleStateData = field(default_factory=TeleStateData) + + class TeleVisionWrapper: - def __init__(self, binocular, img_shape, img_shm_name): - self.tv = TeleVision(binocular, img_shape, img_shm_name) - - def get_data(self): - - # --------------------------------wrist------------------------------------- - - # TeleVision obtains a basis coordinate that is OpenXR Convention - head_vuer_mat, head_flag = mat_update(const_head_vuer_mat, self.tv.head_matrix.copy()) - left_wrist_vuer_mat, left_wrist_flag = mat_update(const_left_wrist_vuer_mat, self.tv.left_hand.copy()) - right_wrist_vuer_mat, right_wrist_flag = mat_update(const_right_wrist_vuer_mat, self.tv.right_hand.copy()) - - # Change basis convention: VuerMat ((basis) OpenXR Convention) to WristMat ((basis) Robot Convention) - # p.s. WristMat = T_{robot}_{openxr} * VuerMat * T_{robot}_{openxr}^T - # Reason for right multiply fast_mat_inv(T_robot_openxr): - # This is similarity transformation: B = PAP^{-1}, that is B ~ A - # For example: - # - For a pose data T_r under the Robot Convention, left-multiplying WristMat means: - # - WristMat * T_r ==> T_{robot}_{openxr} * VuerMat * T_{openxr}_{robot} * T_r - # - First, transform to the OpenXR Convention (The function of T_{openxr}_{robot}) - # - then, apply the rotation VuerMat in the OpenXR Convention (The function of VuerMat) - # - finally, transform back to the Robot Convention (The function of T_{robot}_{openxr}) - # This results in the same rotation effect under the Robot Convention as in the OpenXR Convention. - head_mat = T_robot_openxr @ head_vuer_mat @ fast_mat_inv(T_robot_openxr) - left_wrist_mat = T_robot_openxr @ left_wrist_vuer_mat @ fast_mat_inv(T_robot_openxr) - right_wrist_mat = T_robot_openxr @ right_wrist_vuer_mat @ fast_mat_inv(T_robot_openxr) - - # Change wrist convention: WristMat ((Left Wrist) XR/AppleVisionPro Convention) to UnitreeWristMat((Left Wrist URDF) Unitree Convention) - # Reason for right multiply (T_to_unitree_left_wrist) : Rotate 90 degrees counterclockwise about its own x-axis. - # Reason for right multiply (T_to_unitree_right_wrist): Rotate 90 degrees clockwise about its own x-axis. - unitree_left_wrist = left_wrist_mat @ (T_to_unitree_left_wrist if left_wrist_flag else np.eye(4)) - unitree_right_wrist = right_wrist_mat @ (T_to_unitree_right_wrist if right_wrist_flag else np.eye(4)) - - # Transfer from WORLD to HEAD coordinate (translation only). - unitree_left_wrist[0:3, 3] = unitree_left_wrist[0:3, 3] - head_mat[0:3, 3] - unitree_right_wrist[0:3, 3] = unitree_right_wrist[0:3, 3] - head_mat[0:3, 3] - - # --------------------------------hand------------------------------------- - - # Homogeneous, [xyz] to [xyz1] - # p.s. np.concatenate([25,3]^T,(1,25)) ==> hand_vuer_mat.shape is (4,25) - # Now under (basis) OpenXR Convention, mat shape like this: - # x0 x1 x2 ··· x23 x24 - # y0 y1 y1 ··· y23 y24 - # z0 z1 z2 ··· z23 z24 - # 1 1 1 ··· 1 1 - left_hand_vuer_mat = np.concatenate([self.tv.left_landmarks.copy().T, np.ones((1, self.tv.left_landmarks.shape[0]))]) - right_hand_vuer_mat = np.concatenate([self.tv.right_landmarks.copy().T, np.ones((1, self.tv.right_landmarks.shape[0]))]) - - # Change basis convention: from (basis) OpenXR Convention to (basis) Robot Convention - # Just a change of basis for 3D points. No rotation, only translation. No need to right-multiply fast_mat_inv(T_robot_openxr). - left_hand_mat = T_robot_openxr @ left_hand_vuer_mat - right_hand_mat = T_robot_openxr @ right_hand_vuer_mat - - # Transfer from WORLD to WRIST coordinate. (this process under (basis) Robot Convention) - # p.s. HandMat_WristBased = WristMat_{wrold}_{wrist}^T * HandMat_{wrold} - # HandMat_WristBased = WristMat_{wrist}_{wrold} * HandMat_{wrold}, that is HandMat_{wrist} - left_hand_mat_wb = fast_mat_inv(left_wrist_mat) @ left_hand_mat - right_hand_mat_wb = fast_mat_inv(right_wrist_mat) @ right_hand_mat - # Change hand convention: HandMat ((Left Hand) XR/AppleVisionPro Convention) to UnitreeHandMat((Left Hand URDF) Unitree Convention) - # Reason for left multiply : T_to_unitree_hand @ left_hand_mat_wb ==> (4,4) @ (4,25) ==> (4,25), (4,25)[0:3, :] ==> (3,25), (3,25).T ==> (25,3) - # Now under (Left Hand URDF) Unitree Convention, mat shape like this: - # [x0, y0, z0] - # [x1, y1, z1] - # ··· - # [x23,y23,z23] - # [x24,y24,z24] - unitree_left_hand = (T_to_unitree_hand @ left_hand_mat_wb)[0:3, :].T - unitree_right_hand = (T_to_unitree_hand @ right_hand_mat_wb)[0:3, :].T - - # --------------------------------offset------------------------------------- - - head_rmat = head_mat[:3, :3] - # The origin of the coordinate for IK Solve is the WAIST joint motor. You can use teleop/robot_control/robot_arm_ik.py Unit_Test to check it. - # The origin of the coordinate of unitree_left_wrist is HEAD. So it is necessary to translate the origin of unitree_left_wrist from HEAD to WAIST. - unitree_left_wrist[0, 3] +=0.15 - unitree_right_wrist[0,3] +=0.15 - unitree_left_wrist[2, 3] +=0.45 - unitree_right_wrist[2,3] +=0.45 - - return head_rmat, unitree_left_wrist, unitree_right_wrist, unitree_left_hand, unitree_right_hand \ No newline at end of file + def __init__(self, binocular: bool, use_hand_tracking: bool, img_shape, img_shm_name, return_state_data: bool = True, return_hand_rot_data: bool = False, + cert_file = None, key_file = None): + """ + TeleVisionWrapper is a wrapper for the TeleVision class, which handles XR device's data suit for robot control. + It initializes the TeleVision instance with the specified parameters and provides a method to get motion state data. + + :param binocular: A boolean indicating whether the head camera device is binocular or not. + :param use_hand_tracking: A boolean indicating whether to use hand tracking or use controller tracking. + :param img_shape: The shape of the image to be processed. + :param img_shm_name: The name of the shared memory for the image. + :param return_state: A boolean indicating whether to return the state of the hand or controller. + :param return_hand_rot: A boolean indicating whether to return the hand rotation data. + :param cert_file: The path to the certificate file for secure connection. + :param key_file: The path to the key file for secure connection. + """ + self.use_hand_tracking = use_hand_tracking + self.return_state_data = return_state_data + self.return_hand_rot_data = return_hand_rot_data + self.tvuer = TeleVision(binocular, use_hand_tracking, img_shape, img_shm_name, cert_file=cert_file, key_file=key_file) + + def get_motion_state_data(self): + """ + Get processed motion state data from the TeleVision instance. + + All returned data are transformed from the OpenXR Convention to the (Robot & Unitree) Convention. + """ + # Variable Naming Convention below, + # ┌────────────┬───────────────────────────┬──────────────────────────────────┬────────────────────────────────────┬────────────────────────────────────┐ + # │left / right│ Bxr │ Brobot │ IPxr │ IPunitree │ + # │────────────│───────────────────────────│──────────────────────────────────│────────────────────────────────────│────────────────────────────────────│ + # │ side │ (basis) OpenXR Convention │ (basis) Robot Convention │ (initial pose) OpenXR Convention │ (initial pose) Unitree Convention │ + # └────────────┴───────────────────────────┴──────────────────────────────────┴────────────────────────────────────┴────────────────────────────────────┘ + # ┌───────────────────────────────────┬─────────────────────┐ + # │ world / arm / head / waist │ arm / head / hand │ + # │───────────────────────────────────│─────────────────────│ + # │ source frame │ target frame │ + # └───────────────────────────────────┴─────────────────────┘ + + # TeleVision (Vuer) obtains all raw data under the (basis) OpenXR Convention. + Bxr_world_head, head_pose_is_valid = safe_mat_update(CONST_HEAD_POSE, self.tvuer.head_pose) + + if self.use_hand_tracking: + # 'Arm' pose data follows (basis) OpenXR Convention and (initial pose) OpenXR Arm Convention. + left_IPxr_Bxr_world_arm, left_arm_is_valid = safe_mat_update(CONST_LEFT_ARM_POSE, self.tvuer.left_arm_pose) + right_IPxr_Bxr_world_arm, right_arm_is_valid = safe_mat_update(CONST_RIGHT_ARM_POSE, self.tvuer.right_arm_pose) + + # Change basis convention + # From (basis) OpenXR Convention to (basis) Robot Convention: + # Brobot_Pose = T_{robot}_{openxr} * Bxr_Pose * T_{robot}_{openxr}^T ==> + # Brobot_Pose = T_{robot}_{openxr} * Bxr_Pose * T_{openxr}_{robot} + # Reason for right multiply T_OPENXR_ROBOT = fast_mat_inv(T_ROBOT_OPENXR): + # This is similarity transformation: B = PAP^{-1}, that is B ~ A + # For example: + # - For a pose data T_r under the (basis) Robot Convention, left-multiplying Brobot_Pose means: + # Brobot_Pose * T_r ==> T_{robot}_{openxr} * PoseMatrix_openxr * T_{openxr}_{robot} * T_r + # - First, transform T_r to the (basis) OpenXR Convention (The function of T_{openxr}_{robot}) + # - Then, apply the rotation PoseMatrix_openxr in the OpenXR Convention (The function of PoseMatrix_openxr) + # - Finally, transform back to the Robot Convention (The function of T_{robot}_{openxr}) + # - This results in the same rotation effect under the Robot Convention as in the OpenXR Convention. + Brobot_world_head = T_ROBOT_OPENXR @ Bxr_world_head @ T_OPENXR_ROBOT + left_IPxr_Brobot_world_arm = T_ROBOT_OPENXR @ left_IPxr_Bxr_world_arm @ T_OPENXR_ROBOT + right_IPxr_Brobot_world_arm = T_ROBOT_OPENXR @ right_IPxr_Bxr_world_arm @ T_OPENXR_ROBOT + + # Change initial pose convention + # From (initial pose) OpenXR Arm Convention to (initial pose) Unitree Humanoid Arm URDF Convention + # Reason for right multiply (T_TO_UNITREE_HUMANOID_LEFT_ARM) : Rotate 90 degrees counterclockwise about its own x-axis. + # Reason for right multiply (T_TO_UNITREE_HUMANOID_RIGHT_ARM): Rotate 90 degrees clockwise about its own x-axis. + left_IPunitree_Brobot_world_arm = left_IPxr_Brobot_world_arm @ (T_TO_UNITREE_HUMANOID_LEFT_ARM if left_arm_is_valid else np.eye(4)) + right_IPunitree_Brobot_world_arm = right_IPxr_Brobot_world_arm @ (T_TO_UNITREE_HUMANOID_RIGHT_ARM if right_arm_is_valid else np.eye(4)) + + # Transfer from WORLD to HEAD coordinate (translation adjustment only) + left_IPunitree_Brobot_head_arm = left_IPunitree_Brobot_world_arm.copy() + right_IPunitree_Brobot_head_arm = right_IPunitree_Brobot_world_arm.copy() + left_IPunitree_Brobot_head_arm[0:3, 3] = left_IPunitree_Brobot_head_arm[0:3, 3] - Brobot_world_head[0:3, 3] + right_IPunitree_Brobot_head_arm[0:3, 3] = right_IPunitree_Brobot_world_arm[0:3, 3] - Brobot_world_head[0:3, 3] + + # =====coordinate origin offset===== + Brobot_world_head_rot = Brobot_world_head[:3, :3] + # The origin of the coordinate for IK Solve is near the WAIST joint motor. You can use teleop/robot_control/robot_arm_ik.py Unit_Test to visualize it. + # The origin of the coordinate of IPunitree_Brobot_head_arm is HEAD. + # So it is necessary to translate the origin of IPunitree_Brobot_head_arm from HEAD to WAIST. + left_IPunitree_Brobot_waist_arm = left_IPunitree_Brobot_head_arm.copy() + right_IPunitree_Brobot_waist_arm = right_IPunitree_Brobot_head_arm.copy() + left_IPunitree_Brobot_waist_arm[0, 3] +=0.15 # x + right_IPunitree_Brobot_waist_arm[0,3] +=0.15 + left_IPunitree_Brobot_waist_arm[2, 3] +=0.45 # z + right_IPunitree_Brobot_waist_arm[2,3] +=0.45 + + # -----------------------------------hand position---------------------------------------- + if left_arm_is_valid and right_arm_is_valid: + # Homogeneous, [xyz] to [xyz1] + # np.concatenate([25,3]^T,(1,25)) ==> Bxr_world_hand_pos.shape is (4,25) + # Now under (basis) OpenXR Convention, Bxr_world_hand_pos data like this: + # [x0 x1 x2 ··· x23 x24] + # [y0 y1 y1 ··· y23 y24] + # [z0 z1 z2 ··· z23 z24] + # [ 1 1 1 ··· 1 1] + left_IPxr_Bxr_world_hand_pos = np.concatenate([self.tvuer.left_hand_positions.T, np.ones((1, self.tvuer.left_hand_positions.shape[0]))]) + right_IPxr_Bxr_world_hand_pos = np.concatenate([self.tvuer.right_hand_positions.T, np.ones((1, self.tvuer.right_hand_positions.shape[0]))]) + + # Change basis convention + # From (basis) OpenXR Convention to (basis) Robot Convention + # Just a change of basis for 3D points. No rotation, only translation. So, no need to right-multiply fast_mat_inv(T_ROBOT_OPENXR). + left_IPxr_Brobot_world_hand_pos = T_ROBOT_OPENXR @ left_IPxr_Bxr_world_hand_pos + right_IPxr_Brobot_world_hand_pos = T_ROBOT_OPENXR @ right_IPxr_Bxr_world_hand_pos + + # Transfer from WORLD to ARM frame under (basis) Robot Convention: + # Brobot_{world}_{arm}^T * Brobot_{world}_pos ==> Brobot_{arm}_{world} * Brobot_{world}_pos ==> Brobot_arm_hand_pos, Now it's based on the arm frame. + left_IPxr_Brobot_arm_hand_pos = fast_mat_inv(left_IPxr_Brobot_world_arm) @ left_IPxr_Brobot_world_hand_pos + right_IPxr_Brobot_arm_hand_pos = fast_mat_inv(right_IPxr_Brobot_world_arm) @ right_IPxr_Brobot_world_hand_pos + + # Change initial pose convention + # From (initial pose) XR Hand Convention to (initial pose) Unitree Humanoid Hand URDF Convention: + # T_TO_UNITREE_HAND @ IPxr_Brobot_arm_hand_pos ==> IPunitree_Brobot_arm_hand_pos + # ((4,4) @ (4,25))[0:3, :].T ==> (4,25)[0:3, :].T ==> (3,25).T ==> (25,3) + # Now under (initial pose) Unitree Humanoid Hand URDF Convention, matrix shape like this: + # [x0, y0, z0] + # [x1, y1, z1] + # ··· + # [x23,y23,z23] + # [x24,y24,z24] + left_IPunitree_Brobot_arm_hand_pos = (T_TO_UNITREE_HAND @ left_IPxr_Brobot_arm_hand_pos)[0:3, :].T + right_IPunitree_Brobot_arm_hand_pos = (T_TO_UNITREE_HAND @ right_IPxr_Brobot_arm_hand_pos)[0:3, :].T + else: + left_IPunitree_Brobot_arm_hand_pos = np.zeros((25, 3)) + right_IPunitree_Brobot_arm_hand_pos = np.zeros((25, 3)) + + # -----------------------------------hand rotation---------------------------------------- + if self.return_hand_rot_data: + left_Bxr_world_hand_rot, left_hand_rot_is_valid = safe_rot_update(CONST_HAND_ROT, self.tvuer.left_hand_orientations) # [25, 3, 3] + right_Bxr_world_hand_rot, right_hand_rot_is_valid = safe_rot_update(CONST_HAND_ROT, self.tvuer.right_hand_orientations) + + if left_hand_rot_is_valid and right_hand_rot_is_valid: + left_Bxr_arm_hand_rot = np.einsum('ij,njk->nik', left_IPxr_Bxr_world_arm[:3, :3].T, left_Bxr_world_hand_rot) + right_Bxr_arm_hand_rot = np.einsum('ij,njk->nik', right_IPxr_Bxr_world_arm[:3, :3].T, right_Bxr_world_hand_rot) + # Change basis convention + left_Brobot_arm_hand_rot = np.einsum('ij,njk,kl->nil', R_ROBOT_OPENXR, left_Bxr_arm_hand_rot, R_OPENXR_ROBOT) + right_Brobot_arm_hand_rot = np.einsum('ij,njk,kl->nil', R_ROBOT_OPENXR, right_Bxr_arm_hand_rot, R_OPENXR_ROBOT) + else: + left_Brobot_arm_hand_rot = left_Bxr_world_hand_rot + right_Brobot_arm_hand_rot = right_Bxr_world_hand_rot + else: + left_Brobot_arm_hand_rot = None + right_Brobot_arm_hand_rot = None + + if self.return_state_data: + hand_state = TeleStateData( + left_pinch_state=self.tvuer.left_hand_pinch_state, + left_squeeze_state=self.tvuer.left_hand_squeeze_state, + left_squeeze_value=self.tvuer.left_hand_squeeze_value, + right_pinch_state=self.tvuer.right_hand_pinch_state, + right_squeeze_state=self.tvuer.right_hand_squeeze_state, + right_squeeze_value=self.tvuer.right_hand_squeeze_value, + ) + else: + hand_state = None + + return TeleData( + head_rotation=Brobot_world_head_rot, + left_arm_pose=left_IPunitree_Brobot_waist_arm, + right_arm_pose=right_IPunitree_Brobot_waist_arm, + left_hand_pos=left_IPunitree_Brobot_arm_hand_pos, + right_hand_pos=right_IPunitree_Brobot_arm_hand_pos, + left_hand_rot=left_Brobot_arm_hand_rot, + right_hand_rot=right_Brobot_arm_hand_rot, + left_pinch_value=self.tvuer.left_hand_pinch_value * 100.0, + right_pinch_value=self.tvuer.right_hand_pinch_value * 100.0, + tele_state=hand_state + ) + else: + # Controller pose data directly follows the (initial pose) Unitree Humanoid Arm URDF Convention (thus no transform is needed). + left_IPunitree_Bxr_world_arm, left_arm_is_valid = safe_mat_update(CONST_LEFT_ARM_POSE, self.tvuer.left_arm_pose) + right_IPunitree_Bxr_world_arm, right_arm_is_valid = safe_mat_update(CONST_RIGHT_ARM_POSE, self.tvuer.right_arm_pose) + + # Change basis convention + Brobot_world_head = T_ROBOT_OPENXR @ Bxr_world_head @ T_OPENXR_ROBOT + left_IPunitree_Brobot_world_arm = T_ROBOT_OPENXR @ left_IPunitree_Bxr_world_arm @ T_OPENXR_ROBOT + right_IPunitree_Brobot_world_arm = T_ROBOT_OPENXR @ right_IPunitree_Bxr_world_arm @ T_OPENXR_ROBOT + + # Transfer from WORLD to HEAD coordinate (translation adjustment only) + left_IPunitree_Brobot_head_arm = left_IPunitree_Brobot_world_arm.copy() + right_IPunitree_Brobot_head_arm = right_IPunitree_Brobot_world_arm.copy() + left_IPunitree_Brobot_head_arm[0:3, 3] = left_IPunitree_Brobot_head_arm[0:3, 3] - Brobot_world_head[0:3, 3] + right_IPunitree_Brobot_head_arm[0:3, 3] = right_IPunitree_Brobot_head_arm[0:3, 3] - Brobot_world_head[0:3, 3] + + # =====coordinate origin offset===== + Brobot_world_head_rot = Brobot_world_head[:3, :3] + # The origin of the coordinate for IK Solve is near the WAIST joint motor. You can use teleop/robot_control/robot_arm_ik.py Unit_Test to check it. + # The origin of the coordinate of IPunitree_Brobot_head_arm is HEAD. + # So it is necessary to translate the origin of IPunitree_Brobot_head_arm from HEAD to WAIST. + left_IPunitree_Brobot_waist_arm = left_IPunitree_Brobot_head_arm.copy() + right_IPunitree_Brobot_waist_arm = right_IPunitree_Brobot_head_arm.copy() + left_IPunitree_Brobot_waist_arm[0, 3] +=0.15 # x + right_IPunitree_Brobot_waist_arm[0,3] +=0.15 + left_IPunitree_Brobot_waist_arm[2, 3] +=0.45 # z + right_IPunitree_Brobot_waist_arm[2,3] +=0.45 + # left_IPunitree_Brobot_waist_arm[1, 3] +=0.02 # y + # right_IPunitree_Brobot_waist_arm[1,3] +=0.02 + + # return data + if self.return_state_data: + controller_state = TeleStateData( + left_trigger_state=self.tvuer.left_controller_trigger_state, + left_squeeze_ctrl_state=self.tvuer.left_controller_squeeze_state, + left_squeeze_ctrl_value=self.tvuer.left_controller_squeeze_value, + left_thumbstick_state=self.tvuer.left_controller_thumbstick_state, + left_thumbstick_value=self.tvuer.left_controller_thumbstick_value, + left_aButton=self.tvuer.left_controller_aButton, + left_bButton=self.tvuer.left_controller_bButton, + right_trigger_state=self.tvuer.right_controller_trigger_state, + right_squeeze_ctrl_state=self.tvuer.right_controller_squeeze_state, + right_squeeze_ctrl_value=self.tvuer.right_controller_squeeze_value, + right_thumbstick_state=self.tvuer.right_controller_thumbstick_state, + right_thumbstick_value=self.tvuer.right_controller_thumbstick_value, + right_aButton=self.tvuer.right_controller_aButton, + right_bButton=self.tvuer.right_controller_bButton, + ) + else: + controller_state = None + + return TeleData( + head_rotation=Brobot_world_head_rot, + left_arm_pose=left_IPunitree_Brobot_waist_arm, + right_arm_pose=right_IPunitree_Brobot_waist_arm, + left_trigger_value=10.0 - self.tvuer.left_controller_trigger_value * 10, + right_trigger_value=10.0 - self.tvuer.right_controller_trigger_value * 10, + tele_state=controller_state + ) \ No newline at end of file diff --git a/teleop/robot_control/robot_arm_ik.py b/teleop/robot_control/robot_arm_ik.py index 1ef1787..57c92e0 100644 --- a/teleop/robot_control/robot_arm_ik.py +++ b/teleop/robot_control/robot_arm_ik.py @@ -84,8 +84,9 @@ class G1_29_ArmIK: # frame = self.reduced_robot.model.frames[i] # frame_id = self.reduced_robot.model.getFrameId(frame.name) # print(f"Frame ID: {frame_id}, Name: {frame.name}") - for idx, name in enumerate(self.reduced_robot.model.names): - print(f"{idx}: {name}") + # for idx, name in enumerate(self.reduced_robot.model.names): + # print(f"{idx}: {name}") + # Creating Casadi models and data for symbolic computing self.cmodel = cpin.Model(self.reduced_robot.model) self.cdata = self.cmodel.createData() diff --git a/teleop/robot_control/robot_hand_inspire.py b/teleop/robot_control/robot_hand_inspire.py index ac57397..3c907b9 100644 --- a/teleop/robot_control/robot_hand_inspire.py +++ b/teleop/robot_control/robot_hand_inspire.py @@ -98,8 +98,10 @@ class Inspire_Controller: while self.running: start_time = time.time() # get dual hand state - left_hand_mat = np.array(left_hand_array[:]).reshape(25, 3).copy() - right_hand_mat = np.array(right_hand_array[:]).reshape(25, 3).copy() + with left_hand_array.get_lock(): + left_hand_mat = np.array(left_hand_array[:]).reshape(25, 3).copy() + with right_hand_array.get_lock(): + right_hand_mat = np.array(right_hand_array[:]).reshape(25, 3).copy() # Read left and right q_state from shared arrays state_data = np.concatenate((np.array(left_hand_state_array[:]), np.array(right_hand_state_array[:]))) diff --git a/teleop/robot_control/robot_hand_unitree.py b/teleop/robot_control/robot_hand_unitree.py index 039b759..9142ea6 100644 --- a/teleop/robot_control/robot_hand_unitree.py +++ b/teleop/robot_control/robot_hand_unitree.py @@ -30,20 +30,20 @@ kTopicDex3RightState = "rt/dex3/right/state" class Dex3_1_Controller: - def __init__(self, left_hand_array, right_hand_array, dual_hand_data_lock = None, dual_hand_state_array = None, - dual_hand_action_array = None, fps = 100.0, Unit_Test = False): + def __init__(self, left_hand_array_in, right_hand_array_in, dual_hand_data_lock = None, dual_hand_state_array_out = None, + dual_hand_action_array_out = None, fps = 100.0, Unit_Test = False): """ [note] A *_array type parameter requires using a multiprocessing Array, because it needs to be passed to the internal child process - left_hand_array: [input] Left hand skeleton data (required from XR device) to hand_ctrl.control_process + left_hand_array_in: [input] Left hand skeleton data (required from XR device) to hand_ctrl.control_process - right_hand_array: [input] Right hand skeleton data (required from XR device) to hand_ctrl.control_process + right_hand_array_in: [input] Right hand skeleton data (required from XR device) to hand_ctrl.control_process dual_hand_data_lock: Data synchronization lock for dual_hand_state_array and dual_hand_action_array - dual_hand_state_array: [output] Return left(7), right(7) hand motor state + dual_hand_state_array_out: [output] Return left(7), right(7) hand motor state - dual_hand_action_array: [output] Return left(7), right(7) hand motor action + dual_hand_action_array_out: [output] Return left(7), right(7) hand motor action fps: Control frequency @@ -85,8 +85,8 @@ class Dex3_1_Controller: time.sleep(0.01) print("[Dex3_1_Controller] Waiting to subscribe dds...") - hand_control_process = Process(target=self.control_process, args=(left_hand_array, right_hand_array, self.left_hand_state_array, self.right_hand_state_array, - dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array)) + hand_control_process = Process(target=self.control_process, args=(left_hand_array_in, right_hand_array_in, self.left_hand_state_array, self.right_hand_state_array, + dual_hand_data_lock, dual_hand_state_array_out, dual_hand_action_array_out)) hand_control_process.daemon = True hand_control_process.start() @@ -129,8 +129,8 @@ class Dex3_1_Controller: self.RightHandCmb_publisher.Write(self.right_msg) # print("hand ctrl publish ok.") - def control_process(self, left_hand_array, right_hand_array, left_hand_state_array, right_hand_state_array, - dual_hand_data_lock = None, dual_hand_state_array = None, dual_hand_action_array = None): + def control_process(self, left_hand_array_in, right_hand_array_in, left_hand_state_array, right_hand_state_array, + dual_hand_data_lock = None, dual_hand_state_array_out = None, dual_hand_action_array_out = None): self.running = True left_q_target = np.full(Dex3_Num_Motors, 0) @@ -170,15 +170,17 @@ class Dex3_1_Controller: while self.running: start_time = time.time() # get dual hand state - left_hand_mat = np.array(left_hand_array[:]).reshape(25, 3).copy() - right_hand_mat = np.array(right_hand_array[:]).reshape(25, 3).copy() + with left_hand_array_in.get_lock(): + left_hand_data = np.array(left_hand_array_in[:]).reshape(25, 3).copy() + with right_hand_array_in.get_lock(): + right_hand_data = np.array(right_hand_array_in[:]).reshape(25, 3).copy() # Read left and right q_state from shared arrays state_data = np.concatenate((np.array(left_hand_state_array[:]), np.array(right_hand_state_array[:]))) - if not np.all(right_hand_mat == 0.0) and not np.all(left_hand_mat[4] == np.array([-1.13, 0.3, 0.15])): # if hand data has been initialized. - ref_left_value = left_hand_mat[unitree_tip_indices] - ref_right_value = right_hand_mat[unitree_tip_indices] + if not np.all(right_hand_data == 0.0) and not np.all(left_hand_data[4] == np.array([-1.13, 0.3, 0.15])): # if hand data has been initialized. + ref_left_value = left_hand_data[unitree_tip_indices] + ref_right_value = right_hand_data[unitree_tip_indices] ref_left_value[0] = ref_left_value[0] * 1.15 ref_left_value[1] = ref_left_value[1] * 1.05 ref_left_value[2] = ref_left_value[2] * 0.95 @@ -191,10 +193,10 @@ class Dex3_1_Controller: # get dual hand action action_data = np.concatenate((left_q_target, right_q_target)) - if dual_hand_state_array and dual_hand_action_array: + if dual_hand_state_array_out and dual_hand_action_array_out: with dual_hand_data_lock: - dual_hand_state_array[:] = state_data - dual_hand_action_array[:] = action_data + dual_hand_state_array_out[:] = state_data + dual_hand_action_array_out[:] = action_data self.ctrl_dual_hand(left_q_target, right_q_target) current_time = time.time() @@ -229,20 +231,20 @@ kTopicGripperCommand = "rt/unitree_actuator/cmd" kTopicGripperState = "rt/unitree_actuator/state" class Gripper_Controller: - def __init__(self, left_hand_array, right_hand_array, dual_gripper_data_lock = None, dual_gripper_state_out = None, dual_gripper_action_out = None, + def __init__(self, left_gripper_value_in, right_gripper_value_in, dual_gripper_data_lock = None, dual_gripper_state_out = None, dual_gripper_action_out = None, filter = True, fps = 200.0, Unit_Test = False): """ [note] A *_array type parameter requires using a multiprocessing Array, because it needs to be passed to the internal child process - left_hand_array: [input] Left hand skeleton data (required from XR device) to control_thread + left_gripper_value_in: [input] Left ctrl data (required from XR device) to control_thread - right_hand_array: [input] Right hand skeleton data (required from XR device) to control_thread + right_gripper_value_in: [input] Right ctrl data (required from XR device) to control_thread dual_gripper_data_lock: Data synchronization lock for dual_gripper_state_array and dual_gripper_action_array - dual_gripper_state: [output] Return left(1), right(1) gripper motor state + dual_gripper_state_out: [output] Return left(1), right(1) gripper motor state - dual_gripper_action: [output] Return left(1), right(1) gripper motor action + dual_gripper_action_out: [output] Return left(1), right(1) gripper motor action fps: Control frequency @@ -281,7 +283,7 @@ class Gripper_Controller: time.sleep(0.01) print("[Gripper_Controller] Waiting to subscribe dds...") - self.gripper_control_thread = threading.Thread(target=self.control_thread, args=(left_hand_array, right_hand_array, self.dual_gripper_state, + self.gripper_control_thread = threading.Thread(target=self.control_thread, args=(left_gripper_value_in, right_gripper_value_in, self.dual_gripper_state, dual_gripper_data_lock, dual_gripper_state_out, dual_gripper_action_out)) self.gripper_control_thread.daemon = True self.gripper_control_thread.start() @@ -304,13 +306,13 @@ class Gripper_Controller: self.GripperCmb_publisher.Write(self.gripper_msg) # print("gripper ctrl publish ok.") - def control_thread(self, left_hand_array, right_hand_array, dual_gripper_state_in, dual_hand_data_lock = None, + def control_thread(self, left_gripper_value_in, right_gripper_value_in, dual_gripper_state_in, dual_hand_data_lock = None, dual_gripper_state_out = None, dual_gripper_action_out = None): self.running = True DELTA_GRIPPER_CMD = 0.18 # The motor rotates 5.4 radians, the clamping jaw slide open 9 cm, so 0.6 rad <==> 1 cm, 0.18 rad <==> 3 mm - THUMB_INDEX_DISTANCE_MIN = 0.05 # Assuming a minimum Euclidean distance is 5 cm between thumb and index. - THUMB_INDEX_DISTANCE_MAX = 0.07 # Assuming a maximum Euclidean distance is 9 cm between thumb and index. + THUMB_INDEX_DISTANCE_MIN = 5.0 + THUMB_INDEX_DISTANCE_MAX = 7.0 LEFT_MAPPED_MIN = 0.0 # The minimum initial motor position when the gripper closes at startup. RIGHT_MAPPED_MIN = 0.0 # The minimum initial motor position when the gripper closes at startup. # The maximum initial motor position when the gripper closes before calibration (with the rail stroke calculated as 0.6 cm/rad * 9 rad = 5.4 cm). @@ -336,15 +338,15 @@ class Gripper_Controller: while self.running: start_time = time.time() # get dual hand skeletal point state from XR device - left_hand_mat = np.array(left_hand_array[:]).reshape(25, 3).copy() - right_hand_mat = np.array(right_hand_array[:]).reshape(25, 3).copy() + with left_gripper_value_in.get_lock(): + left_gripper_value = left_gripper_value_in + with right_gripper_value_in.get_lock(): + right_gripper_value = right_gripper_value_in - if not np.all(right_hand_mat == 0.0) and not np.all(left_hand_mat[4] == np.array([-1.13, 0.3, 0.15])): # if hand data has been initialized. - left_euclidean_distance = np.linalg.norm(left_hand_mat[unitree_gripper_indices[1]] - left_hand_mat[unitree_gripper_indices[0]]) - right_euclidean_distance = np.linalg.norm(right_hand_mat[unitree_gripper_indices[1]] - right_hand_mat[unitree_gripper_indices[0]]) + if left_gripper_value != 0.0 or right_gripper_value != 0.0: # if hand data has been initialized. # Linear mapping from [0, THUMB_INDEX_DISTANCE_MAX] to gripper action range - left_target_action = np.interp(left_euclidean_distance, [THUMB_INDEX_DISTANCE_MIN, THUMB_INDEX_DISTANCE_MAX], [LEFT_MAPPED_MIN, LEFT_MAPPED_MAX]) - right_target_action = np.interp(right_euclidean_distance, [THUMB_INDEX_DISTANCE_MIN, THUMB_INDEX_DISTANCE_MAX], [RIGHT_MAPPED_MIN, RIGHT_MAPPED_MAX]) + left_target_action = np.interp(left_gripper_value, [THUMB_INDEX_DISTANCE_MIN, THUMB_INDEX_DISTANCE_MAX], [LEFT_MAPPED_MIN, LEFT_MAPPED_MAX]) + right_target_action = np.interp(right_gripper_value, [THUMB_INDEX_DISTANCE_MIN, THUMB_INDEX_DISTANCE_MAX], [RIGHT_MAPPED_MIN, RIGHT_MAPPED_MAX]) # else: # TEST WITHOUT XR DEVICE # current_time = time.time() # period = 2.5 diff --git a/teleop/teleop_hand_and_arm.py b/teleop/teleop_hand_and_arm.py index 0ede616..b6bb76c 100644 --- a/teleop/teleop_hand_and_arm.py +++ b/teleop/teleop_hand_and_arm.py @@ -2,7 +2,7 @@ import numpy as np import time import argparse import cv2 -from multiprocessing import shared_memory, Array, Lock +from multiprocessing import shared_memory, Value, Array, Lock import threading import os @@ -11,7 +11,7 @@ current_dir = os.path.dirname(os.path.abspath(__file__)) parent_dir = os.path.dirname(current_dir) sys.path.append(parent_dir) -from teleop.open_television.tv_wrapper import TeleVisionWrapper +from teleop.open_television import TeleVisionWrapper from teleop.robot_control.robot_arm import G1_29_ArmController, G1_23_ArmController, H1_2_ArmController, H1_ArmController from teleop.robot_control.robot_arm_ik import G1_29_ArmIK, G1_23_ArmIK, H1_2_ArmIK, H1_ArmIK from teleop.robot_control.robot_hand_unitree import Dex3_1_Controller, Gripper_Controller @@ -23,14 +23,15 @@ from teleop.utils.episode_writer import EpisodeWriter if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--task_dir', type = str, default = './utils/data', help = 'path to save data') - parser.add_argument('--frequency', type = int, default = 30.0, help = 'save data\'s frequency') + parser.add_argument('--frequency', type = float, default = 30.0, help = 'save data\'s frequency') parser.add_argument('--record', action = 'store_true', help = 'Save data or not') parser.add_argument('--no-record', dest = 'record', action = 'store_false', help = 'Do not save data') parser.set_defaults(record = False) + parser.add_argument('--xr_mode', type=str, choices=['hand', 'controller'], default='hand', help='Select XR device tracking source') parser.add_argument('--arm', type=str, choices=['G1_29', 'G1_23', 'H1_2', 'H1'], default='G1_29', help='Select arm controller') - parser.add_argument('--hand', type=str, choices=['dex3', 'gripper', 'inspire1'], help='Select hand controller') + parser.add_argument('--ee', type=str, choices=['dex3', 'gripper', 'inspire1'], help='Select end effector controller') args = parser.parse_args() print(f"args:{args}\n") @@ -77,7 +78,7 @@ if __name__ == '__main__': image_receive_thread.start() # television: obtain hand pose data from the XR device and transmit the robot's head camera image to the XR device. - tv_wrapper = TeleVisionWrapper(BINOCULAR, tv_img_shape, tv_img_shm.name) + tv_wrapper = TeleVisionWrapper(BINOCULAR, args.xr_mode == 'hand', tv_img_shape, tv_img_shm.name) # arm if args.arm == 'G1_29': @@ -93,28 +94,28 @@ if __name__ == '__main__': arm_ctrl = H1_ArmController() arm_ik = H1_ArmIK() - # hand - if args.hand == "dex3": - left_hand_array = Array('d', 75, lock = True) # [input] - right_hand_array = Array('d', 75, lock = True) # [input] + # end-effector + if args.ee == "dex3": + left_hand_pos_array = Array('d', 75, lock = True) # [input] + right_hand_pos_array = Array('d', 75, lock = True) # [input] dual_hand_data_lock = Lock() - dual_hand_state_array = Array('d', 14, lock = False) # [output] current left, right hand state(14) data. - dual_hand_action_array = Array('d', 14, lock = False) # [output] current left, right hand action(14) data. - hand_ctrl = Dex3_1_Controller(left_hand_array, right_hand_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array) - elif args.hand == "gripper": - left_hand_array = Array('d', 75, lock=True) - right_hand_array = Array('d', 75, lock=True) + dual_hand_state_array = Array('d', 14, lock = False) # [output] current left, right hand state(14) data. + dual_hand_action_array = Array('d', 14, lock = False) # [output] current left, right hand action(14) data. + hand_ctrl = Dex3_1_Controller(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array) + elif args.ee == "gripper": + left_gripper_value = Value('d', 0.0, lock=True) # [input] + right_gripper_value = Value('d', 0.0, lock=True) # [input] dual_gripper_data_lock = Lock() dual_gripper_state_array = Array('d', 2, lock=False) # current left, right gripper state(2) data. dual_gripper_action_array = Array('d', 2, lock=False) # current left, right gripper action(2) data. - gripper_ctrl = Gripper_Controller(left_hand_array, right_hand_array, dual_gripper_data_lock, dual_gripper_state_array, dual_gripper_action_array) - elif args.hand == "inspire1": - left_hand_array = Array('d', 75, lock = True) # [input] - right_hand_array = Array('d', 75, lock = True) # [input] + gripper_ctrl = Gripper_Controller(left_gripper_value, right_gripper_value, dual_gripper_data_lock, dual_gripper_state_array, dual_gripper_action_array) + elif args.ee == "inspire1": + left_hand_pos_array = Array('d', 75, lock = True) # [input] + right_hand_pos_array = Array('d', 75, lock = True) # [input] dual_hand_data_lock = Lock() dual_hand_state_array = Array('d', 12, lock = False) # [output] current left, right hand state(12) data. dual_hand_action_array = Array('d', 12, lock = False) # [output] current left, right hand action(12) data. - hand_ctrl = Inspire_Controller(left_hand_array, right_hand_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array) + hand_ctrl = Inspire_Controller(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array) else: pass @@ -126,16 +127,28 @@ if __name__ == '__main__': user_input = input("Please enter the start signal (enter 'r' to start the subsequent program):\n") if user_input.lower() == 'r': arm_ctrl.speed_gradual_max() - running = True while running: start_time = time.time() - head_rmat, left_wrist, right_wrist, left_hand, right_hand = tv_wrapper.get_data() + tele_data = tv_wrapper.get_motion_state_data() - # send hand skeleton data to hand_ctrl.control_process - if args.hand: - left_hand_array[:] = left_hand.flatten() - right_hand_array[:] = right_hand.flatten() + if (args.ee == 'dex3' or args.ee == 'inspire1') and args.xr_mode == 'hand': + with left_hand_pos_array.get_lock(): + left_hand_pos_array[:] = tele_data.left_hand_pos.flatten() + with right_hand_pos_array.get_lock(): + right_hand_pos_array[:] = tele_data.right_hand_pos.flatten() + elif args.ee == 'gripper' and args.xr_mode == 'controller': + with left_gripper_value.get_lock(): + left_gripper_value.value = tele_data.left_trigger_value + with right_gripper_value.get_lock(): + right_gripper_value.value = tele_data.right_trigger_value + elif args.ee == 'gripper' and args.xr_mode == 'hand': + with left_gripper_value.get_lock(): + left_gripper_value.value = tele_data.left_pinch_value + with right_gripper_value.get_lock(): + right_gripper_value.value = tele_data.right_pinch_value + else: + pass # get current state data. current_lr_arm_q = arm_ctrl.get_current_dual_arm_q() @@ -143,7 +156,7 @@ if __name__ == '__main__': # solve ik using motor data and wrist pose, then use ik results to control arms. time_ik_start = time.time() - sol_q, sol_tauff = arm_ik.solve_ik(left_wrist, right_wrist, current_lr_arm_q, current_lr_arm_dq) + sol_q, sol_tauff = arm_ik.solve_ik(tele_data.left_arm_pose, tele_data.right_arm_pose, current_lr_arm_q, current_lr_arm_dq) time_ik_end = time.time() # print(f"ik:\t{round(time_ik_end - time_ik_start, 6)}") arm_ctrl.ctrl_dual_arm(sol_q, sol_tauff) @@ -164,27 +177,29 @@ if __name__ == '__main__': # record data if args.record: # dex hand or gripper - if args.hand == "dex3": + if args.ee == "dex3": with dual_hand_data_lock: left_hand_state = dual_hand_state_array[:7] right_hand_state = dual_hand_state_array[-7:] left_hand_action = dual_hand_action_array[:7] right_hand_action = dual_hand_action_array[-7:] - elif args.hand == "gripper": + elif args.ee == "gripper": with dual_gripper_data_lock: left_hand_state = [dual_gripper_state_array[1]] right_hand_state = [dual_gripper_state_array[0]] left_hand_action = [dual_gripper_action_array[1]] right_hand_action = [dual_gripper_action_array[0]] - elif args.hand == "inspire1": + elif args.ee == "inspire1": with dual_hand_data_lock: left_hand_state = dual_hand_state_array[:6] right_hand_state = dual_hand_state_array[-6:] left_hand_action = dual_hand_action_array[:6] right_hand_action = dual_hand_action_array[-6:] else: - print("No dexterous hand set.") - pass + left_hand_state = [] + right_hand_state = [] + left_hand_action = [] + right_hand_action = [] # head image current_tv_image = tv_img_array.copy() # wrist image @@ -195,7 +210,6 @@ if __name__ == '__main__': right_arm_state = current_lr_arm_q[-7:] left_arm_action = sol_q[:7] right_arm_action = sol_q[-7:] - if recording: colors = {} depths = {} @@ -221,12 +235,12 @@ if __name__ == '__main__': "qvel": [], "torque": [], }, - "left_hand": { + "left_hand_pos": { "qpos": left_hand_state, "qvel": [], "torque": [], }, - "right_hand": { + "right_hand_pos": { "qpos": right_hand_state, "qvel": [], "torque": [], @@ -244,12 +258,12 @@ if __name__ == '__main__': "qvel": [], "torque": [], }, - "left_hand": { + "left_hand_pos": { "qpos": left_hand_action, "qvel": [], "torque": [], }, - "right_hand": { + "right_hand_pos": { "qpos": right_hand_action, "qvel": [], "torque": [], @@ -260,7 +274,7 @@ if __name__ == '__main__': current_time = time.time() time_elapsed = current_time - start_time - sleep_time = max(0, (1 / float(args.frequency)) - time_elapsed) + sleep_time = max(0, (1 / args.frequency) - time_elapsed) time.sleep(sleep_time) # print(f"main process sleep: {sleep_time}") diff --git a/teleop/utils/mat_tool.py b/teleop/utils/mat_tool.py deleted file mode 100644 index 349489f..0000000 --- a/teleop/utils/mat_tool.py +++ /dev/null @@ -1,14 +0,0 @@ -import numpy as np - -def mat_update(prev_mat, mat): - if np.linalg.det(mat) == 0: - return prev_mat, False # Return previous matrix and False flag if the new matrix is non-singular (determinant ≠ 0). - else: - return mat, True - - -def fast_mat_inv(mat): - ret = np.eye(4) - ret[:3, :3] = mat[:3, :3].T - ret[:3, 3] = -mat[:3, :3].T @ mat[:3, 3] - return ret \ No newline at end of file