You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

344 lines
15 KiB

"""
Native TeleVuerWrapper — drop-in replacement for TeleVuerWrapper that uses
the Quest 3 native app via TeleopServer instead of Vuer/browser.
Returns TeleData in the same format as the original TeleVuerWrapper, but:
- Wrist poses arrive chest-relative from body tracking (no head subtraction needed)
- Chest orientation is available for body rotation decoupling
- No SSL, no browser, no Vuer dependency
The coordinate transform pipeline is preserved:
1. Quest 3 sends poses in Godot's coordinate system (Y-up, -Z forward)
2. This wrapper converts to robot convention (X-forward, Y-left, Z-up)
3. Applies Unitree arm URDF convention transforms
4. Applies head-to-waist offset (same as original)
Usage:
# Instead of:
# tv_wrapper = TeleVuerWrapper(use_hand_tracking=True, ...)
# Use:
# tv_wrapper = NativeTeleWrapper(port=8765)
# tv_wrapper.start() # Starts WebSocket server in background
"""
import numpy as np
import time
import threading
from multiprocessing import Array, Value
from dataclasses import dataclass, field
from typing import Optional
from teleop_server import TeleopServer
# --- Coordinate Transform Constants ---
# Godot coordinate system: Y-up, -Z forward, X-right
# This is equivalent to OpenXR convention.
# Robot convention: X-forward, Y-left, Z-up
# Basis change: OpenXR (Y-up, -Z fwd) → Robot (Z-up, X fwd)
T_ROBOT_OPENXR = np.array([[ 0, 0,-1, 0],
[-1, 0, 0, 0],
[ 0, 1, 0, 0],
[ 0, 0, 0, 1]], dtype=np.float64)
T_OPENXR_ROBOT = np.array([[ 0,-1, 0, 0],
[ 0, 0, 1, 0],
[-1, 0, 0, 0],
[ 0, 0, 0, 1]], dtype=np.float64)
R_ROBOT_OPENXR = T_ROBOT_OPENXR[:3, :3]
R_OPENXR_ROBOT = T_OPENXR_ROBOT[:3, :3]
# Arm initial pose convention transforms
T_TO_UNITREE_HUMANOID_LEFT_ARM = np.array([[1, 0, 0, 0],
[0, 0,-1, 0],
[0, 1, 0, 0],
[0, 0, 0, 1]], dtype=np.float64)
T_TO_UNITREE_HUMANOID_RIGHT_ARM = np.array([[1, 0, 0, 0],
[0, 0, 1, 0],
[0,-1, 0, 0],
[0, 0, 0, 1]], dtype=np.float64)
# Hand convention transform
T_TO_UNITREE_HAND = np.array([[0, 0, 1, 0],
[-1, 0, 0, 0],
[0, -1, 0, 0],
[0, 0, 0, 1]], dtype=np.float64)
# Default poses (fallback when no data)
CONST_HEAD_POSE = np.array([[1, 0, 0, 0],
[0, 1, 0, 1.5],
[0, 0, 1, -0.2],
[0, 0, 0, 1]], dtype=np.float64)
CONST_LEFT_ARM_POSE = np.array([[1, 0, 0, -0.15],
[0, 1, 0, 1.13],
[0, 0, 1, -0.3],
[0, 0, 0, 1]], dtype=np.float64)
CONST_RIGHT_ARM_POSE = np.array([[1, 0, 0, 0.15],
[0, 1, 0, 1.13],
[0, 0, 1, -0.3],
[0, 0, 0, 1]], dtype=np.float64)
def fast_mat_inv(mat):
"""Fast SE(3) inverse using rotation transpose."""
ret = np.eye(4)
ret[:3, :3] = mat[:3, :3].T
ret[:3, 3] = -mat[:3, :3].T @ mat[:3, 3]
return ret
def safe_mat_update(prev_mat, mat):
"""Return previous matrix if new matrix is singular."""
det = np.linalg.det(mat)
if not np.isfinite(det) or np.isclose(det, 0.0, atol=1e-6):
return prev_mat, False
return mat, True
@dataclass
class TeleData:
"""Matches the TeleData from tv_wrapper.py exactly."""
head_pose: np.ndarray # (4,4) SE(3) pose of head
left_wrist_pose: np.ndarray # (4,4) SE(3) pose of left wrist (IK frame)
right_wrist_pose: np.ndarray # (4,4) SE(3) pose of right wrist (IK frame)
chest_pose: np.ndarray = None # (4,4) SE(3) NEW: chest orientation
# Hand tracking
left_hand_pos: np.ndarray = None # (25,3) 3D positions of left hand joints
right_hand_pos: np.ndarray = None # (25,3) 3D positions of right hand joints
left_hand_rot: np.ndarray = None # (25,3,3) rotation matrices
right_hand_rot: np.ndarray = None # (25,3,3) rotation matrices
# Hand state (computed from finger positions)
left_hand_pinch: bool = False
left_hand_pinchValue: float = 10.0
left_hand_squeeze: bool = False
left_hand_squeezeValue: float = 0.0
right_hand_pinch: bool = False
right_hand_pinchValue: float = 10.0
right_hand_squeeze: bool = False
right_hand_squeezeValue: float = 0.0
class NativeTeleWrapper:
"""Drop-in replacement for TeleVuerWrapper using Quest 3 native app.
Matches the Unitree TeleVuerWrapper pipeline exactly:
1. Wrist poses arrive chest-relative from body tracking (Godot computes chest_inv * wrist)
2. We reconstruct world-space wrist poses: chest * chest_relative
3. Apply basis change (OpenXR -> Robot convention)
4. Apply Unitree arm URDF convention transforms
5. Subtract HEAD position (translation only — rotation stays world-space)
6. Add head-to-waist offset (matches Unitree: 0.15 x, 0.45 z)
"""
def __init__(self, port: int = 8765, host: str = "0.0.0.0",
head_to_waist_x: float = 0.15,
head_to_waist_z: float = 0.45):
"""
Args:
port: WebSocket server port
host: WebSocket bind address
head_to_waist_x: Forward offset from head to IK solver origin (meters).
Matches Unitree default: 0.15.
head_to_waist_z: Vertical offset from head to IK solver origin (meters).
Matches Unitree default: 0.45.
"""
self.server = TeleopServer(host=host, port=port)
self._server_thread = None
self._head_to_waist_x = head_to_waist_x
self._head_to_waist_z = head_to_waist_z
def start(self):
"""Start the WebSocket server in a background thread."""
self._server_thread = self.server.start_in_thread()
def get_tele_data(self) -> TeleData:
"""Get processed motion data, transformed to robot convention.
Pipeline matches Unitree's TeleVuerWrapper (televuer/tv_wrapper.py):
1. Reconstruct world-space wrist from chest-relative data
2. Basis change (OpenXR -> Robot)
3. Arm convention transform
4. Head subtraction (translation only — rotation stays world-space)
5. Head-to-waist offset
"""
# Read raw poses from shared memory (column-major 4x4)
with self.server.head_pose_shared.get_lock():
head_raw = np.array(self.server.head_pose_shared[:]).reshape(4, 4, order='F')
with self.server.chest_pose_shared.get_lock():
chest_raw = np.array(self.server.chest_pose_shared[:]).reshape(4, 4, order='F')
with self.server.left_arm_pose_shared.get_lock():
left_wrist_chest_rel = np.array(self.server.left_arm_pose_shared[:]).reshape(4, 4, order='F')
with self.server.right_arm_pose_shared.get_lock():
right_wrist_chest_rel = np.array(self.server.right_arm_pose_shared[:]).reshape(4, 4, order='F')
# Validate matrices
head_raw, head_valid = safe_mat_update(CONST_HEAD_POSE, head_raw)
chest_raw, chest_valid = safe_mat_update(np.eye(4), chest_raw)
left_wrist_chest_rel, left_valid = safe_mat_update(np.eye(4), left_wrist_chest_rel)
right_wrist_chest_rel, right_valid = safe_mat_update(np.eye(4), right_wrist_chest_rel)
# --- Reconstruct world-space wrist poses ---
# body_tracker.gd sends: left_wrist_rel = chest_inv * left_wrist_world
# So: left_wrist_world = chest * left_wrist_rel
left_wrist_world = chest_raw @ left_wrist_chest_rel
right_wrist_world = chest_raw @ right_wrist_chest_rel
# Validate reconstructed world-space poses
left_wrist_world, left_valid = safe_mat_update(CONST_LEFT_ARM_POSE, left_wrist_world)
right_wrist_world, right_valid = safe_mat_update(CONST_RIGHT_ARM_POSE, right_wrist_world)
# --- Basis change: OpenXR -> Robot convention ---
Brobot_world_head = T_ROBOT_OPENXR @ head_raw @ T_OPENXR_ROBOT
Brobot_world_chest = T_ROBOT_OPENXR @ chest_raw @ T_OPENXR_ROBOT
left_Brobot_world = T_ROBOT_OPENXR @ left_wrist_world @ T_OPENXR_ROBOT
right_Brobot_world = T_ROBOT_OPENXR @ right_wrist_world @ T_OPENXR_ROBOT
# --- Arm URDF convention transform ---
left_IPunitree = left_Brobot_world @ (T_TO_UNITREE_HUMANOID_LEFT_ARM if left_valid else np.eye(4))
right_IPunitree = right_Brobot_world @ (T_TO_UNITREE_HUMANOID_RIGHT_ARM if right_valid else np.eye(4))
# --- Head subtraction (TRANSLATION ONLY, matching Unitree tv_wrapper.py) ---
# The rotation stays in world-space, which is what the IK solver expects.
# Only the position is made head-relative.
left_wrist_final = left_IPunitree.copy()
right_wrist_final = right_IPunitree.copy()
left_wrist_final[0:3, 3] -= Brobot_world_head[0:3, 3]
right_wrist_final[0:3, 3] -= Brobot_world_head[0:3, 3]
# --- Head-to-waist offset (matching Unitree: x=0.15, z=0.45) ---
left_wrist_final[0, 3] += self._head_to_waist_x # x (forward)
right_wrist_final[0, 3] += self._head_to_waist_x
left_wrist_final[2, 3] += self._head_to_waist_z # z (up)
right_wrist_final[2, 3] += self._head_to_waist_z
# --- Hand positions ---
left_hand_pos = np.zeros((25, 3))
right_hand_pos = np.zeros((25, 3))
if left_valid and right_valid:
with self.server.left_hand_position_shared.get_lock():
raw_left = np.array(self.server.left_hand_position_shared[:])
with self.server.right_hand_position_shared.get_lock():
raw_right = np.array(self.server.right_hand_position_shared[:])
if np.any(raw_left != 0):
# Hand positions are wrist-relative from Quest 3, in OpenXR coords.
# Must apply basis change (OpenXR→Robot) before hand convention transform.
# Derivation: original code does T_TO_UNITREE_HAND @ inv(arm_robot) @ (T_ROBOT_OPENXR @ world_pos)
# which simplifies to T_TO_UNITREE_HAND @ T_ROBOT_OPENXR @ wrist_relative_openxr
left_pos_25x3 = raw_left.reshape(25, 3)
left_hom = np.concatenate([left_pos_25x3.T, np.ones((1, 25))])
left_hand_pos = (T_TO_UNITREE_HAND @ T_ROBOT_OPENXR @ left_hom)[0:3, :].T
if np.any(raw_right != 0):
right_pos_25x3 = raw_right.reshape(25, 3)
right_hom = np.concatenate([right_pos_25x3.T, np.ones((1, 25))])
right_hand_pos = (T_TO_UNITREE_HAND @ T_ROBOT_OPENXR @ right_hom)[0:3, :].T
# --- Hand rotations ---
left_hand_rot = None
right_hand_rot = None
# TODO: Transform hand rotations if needed for dexterous hand control
# --- Pinch/squeeze from finger distances ---
# Compute pinch from thumb-tip to index-tip distance
left_pinch_val, left_pinch = _compute_pinch(left_hand_pos)
right_pinch_val, right_pinch = _compute_pinch(right_hand_pos)
left_squeeze_val, left_squeeze = _compute_squeeze(left_hand_pos)
right_squeeze_val, right_squeeze = _compute_squeeze(right_hand_pos)
return TeleData(
head_pose=Brobot_world_head,
left_wrist_pose=left_wrist_final,
right_wrist_pose=right_wrist_final,
chest_pose=Brobot_world_chest,
left_hand_pos=left_hand_pos,
right_hand_pos=right_hand_pos,
left_hand_rot=left_hand_rot,
right_hand_rot=right_hand_rot,
left_hand_pinch=left_pinch,
left_hand_pinchValue=left_pinch_val,
left_hand_squeeze=left_squeeze,
left_hand_squeezeValue=left_squeeze_val,
right_hand_pinch=right_pinch,
right_hand_pinchValue=right_pinch_val,
right_hand_squeeze=right_squeeze,
right_hand_squeezeValue=right_squeeze_val,
)
def render_to_xr(self, img):
"""Send a webcam frame to the Quest 3 app.
Args:
img: BGR numpy array (OpenCV format). Will be JPEG-encoded and sent.
"""
import cv2
# Encode as JPEG
_, jpeg_buf = cv2.imencode('.jpg', img, [cv2.IMWRITE_JPEG_QUALITY, 70])
self.server.set_webcam_frame(jpeg_buf.tobytes())
def close(self):
"""Shutdown the server."""
# The server thread is daemonic, it will stop when the process exits.
pass
@property
def has_client(self) -> bool:
"""Check if a Quest 3 client is connected."""
return len(self.server._clients) > 0
@property
def last_data_time(self) -> float:
"""Timestamp of last received tracking data."""
with self.server.last_data_time_shared.get_lock():
return self.server.last_data_time_shared.value
def _compute_pinch(hand_pos_25x3: np.ndarray) -> tuple:
"""Compute pinch state from thumb tip (4) and index tip (8) distance.
Returns (pinchValue, is_pinching).
pinchValue: ~15.0 (open) → 0.0 (pinched), scaled to match Vuer convention * 100.
"""
if np.all(hand_pos_25x3 == 0):
return 10.0, False
# OpenXR hand joint indices: thumb tip = 4, index tip = 8
thumb_tip = hand_pos_25x3[4]
index_tip = hand_pos_25x3[8]
dist = np.linalg.norm(thumb_tip - index_tip)
# Map distance to pinch value (meters → normalized)
# Typical range: 0.0 (touching) to 0.10 (fully open)
pinch_value = min(dist * 150.0, 15.0) # Scale to ~0-15 range
is_pinching = dist < 0.025 # 2.5cm threshold
return pinch_value * 100.0, is_pinching # Match TeleVuerWrapper scaling
def _compute_squeeze(hand_pos_25x3: np.ndarray) -> tuple:
"""Compute squeeze (fist) state from average finger curl.
Returns (squeezeValue, is_squeezing).
squeezeValue: 0.0 (open) → 1.0 (fist).
"""
if np.all(hand_pos_25x3 == 0):
return 0.0, False
# Measure distance from fingertips to palm center
# Joint indices: palm=0, index_tip=8, middle_tip=12, ring_tip=16, pinky_tip=20
palm = hand_pos_25x3[0]
tips = hand_pos_25x3[[8, 12, 16, 20]]
dists = np.linalg.norm(tips - palm, axis=1)
avg_dist = np.mean(dists)
# Map: ~0.02m (fist) → 1.0, ~0.10m (open) → 0.0
squeeze_value = np.clip(1.0 - (avg_dist - 0.02) / 0.08, 0.0, 1.0)
is_squeezing = squeeze_value > 0.7
return squeeze_value, is_squeezing