Browse Source

[update] Simplify the data structure, align variable names and optim image comm.

main
silencht 5 months ago
parent
commit
1e4e6c6be3
  1. 2
      src/televuer/__init__.py
  2. 358
      src/televuer/televuer.py
  3. 206
      src/televuer/tv_wrapper.py
  4. 88
      test/_test_televuer.py
  5. 100
      test/_test_tv_wrapper.py

2
src/televuer/__init__.py

@ -1,3 +1,3 @@
# unitree_televuer/__init__.py
from .televuer import TeleVuer
from .tv_wrapper import TeleVuerWrapper, TeleData, TeleStateData
from .tv_wrapper import TeleVuerWrapper, TeleData

358
src/televuer/televuer.py

@ -3,32 +3,42 @@ from vuer.schemas import ImageBackground, Hands, MotionControllers, WebRTCVideoP
from multiprocessing import Value, Array, Process, shared_memory
import numpy as np
import asyncio
import threading
import cv2
import os
from pathlib import Path
class TeleVuer:
def __init__(self, binocular: bool, use_hand_tracking: bool, img_shape, img_shm_name, cert_file=None, key_file=None, ngrok=False, webrtc=False):
def __init__(self, binocular: bool, use_hand_tracking: bool, img_shape, cert_file=None, key_file=None, ngrok=False, webrtc=False):
"""
TeleVuer class for OpenXR-based XR teleoperate applications.
This class handles the communication with the Vuer server and manages the shared memory for image and pose data.
:param binocular: bool, whether the application is binocular (stereoscopic) or monocular.
:param use_hand_tracking: bool, whether to use hand tracking or controller tracking.
:param img_shape: tuple, shape of the image (height, width, channels).
:param img_shm_name: str, name of the shared memory for the image.
:param img_shape: tuple, shape of the image (height, width).
:param cert_file: str, path to the SSL certificate file.
:param key_file: str, path to the SSL key file.
:param ngrok: bool, whether to use ngrok for tunneling.
:param webrtc: bool, whether to use WebRTC for real-time communication.
"""
self.binocular = binocular
self.use_hand_tracking = use_hand_tracking
self.img_height = img_shape[0]
self.img_shape = (img_shape[0], img_shape[1], 3)
self.img2display_shm = shared_memory.SharedMemory(create=True, size=np.prod(self.img_shape) * np.uint8().itemsize)
self.img2display = np.ndarray(self.img_shape, dtype=np.uint8, buffer=self.img2display_shm.buf)
self.latest_frame = None
self.new_frame_event = threading.Event()
self.writer_thread = threading.Thread(target=self._img2display_writer, daemon=True)
self.writer_thread.start()
self.img_height = self.img_shape[0]
if self.binocular:
self.img_width = img_shape[1] // 2
self.img_width = self.img_shape[1] // 2
else:
self.img_width = img_shape[1]
self.img_width = self.img_shape[1]
current_module_dir = Path(__file__).resolve().parent.parent.parent
if cert_file is None:
@ -46,10 +56,6 @@ class TeleVuer:
self.vuer.add_handler("HAND_MOVE")(self.on_hand_move)
else:
self.vuer.add_handler("CONTROLLER_MOVE")(self.on_controller_move)
existing_shm = shared_memory.SharedMemory(name=img_shm_name)
self.img_array = np.ndarray(img_shape, dtype=np.uint8, buffer=existing_shm.buf)
self.webrtc = webrtc
if self.binocular and not self.webrtc:
self.vuer.spawn(start=False)(self.main_image_binocular)
@ -67,45 +73,59 @@ class TeleVuer:
self.left_hand_orientation_shared = Array('d', 25 * 9, lock=True)
self.right_hand_orientation_shared = Array('d', 25 * 9, lock=True)
self.left_pinch_state_shared = Value('b', False, lock=True)
self.left_pinch_value_shared = Value('d', 0.0, lock=True)
self.left_squeeze_state_shared = Value('b', False, lock=True)
self.left_squeeze_value_shared = Value('d', 0.0, lock=True)
self.left_hand_pinch_shared = Value('b', False, lock=True)
self.left_hand_pinchValue_shared = Value('d', 0.0, lock=True)
self.left_hand_squeeze_shared = Value('b', False, lock=True)
self.left_hand_squeezeValue_shared = Value('d', 0.0, lock=True)
self.right_pinch_state_shared = Value('b', False, lock=True)
self.right_pinch_value_shared = Value('d', 0.0, lock=True)
self.right_squeeze_state_shared = Value('b', False, lock=True)
self.right_squeeze_value_shared = Value('d', 0.0, lock=True)
self.right_hand_pinch_shared = Value('b', False, lock=True)
self.right_hand_pinchValue_shared = Value('d', 0.0, lock=True)
self.right_hand_squeeze_shared = Value('b', False, lock=True)
self.right_hand_squeezeValue_shared = Value('d', 0.0, lock=True)
else:
self.left_trigger_state_shared = Value('b', False, lock=True)
self.left_trigger_value_shared = Value('d', 0.0, lock=True)
self.left_squeeze_state_shared = Value('b', False, lock=True)
self.left_squeeze_value_shared = Value('d', 0.0, lock=True)
self.left_thumbstick_state_shared = Value('b', False, lock=True)
self.left_thumbstick_value_shared = Array('d', 2, lock=True)
self.left_aButton_shared = Value('b', False, lock=True)
self.left_bButton_shared = Value('b', False, lock=True)
self.right_trigger_state_shared = Value('b', False, lock=True)
self.right_trigger_value_shared = Value('d', 0.0, lock=True)
self.right_squeeze_state_shared = Value('b', False, lock=True)
self.right_squeeze_value_shared = Value('d', 0.0, lock=True)
self.right_thumbstick_state_shared = Value('b', False, lock=True)
self.right_thumbstick_value_shared = Array('d', 2, lock=True)
self.right_aButton_shared = Value('b', False, lock=True)
self.right_bButton_shared = Value('b', False, lock=True)
self.process = Process(target=self.vuer_run)
self.left_ctrl_trigger_shared = Value('b', False, lock=True)
self.left_ctrl_triggerValue_shared = Value('d', 0.0, lock=True)
self.left_ctrl_squeeze_shared = Value('b', False, lock=True)
self.left_ctrl_squeezeValue_shared = Value('d', 0.0, lock=True)
self.left_ctrl_thumbstick_shared = Value('b', False, lock=True)
self.left_ctrl_thumbstickValue_shared = Array('d', 2, lock=True)
self.left_ctrl_aButton_shared = Value('b', False, lock=True)
self.left_ctrl_bButton_shared = Value('b', False, lock=True)
self.right_ctrl_trigger_shared = Value('b', False, lock=True)
self.right_ctrl_triggerValue_shared = Value('d', 0.0, lock=True)
self.right_ctrl_squeeze_shared = Value('b', False, lock=True)
self.right_ctrl_squeezeValue_shared = Value('d', 0.0, lock=True)
self.right_ctrl_thumbstick_shared = Value('b', False, lock=True)
self.right_ctrl_thumbstickValue_shared = Array('d', 2, lock=True)
self.right_ctrl_aButton_shared = Value('b', False, lock=True)
self.right_ctrl_bButton_shared = Value('b', False, lock=True)
self.process = Process(target=self._vuer_run)
self.process.daemon = True
self.process.start()
def _vuer_run(self):
self.vuer.run()
def vuer_run(self):
try:
self.vuer.run()
except KeyboardInterrupt:
pass
except Exception as e:
print(f"Vuer encountered an error: {e}")
def _img2display_writer(self):
while True:
self.new_frame_event.wait()
self.new_frame_event.clear()
self.latest_frame = cv2.cvtColor(self.latest_frame, cv2.COLOR_BGR2RGB)
self.img2display[:] = self.latest_frame #.copy()
def set_display_image(self, image):
self.latest_frame = image
self.new_frame_event.set()
def close(self):
self.process.terminate()
self.process.join(timeout=0.5)
self.img2display_shm.close()
self.img2display_shm.unlink()
self.writer_thread.join(timeout=0.5)
async def on_cam_move(self, event, session, fps=60):
try:
@ -115,49 +135,53 @@ class TeleVuer:
pass
async def on_controller_move(self, event, session, fps=60):
"""https://docs.vuer.ai/en/latest/examples/20_motion_controllers.html"""
try:
# ControllerData
with self.left_arm_pose_shared.get_lock():
self.left_arm_pose_shared[:] = event.value["left"]
with self.right_arm_pose_shared.get_lock():
self.right_arm_pose_shared[:] = event.value["right"]
# ControllerState
left_controller = event.value["leftState"]
right_controller = event.value["rightState"]
left_controller_state = event.value["leftState"]
right_controller_state = event.value["rightState"]
def extract_controller_states(state_dict, prefix):
def extract_controllers(controllerState, prefix):
# trigger
with getattr(self, f"{prefix}_trigger_state_shared").get_lock():
getattr(self, f"{prefix}_trigger_state_shared").value = bool(state_dict.get("trigger", False))
with getattr(self, f"{prefix}_trigger_value_shared").get_lock():
getattr(self, f"{prefix}_trigger_value_shared").value = float(state_dict.get("triggerValue", 0.0))
with getattr(self, f"{prefix}_ctrl_trigger_shared").get_lock():
getattr(self, f"{prefix}_ctrl_trigger_shared").value = bool(controllerState.get("trigger", False))
with getattr(self, f"{prefix}_ctrl_triggerValue_shared").get_lock():
getattr(self, f"{prefix}_ctrl_triggerValue_shared").value = float(controllerState.get("triggerValue", 0.0))
# squeeze
with getattr(self, f"{prefix}_squeeze_state_shared").get_lock():
getattr(self, f"{prefix}_squeeze_state_shared").value = bool(state_dict.get("squeeze", False))
with getattr(self, f"{prefix}_squeeze_value_shared").get_lock():
getattr(self, f"{prefix}_squeeze_value_shared").value = float(state_dict.get("squeezeValue", 0.0))
with getattr(self, f"{prefix}_ctrl_squeeze_shared").get_lock():
getattr(self, f"{prefix}_ctrl_squeeze_shared").value = bool(controllerState.get("squeeze", False))
with getattr(self, f"{prefix}_ctrl_squeezeValue_shared").get_lock():
getattr(self, f"{prefix}_ctrl_squeezeValue_shared").value = float(controllerState.get("squeezeValue", 0.0))
# thumbstick
with getattr(self, f"{prefix}_thumbstick_state_shared").get_lock():
getattr(self, f"{prefix}_thumbstick_state_shared").value = bool(state_dict.get("thumbstick", False))
with getattr(self, f"{prefix}_thumbstick_value_shared").get_lock():
getattr(self, f"{prefix}_thumbstick_value_shared")[:] = state_dict.get("thumbstickValue", [0.0, 0.0])
with getattr(self, f"{prefix}_ctrl_thumbstick_shared").get_lock():
getattr(self, f"{prefix}_ctrl_thumbstick_shared").value = bool(controllerState.get("thumbstick", False))
with getattr(self, f"{prefix}_ctrl_thumbstickValue_shared").get_lock():
getattr(self, f"{prefix}_ctrl_thumbstickValue_shared")[:] = controllerState.get("thumbstickValue", [0.0, 0.0])
# buttons
with getattr(self, f"{prefix}_aButton_shared").get_lock():
getattr(self, f"{prefix}_aButton_shared").value = bool(state_dict.get("aButton", False))
with getattr(self, f"{prefix}_bButton_shared").get_lock():
getattr(self, f"{prefix}_bButton_shared").value = bool(state_dict.get("bButton", False))
with getattr(self, f"{prefix}_ctrl_aButton_shared").get_lock():
getattr(self, f"{prefix}_ctrl_aButton_shared").value = bool(controllerState.get("aButton", False))
with getattr(self, f"{prefix}_ctrl_bButton_shared").get_lock():
getattr(self, f"{prefix}_ctrl_bButton_shared").value = bool(controllerState.get("bButton", False))
extract_controller_states(left_controller_state, "left")
extract_controller_states(right_controller_state, "right")
extract_controllers(left_controller, "left")
extract_controllers(right_controller, "right")
except:
pass
async def on_hand_move(self, event, session, fps=60):
"""https://docs.vuer.ai/en/latest/examples/19_hand_tracking.html"""
try:
# HandsData
left_hand_data = event.value["left"]
right_hand_data = event.value["right"]
left_hand_state = event.value["leftState"]
right_hand_state = event.value["rightState"]
left_hand = event.value["leftState"]
right_hand = event.value["rightState"]
# HandState
def extract_hand_poses(hand_data, arm_pose_shared, hand_position_shared, hand_orientation_shared):
with arm_pose_shared.get_lock():
arm_pose_shared[:] = hand_data[0:16]
@ -176,22 +200,22 @@ class TeleVuer:
hand_data[base + 8], hand_data[base + 9], hand_data[base + 10],
]
def extract_hand_states(state_dict, prefix):
def extract_hands(handState, prefix):
# pinch
with getattr(self, f"{prefix}_pinch_state_shared").get_lock():
getattr(self, f"{prefix}_pinch_state_shared").value = bool(state_dict.get("pinch", False))
with getattr(self, f"{prefix}_pinch_value_shared").get_lock():
getattr(self, f"{prefix}_pinch_value_shared").value = float(state_dict.get("pinchValue", 0.0))
with getattr(self, f"{prefix}_hand_pinch_shared").get_lock():
getattr(self, f"{prefix}_hand_pinch_shared").value = bool(handState.get("pinch", False))
with getattr(self, f"{prefix}_hand_pinchValue_shared").get_lock():
getattr(self, f"{prefix}_hand_pinchValue_shared").value = float(handState.get("pinchValue", 0.0))
# squeeze
with getattr(self, f"{prefix}_squeeze_state_shared").get_lock():
getattr(self, f"{prefix}_squeeze_state_shared").value = bool(state_dict.get("squeeze", False))
with getattr(self, f"{prefix}_squeeze_value_shared").get_lock():
getattr(self, f"{prefix}_squeeze_value_shared").value = float(state_dict.get("squeezeValue", 0.0))
with getattr(self, f"{prefix}_hand_squeeze_shared").get_lock():
getattr(self, f"{prefix}_hand_squeeze_shared").value = bool(handState.get("squeeze", False))
with getattr(self, f"{prefix}_hand_squeezeValue_shared").get_lock():
getattr(self, f"{prefix}_hand_squeezeValue_shared").value = float(handState.get("squeezeValue", 0.0))
extract_hand_poses(left_hand_data, self.left_arm_pose_shared, self.left_hand_position_shared, self.left_hand_orientation_shared)
extract_hand_poses(right_hand_data, self.right_arm_pose_shared, self.right_hand_position_shared, self.right_hand_orientation_shared)
extract_hand_states(left_hand_state, "left")
extract_hand_states(right_hand_state, "right")
extract_hands(left_hand, "left")
extract_hands(right_hand, "right")
except:
pass
@ -217,15 +241,13 @@ class TeleVuer:
),
to="bgChildren",
)
while True:
display_image = cv2.cvtColor(self.img_array, cv2.COLOR_BGR2RGB)
# aspect_ratio = self.img_width / self.img_height
aspect_ratio = self.img_width / self.img_height
session.upsert(
[
ImageBackground(
display_image[:, :self.img_width],
aspect=1.778,
self.img2display[:, :self.img_width],
aspect=aspect_ratio,
height=1,
distanceToCamera=1,
# The underlying rendering engine supported a layer binary bitmask for both objects and the camera.
@ -233,18 +255,18 @@ class TeleVuer:
# Note that these two masks are associated with left eye’s camera and the right eye’s camera.
layers=1,
format="jpeg",
quality=100,
quality=80,
key="background-left",
interpolate=True,
),
ImageBackground(
display_image[:, self.img_width:],
aspect=1.778,
self.img2display[:, self.img_width:],
aspect=aspect_ratio,
height=1,
distanceToCamera=1,
layers=2,
format="jpeg",
quality=100,
quality=80,
key="background-right",
interpolate=True,
),
@ -252,7 +274,7 @@ class TeleVuer:
to="bgChildren",
)
# 'jpeg' encoding should give you about 30fps with a 16ms wait in-between.
await asyncio.sleep(0.016 * 2)
await asyncio.sleep(0.016)
async def main_image_monocular(self, session, fps=60):
if self.use_hand_tracking:
@ -277,17 +299,16 @@ class TeleVuer:
)
while True:
display_image = cv2.cvtColor(self.img_array, cv2.COLOR_BGR2RGB)
# aspect_ratio = self.img_width / self.img_height
aspect_ratio = self.img_width / self.img_height
session.upsert(
[
ImageBackground(
display_image,
aspect=1.778,
self.img2display,
aspect=aspect_ratio,
height=1,
distanceToCamera=1,
format="jpeg",
quality=50,
quality=80,
key="background-mono",
interpolate=True,
),
@ -297,6 +318,7 @@ class TeleVuer:
await asyncio.sleep(0.016)
async def main_image_webrtc(self, session, fps=60):
aspect_ratio = self.img_width / self.img_height
if self.use_hand_tracking:
session.upsert(
Hands(
@ -323,7 +345,7 @@ class TeleVuer:
src="https://10.0.7.49:8080/offer",
iceServer={},
key="webrtc",
aspect=1.778,
aspect=aspect_ratio,
height = 7,
),
to="bgChildren",
@ -375,146 +397,146 @@ class TeleVuer:
return np.array(self.right_hand_orientation_shared[:]).reshape(25, 9).reshape(25, 3, 3, order="F")
@property
def left_hand_pinch_state(self):
def left_hand_pinch(self):
"""bool, whether left hand is pinching."""
with self.left_pinch_state_shared.get_lock():
return self.left_pinch_state_shared.value
with self.left_hand_pinch_shared.get_lock():
return self.left_hand_pinch_shared.value
@property
def left_hand_pinch_value(self):
def left_hand_pinchValue(self):
"""float, pinch strength of left hand."""
with self.left_pinch_value_shared.get_lock():
return self.left_pinch_value_shared.value
with self.left_hand_pinchValue_shared.get_lock():
return self.left_hand_pinchValue_shared.value
@property
def left_hand_squeeze_state(self):
def left_hand_squeeze(self):
"""bool, whether left hand is squeezing."""
with self.left_squeeze_state_shared.get_lock():
return self.left_squeeze_state_shared.value
with self.left_hand_squeeze_shared.get_lock():
return self.left_hand_squeeze_shared.value
@property
def left_hand_squeeze_value(self):
def left_hand_squeezeValue(self):
"""float, squeeze strength of left hand."""
with self.left_squeeze_value_shared.get_lock():
return self.left_squeeze_value_shared.value
with self.left_hand_squeezeValue_shared.get_lock():
return self.left_hand_squeezeValue_shared.value
@property
def right_hand_pinch_state(self):
def right_hand_pinch(self):
"""bool, whether right hand is pinching."""
with self.right_pinch_state_shared.get_lock():
return self.right_pinch_state_shared.value
with self.right_hand_pinch_shared.get_lock():
return self.right_hand_pinch_shared.value
@property
def right_hand_pinch_value(self):
def right_hand_pinchValue(self):
"""float, pinch strength of right hand."""
with self.right_pinch_value_shared.get_lock():
return self.right_pinch_value_shared.value
with self.right_hand_pinchValue_shared.get_lock():
return self.right_hand_pinchValue_shared.value
@property
def right_hand_squeeze_state(self):
def right_hand_squeeze(self):
"""bool, whether right hand is squeezing."""
with self.right_squeeze_state_shared.get_lock():
return self.right_squeeze_state_shared.value
with self.right_hand_squeeze_shared.get_lock():
return self.right_hand_squeeze_shared.value
@property
def right_hand_squeeze_value(self):
def right_hand_squeezeValue(self):
"""float, squeeze strength of right hand."""
with self.right_squeeze_value_shared.get_lock():
return self.right_squeeze_value_shared.value
with self.right_hand_squeezeValue_shared.get_lock():
return self.right_hand_squeezeValue_shared.value
# ==================== Controller Data ====================
@property
def left_controller_trigger_state(self):
def left_ctrl_trigger(self):
"""bool, left controller trigger pressed or not."""
with self.left_trigger_state_shared.get_lock():
return self.left_trigger_state_shared.value
with self.left_ctrl_trigger_shared.get_lock():
return self.left_ctrl_trigger_shared.value
@property
def left_controller_trigger_value(self):
def left_ctrl_triggerValue(self):
"""float, left controller trigger analog value (0.0 ~ 1.0)."""
with self.left_trigger_value_shared.get_lock():
return self.left_trigger_value_shared.value
with self.left_ctrl_triggerValue_shared.get_lock():
return self.left_ctrl_triggerValue_shared.value
@property
def left_controller_squeeze_state(self):
def left_ctrl_squeeze(self):
"""bool, left controller squeeze pressed or not."""
with self.left_squeeze_state_shared.get_lock():
return self.left_squeeze_state_shared.value
with self.left_ctrl_squeeze_shared.get_lock():
return self.left_ctrl_squeeze_shared.value
@property
def left_controller_squeeze_value(self):
def left_ctrl_squeezeValue(self):
"""float, left controller squeeze analog value (0.0 ~ 1.0)."""
with self.left_squeeze_value_shared.get_lock():
return self.left_squeeze_value_shared.value
with self.left_ctrl_squeezeValue_shared.get_lock():
return self.left_ctrl_squeezeValue_shared.value
@property
def left_controller_thumbstick_state(self):
def left_ctrl_thumbstick(self):
"""bool, whether left thumbstick is touched or clicked."""
with self.left_thumbstick_state_shared.get_lock():
return self.left_thumbstick_state_shared.value
with self.left_ctrl_thumbstick_shared.get_lock():
return self.left_ctrl_thumbstick_shared.value
@property
def left_controller_thumbstick_value(self):
def left_ctrl_thumbstickValue(self):
"""np.ndarray, shape (2,), left thumbstick 2D axis values (x, y)."""
with self.left_thumbstick_value_shared.get_lock():
return np.array(self.left_thumbstick_value_shared[:])
with self.left_ctrl_thumbstickValue_shared.get_lock():
return np.array(self.left_ctrl_thumbstickValue_shared[:])
@property
def left_controller_aButton(self):
def left_ctrl_aButton(self):
"""bool, left controller 'A' button pressed."""
with self.left_aButton_shared.get_lock():
return self.left_aButton_shared.value
with self.left_ctrl_aButton_shared.get_lock():
return self.left_ctrl_aButton_shared.value
@property
def left_controller_bButton(self):
def left_ctrl_bButton(self):
"""bool, left controller 'B' button pressed."""
with self.left_bButton_shared.get_lock():
return self.left_bButton_shared.value
with self.left_ctrl_bButton_shared.get_lock():
return self.left_ctrl_bButton_shared.value
@property
def right_controller_trigger_state(self):
def right_ctrl_trigger(self):
"""bool, right controller trigger pressed or not."""
with self.right_trigger_state_shared.get_lock():
return self.right_trigger_state_shared.value
with self.right_ctrl_trigger_shared.get_lock():
return self.right_ctrl_trigger_shared.value
@property
def right_controller_trigger_value(self):
def right_ctrl_triggerValue(self):
"""float, right controller trigger analog value (0.0 ~ 1.0)."""
with self.right_trigger_value_shared.get_lock():
return self.right_trigger_value_shared.value
with self.right_ctrl_triggerValue_shared.get_lock():
return self.right_ctrl_triggerValue_shared.value
@property
def right_controller_squeeze_state(self):
def right_ctrl_squeeze(self):
"""bool, right controller squeeze pressed or not."""
with self.right_squeeze_state_shared.get_lock():
return self.right_squeeze_state_shared.value
with self.right_ctrl_squeeze_shared.get_lock():
return self.right_ctrl_squeeze_shared.value
@property
def right_controller_squeeze_value(self):
def right_ctrl_squeezeValue(self):
"""float, right controller squeeze analog value (0.0 ~ 1.0)."""
with self.right_squeeze_value_shared.get_lock():
return self.right_squeeze_value_shared.value
with self.right_ctrl_squeezeValue_shared.get_lock():
return self.right_ctrl_squeezeValue_shared.value
@property
def right_controller_thumbstick_state(self):
def right_ctrl_thumbstick(self):
"""bool, whether right thumbstick is touched or clicked."""
with self.right_thumbstick_state_shared.get_lock():
return self.right_thumbstick_state_shared.value
with self.right_ctrl_thumbstick_shared.get_lock():
return self.right_ctrl_thumbstick_shared.value
@property
def right_controller_thumbstick_value(self):
def right_ctrl_thumbstickValue(self):
"""np.ndarray, shape (2,), right thumbstick 2D axis values (x, y)."""
with self.right_thumbstick_value_shared.get_lock():
return np.array(self.right_thumbstick_value_shared[:])
with self.right_ctrl_thumbstickValue_shared.get_lock():
return np.array(self.right_ctrl_thumbstickValue_shared[:])
@property
def right_controller_aButton(self):
def right_ctrl_aButton(self):
"""bool, right controller 'A' button pressed."""
with self.right_aButton_shared.get_lock():
return self.right_aButton_shared.value
with self.right_ctrl_aButton_shared.get_lock():
return self.right_ctrl_aButton_shared.value
@property
def right_controller_bButton(self):
def right_ctrl_bButton(self):
"""bool, right controller 'B' button pressed."""
with self.right_bButton_shared.get_lock():
return self.right_bButton_shared.value
with self.right_ctrl_bButton_shared.get_lock():
return self.right_ctrl_bButton_shared.value

206
src/televuer/tv_wrapper.py

@ -139,49 +139,61 @@ CONST_LEFT_ARM_POSE = np.array([[1, 0, 0, -0.15],
CONST_HAND_ROT = np.tile(np.eye(3)[None, :, :], (25, 1, 1))
@dataclass
class TeleStateData:
class TeleData:
head_pose: np.ndarray # (4,4) SE(3) pose of head matrix
left_wrist_pose: np.ndarray # (4,4) SE(3) pose of left wrist of arm
right_wrist_pose: np.ndarray # (4,4) SE(3) pose of right wrist of arm
# hand tracking
left_pinch_state: bool = False # True if index and thumb are pinching
left_squeeze_state: bool = False # True if hand is making a fist
left_squeeze_value: float = 0.0 # (0.0 ~ 1.0) degree of hand squeeze
right_pinch_state: bool = False # True if index and thumb are pinching
right_squeeze_state: bool = False # True if hand is making a fist
right_squeeze_value: float = 0.0 # (0.0 ~ 1.0) degree of hand squeeze
# https://docs.vuer.ai/en/latest/examples/19_hand_tracking.html
# https://immersive-web.github.io/webxr-hand-input/
# HandsData, (25,16) SE3 => (25,3) pos + (25,3,3) rot
left_hand_pos: np.ndarray = None # (25,3) 3D positions of left hand joints
right_hand_pos: np.ndarray = None # (25,3) 3D positions of right hand joints
left_hand_rot: np.ndarray = None # (25,3,3) rotation matrices of left hand joints
right_hand_rot: np.ndarray = None # (25,3,3) rotation matrices of right hand joints
# HandState
left_hand_pinch: bool = False # True if index and thumb are pinching
left_hand_pinchValue: float = 10.0 # float (~15.0​​ → 0.0) pinch distance between index and thumb
left_hand_squeeze: bool = False # True if hand is making a fist
left_hand_squeezeValue: float = 0.0 # (0.0 → 1.0) degree of hand squeeze
right_hand_pinch: bool = False # True if index and thumb are pinching
right_hand_pinchValue: float = 10.0 # float (~15.0​​ → 0.0) pinch distance between index and thumb
right_hand_squeeze: bool = False # True if hand is making a fist
right_hand_squeezeValue: float = 0.0 # (0.0 → 1.0) degree of hand squeeze
# controller tracking
left_trigger_state: bool = False # True if trigger is actively pressed
left_squeeze_ctrl_state: bool = False # True if grip button is pressed
left_squeeze_ctrl_value: float = 0.0 # (0.0 ~ 1.0) grip pull depth
left_thumbstick_state: bool = False # True if thumbstick button is pressed
left_thumbstick_value: np.ndarray = field(default_factory=lambda: np.zeros(2)) # 2D vector (x, y), normalized
left_aButton: bool = False # True if A button is pressed
left_bButton: bool = False # True if B button is pressed
right_trigger_state: bool = False # True if trigger is actively pressed
right_squeeze_ctrl_state: bool = False # True if grip button is pressed
right_squeeze_ctrl_value: float = 0.0 # (0.0 ~ 1.0) grip pull depth
right_thumbstick_state: bool = False # True if thumbstick button is pressed
right_thumbstick_value: np.ndarray = field(default_factory=lambda: np.zeros(2)) # 2D vector (x, y), normalized
right_aButton: bool = False # True if A button is pressed
right_bButton: bool = False # True if B button is pressed
@dataclass
class TeleData:
head_pose: np.ndarray # (4,4) SE(3) pose of head matrix
left_arm_pose: np.ndarray # (4,4) SE(3) pose of left arm
right_arm_pose: np.ndarray # (4,4) SE(3) pose of right arm
left_hand_pos: np.ndarray = None # (25,3) 3D positions of left hand joints
right_hand_pos: np.ndarray = None # (25,3) 3D positions of right hand joints
left_hand_rot: np.ndarray = None # (25,3,3) rotation matrices of left hand joints
right_hand_rot: np.ndarray = None # (25,3,3) rotation matrices of right hand joints
left_pinch_value: float = None # float (1x.0 ~ 0.0) pinch distance between index and thumb
right_pinch_value: float = None # float (1x.0 ~ 0.0) pinch distance between index and thumb
left_trigger_value: float = None # float (10.0 ~ 0.0) trigger pull depth
right_trigger_value: float = None # float (10.0 ~ 0.0) trigger pull depth
tele_state: TeleStateData = field(default_factory=TeleStateData)
# https://docs.vuer.ai/en/latest/examples/20_motion_controllers.html
# https://immersive-web.github.io/webxr-gamepads-module/
left_ctrl_trigger: bool = False # True if trigger is actively pressed
left_ctrl_triggerValue: float = 10.0 # float (10.0 → 0.0) trigger pull depth, 0.0 means fully pressed (for align with hand pinch value's logic)
left_ctrl_squeeze: bool = False # True if grip button is pressed
left_ctrl_squeezeValue: float = 0.0 # (0.0 → 1.0) grip pull depth, 0.0 means no press
left_ctrl_aButton: bool = False # True if A(X) button is pressed
left_ctrl_bButton: bool = False # True if B(Y) button is pressed
left_ctrl_thumbstick: bool = False # True if thumbstick button is pressed
left_ctrl_thumbstickValue: np.ndarray = field(default_factory=lambda: np.zeros(2)) # 2D vector (x, y), normalized
""" thumbstickValue explanation:
front (0,-1)
^
|
left (-1,0) < o > right (1,0) and 'o' is at (0, 0)
|
v
back (0,1)
"""
right_ctrl_trigger: bool = False # True if trigger is actively pressed
right_ctrl_triggerValue: float = 10.0 # float (10.0 → 0.0) trigger pull depth, 0.0 means fully pressed (for align with hand pinch value's logic)
right_ctrl_squeeze: bool = False # True if grip button is pressed
right_ctrl_squeezeValue: float = 0.0 # (0.0 → 1.0) grip pull depth, 0.0 means no press
right_ctrl_aButton: bool = False # True if A button is pressed
right_ctrl_bButton: bool = False # True if B button is pressed
right_ctrl_thumbstick: bool = False # True if thumbstick button is pressed
right_ctrl_thumbstickValue: np.ndarray = field(default_factory=lambda: np.zeros(2)) # 2D vector (x, y), normalized
class TeleVuerWrapper:
def __init__(self, binocular: bool, use_hand_tracking: bool, img_shape, img_shm_name, return_state_data: bool = True, return_hand_rot_data: bool = False,
def __init__(self, binocular: bool, use_hand_tracking: bool, img_shape, return_hand_rot_data: bool = False,
cert_file = None, key_file = None, ngrok = False, webrtc = False):
"""
TeleVuerWrapper is a wrapper for the TeleVuer class, which handles XR device's data suit for robot control.
@ -190,19 +202,18 @@ class TeleVuerWrapper:
:param binocular: A boolean indicating whether the head camera device is binocular or not.
:param use_hand_tracking: A boolean indicating whether to use hand tracking or use controller tracking.
:param img_shape: The shape of the image to be processed.
:param img_shm_name: The name of the shared memory for the image.
:param return_state: A boolean indicating whether to return the state of the hand or controller.
:param return_hand_rot: A boolean indicating whether to return the hand rotation data.
:param return_hand_rot_data: A boolean indicating whether to return the hand rotation data.
:param cert_file: The path to the certificate file for secure connection.
:param key_file: The path to the key file for secure connection.
:param ngrok: A boolean indicating whether to use ngrok for remote access.
:param webrtc: A boolean indicating whether to use WebRTC for real-time communication.
"""
self.use_hand_tracking = use_hand_tracking
self.return_state_data = return_state_data
self.return_hand_rot_data = return_hand_rot_data
self.tvuer = TeleVuer(binocular, use_hand_tracking, img_shape, img_shm_name, cert_file=cert_file, key_file=key_file,
ngrok=ngrok, webrtc=webrtc)
self.tvuer = TeleVuer(binocular, use_hand_tracking, img_shape, cert_file=cert_file, key_file=key_file,
ngrok=ngrok, webrtc=webrtc)
def get_motion_state_data(self):
def get_tele_data(self):
"""
Get processed motion state data from the TeleVuer instance.
@ -223,6 +234,7 @@ class TeleVuerWrapper:
# TeleVuer (Vuer) obtains all raw data under the (basis) OpenXR Convention.
Bxr_world_head, head_pose_is_valid = safe_mat_update(CONST_HEAD_POSE, self.tvuer.head_pose)
# hand tracking
if self.use_hand_tracking:
# 'Arm' pose data follows (basis) OpenXR Convention and (initial pose) OpenXR Arm Convention.
left_IPxr_Bxr_world_arm, left_arm_is_valid = safe_mat_update(CONST_LEFT_ARM_POSE, self.tvuer.left_arm_pose)
@ -262,12 +274,12 @@ class TeleVuerWrapper:
# The origin of the coordinate for IK Solve is near the WAIST joint motor. You can use teleop/robot_control/robot_arm_ik.py Unit_Test to visualize it.
# The origin of the coordinate of IPunitree_Brobot_head_arm is HEAD.
# So it is necessary to translate the origin of IPunitree_Brobot_head_arm from HEAD to WAIST.
left_IPunitree_Brobot_waist_arm = left_IPunitree_Brobot_head_arm.copy()
right_IPunitree_Brobot_waist_arm = right_IPunitree_Brobot_head_arm.copy()
left_IPunitree_Brobot_waist_arm[0, 3] +=0.15 # x
right_IPunitree_Brobot_waist_arm[0,3] +=0.15
left_IPunitree_Brobot_waist_arm[2, 3] +=0.45 # z
right_IPunitree_Brobot_waist_arm[2,3] +=0.45
left_IPunitree_Brobot_wrist_arm = left_IPunitree_Brobot_head_arm.copy()
right_IPunitree_Brobot_wrist_arm = right_IPunitree_Brobot_head_arm.copy()
left_IPunitree_Brobot_wrist_arm[0, 3] +=0.15 # x
right_IPunitree_Brobot_wrist_arm[0,3] +=0.15
left_IPunitree_Brobot_wrist_arm[2, 3] +=0.45 # z
right_IPunitree_Brobot_wrist_arm[2,3] +=0.45
# -----------------------------------hand position----------------------------------------
if left_arm_is_valid and right_arm_is_valid:
@ -325,31 +337,24 @@ class TeleVuerWrapper:
else:
left_Brobot_arm_hand_rot = None
right_Brobot_arm_hand_rot = None
if self.return_state_data:
hand_state = TeleStateData(
left_pinch_state=self.tvuer.left_hand_pinch_state,
left_squeeze_state=self.tvuer.left_hand_squeeze_state,
left_squeeze_value=self.tvuer.left_hand_squeeze_value,
right_pinch_state=self.tvuer.right_hand_pinch_state,
right_squeeze_state=self.tvuer.right_hand_squeeze_state,
right_squeeze_value=self.tvuer.right_hand_squeeze_value,
)
else:
hand_state = None
return TeleData(
head_pose=Brobot_world_head,
left_arm_pose=left_IPunitree_Brobot_waist_arm,
right_arm_pose=right_IPunitree_Brobot_waist_arm,
left_wrist_pose=left_IPunitree_Brobot_wrist_arm,
right_wrist_pose=right_IPunitree_Brobot_wrist_arm,
left_hand_pos=left_IPunitree_Brobot_arm_hand_pos,
right_hand_pos=right_IPunitree_Brobot_arm_hand_pos,
left_hand_rot=left_Brobot_arm_hand_rot,
right_hand_rot=right_Brobot_arm_hand_rot,
left_pinch_value=self.tvuer.left_hand_pinch_value * 100.0,
right_pinch_value=self.tvuer.right_hand_pinch_value * 100.0,
tele_state=hand_state
left_hand_pinch=self.tvuer.left_hand_pinch,
left_hand_pinchValue=self.tvuer.left_hand_pinchValue,
left_hand_squeeze=self.tvuer.left_hand_squeeze,
left_hand_squeezeValue=self.tvuer.left_hand_squeezeValue,
right_hand_pinch=self.tvuer.right_hand_pinch,
right_hand_pinchValue=self.tvuer.right_hand_pinchValue,
right_hand_squeeze=self.tvuer.right_hand_squeeze,
right_hand_squeezeValue=self.tvuer.right_hand_squeezeValue,
)
# controller tracking
else:
# Controller pose data directly follows the (initial pose) Unitree Humanoid Arm URDF Convention (thus no transform is needed).
left_IPunitree_Bxr_world_arm, left_arm_is_valid = safe_mat_update(CONST_LEFT_ARM_POSE, self.tvuer.left_arm_pose)
@ -370,41 +375,38 @@ class TeleVuerWrapper:
# The origin of the coordinate for IK Solve is near the WAIST joint motor. You can use teleop/robot_control/robot_arm_ik.py Unit_Test to check it.
# The origin of the coordinate of IPunitree_Brobot_head_arm is HEAD.
# So it is necessary to translate the origin of IPunitree_Brobot_head_arm from HEAD to WAIST.
left_IPunitree_Brobot_waist_arm = left_IPunitree_Brobot_head_arm.copy()
right_IPunitree_Brobot_waist_arm = right_IPunitree_Brobot_head_arm.copy()
left_IPunitree_Brobot_waist_arm[0, 3] +=0.15 # x
right_IPunitree_Brobot_waist_arm[0,3] +=0.15
left_IPunitree_Brobot_waist_arm[2, 3] +=0.45 # z
right_IPunitree_Brobot_waist_arm[2,3] +=0.45
left_IPunitree_Brobot_wrist_arm = left_IPunitree_Brobot_head_arm.copy()
right_IPunitree_Brobot_wrist_arm = right_IPunitree_Brobot_head_arm.copy()
left_IPunitree_Brobot_wrist_arm[0, 3] +=0.15 # x
right_IPunitree_Brobot_wrist_arm[0,3] +=0.15
left_IPunitree_Brobot_wrist_arm[2, 3] +=0.45 # z
right_IPunitree_Brobot_wrist_arm[2,3] +=0.45
# left_IPunitree_Brobot_waist_arm[1, 3] +=0.02 # y
# right_IPunitree_Brobot_waist_arm[1,3] +=0.02
# return data
if self.return_state_data:
controller_state = TeleStateData(
left_trigger_state=self.tvuer.left_controller_trigger_state,
left_squeeze_ctrl_state=self.tvuer.left_controller_squeeze_state,
left_squeeze_ctrl_value=self.tvuer.left_controller_squeeze_value,
left_thumbstick_state=self.tvuer.left_controller_thumbstick_state,
left_thumbstick_value=self.tvuer.left_controller_thumbstick_value,
left_aButton=self.tvuer.left_controller_aButton,
left_bButton=self.tvuer.left_controller_bButton,
right_trigger_state=self.tvuer.right_controller_trigger_state,
right_squeeze_ctrl_state=self.tvuer.right_controller_squeeze_state,
right_squeeze_ctrl_value=self.tvuer.right_controller_squeeze_value,
right_thumbstick_state=self.tvuer.right_controller_thumbstick_state,
right_thumbstick_value=self.tvuer.right_controller_thumbstick_value,
right_aButton=self.tvuer.right_controller_aButton,
right_bButton=self.tvuer.right_controller_bButton,
)
else:
controller_state = None
return TeleData(
head_pose=Brobot_world_head,
left_arm_pose=left_IPunitree_Brobot_waist_arm,
right_arm_pose=right_IPunitree_Brobot_waist_arm,
left_trigger_value=10.0 - self.tvuer.left_controller_trigger_value * 10,
right_trigger_value=10.0 - self.tvuer.right_controller_trigger_value * 10,
tele_state=controller_state
)
left_wrist_pose=left_IPunitree_Brobot_wrist_arm,
right_wrist_pose=right_IPunitree_Brobot_wrist_arm,
left_ctrl_trigger=self.tvuer.left_ctrl_trigger,
left_ctrl_triggerValue=10.0 - self.tvuer.left_ctrl_triggerValue * 10,
left_ctrl_squeeze=self.tvuer.left_ctrl_squeeze,
left_ctrl_squeezeValue=self.tvuer.left_ctrl_squeezeValue,
left_ctrl_aButton=self.tvuer.left_ctrl_aButton,
left_ctrl_bButton=self.tvuer.left_ctrl_bButton,
left_ctrl_thumbstick=self.tvuer.left_ctrl_thumbstick,
left_ctrl_thumbstickValue=self.tvuer.left_ctrl_thumbstickValue,
right_ctrl_trigger=self.tvuer.right_ctrl_trigger,
right_ctrl_triggerValue=10.0 - self.tvuer.right_ctrl_triggerValue * 10,
right_ctrl_squeeze=self.tvuer.right_ctrl_squeeze,
right_ctrl_squeezeValue=self.tvuer.right_ctrl_squeezeValue,
right_ctrl_aButton=self.tvuer.right_ctrl_aButton,
right_ctrl_bButton=self.tvuer.right_ctrl_bButton,
right_ctrl_thumbstick=self.tvuer.right_ctrl_thumbstick,
right_ctrl_thumbstickValue=self.tvuer.right_ctrl_thumbstickValue,
)
def set_display_image(self, img):
self.tvuer.set_display_image(img)
def close(self):
self.tvuer.close()

88
test/_test_televuer.py

@ -5,33 +5,36 @@ if project_root not in sys.path:
sys.path.insert(0, project_root)
import time
import numpy as np
from multiprocessing import shared_memory
from televuer import TeleVuer
# image client. if you want to test with real image stream,
# please copy image_client.py and messaging.py from xr_teleoperate/teleop/image_server to the current directory before running this test script
# from image_client import ImageClient
import logging_mp
logger_mp = logging_mp.get_logger(__name__, level=logging_mp.INFO)
def run_test_TeleVuer():
# image
image_shape = (480, 640 * 2, 3)
image_shm = shared_memory.SharedMemory(create=True, size=np.prod(image_shape) * np.uint8().itemsize)
image_array = np.ndarray(image_shape, dtype=np.uint8, buffer=image_shm.buf)
head_binocular = True
head_img_shape = (480, 1280, 3) # default image
# from image_server.image_client import ImageClient
# import threading
# image_client = ImageClient(tv_img_shape = image_shape, tv_img_shm_name = image_shm.name, image_show=True, server_address="127.0.0.1")
# image_receive_thread = threading.Thread(target = image_client.receive_process, daemon = True)
# image_receive_thread.daemon = True
# image_receive_thread.start()
# img_client = ImageClient(host="127.0.0.1") # 127.0.0.1 is localhost, 192.168.123.164 is unitree robot's host ip
# if not img_client.has_head_cam():
# logger_mp.error("Head camera is required. Please enable head camera on the image server side.")
# head_img_shape = img_client.get_head_shape()
# head_binocular = img_client.is_binocular()
# xr-mode
use_hand_track = True
tv = TeleVuer(binocular = True, use_hand_tracking = use_hand_track, img_shape = image_shape, img_shm_name = image_shm.name, webrtc=False)
tv = TeleVuer(binocular = head_binocular, use_hand_tracking = use_hand_track, img_shape = head_img_shape, webrtc=False)
try:
input("Press Enter to start TeleVuer test...")
running = True
while running:
start_time = time.time()
# image client
# head_img, head_img_fps = img_client.get_head_frame()
# tv.set_display_image(head_img)
logger_mp.info("=" * 80)
logger_mp.info("Common Data (always available):")
logger_mp.info(f"head_pose shape: {tv.head_pose.shape}\n{tv.head_pose}\n")
@ -45,40 +48,45 @@ def run_test_TeleVuer():
logger_mp.info(f"right_hand_positions shape: {tv.right_hand_positions.shape}\n{tv.right_hand_positions}\n")
logger_mp.info(f"left_hand_orientations shape: {tv.left_hand_orientations.shape}\n{tv.left_hand_orientations}\n")
logger_mp.info(f"right_hand_orientations shape: {tv.right_hand_orientations.shape}\n{tv.right_hand_orientations}\n")
logger_mp.info(f"left_hand_pinch_state: {tv.left_hand_pinch_state}")
logger_mp.info(f"left_hand_pinch_value: {tv.left_hand_pinch_value}")
logger_mp.info(f"left_hand_squeeze_state: {tv.left_hand_squeeze_state}")
logger_mp.info(f"left_hand_squeeze_value: {tv.left_hand_squeeze_value}")
logger_mp.info(f"right_hand_pinch_state: {tv.right_hand_pinch_state}")
logger_mp.info(f"right_hand_pinch_value: {tv.right_hand_pinch_value}")
logger_mp.info(f"right_hand_squeeze_state: {tv.right_hand_squeeze_state}")
logger_mp.info(f"right_hand_squeeze_value: {tv.right_hand_squeeze_value}")
logger_mp.info(f"left_hand_pinch: {tv.left_hand_pinch}")
logger_mp.info(f"left_hand_pinchValue: {tv.left_hand_pinchValue}")
logger_mp.info(f"left_hand_squeeze: {tv.left_hand_squeeze}")
logger_mp.info(f"left_hand_squeezeValue: {tv.left_hand_squeezeValue}")
logger_mp.info(f"right_hand_pinch: {tv.right_hand_pinch}")
logger_mp.info(f"right_hand_pinchValue: {tv.right_hand_pinchValue}")
logger_mp.info(f"right_hand_squeeze: {tv.right_hand_squeeze}")
logger_mp.info(f"right_hand_squeezeValue: {tv.right_hand_squeezeValue}")
else:
logger_mp.info("Controller Data:")
logger_mp.info(f"left_controller_trigger_state: {tv.left_controller_trigger_state}")
logger_mp.info(f"left_controller_trigger_value: {tv.left_controller_trigger_value}")
logger_mp.info(f"left_controller_squeeze_state: {tv.left_controller_squeeze_state}")
logger_mp.info(f"left_controller_squeeze_value: {tv.left_controller_squeeze_value}")
logger_mp.info(f"left_controller_thumbstick_state: {tv.left_controller_thumbstick_state}")
logger_mp.info(f"left_controller_thumbstick_value: {tv.left_controller_thumbstick_value}")
logger_mp.info(f"left_controller_aButton: {tv.left_controller_aButton}")
logger_mp.info(f"left_controller_bButton: {tv.left_controller_bButton}")
logger_mp.info(f"right_controller_trigger_state: {tv.right_controller_trigger_state}")
logger_mp.info(f"right_controller_trigger_value: {tv.right_controller_trigger_value}")
logger_mp.info(f"right_controller_squeeze_state: {tv.right_controller_squeeze_state}")
logger_mp.info(f"right_controller_squeeze_value: {tv.right_controller_squeeze_value}")
logger_mp.info(f"right_controller_thumbstick_state: {tv.right_controller_thumbstick_state}")
logger_mp.info(f"right_controller_thumbstick_value: {tv.right_controller_thumbstick_value}")
logger_mp.info(f"right_controller_aButton: {tv.right_controller_aButton}")
logger_mp.info(f"right_controller_bButton: {tv.right_controller_bButton}")
logger_mp.info(f"left_ctrl_trigger: {tv.left_ctrl_trigger}")
logger_mp.info(f"left_ctrl_triggerValue: {tv.left_ctrl_triggerValue}")
logger_mp.info(f"left_ctrl_squeeze: {tv.left_ctrl_squeeze}")
logger_mp.info(f"left_ctrl_squeezeValue: {tv.left_ctrl_squeezeValue}")
logger_mp.info(f"left_ctrl_thumbstick: {tv.left_ctrl_thumbstick}")
logger_mp.info(f"left_ctrl_thumbstickValue: {tv.left_ctrl_thumbstickValue}")
logger_mp.info(f"left_ctrl_aButton: {tv.left_ctrl_aButton}")
logger_mp.info(f"left_ctrl_bButton: {tv.left_ctrl_bButton}")
logger_mp.info(f"right_ctrl_trigger: {tv.right_ctrl_trigger}")
logger_mp.info(f"right_ctrl_triggerValue: {tv.right_ctrl_triggerValue}")
logger_mp.info(f"right_ctrl_squeeze: {tv.right_ctrl_squeeze}")
logger_mp.info(f"right_ctrl_squeezeValue: {tv.right_ctrl_squeezeValue}")
logger_mp.info(f"right_ctrl_thumbstick: {tv.right_ctrl_thumbstick}")
logger_mp.info(f"right_ctrl_thumbstickValue: {tv.right_ctrl_thumbstickValue}")
logger_mp.info(f"right_ctrl_aButton: {tv.right_ctrl_aButton}")
logger_mp.info(f"right_ctrl_bButton: {tv.right_ctrl_bButton}")
logger_mp.info("=" * 80)
time.sleep(0.03)
current_time = time.time()
time_elapsed = current_time - start_time
sleep_time = max(0, 0.3 - time_elapsed)
time.sleep(sleep_time)
logger_mp.debug(f"main process sleep: {sleep_time}")
except KeyboardInterrupt:
running = False
logger_mp.warning("KeyboardInterrupt, exiting program...")
finally:
image_shm.unlink()
image_shm.close()
tv.close()
# img_client.close()
logger_mp.warning("Finally, exiting program...")
exit(0)

100
test/_test_tv_wrapper.py

@ -4,81 +4,73 @@ project_root = os.path.abspath(os.path.join(os.path.dirname(this_file), '..'))
if project_root not in sys.path:
sys.path.insert(0, project_root)
import numpy as np
import time
from multiprocessing import shared_memory
from televuer import TeleVuerWrapper
# image client. if you want to test with real image stream,
# please copy image_client.py and messaging.py from xr_teleoperate/teleop/image_server to the current directory before running this test script
# from image_client import ImageClient
import logging_mp
logger_mp = logging_mp.get_logger(__name__, level=logging_mp.INFO)
def run_test_tv_wrapper():
image_shape = (480, 640 * 2, 3)
image_shm = shared_memory.SharedMemory(create=True, size=np.prod(image_shape) * np.uint8().itemsize)
image_array = np.ndarray(image_shape, dtype=np.uint8, buffer=image_shm.buf)
head_binocular = True
head_img_shape = (480, 1280, 3) # default image shape
# from image_server.image_client import ImageClient
# import threading
# image_client = ImageClient(tv_img_shape = image_shape, tv_img_shm_name = image_shm.name, image_show=True, server_address="127.0.0.1")
# image_receive_thread = threading.Thread(target = image_client.receive_process, daemon = True)
# image_receive_thread.daemon = True
# image_receive_thread.start()
# img_client = ImageClient(host="127.0.0.1") # 127.0.0.1 is localhost, 192.168.123.164 is unitree robot's host ip
# if not img_client.has_head_cam():
# logger_mp.error("Head camera is required. Please enable head camera on the image server side.")
# head_img_shape = img_client.get_head_shape()
# head_binocular = img_client.is_binocular()
# xr-mode
use_hand_track=False
tv_wrapper = TeleVuerWrapper(binocular=True, use_hand_tracking=use_hand_track, img_shape=image_shape, img_shm_name=image_shm.name,
return_state_data=True, return_hand_rot_data = True)
tv_wrapper = TeleVuerWrapper(binocular=head_binocular, use_hand_tracking=use_hand_track, img_shape=head_img_shape, return_hand_rot_data = True)
try:
input("Press Enter to start tv_wrapper test...")
running = True
while running:
start_time = time.time()
teleData = tv_wrapper.get_motion_state_data()
# image client
# head_img, head_img_fps = img_client.get_head_frame()
# tv_wrapper.set_display_image(head_img)
logger_mp.info("---- TV Wrapper TeleData ----")
teleData = tv_wrapper.get_tele_data()
logger_mp.info("=== TeleData Snapshot ===")
logger_mp.info(f"[Head Rotation Matrix]:\n{teleData.head_pose}")
logger_mp.info(f"[Left EE Pose]:\n{teleData.left_arm_pose}")
logger_mp.info(f"[Right EE Pose]:\n{teleData.right_arm_pose}")
logger_mp.info("-------------------=== TeleData Snapshot ===-------------------")
logger_mp.info(f"[Head Pose]:\n{teleData.head_pose}")
logger_mp.info(f"[Left Wrist Pose]:\n{teleData.left_wrist_pose}")
logger_mp.info(f"[Right Wrist Pose]:\n{teleData.right_wrist_pose}")
if use_hand_track:
logger_mp.info(f"[Left Hand Position] shape {teleData.left_hand_pos.shape}:\n{teleData.left_hand_pos}")
logger_mp.info(f"[Right Hand Position] shape {teleData.right_hand_pos.shape}:\n{teleData.right_hand_pos}")
logger_mp.info(f"[Left Hand Positions] shape {teleData.left_hand_pos.shape}:\n{teleData.left_hand_pos}")
logger_mp.info(f"[Right Hand Positions] shape {teleData.right_hand_pos.shape}:\n{teleData.right_hand_pos}")
if teleData.left_hand_rot is not None:
logger_mp.info(f"[Left Hand Rotation] shape {teleData.left_hand_rot.shape}:\n{teleData.left_hand_rot}")
logger_mp.info(f"[Left Hand Rotations] shape {teleData.left_hand_rot.shape}:\n{teleData.left_hand_rot}")
if teleData.right_hand_rot is not None:
logger_mp.info(f"[Right Hand Rotation] shape {teleData.right_hand_rot.shape}:\n{teleData.right_hand_rot}")
if teleData.left_pinch_value is not None:
logger_mp.info(f"[Left Pinch Value]: {teleData.left_pinch_value:.2f}")
if teleData.right_pinch_value is not None:
logger_mp.info(f"[Right Pinch Value]: {teleData.right_pinch_value:.2f}")
if teleData.tele_state:
state = teleData.tele_state
logger_mp.info("[Hand State]:")
logger_mp.info(f" Left Pinch state: {state.left_pinch_state}")
logger_mp.info(f" Left Squeeze: {state.left_squeeze_state} ({state.left_squeeze_value:.2f})")
logger_mp.info(f" Right Pinch state: {state.right_pinch_state}")
logger_mp.info(f" Right Squeeze: {state.right_squeeze_state} ({state.right_squeeze_value:.2f})")
logger_mp.info(f"[Right Hand Rotations] shape {teleData.right_hand_rot.shape}:\n{teleData.right_hand_rot}")
logger_mp.info(f"[Left Pinch Value]: {teleData.left_hand_pinchValue:.2f}")
logger_mp.info(f"[Left Squeeze Value]: {teleData.left_hand_squeezeValue:.2f}")
logger_mp.info(f"[Right Pinch Value]: {teleData.right_hand_pinchValue:.2f}")
logger_mp.info(f"[Right Squeeze Value]: {teleData.right_hand_squeezeValue:.2f}")
else:
logger_mp.info(f"[Left Trigger Value]: {teleData.left_trigger_value:.2f}")
logger_mp.info(f"[Right Trigger Value]: {teleData.right_trigger_value:.2f}")
if teleData.tele_state:
state = teleData.tele_state
logger_mp.info("[Controller State]:")
logger_mp.info(f" Left Trigger: {state.left_trigger_state}")
logger_mp.info(f" Left Squeeze: {state.left_squeeze_ctrl_state} ({state.left_squeeze_ctrl_value:.2f})")
logger_mp.info(f" Left Thumbstick: {state.left_thumbstick_state} ({state.left_thumbstick_value})")
logger_mp.info(f" Left A/B Buttons: A={state.left_aButton}, B={state.left_bButton}")
logger_mp.info(f" Right Trigger: {state.right_trigger_state}")
logger_mp.info(f" Right Squeeze: {state.right_squeeze_ctrl_state} ({state.right_squeeze_ctrl_value:.2f})")
logger_mp.info(f" Right Thumbstick: {state.right_thumbstick_state} ({state.right_thumbstick_value})")
logger_mp.info(f" Right A/B Buttons: A={state.right_aButton}, B={state.right_bButton}")
logger_mp.info(f"[Left Trigger Value]: {teleData.left_ctrl_triggerValue:.2f}")
logger_mp.info(f"[Left Squeeze Value]: {teleData.left_ctrl_squeezeValue:.2f}")
logger_mp.info(f"[Left Thumbstick Value]: {teleData.left_ctrl_thumbstickValue}")
logger_mp.info(f"[Left A/B Buttons]: A={teleData.left_ctrl_aButton}, B={teleData.left_ctrl_bButton}")
logger_mp.info(f"[Right Trigger Value]: {teleData.right_ctrl_triggerValue:.2f}")
logger_mp.info(f"[Right Squeeze Value]: {teleData.right_ctrl_squeezeValue:.2f}")
logger_mp.info(f"[Right Thumbstick Value]: {teleData.right_ctrl_thumbstickValue}")
logger_mp.info(f"[Right A/B Buttons]: A={teleData.right_ctrl_aButton}, B={teleData.right_ctrl_bButton}")
current_time = time.time()
time_elapsed = current_time - start_time
sleep_time = max(0, 0.033 - time_elapsed)
sleep_time = max(0, 0.3 - time_elapsed)
time.sleep(sleep_time)
logger_mp.debug(f"main process sleep: {sleep_time}")
@ -86,8 +78,8 @@ def run_test_tv_wrapper():
running = False
logger_mp.warning("KeyboardInterrupt, exiting program...")
finally:
image_shm.unlink()
image_shm.close()
tv_wrapper.close()
# img_client.close()
logger_mp.warning("Finally, exiting program...")
exit(0)

Loading…
Cancel
Save