From 1e4e6c6be3dad616640138a2e7121d5748428f00 Mon Sep 17 00:00:00 2001 From: silencht Date: Mon, 13 Oct 2025 21:14:58 +0800 Subject: [PATCH] [update] Simplify the data structure, align variable names and optim image comm. --- src/televuer/__init__.py | 2 +- src/televuer/televuer.py | 358 ++++++++++++++++++++----------------- src/televuer/tv_wrapper.py | 206 ++++++++++----------- test/_test_televuer.py | 88 ++++----- test/_test_tv_wrapper.py | 100 +++++------ 5 files changed, 389 insertions(+), 365 deletions(-) diff --git a/src/televuer/__init__.py b/src/televuer/__init__.py index 17b4fd9..9c66ed7 100644 --- a/src/televuer/__init__.py +++ b/src/televuer/__init__.py @@ -1,3 +1,3 @@ # unitree_televuer/__init__.py from .televuer import TeleVuer -from .tv_wrapper import TeleVuerWrapper, TeleData, TeleStateData \ No newline at end of file +from .tv_wrapper import TeleVuerWrapper, TeleData \ No newline at end of file diff --git a/src/televuer/televuer.py b/src/televuer/televuer.py index d8a9814..c1d3548 100644 --- a/src/televuer/televuer.py +++ b/src/televuer/televuer.py @@ -3,32 +3,42 @@ from vuer.schemas import ImageBackground, Hands, MotionControllers, WebRTCVideoP from multiprocessing import Value, Array, Process, shared_memory import numpy as np import asyncio +import threading import cv2 import os from pathlib import Path class TeleVuer: - def __init__(self, binocular: bool, use_hand_tracking: bool, img_shape, img_shm_name, cert_file=None, key_file=None, ngrok=False, webrtc=False): + def __init__(self, binocular: bool, use_hand_tracking: bool, img_shape, cert_file=None, key_file=None, ngrok=False, webrtc=False): """ TeleVuer class for OpenXR-based XR teleoperate applications. This class handles the communication with the Vuer server and manages the shared memory for image and pose data. :param binocular: bool, whether the application is binocular (stereoscopic) or monocular. :param use_hand_tracking: bool, whether to use hand tracking or controller tracking. - :param img_shape: tuple, shape of the image (height, width, channels). - :param img_shm_name: str, name of the shared memory for the image. + :param img_shape: tuple, shape of the image (height, width). :param cert_file: str, path to the SSL certificate file. :param key_file: str, path to the SSL key file. :param ngrok: bool, whether to use ngrok for tunneling. + :param webrtc: bool, whether to use WebRTC for real-time communication. """ self.binocular = binocular self.use_hand_tracking = use_hand_tracking - self.img_height = img_shape[0] + self.img_shape = (img_shape[0], img_shape[1], 3) + self.img2display_shm = shared_memory.SharedMemory(create=True, size=np.prod(self.img_shape) * np.uint8().itemsize) + self.img2display = np.ndarray(self.img_shape, dtype=np.uint8, buffer=self.img2display_shm.buf) + + self.latest_frame = None + self.new_frame_event = threading.Event() + self.writer_thread = threading.Thread(target=self._img2display_writer, daemon=True) + self.writer_thread.start() + + self.img_height = self.img_shape[0] if self.binocular: - self.img_width = img_shape[1] // 2 + self.img_width = self.img_shape[1] // 2 else: - self.img_width = img_shape[1] + self.img_width = self.img_shape[1] current_module_dir = Path(__file__).resolve().parent.parent.parent if cert_file is None: @@ -46,10 +56,6 @@ class TeleVuer: self.vuer.add_handler("HAND_MOVE")(self.on_hand_move) else: self.vuer.add_handler("CONTROLLER_MOVE")(self.on_controller_move) - - existing_shm = shared_memory.SharedMemory(name=img_shm_name) - self.img_array = np.ndarray(img_shape, dtype=np.uint8, buffer=existing_shm.buf) - self.webrtc = webrtc if self.binocular and not self.webrtc: self.vuer.spawn(start=False)(self.main_image_binocular) @@ -67,45 +73,59 @@ class TeleVuer: self.left_hand_orientation_shared = Array('d', 25 * 9, lock=True) self.right_hand_orientation_shared = Array('d', 25 * 9, lock=True) - self.left_pinch_state_shared = Value('b', False, lock=True) - self.left_pinch_value_shared = Value('d', 0.0, lock=True) - self.left_squeeze_state_shared = Value('b', False, lock=True) - self.left_squeeze_value_shared = Value('d', 0.0, lock=True) + self.left_hand_pinch_shared = Value('b', False, lock=True) + self.left_hand_pinchValue_shared = Value('d', 0.0, lock=True) + self.left_hand_squeeze_shared = Value('b', False, lock=True) + self.left_hand_squeezeValue_shared = Value('d', 0.0, lock=True) - self.right_pinch_state_shared = Value('b', False, lock=True) - self.right_pinch_value_shared = Value('d', 0.0, lock=True) - self.right_squeeze_state_shared = Value('b', False, lock=True) - self.right_squeeze_value_shared = Value('d', 0.0, lock=True) + self.right_hand_pinch_shared = Value('b', False, lock=True) + self.right_hand_pinchValue_shared = Value('d', 0.0, lock=True) + self.right_hand_squeeze_shared = Value('b', False, lock=True) + self.right_hand_squeezeValue_shared = Value('d', 0.0, lock=True) else: - self.left_trigger_state_shared = Value('b', False, lock=True) - self.left_trigger_value_shared = Value('d', 0.0, lock=True) - self.left_squeeze_state_shared = Value('b', False, lock=True) - self.left_squeeze_value_shared = Value('d', 0.0, lock=True) - self.left_thumbstick_state_shared = Value('b', False, lock=True) - self.left_thumbstick_value_shared = Array('d', 2, lock=True) - self.left_aButton_shared = Value('b', False, lock=True) - self.left_bButton_shared = Value('b', False, lock=True) - - self.right_trigger_state_shared = Value('b', False, lock=True) - self.right_trigger_value_shared = Value('d', 0.0, lock=True) - self.right_squeeze_state_shared = Value('b', False, lock=True) - self.right_squeeze_value_shared = Value('d', 0.0, lock=True) - self.right_thumbstick_state_shared = Value('b', False, lock=True) - self.right_thumbstick_value_shared = Array('d', 2, lock=True) - self.right_aButton_shared = Value('b', False, lock=True) - self.right_bButton_shared = Value('b', False, lock=True) - - self.process = Process(target=self.vuer_run) + self.left_ctrl_trigger_shared = Value('b', False, lock=True) + self.left_ctrl_triggerValue_shared = Value('d', 0.0, lock=True) + self.left_ctrl_squeeze_shared = Value('b', False, lock=True) + self.left_ctrl_squeezeValue_shared = Value('d', 0.0, lock=True) + self.left_ctrl_thumbstick_shared = Value('b', False, lock=True) + self.left_ctrl_thumbstickValue_shared = Array('d', 2, lock=True) + self.left_ctrl_aButton_shared = Value('b', False, lock=True) + self.left_ctrl_bButton_shared = Value('b', False, lock=True) + + self.right_ctrl_trigger_shared = Value('b', False, lock=True) + self.right_ctrl_triggerValue_shared = Value('d', 0.0, lock=True) + self.right_ctrl_squeeze_shared = Value('b', False, lock=True) + self.right_ctrl_squeezeValue_shared = Value('d', 0.0, lock=True) + self.right_ctrl_thumbstick_shared = Value('b', False, lock=True) + self.right_ctrl_thumbstickValue_shared = Array('d', 2, lock=True) + self.right_ctrl_aButton_shared = Value('b', False, lock=True) + self.right_ctrl_bButton_shared = Value('b', False, lock=True) + + self.process = Process(target=self._vuer_run) self.process.daemon = True self.process.start() + + def _vuer_run(self): + self.vuer.run() - def vuer_run(self): - try: - self.vuer.run() - except KeyboardInterrupt: - pass - except Exception as e: - print(f"Vuer encountered an error: {e}") + def _img2display_writer(self): + while True: + self.new_frame_event.wait() + self.new_frame_event.clear() + self.latest_frame = cv2.cvtColor(self.latest_frame, cv2.COLOR_BGR2RGB) + self.img2display[:] = self.latest_frame #.copy() + + def set_display_image(self, image): + self.latest_frame = image + self.new_frame_event.set() + + + def close(self): + self.process.terminate() + self.process.join(timeout=0.5) + self.img2display_shm.close() + self.img2display_shm.unlink() + self.writer_thread.join(timeout=0.5) async def on_cam_move(self, event, session, fps=60): try: @@ -115,49 +135,53 @@ class TeleVuer: pass async def on_controller_move(self, event, session, fps=60): + """https://docs.vuer.ai/en/latest/examples/20_motion_controllers.html""" try: + # ControllerData with self.left_arm_pose_shared.get_lock(): self.left_arm_pose_shared[:] = event.value["left"] with self.right_arm_pose_shared.get_lock(): self.right_arm_pose_shared[:] = event.value["right"] + # ControllerState + left_controller = event.value["leftState"] + right_controller = event.value["rightState"] - left_controller_state = event.value["leftState"] - right_controller_state = event.value["rightState"] - - def extract_controller_states(state_dict, prefix): + def extract_controllers(controllerState, prefix): # trigger - with getattr(self, f"{prefix}_trigger_state_shared").get_lock(): - getattr(self, f"{prefix}_trigger_state_shared").value = bool(state_dict.get("trigger", False)) - with getattr(self, f"{prefix}_trigger_value_shared").get_lock(): - getattr(self, f"{prefix}_trigger_value_shared").value = float(state_dict.get("triggerValue", 0.0)) + with getattr(self, f"{prefix}_ctrl_trigger_shared").get_lock(): + getattr(self, f"{prefix}_ctrl_trigger_shared").value = bool(controllerState.get("trigger", False)) + with getattr(self, f"{prefix}_ctrl_triggerValue_shared").get_lock(): + getattr(self, f"{prefix}_ctrl_triggerValue_shared").value = float(controllerState.get("triggerValue", 0.0)) # squeeze - with getattr(self, f"{prefix}_squeeze_state_shared").get_lock(): - getattr(self, f"{prefix}_squeeze_state_shared").value = bool(state_dict.get("squeeze", False)) - with getattr(self, f"{prefix}_squeeze_value_shared").get_lock(): - getattr(self, f"{prefix}_squeeze_value_shared").value = float(state_dict.get("squeezeValue", 0.0)) + with getattr(self, f"{prefix}_ctrl_squeeze_shared").get_lock(): + getattr(self, f"{prefix}_ctrl_squeeze_shared").value = bool(controllerState.get("squeeze", False)) + with getattr(self, f"{prefix}_ctrl_squeezeValue_shared").get_lock(): + getattr(self, f"{prefix}_ctrl_squeezeValue_shared").value = float(controllerState.get("squeezeValue", 0.0)) # thumbstick - with getattr(self, f"{prefix}_thumbstick_state_shared").get_lock(): - getattr(self, f"{prefix}_thumbstick_state_shared").value = bool(state_dict.get("thumbstick", False)) - with getattr(self, f"{prefix}_thumbstick_value_shared").get_lock(): - getattr(self, f"{prefix}_thumbstick_value_shared")[:] = state_dict.get("thumbstickValue", [0.0, 0.0]) + with getattr(self, f"{prefix}_ctrl_thumbstick_shared").get_lock(): + getattr(self, f"{prefix}_ctrl_thumbstick_shared").value = bool(controllerState.get("thumbstick", False)) + with getattr(self, f"{prefix}_ctrl_thumbstickValue_shared").get_lock(): + getattr(self, f"{prefix}_ctrl_thumbstickValue_shared")[:] = controllerState.get("thumbstickValue", [0.0, 0.0]) # buttons - with getattr(self, f"{prefix}_aButton_shared").get_lock(): - getattr(self, f"{prefix}_aButton_shared").value = bool(state_dict.get("aButton", False)) - with getattr(self, f"{prefix}_bButton_shared").get_lock(): - getattr(self, f"{prefix}_bButton_shared").value = bool(state_dict.get("bButton", False)) + with getattr(self, f"{prefix}_ctrl_aButton_shared").get_lock(): + getattr(self, f"{prefix}_ctrl_aButton_shared").value = bool(controllerState.get("aButton", False)) + with getattr(self, f"{prefix}_ctrl_bButton_shared").get_lock(): + getattr(self, f"{prefix}_ctrl_bButton_shared").value = bool(controllerState.get("bButton", False)) - extract_controller_states(left_controller_state, "left") - extract_controller_states(right_controller_state, "right") + extract_controllers(left_controller, "left") + extract_controllers(right_controller, "right") except: pass async def on_hand_move(self, event, session, fps=60): + """https://docs.vuer.ai/en/latest/examples/19_hand_tracking.html""" try: + # HandsData left_hand_data = event.value["left"] right_hand_data = event.value["right"] - left_hand_state = event.value["leftState"] - right_hand_state = event.value["rightState"] - + left_hand = event.value["leftState"] + right_hand = event.value["rightState"] + # HandState def extract_hand_poses(hand_data, arm_pose_shared, hand_position_shared, hand_orientation_shared): with arm_pose_shared.get_lock(): arm_pose_shared[:] = hand_data[0:16] @@ -176,22 +200,22 @@ class TeleVuer: hand_data[base + 8], hand_data[base + 9], hand_data[base + 10], ] - def extract_hand_states(state_dict, prefix): + def extract_hands(handState, prefix): # pinch - with getattr(self, f"{prefix}_pinch_state_shared").get_lock(): - getattr(self, f"{prefix}_pinch_state_shared").value = bool(state_dict.get("pinch", False)) - with getattr(self, f"{prefix}_pinch_value_shared").get_lock(): - getattr(self, f"{prefix}_pinch_value_shared").value = float(state_dict.get("pinchValue", 0.0)) + with getattr(self, f"{prefix}_hand_pinch_shared").get_lock(): + getattr(self, f"{prefix}_hand_pinch_shared").value = bool(handState.get("pinch", False)) + with getattr(self, f"{prefix}_hand_pinchValue_shared").get_lock(): + getattr(self, f"{prefix}_hand_pinchValue_shared").value = float(handState.get("pinchValue", 0.0)) # squeeze - with getattr(self, f"{prefix}_squeeze_state_shared").get_lock(): - getattr(self, f"{prefix}_squeeze_state_shared").value = bool(state_dict.get("squeeze", False)) - with getattr(self, f"{prefix}_squeeze_value_shared").get_lock(): - getattr(self, f"{prefix}_squeeze_value_shared").value = float(state_dict.get("squeezeValue", 0.0)) + with getattr(self, f"{prefix}_hand_squeeze_shared").get_lock(): + getattr(self, f"{prefix}_hand_squeeze_shared").value = bool(handState.get("squeeze", False)) + with getattr(self, f"{prefix}_hand_squeezeValue_shared").get_lock(): + getattr(self, f"{prefix}_hand_squeezeValue_shared").value = float(handState.get("squeezeValue", 0.0)) extract_hand_poses(left_hand_data, self.left_arm_pose_shared, self.left_hand_position_shared, self.left_hand_orientation_shared) extract_hand_poses(right_hand_data, self.right_arm_pose_shared, self.right_hand_position_shared, self.right_hand_orientation_shared) - extract_hand_states(left_hand_state, "left") - extract_hand_states(right_hand_state, "right") + extract_hands(left_hand, "left") + extract_hands(right_hand, "right") except: pass @@ -217,15 +241,13 @@ class TeleVuer: ), to="bgChildren", ) - while True: - display_image = cv2.cvtColor(self.img_array, cv2.COLOR_BGR2RGB) - # aspect_ratio = self.img_width / self.img_height + aspect_ratio = self.img_width / self.img_height session.upsert( [ ImageBackground( - display_image[:, :self.img_width], - aspect=1.778, + self.img2display[:, :self.img_width], + aspect=aspect_ratio, height=1, distanceToCamera=1, # The underlying rendering engine supported a layer binary bitmask for both objects and the camera. @@ -233,18 +255,18 @@ class TeleVuer: # Note that these two masks are associated with left eye’s camera and the right eye’s camera. layers=1, format="jpeg", - quality=100, + quality=80, key="background-left", interpolate=True, ), ImageBackground( - display_image[:, self.img_width:], - aspect=1.778, + self.img2display[:, self.img_width:], + aspect=aspect_ratio, height=1, distanceToCamera=1, layers=2, format="jpeg", - quality=100, + quality=80, key="background-right", interpolate=True, ), @@ -252,7 +274,7 @@ class TeleVuer: to="bgChildren", ) # 'jpeg' encoding should give you about 30fps with a 16ms wait in-between. - await asyncio.sleep(0.016 * 2) + await asyncio.sleep(0.016) async def main_image_monocular(self, session, fps=60): if self.use_hand_tracking: @@ -277,17 +299,16 @@ class TeleVuer: ) while True: - display_image = cv2.cvtColor(self.img_array, cv2.COLOR_BGR2RGB) - # aspect_ratio = self.img_width / self.img_height + aspect_ratio = self.img_width / self.img_height session.upsert( [ ImageBackground( - display_image, - aspect=1.778, + self.img2display, + aspect=aspect_ratio, height=1, distanceToCamera=1, format="jpeg", - quality=50, + quality=80, key="background-mono", interpolate=True, ), @@ -297,6 +318,7 @@ class TeleVuer: await asyncio.sleep(0.016) async def main_image_webrtc(self, session, fps=60): + aspect_ratio = self.img_width / self.img_height if self.use_hand_tracking: session.upsert( Hands( @@ -323,7 +345,7 @@ class TeleVuer: src="https://10.0.7.49:8080/offer", iceServer={}, key="webrtc", - aspect=1.778, + aspect=aspect_ratio, height = 7, ), to="bgChildren", @@ -375,146 +397,146 @@ class TeleVuer: return np.array(self.right_hand_orientation_shared[:]).reshape(25, 9).reshape(25, 3, 3, order="F") @property - def left_hand_pinch_state(self): + def left_hand_pinch(self): """bool, whether left hand is pinching.""" - with self.left_pinch_state_shared.get_lock(): - return self.left_pinch_state_shared.value + with self.left_hand_pinch_shared.get_lock(): + return self.left_hand_pinch_shared.value @property - def left_hand_pinch_value(self): + def left_hand_pinchValue(self): """float, pinch strength of left hand.""" - with self.left_pinch_value_shared.get_lock(): - return self.left_pinch_value_shared.value + with self.left_hand_pinchValue_shared.get_lock(): + return self.left_hand_pinchValue_shared.value @property - def left_hand_squeeze_state(self): + def left_hand_squeeze(self): """bool, whether left hand is squeezing.""" - with self.left_squeeze_state_shared.get_lock(): - return self.left_squeeze_state_shared.value + with self.left_hand_squeeze_shared.get_lock(): + return self.left_hand_squeeze_shared.value @property - def left_hand_squeeze_value(self): + def left_hand_squeezeValue(self): """float, squeeze strength of left hand.""" - with self.left_squeeze_value_shared.get_lock(): - return self.left_squeeze_value_shared.value + with self.left_hand_squeezeValue_shared.get_lock(): + return self.left_hand_squeezeValue_shared.value @property - def right_hand_pinch_state(self): + def right_hand_pinch(self): """bool, whether right hand is pinching.""" - with self.right_pinch_state_shared.get_lock(): - return self.right_pinch_state_shared.value + with self.right_hand_pinch_shared.get_lock(): + return self.right_hand_pinch_shared.value @property - def right_hand_pinch_value(self): + def right_hand_pinchValue(self): """float, pinch strength of right hand.""" - with self.right_pinch_value_shared.get_lock(): - return self.right_pinch_value_shared.value + with self.right_hand_pinchValue_shared.get_lock(): + return self.right_hand_pinchValue_shared.value @property - def right_hand_squeeze_state(self): + def right_hand_squeeze(self): """bool, whether right hand is squeezing.""" - with self.right_squeeze_state_shared.get_lock(): - return self.right_squeeze_state_shared.value + with self.right_hand_squeeze_shared.get_lock(): + return self.right_hand_squeeze_shared.value @property - def right_hand_squeeze_value(self): + def right_hand_squeezeValue(self): """float, squeeze strength of right hand.""" - with self.right_squeeze_value_shared.get_lock(): - return self.right_squeeze_value_shared.value + with self.right_hand_squeezeValue_shared.get_lock(): + return self.right_hand_squeezeValue_shared.value # ==================== Controller Data ==================== @property - def left_controller_trigger_state(self): + def left_ctrl_trigger(self): """bool, left controller trigger pressed or not.""" - with self.left_trigger_state_shared.get_lock(): - return self.left_trigger_state_shared.value + with self.left_ctrl_trigger_shared.get_lock(): + return self.left_ctrl_trigger_shared.value @property - def left_controller_trigger_value(self): + def left_ctrl_triggerValue(self): """float, left controller trigger analog value (0.0 ~ 1.0).""" - with self.left_trigger_value_shared.get_lock(): - return self.left_trigger_value_shared.value + with self.left_ctrl_triggerValue_shared.get_lock(): + return self.left_ctrl_triggerValue_shared.value @property - def left_controller_squeeze_state(self): + def left_ctrl_squeeze(self): """bool, left controller squeeze pressed or not.""" - with self.left_squeeze_state_shared.get_lock(): - return self.left_squeeze_state_shared.value + with self.left_ctrl_squeeze_shared.get_lock(): + return self.left_ctrl_squeeze_shared.value @property - def left_controller_squeeze_value(self): + def left_ctrl_squeezeValue(self): """float, left controller squeeze analog value (0.0 ~ 1.0).""" - with self.left_squeeze_value_shared.get_lock(): - return self.left_squeeze_value_shared.value + with self.left_ctrl_squeezeValue_shared.get_lock(): + return self.left_ctrl_squeezeValue_shared.value @property - def left_controller_thumbstick_state(self): + def left_ctrl_thumbstick(self): """bool, whether left thumbstick is touched or clicked.""" - with self.left_thumbstick_state_shared.get_lock(): - return self.left_thumbstick_state_shared.value + with self.left_ctrl_thumbstick_shared.get_lock(): + return self.left_ctrl_thumbstick_shared.value @property - def left_controller_thumbstick_value(self): + def left_ctrl_thumbstickValue(self): """np.ndarray, shape (2,), left thumbstick 2D axis values (x, y).""" - with self.left_thumbstick_value_shared.get_lock(): - return np.array(self.left_thumbstick_value_shared[:]) + with self.left_ctrl_thumbstickValue_shared.get_lock(): + return np.array(self.left_ctrl_thumbstickValue_shared[:]) @property - def left_controller_aButton(self): + def left_ctrl_aButton(self): """bool, left controller 'A' button pressed.""" - with self.left_aButton_shared.get_lock(): - return self.left_aButton_shared.value + with self.left_ctrl_aButton_shared.get_lock(): + return self.left_ctrl_aButton_shared.value @property - def left_controller_bButton(self): + def left_ctrl_bButton(self): """bool, left controller 'B' button pressed.""" - with self.left_bButton_shared.get_lock(): - return self.left_bButton_shared.value + with self.left_ctrl_bButton_shared.get_lock(): + return self.left_ctrl_bButton_shared.value @property - def right_controller_trigger_state(self): + def right_ctrl_trigger(self): """bool, right controller trigger pressed or not.""" - with self.right_trigger_state_shared.get_lock(): - return self.right_trigger_state_shared.value + with self.right_ctrl_trigger_shared.get_lock(): + return self.right_ctrl_trigger_shared.value @property - def right_controller_trigger_value(self): + def right_ctrl_triggerValue(self): """float, right controller trigger analog value (0.0 ~ 1.0).""" - with self.right_trigger_value_shared.get_lock(): - return self.right_trigger_value_shared.value + with self.right_ctrl_triggerValue_shared.get_lock(): + return self.right_ctrl_triggerValue_shared.value @property - def right_controller_squeeze_state(self): + def right_ctrl_squeeze(self): """bool, right controller squeeze pressed or not.""" - with self.right_squeeze_state_shared.get_lock(): - return self.right_squeeze_state_shared.value + with self.right_ctrl_squeeze_shared.get_lock(): + return self.right_ctrl_squeeze_shared.value @property - def right_controller_squeeze_value(self): + def right_ctrl_squeezeValue(self): """float, right controller squeeze analog value (0.0 ~ 1.0).""" - with self.right_squeeze_value_shared.get_lock(): - return self.right_squeeze_value_shared.value + with self.right_ctrl_squeezeValue_shared.get_lock(): + return self.right_ctrl_squeezeValue_shared.value @property - def right_controller_thumbstick_state(self): + def right_ctrl_thumbstick(self): """bool, whether right thumbstick is touched or clicked.""" - with self.right_thumbstick_state_shared.get_lock(): - return self.right_thumbstick_state_shared.value + with self.right_ctrl_thumbstick_shared.get_lock(): + return self.right_ctrl_thumbstick_shared.value @property - def right_controller_thumbstick_value(self): + def right_ctrl_thumbstickValue(self): """np.ndarray, shape (2,), right thumbstick 2D axis values (x, y).""" - with self.right_thumbstick_value_shared.get_lock(): - return np.array(self.right_thumbstick_value_shared[:]) + with self.right_ctrl_thumbstickValue_shared.get_lock(): + return np.array(self.right_ctrl_thumbstickValue_shared[:]) @property - def right_controller_aButton(self): + def right_ctrl_aButton(self): """bool, right controller 'A' button pressed.""" - with self.right_aButton_shared.get_lock(): - return self.right_aButton_shared.value + with self.right_ctrl_aButton_shared.get_lock(): + return self.right_ctrl_aButton_shared.value @property - def right_controller_bButton(self): + def right_ctrl_bButton(self): """bool, right controller 'B' button pressed.""" - with self.right_bButton_shared.get_lock(): - return self.right_bButton_shared.value + with self.right_ctrl_bButton_shared.get_lock(): + return self.right_ctrl_bButton_shared.value \ No newline at end of file diff --git a/src/televuer/tv_wrapper.py b/src/televuer/tv_wrapper.py index b875a93..d31271d 100644 --- a/src/televuer/tv_wrapper.py +++ b/src/televuer/tv_wrapper.py @@ -139,49 +139,61 @@ CONST_LEFT_ARM_POSE = np.array([[1, 0, 0, -0.15], CONST_HAND_ROT = np.tile(np.eye(3)[None, :, :], (25, 1, 1)) @dataclass -class TeleStateData: +class TeleData: + head_pose: np.ndarray # (4,4) SE(3) pose of head matrix + left_wrist_pose: np.ndarray # (4,4) SE(3) pose of left wrist of arm + right_wrist_pose: np.ndarray # (4,4) SE(3) pose of right wrist of arm # hand tracking - left_pinch_state: bool = False # True if index and thumb are pinching - left_squeeze_state: bool = False # True if hand is making a fist - left_squeeze_value: float = 0.0 # (0.0 ~ 1.0) degree of hand squeeze - right_pinch_state: bool = False # True if index and thumb are pinching - right_squeeze_state: bool = False # True if hand is making a fist - right_squeeze_value: float = 0.0 # (0.0 ~ 1.0) degree of hand squeeze + # https://docs.vuer.ai/en/latest/examples/19_hand_tracking.html + # https://immersive-web.github.io/webxr-hand-input/ + # HandsData, (25,16) SE3 => (25,3) pos + (25,3,3) rot + left_hand_pos: np.ndarray = None # (25,3) 3D positions of left hand joints + right_hand_pos: np.ndarray = None # (25,3) 3D positions of right hand joints + left_hand_rot: np.ndarray = None # (25,3,3) rotation matrices of left hand joints + right_hand_rot: np.ndarray = None # (25,3,3) rotation matrices of right hand joints + # HandState + left_hand_pinch: bool = False # True if index and thumb are pinching + left_hand_pinchValue: float = 10.0 # float (~15.0​​ → 0.0) pinch distance between index and thumb + left_hand_squeeze: bool = False # True if hand is making a fist + left_hand_squeezeValue: float = 0.0 # (0.0 → 1.0) degree of hand squeeze + + right_hand_pinch: bool = False # True if index and thumb are pinching + right_hand_pinchValue: float = 10.0 # float (~15.0​​ → 0.0) pinch distance between index and thumb + right_hand_squeeze: bool = False # True if hand is making a fist + right_hand_squeezeValue: float = 0.0 # (0.0 → 1.0) degree of hand squeeze # controller tracking - left_trigger_state: bool = False # True if trigger is actively pressed - left_squeeze_ctrl_state: bool = False # True if grip button is pressed - left_squeeze_ctrl_value: float = 0.0 # (0.0 ~ 1.0) grip pull depth - left_thumbstick_state: bool = False # True if thumbstick button is pressed - left_thumbstick_value: np.ndarray = field(default_factory=lambda: np.zeros(2)) # 2D vector (x, y), normalized - left_aButton: bool = False # True if A button is pressed - left_bButton: bool = False # True if B button is pressed - right_trigger_state: bool = False # True if trigger is actively pressed - right_squeeze_ctrl_state: bool = False # True if grip button is pressed - right_squeeze_ctrl_value: float = 0.0 # (0.0 ~ 1.0) grip pull depth - right_thumbstick_state: bool = False # True if thumbstick button is pressed - right_thumbstick_value: np.ndarray = field(default_factory=lambda: np.zeros(2)) # 2D vector (x, y), normalized - right_aButton: bool = False # True if A button is pressed - right_bButton: bool = False # True if B button is pressed - -@dataclass -class TeleData: - head_pose: np.ndarray # (4,4) SE(3) pose of head matrix - left_arm_pose: np.ndarray # (4,4) SE(3) pose of left arm - right_arm_pose: np.ndarray # (4,4) SE(3) pose of right arm - left_hand_pos: np.ndarray = None # (25,3) 3D positions of left hand joints - right_hand_pos: np.ndarray = None # (25,3) 3D positions of right hand joints - left_hand_rot: np.ndarray = None # (25,3,3) rotation matrices of left hand joints - right_hand_rot: np.ndarray = None # (25,3,3) rotation matrices of right hand joints - left_pinch_value: float = None # float (1x.0 ~ 0.0) pinch distance between index and thumb - right_pinch_value: float = None # float (1x.0 ~ 0.0) pinch distance between index and thumb - left_trigger_value: float = None # float (10.0 ~ 0.0) trigger pull depth - right_trigger_value: float = None # float (10.0 ~ 0.0) trigger pull depth - tele_state: TeleStateData = field(default_factory=TeleStateData) + # https://docs.vuer.ai/en/latest/examples/20_motion_controllers.html + # https://immersive-web.github.io/webxr-gamepads-module/ + left_ctrl_trigger: bool = False # True if trigger is actively pressed + left_ctrl_triggerValue: float = 10.0 # float (10.0 → 0.0) trigger pull depth, 0.0 means fully pressed (for align with hand pinch value's logic) + left_ctrl_squeeze: bool = False # True if grip button is pressed + left_ctrl_squeezeValue: float = 0.0 # (0.0 → 1.0) grip pull depth, 0.0 means no press + left_ctrl_aButton: bool = False # True if A(X) button is pressed + left_ctrl_bButton: bool = False # True if B(Y) button is pressed + left_ctrl_thumbstick: bool = False # True if thumbstick button is pressed + left_ctrl_thumbstickValue: np.ndarray = field(default_factory=lambda: np.zeros(2)) # 2D vector (x, y), normalized + """ thumbstickValue explanation: + front (0,-1) + ^ + | + left (-1,0) < —— o —— > right (1,0) and 'o' is at (0, 0) + | + v + back (0,1) + """ + right_ctrl_trigger: bool = False # True if trigger is actively pressed + right_ctrl_triggerValue: float = 10.0 # float (10.0 → 0.0) trigger pull depth, 0.0 means fully pressed (for align with hand pinch value's logic) + right_ctrl_squeeze: bool = False # True if grip button is pressed + right_ctrl_squeezeValue: float = 0.0 # (0.0 → 1.0) grip pull depth, 0.0 means no press + right_ctrl_aButton: bool = False # True if A button is pressed + right_ctrl_bButton: bool = False # True if B button is pressed + right_ctrl_thumbstick: bool = False # True if thumbstick button is pressed + right_ctrl_thumbstickValue: np.ndarray = field(default_factory=lambda: np.zeros(2)) # 2D vector (x, y), normalized class TeleVuerWrapper: - def __init__(self, binocular: bool, use_hand_tracking: bool, img_shape, img_shm_name, return_state_data: bool = True, return_hand_rot_data: bool = False, + def __init__(self, binocular: bool, use_hand_tracking: bool, img_shape, return_hand_rot_data: bool = False, cert_file = None, key_file = None, ngrok = False, webrtc = False): """ TeleVuerWrapper is a wrapper for the TeleVuer class, which handles XR device's data suit for robot control. @@ -190,19 +202,18 @@ class TeleVuerWrapper: :param binocular: A boolean indicating whether the head camera device is binocular or not. :param use_hand_tracking: A boolean indicating whether to use hand tracking or use controller tracking. :param img_shape: The shape of the image to be processed. - :param img_shm_name: The name of the shared memory for the image. - :param return_state: A boolean indicating whether to return the state of the hand or controller. - :param return_hand_rot: A boolean indicating whether to return the hand rotation data. + :param return_hand_rot_data: A boolean indicating whether to return the hand rotation data. :param cert_file: The path to the certificate file for secure connection. :param key_file: The path to the key file for secure connection. + :param ngrok: A boolean indicating whether to use ngrok for remote access. + :param webrtc: A boolean indicating whether to use WebRTC for real-time communication. """ self.use_hand_tracking = use_hand_tracking - self.return_state_data = return_state_data self.return_hand_rot_data = return_hand_rot_data - self.tvuer = TeleVuer(binocular, use_hand_tracking, img_shape, img_shm_name, cert_file=cert_file, key_file=key_file, - ngrok=ngrok, webrtc=webrtc) + self.tvuer = TeleVuer(binocular, use_hand_tracking, img_shape, cert_file=cert_file, key_file=key_file, + ngrok=ngrok, webrtc=webrtc) - def get_motion_state_data(self): + def get_tele_data(self): """ Get processed motion state data from the TeleVuer instance. @@ -223,6 +234,7 @@ class TeleVuerWrapper: # TeleVuer (Vuer) obtains all raw data under the (basis) OpenXR Convention. Bxr_world_head, head_pose_is_valid = safe_mat_update(CONST_HEAD_POSE, self.tvuer.head_pose) + # hand tracking if self.use_hand_tracking: # 'Arm' pose data follows (basis) OpenXR Convention and (initial pose) OpenXR Arm Convention. left_IPxr_Bxr_world_arm, left_arm_is_valid = safe_mat_update(CONST_LEFT_ARM_POSE, self.tvuer.left_arm_pose) @@ -262,12 +274,12 @@ class TeleVuerWrapper: # The origin of the coordinate for IK Solve is near the WAIST joint motor. You can use teleop/robot_control/robot_arm_ik.py Unit_Test to visualize it. # The origin of the coordinate of IPunitree_Brobot_head_arm is HEAD. # So it is necessary to translate the origin of IPunitree_Brobot_head_arm from HEAD to WAIST. - left_IPunitree_Brobot_waist_arm = left_IPunitree_Brobot_head_arm.copy() - right_IPunitree_Brobot_waist_arm = right_IPunitree_Brobot_head_arm.copy() - left_IPunitree_Brobot_waist_arm[0, 3] +=0.15 # x - right_IPunitree_Brobot_waist_arm[0,3] +=0.15 - left_IPunitree_Brobot_waist_arm[2, 3] +=0.45 # z - right_IPunitree_Brobot_waist_arm[2,3] +=0.45 + left_IPunitree_Brobot_wrist_arm = left_IPunitree_Brobot_head_arm.copy() + right_IPunitree_Brobot_wrist_arm = right_IPunitree_Brobot_head_arm.copy() + left_IPunitree_Brobot_wrist_arm[0, 3] +=0.15 # x + right_IPunitree_Brobot_wrist_arm[0,3] +=0.15 + left_IPunitree_Brobot_wrist_arm[2, 3] +=0.45 # z + right_IPunitree_Brobot_wrist_arm[2,3] +=0.45 # -----------------------------------hand position---------------------------------------- if left_arm_is_valid and right_arm_is_valid: @@ -325,31 +337,24 @@ class TeleVuerWrapper: else: left_Brobot_arm_hand_rot = None right_Brobot_arm_hand_rot = None - - if self.return_state_data: - hand_state = TeleStateData( - left_pinch_state=self.tvuer.left_hand_pinch_state, - left_squeeze_state=self.tvuer.left_hand_squeeze_state, - left_squeeze_value=self.tvuer.left_hand_squeeze_value, - right_pinch_state=self.tvuer.right_hand_pinch_state, - right_squeeze_state=self.tvuer.right_hand_squeeze_state, - right_squeeze_value=self.tvuer.right_hand_squeeze_value, - ) - else: - hand_state = None - return TeleData( head_pose=Brobot_world_head, - left_arm_pose=left_IPunitree_Brobot_waist_arm, - right_arm_pose=right_IPunitree_Brobot_waist_arm, + left_wrist_pose=left_IPunitree_Brobot_wrist_arm, + right_wrist_pose=right_IPunitree_Brobot_wrist_arm, left_hand_pos=left_IPunitree_Brobot_arm_hand_pos, right_hand_pos=right_IPunitree_Brobot_arm_hand_pos, left_hand_rot=left_Brobot_arm_hand_rot, right_hand_rot=right_Brobot_arm_hand_rot, - left_pinch_value=self.tvuer.left_hand_pinch_value * 100.0, - right_pinch_value=self.tvuer.right_hand_pinch_value * 100.0, - tele_state=hand_state + left_hand_pinch=self.tvuer.left_hand_pinch, + left_hand_pinchValue=self.tvuer.left_hand_pinchValue, + left_hand_squeeze=self.tvuer.left_hand_squeeze, + left_hand_squeezeValue=self.tvuer.left_hand_squeezeValue, + right_hand_pinch=self.tvuer.right_hand_pinch, + right_hand_pinchValue=self.tvuer.right_hand_pinchValue, + right_hand_squeeze=self.tvuer.right_hand_squeeze, + right_hand_squeezeValue=self.tvuer.right_hand_squeezeValue, ) + # controller tracking else: # Controller pose data directly follows the (initial pose) Unitree Humanoid Arm URDF Convention (thus no transform is needed). left_IPunitree_Bxr_world_arm, left_arm_is_valid = safe_mat_update(CONST_LEFT_ARM_POSE, self.tvuer.left_arm_pose) @@ -370,41 +375,38 @@ class TeleVuerWrapper: # The origin of the coordinate for IK Solve is near the WAIST joint motor. You can use teleop/robot_control/robot_arm_ik.py Unit_Test to check it. # The origin of the coordinate of IPunitree_Brobot_head_arm is HEAD. # So it is necessary to translate the origin of IPunitree_Brobot_head_arm from HEAD to WAIST. - left_IPunitree_Brobot_waist_arm = left_IPunitree_Brobot_head_arm.copy() - right_IPunitree_Brobot_waist_arm = right_IPunitree_Brobot_head_arm.copy() - left_IPunitree_Brobot_waist_arm[0, 3] +=0.15 # x - right_IPunitree_Brobot_waist_arm[0,3] +=0.15 - left_IPunitree_Brobot_waist_arm[2, 3] +=0.45 # z - right_IPunitree_Brobot_waist_arm[2,3] +=0.45 + left_IPunitree_Brobot_wrist_arm = left_IPunitree_Brobot_head_arm.copy() + right_IPunitree_Brobot_wrist_arm = right_IPunitree_Brobot_head_arm.copy() + left_IPunitree_Brobot_wrist_arm[0, 3] +=0.15 # x + right_IPunitree_Brobot_wrist_arm[0,3] +=0.15 + left_IPunitree_Brobot_wrist_arm[2, 3] +=0.45 # z + right_IPunitree_Brobot_wrist_arm[2,3] +=0.45 # left_IPunitree_Brobot_waist_arm[1, 3] +=0.02 # y # right_IPunitree_Brobot_waist_arm[1,3] +=0.02 - - # return data - if self.return_state_data: - controller_state = TeleStateData( - left_trigger_state=self.tvuer.left_controller_trigger_state, - left_squeeze_ctrl_state=self.tvuer.left_controller_squeeze_state, - left_squeeze_ctrl_value=self.tvuer.left_controller_squeeze_value, - left_thumbstick_state=self.tvuer.left_controller_thumbstick_state, - left_thumbstick_value=self.tvuer.left_controller_thumbstick_value, - left_aButton=self.tvuer.left_controller_aButton, - left_bButton=self.tvuer.left_controller_bButton, - right_trigger_state=self.tvuer.right_controller_trigger_state, - right_squeeze_ctrl_state=self.tvuer.right_controller_squeeze_state, - right_squeeze_ctrl_value=self.tvuer.right_controller_squeeze_value, - right_thumbstick_state=self.tvuer.right_controller_thumbstick_state, - right_thumbstick_value=self.tvuer.right_controller_thumbstick_value, - right_aButton=self.tvuer.right_controller_aButton, - right_bButton=self.tvuer.right_controller_bButton, - ) - else: - controller_state = None - return TeleData( head_pose=Brobot_world_head, - left_arm_pose=left_IPunitree_Brobot_waist_arm, - right_arm_pose=right_IPunitree_Brobot_waist_arm, - left_trigger_value=10.0 - self.tvuer.left_controller_trigger_value * 10, - right_trigger_value=10.0 - self.tvuer.right_controller_trigger_value * 10, - tele_state=controller_state - ) \ No newline at end of file + left_wrist_pose=left_IPunitree_Brobot_wrist_arm, + right_wrist_pose=right_IPunitree_Brobot_wrist_arm, + left_ctrl_trigger=self.tvuer.left_ctrl_trigger, + left_ctrl_triggerValue=10.0 - self.tvuer.left_ctrl_triggerValue * 10, + left_ctrl_squeeze=self.tvuer.left_ctrl_squeeze, + left_ctrl_squeezeValue=self.tvuer.left_ctrl_squeezeValue, + left_ctrl_aButton=self.tvuer.left_ctrl_aButton, + left_ctrl_bButton=self.tvuer.left_ctrl_bButton, + left_ctrl_thumbstick=self.tvuer.left_ctrl_thumbstick, + left_ctrl_thumbstickValue=self.tvuer.left_ctrl_thumbstickValue, + right_ctrl_trigger=self.tvuer.right_ctrl_trigger, + right_ctrl_triggerValue=10.0 - self.tvuer.right_ctrl_triggerValue * 10, + right_ctrl_squeeze=self.tvuer.right_ctrl_squeeze, + right_ctrl_squeezeValue=self.tvuer.right_ctrl_squeezeValue, + right_ctrl_aButton=self.tvuer.right_ctrl_aButton, + right_ctrl_bButton=self.tvuer.right_ctrl_bButton, + right_ctrl_thumbstick=self.tvuer.right_ctrl_thumbstick, + right_ctrl_thumbstickValue=self.tvuer.right_ctrl_thumbstickValue, + ) + + def set_display_image(self, img): + self.tvuer.set_display_image(img) + + def close(self): + self.tvuer.close() \ No newline at end of file diff --git a/test/_test_televuer.py b/test/_test_televuer.py index 8e8ccca..f1b7f56 100644 --- a/test/_test_televuer.py +++ b/test/_test_televuer.py @@ -5,33 +5,36 @@ if project_root not in sys.path: sys.path.insert(0, project_root) import time -import numpy as np -from multiprocessing import shared_memory from televuer import TeleVuer +# image client. if you want to test with real image stream, +# please copy image_client.py and messaging.py from xr_teleoperate/teleop/image_server to the current directory before running this test script +# from image_client import ImageClient import logging_mp logger_mp = logging_mp.get_logger(__name__, level=logging_mp.INFO) def run_test_TeleVuer(): - # image - image_shape = (480, 640 * 2, 3) - image_shm = shared_memory.SharedMemory(create=True, size=np.prod(image_shape) * np.uint8().itemsize) - image_array = np.ndarray(image_shape, dtype=np.uint8, buffer=image_shm.buf) + head_binocular = True + head_img_shape = (480, 1280, 3) # default image - # from image_server.image_client import ImageClient - # import threading - # image_client = ImageClient(tv_img_shape = image_shape, tv_img_shm_name = image_shm.name, image_show=True, server_address="127.0.0.1") - # image_receive_thread = threading.Thread(target = image_client.receive_process, daemon = True) - # image_receive_thread.daemon = True - # image_receive_thread.start() + # img_client = ImageClient(host="127.0.0.1") # 127.0.0.1 is localhost, 192.168.123.164 is unitree robot's host ip + # if not img_client.has_head_cam(): + # logger_mp.error("Head camera is required. Please enable head camera on the image server side.") + # head_img_shape = img_client.get_head_shape() + # head_binocular = img_client.is_binocular() # xr-mode use_hand_track = True - tv = TeleVuer(binocular = True, use_hand_tracking = use_hand_track, img_shape = image_shape, img_shm_name = image_shm.name, webrtc=False) + tv = TeleVuer(binocular = head_binocular, use_hand_tracking = use_hand_track, img_shape = head_img_shape, webrtc=False) try: input("Press Enter to start TeleVuer test...") running = True while running: + start_time = time.time() + # image client + # head_img, head_img_fps = img_client.get_head_frame() + # tv.set_display_image(head_img) + logger_mp.info("=" * 80) logger_mp.info("Common Data (always available):") logger_mp.info(f"head_pose shape: {tv.head_pose.shape}\n{tv.head_pose}\n") @@ -45,40 +48,45 @@ def run_test_TeleVuer(): logger_mp.info(f"right_hand_positions shape: {tv.right_hand_positions.shape}\n{tv.right_hand_positions}\n") logger_mp.info(f"left_hand_orientations shape: {tv.left_hand_orientations.shape}\n{tv.left_hand_orientations}\n") logger_mp.info(f"right_hand_orientations shape: {tv.right_hand_orientations.shape}\n{tv.right_hand_orientations}\n") - logger_mp.info(f"left_hand_pinch_state: {tv.left_hand_pinch_state}") - logger_mp.info(f"left_hand_pinch_value: {tv.left_hand_pinch_value}") - logger_mp.info(f"left_hand_squeeze_state: {tv.left_hand_squeeze_state}") - logger_mp.info(f"left_hand_squeeze_value: {tv.left_hand_squeeze_value}") - logger_mp.info(f"right_hand_pinch_state: {tv.right_hand_pinch_state}") - logger_mp.info(f"right_hand_pinch_value: {tv.right_hand_pinch_value}") - logger_mp.info(f"right_hand_squeeze_state: {tv.right_hand_squeeze_state}") - logger_mp.info(f"right_hand_squeeze_value: {tv.right_hand_squeeze_value}") + logger_mp.info(f"left_hand_pinch: {tv.left_hand_pinch}") + logger_mp.info(f"left_hand_pinchValue: {tv.left_hand_pinchValue}") + logger_mp.info(f"left_hand_squeeze: {tv.left_hand_squeeze}") + logger_mp.info(f"left_hand_squeezeValue: {tv.left_hand_squeezeValue}") + logger_mp.info(f"right_hand_pinch: {tv.right_hand_pinch}") + logger_mp.info(f"right_hand_pinchValue: {tv.right_hand_pinchValue}") + logger_mp.info(f"right_hand_squeeze: {tv.right_hand_squeeze}") + logger_mp.info(f"right_hand_squeezeValue: {tv.right_hand_squeezeValue}") else: logger_mp.info("Controller Data:") - logger_mp.info(f"left_controller_trigger_state: {tv.left_controller_trigger_state}") - logger_mp.info(f"left_controller_trigger_value: {tv.left_controller_trigger_value}") - logger_mp.info(f"left_controller_squeeze_state: {tv.left_controller_squeeze_state}") - logger_mp.info(f"left_controller_squeeze_value: {tv.left_controller_squeeze_value}") - logger_mp.info(f"left_controller_thumbstick_state: {tv.left_controller_thumbstick_state}") - logger_mp.info(f"left_controller_thumbstick_value: {tv.left_controller_thumbstick_value}") - logger_mp.info(f"left_controller_aButton: {tv.left_controller_aButton}") - logger_mp.info(f"left_controller_bButton: {tv.left_controller_bButton}") - logger_mp.info(f"right_controller_trigger_state: {tv.right_controller_trigger_state}") - logger_mp.info(f"right_controller_trigger_value: {tv.right_controller_trigger_value}") - logger_mp.info(f"right_controller_squeeze_state: {tv.right_controller_squeeze_state}") - logger_mp.info(f"right_controller_squeeze_value: {tv.right_controller_squeeze_value}") - logger_mp.info(f"right_controller_thumbstick_state: {tv.right_controller_thumbstick_state}") - logger_mp.info(f"right_controller_thumbstick_value: {tv.right_controller_thumbstick_value}") - logger_mp.info(f"right_controller_aButton: {tv.right_controller_aButton}") - logger_mp.info(f"right_controller_bButton: {tv.right_controller_bButton}") + logger_mp.info(f"left_ctrl_trigger: {tv.left_ctrl_trigger}") + logger_mp.info(f"left_ctrl_triggerValue: {tv.left_ctrl_triggerValue}") + logger_mp.info(f"left_ctrl_squeeze: {tv.left_ctrl_squeeze}") + logger_mp.info(f"left_ctrl_squeezeValue: {tv.left_ctrl_squeezeValue}") + logger_mp.info(f"left_ctrl_thumbstick: {tv.left_ctrl_thumbstick}") + logger_mp.info(f"left_ctrl_thumbstickValue: {tv.left_ctrl_thumbstickValue}") + logger_mp.info(f"left_ctrl_aButton: {tv.left_ctrl_aButton}") + logger_mp.info(f"left_ctrl_bButton: {tv.left_ctrl_bButton}") + logger_mp.info(f"right_ctrl_trigger: {tv.right_ctrl_trigger}") + logger_mp.info(f"right_ctrl_triggerValue: {tv.right_ctrl_triggerValue}") + logger_mp.info(f"right_ctrl_squeeze: {tv.right_ctrl_squeeze}") + logger_mp.info(f"right_ctrl_squeezeValue: {tv.right_ctrl_squeezeValue}") + logger_mp.info(f"right_ctrl_thumbstick: {tv.right_ctrl_thumbstick}") + logger_mp.info(f"right_ctrl_thumbstickValue: {tv.right_ctrl_thumbstickValue}") + logger_mp.info(f"right_ctrl_aButton: {tv.right_ctrl_aButton}") + logger_mp.info(f"right_ctrl_bButton: {tv.right_ctrl_bButton}") logger_mp.info("=" * 80) - time.sleep(0.03) + + current_time = time.time() + time_elapsed = current_time - start_time + sleep_time = max(0, 0.3 - time_elapsed) + time.sleep(sleep_time) + logger_mp.debug(f"main process sleep: {sleep_time}") except KeyboardInterrupt: running = False logger_mp.warning("KeyboardInterrupt, exiting program...") finally: - image_shm.unlink() - image_shm.close() + tv.close() + # img_client.close() logger_mp.warning("Finally, exiting program...") exit(0) diff --git a/test/_test_tv_wrapper.py b/test/_test_tv_wrapper.py index d0f28e7..8eb1f9b 100644 --- a/test/_test_tv_wrapper.py +++ b/test/_test_tv_wrapper.py @@ -4,81 +4,73 @@ project_root = os.path.abspath(os.path.join(os.path.dirname(this_file), '..')) if project_root not in sys.path: sys.path.insert(0, project_root) -import numpy as np import time -from multiprocessing import shared_memory from televuer import TeleVuerWrapper +# image client. if you want to test with real image stream, +# please copy image_client.py and messaging.py from xr_teleoperate/teleop/image_server to the current directory before running this test script +# from image_client import ImageClient import logging_mp logger_mp = logging_mp.get_logger(__name__, level=logging_mp.INFO) def run_test_tv_wrapper(): - image_shape = (480, 640 * 2, 3) - image_shm = shared_memory.SharedMemory(create=True, size=np.prod(image_shape) * np.uint8().itemsize) - image_array = np.ndarray(image_shape, dtype=np.uint8, buffer=image_shm.buf) + head_binocular = True + head_img_shape = (480, 1280, 3) # default image shape - # from image_server.image_client import ImageClient - # import threading - # image_client = ImageClient(tv_img_shape = image_shape, tv_img_shm_name = image_shm.name, image_show=True, server_address="127.0.0.1") - # image_receive_thread = threading.Thread(target = image_client.receive_process, daemon = True) - # image_receive_thread.daemon = True - # image_receive_thread.start() - + # img_client = ImageClient(host="127.0.0.1") # 127.0.0.1 is localhost, 192.168.123.164 is unitree robot's host ip + # if not img_client.has_head_cam(): + # logger_mp.error("Head camera is required. Please enable head camera on the image server side.") + # head_img_shape = img_client.get_head_shape() + # head_binocular = img_client.is_binocular() + + # xr-mode use_hand_track=False - tv_wrapper = TeleVuerWrapper(binocular=True, use_hand_tracking=use_hand_track, img_shape=image_shape, img_shm_name=image_shm.name, - return_state_data=True, return_hand_rot_data = True) + tv_wrapper = TeleVuerWrapper(binocular=head_binocular, use_hand_tracking=use_hand_track, img_shape=head_img_shape, return_hand_rot_data = True) try: input("Press Enter to start tv_wrapper test...") running = True while running: start_time = time.time() - teleData = tv_wrapper.get_motion_state_data() + # image client + # head_img, head_img_fps = img_client.get_head_frame() + # tv_wrapper.set_display_image(head_img) + logger_mp.info("---- TV Wrapper TeleData ----") + teleData = tv_wrapper.get_tele_data() - logger_mp.info("=== TeleData Snapshot ===") - logger_mp.info(f"[Head Rotation Matrix]:\n{teleData.head_pose}") - logger_mp.info(f"[Left EE Pose]:\n{teleData.left_arm_pose}") - logger_mp.info(f"[Right EE Pose]:\n{teleData.right_arm_pose}") + logger_mp.info("-------------------=== TeleData Snapshot ===-------------------") + logger_mp.info(f"[Head Pose]:\n{teleData.head_pose}") + logger_mp.info(f"[Left Wrist Pose]:\n{teleData.left_wrist_pose}") + logger_mp.info(f"[Right Wrist Pose]:\n{teleData.right_wrist_pose}") if use_hand_track: - logger_mp.info(f"[Left Hand Position] shape {teleData.left_hand_pos.shape}:\n{teleData.left_hand_pos}") - logger_mp.info(f"[Right Hand Position] shape {teleData.right_hand_pos.shape}:\n{teleData.right_hand_pos}") - + logger_mp.info(f"[Left Hand Positions] shape {teleData.left_hand_pos.shape}:\n{teleData.left_hand_pos}") + logger_mp.info(f"[Right Hand Positions] shape {teleData.right_hand_pos.shape}:\n{teleData.right_hand_pos}") + if teleData.left_hand_rot is not None: - logger_mp.info(f"[Left Hand Rotation] shape {teleData.left_hand_rot.shape}:\n{teleData.left_hand_rot}") + logger_mp.info(f"[Left Hand Rotations] shape {teleData.left_hand_rot.shape}:\n{teleData.left_hand_rot}") if teleData.right_hand_rot is not None: - logger_mp.info(f"[Right Hand Rotation] shape {teleData.right_hand_rot.shape}:\n{teleData.right_hand_rot}") - - if teleData.left_pinch_value is not None: - logger_mp.info(f"[Left Pinch Value]: {teleData.left_pinch_value:.2f}") - if teleData.right_pinch_value is not None: - logger_mp.info(f"[Right Pinch Value]: {teleData.right_pinch_value:.2f}") - - if teleData.tele_state: - state = teleData.tele_state - logger_mp.info("[Hand State]:") - logger_mp.info(f" Left Pinch state: {state.left_pinch_state}") - logger_mp.info(f" Left Squeeze: {state.left_squeeze_state} ({state.left_squeeze_value:.2f})") - logger_mp.info(f" Right Pinch state: {state.right_pinch_state}") - logger_mp.info(f" Right Squeeze: {state.right_squeeze_state} ({state.right_squeeze_value:.2f})") + logger_mp.info(f"[Right Hand Rotations] shape {teleData.right_hand_rot.shape}:\n{teleData.right_hand_rot}") + + logger_mp.info(f"[Left Pinch Value]: {teleData.left_hand_pinchValue:.2f}") + logger_mp.info(f"[Left Squeeze Value]: {teleData.left_hand_squeezeValue:.2f}") + logger_mp.info(f"[Right Pinch Value]: {teleData.right_hand_pinchValue:.2f}") + logger_mp.info(f"[Right Squeeze Value]: {teleData.right_hand_squeezeValue:.2f}") + else: - logger_mp.info(f"[Left Trigger Value]: {teleData.left_trigger_value:.2f}") - logger_mp.info(f"[Right Trigger Value]: {teleData.right_trigger_value:.2f}") - - if teleData.tele_state: - state = teleData.tele_state - logger_mp.info("[Controller State]:") - logger_mp.info(f" Left Trigger: {state.left_trigger_state}") - logger_mp.info(f" Left Squeeze: {state.left_squeeze_ctrl_state} ({state.left_squeeze_ctrl_value:.2f})") - logger_mp.info(f" Left Thumbstick: {state.left_thumbstick_state} ({state.left_thumbstick_value})") - logger_mp.info(f" Left A/B Buttons: A={state.left_aButton}, B={state.left_bButton}") - logger_mp.info(f" Right Trigger: {state.right_trigger_state}") - logger_mp.info(f" Right Squeeze: {state.right_squeeze_ctrl_state} ({state.right_squeeze_ctrl_value:.2f})") - logger_mp.info(f" Right Thumbstick: {state.right_thumbstick_state} ({state.right_thumbstick_value})") - logger_mp.info(f" Right A/B Buttons: A={state.right_aButton}, B={state.right_bButton}") + logger_mp.info(f"[Left Trigger Value]: {teleData.left_ctrl_triggerValue:.2f}") + logger_mp.info(f"[Left Squeeze Value]: {teleData.left_ctrl_squeezeValue:.2f}") + logger_mp.info(f"[Left Thumbstick Value]: {teleData.left_ctrl_thumbstickValue}") + logger_mp.info(f"[Left A/B Buttons]: A={teleData.left_ctrl_aButton}, B={teleData.left_ctrl_bButton}") + + logger_mp.info(f"[Right Trigger Value]: {teleData.right_ctrl_triggerValue:.2f}") + logger_mp.info(f"[Right Squeeze Value]: {teleData.right_ctrl_squeezeValue:.2f}") + logger_mp.info(f"[Right Thumbstick Value]: {teleData.right_ctrl_thumbstickValue}") + logger_mp.info(f"[Right A/B Buttons]: A={teleData.right_ctrl_aButton}, B={teleData.right_ctrl_bButton}") + current_time = time.time() time_elapsed = current_time - start_time - sleep_time = max(0, 0.033 - time_elapsed) + sleep_time = max(0, 0.3 - time_elapsed) time.sleep(sleep_time) logger_mp.debug(f"main process sleep: {sleep_time}") @@ -86,8 +78,8 @@ def run_test_tv_wrapper(): running = False logger_mp.warning("KeyboardInterrupt, exiting program...") finally: - image_shm.unlink() - image_shm.close() + tv_wrapper.close() + # img_client.close() logger_mp.warning("Finally, exiting program...") exit(0)