Browse Source

[feat] fov, immersive, pass-through mode.

main
silencht 4 months ago
parent
commit
562a9ca27d
  1. 41
      README.md
  2. 2
      pyproject.toml
  3. 426
      src/televuer/televuer.py
  4. 52
      src/televuer/tv_wrapper.py
  5. 31
      test/test_televuer.py
  6. 39
      test/test_tv_wrapper.py

41
README.md

@ -4,13 +4,32 @@ The TeleVuer library is a specialized version of the [Vuer](https://github.com/v
Currently, this module serves as a core component of the [xr_teleoperate](https://github.com/unitreerobotics/xr_teleoperate) library, offering advanced functionality for teleoperation tasks. It supports various XR devices, including Apple Vision Pro, Meta Quest3, Pico 4 Ultra Enterprise etc., ensuring compatibility and ease of use for robotic teleoperation applications. Currently, this module serves as a core component of the [xr_teleoperate](https://github.com/unitreerobotics/xr_teleoperate) library, offering advanced functionality for teleoperation tasks. It supports various XR devices, including Apple Vision Pro, Meta Quest3, Pico 4 Ultra Enterprise etc., ensuring compatibility and ease of use for robotic teleoperation applications.
## Release Note
V3.0 brings updates:
The image input of this library works in conjunction with the [teleimager](https://github.com/silencht/teleimager) library. We recommend using both libraries together.
## 0. 🔖 Release Note
### V4.0 🏷️ brings updates:
1. Improved Display Modes
Removed the old “pass_through” mode. The system now supports three modes:
- immersive: fully immersive mode; VR shows the robot's first-person view (zmq or webrtc must be enabled).
- pass-through: VR shows the real world through the VR headset cameras; no image from zmq or webrtc is displayed (even if enabled).
- fov: a small window in the center shows the robot's first-person view, while the surrounding area shows the real world.
2. Enhanced Immersion
Adjusted the image plane height for immersive and fov modes to provide a more natural and comfortable VR experience
### V3.0 🏷️ brings updates:
1. Added `pass_through` interface to enable/disable the pass-through mode. 1. Added `pass_through` interface to enable/disable the pass-through mode.
2. Support `webrtc` interface to enable/disable the webrtc streaming mode. 2. Support `webrtc` interface to enable/disable the webrtc streaming mode.
3. Use `render_to_xr` method (adjust from `set_display_image`) to send images to XR device. 3. Use `render_to_xr` method (adjust from `set_display_image`) to send images to XR device.
V2.0 brings updates:
### V2.0 🏷️ brings updates:
1. Image transport is now by reference instead of external shared memory. 1. Image transport is now by reference instead of external shared memory.
2. Renamed the get-data function from `get_motion_state_data` to `get_tele_data`. 2. Renamed the get-data function from `get_motion_state_data` to `get_tele_data`.
@ -19,7 +38,7 @@ V2.0 brings updates:
5. Streamlined the data structure: removed the nested `TeleStateData` and return everything in the unified `TeleData`. 5. Streamlined the data structure: removed the nested `TeleStateData` and return everything in the unified `TeleData`.
6. Added new image-transport interfaces such as `set_display_image`. 6. Added new image-transport interfaces such as `set_display_image`.
## 1. Diagram
## 1. 🗺️ Diagram
<p align="center"> <p align="center">
<a href="https://oss-global-cdn.unitree.com/static/5ae3c9ee9a3d40dc9fe002281e8aeac1_2975x3000.png"> <a href="https://oss-global-cdn.unitree.com/static/5ae3c9ee9a3d40dc9fe002281e8aeac1_2975x3000.png">
@ -27,9 +46,9 @@ V2.0 brings updates:
</a> </a>
</p> </p>
## 2. Install
## 2. 📦 Install
### 2.1 Install televuer repository
### 2.1 📥 Install televuer repository
```bash ```bash
git clone https://github.com/silencht/televuer git clone https://github.com/silencht/televuer
@ -38,7 +57,7 @@ pip install -e . # or pip install .
``` ```
### 2.2 Generate Certificate Files
### 2.2 🔑 Generate Certificate Files
The televuer module requires SSL certificates to allow XR devices (such as Pico / Quest / Apple Vision Pro) to connect securely via HTTPS / WebRTC. The televuer module requires SSL certificates to allow XR devices (such as Pico / Quest / Apple Vision Pro) to connect securely via HTTPS / WebRTC.
@ -73,13 +92,13 @@ build cert.pem key.pem LICENSE pyproject.toml README.md rootCA.key rootCA
# Use AirDrop to copy rootCA.pem to your Apple Vision Pro device and install it manually as a trusted certificate. # Use AirDrop to copy rootCA.pem to your Apple Vision Pro device and install it manually as a trusted certificate.
``` ```
3. Allow Firewall Access
3. 🧱 Allow Firewall Access
```bash ```bash
sudo ufw allow 8012 sudo ufw allow 8012
``` ```
### 2.3 Configure Certificate Paths (Choose One Method)
### 2.3 🔐 Configure Certificate Paths (Choose One Method)
You can tell televuer where to find the certificate files using either environment variables or a user config directory. You can tell televuer where to find the certificate files using either environment variables or a user config directory.
@ -105,7 +124,7 @@ cp cert.pem key.pem ~/.config/xr_teleoperate/
If neither of the above methods is used, televuer will look for the certificate files from the function parameters or fall back to the default paths within the module. If neither of the above methods is used, televuer will look for the certificate files from the function parameters or fall back to the default paths within the module.
## 3. Test
## 3. 🧐 Test
```bash ```bash
python test_televuer.py python test_televuer.py
@ -120,7 +139,7 @@ python test_tv_wrapper.py
# Press Enter in the terminal to launch the program. # Press Enter in the terminal to launch the program.
``` ```
## 4. Version History
## 4. 📌 Version History
`vuer==0.0.32rc7` `vuer==0.0.32rc7`

2
pyproject.toml

@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "televuer" name = "televuer"
version = "3.9.0"
version = "4.0.0"
description = "XR vision and hand/controller teleoperate interface for unitree robotics" description = "XR vision and hand/controller teleoperate interface for unitree robotics"
authors = [ authors = [
{ name = "silencht", email = "silencht@qq.com" } { name = "silencht", email = "silencht@qq.com" }

426
src/televuer/televuer.py

@ -7,38 +7,57 @@ import threading
import cv2 import cv2
import os import os
from pathlib import Path from pathlib import Path
from typing import Literal
class TeleVuer: class TeleVuer:
def __init__(self, use_hand_tracking: bool, pass_through:bool=False, binocular: bool=True, img_shape: tuple=None,
cert_file: str=None, key_file: str=None, webrtc: bool=False, webrtc_url: str=None, display_fps: float=30.0):
def __init__(self, use_hand_tracking: bool, binocular: bool=True, img_shape: tuple=None, display_fps: float=30.0,
display_mode: Literal["immersive", "pass-through", "fov"]="immersive", zmq: bool=False, webrtc: bool=False, webrtc_url: str=None,
cert_file: str=None, key_file: str=None):
""" """
TeleVuer class for OpenXR-based XR teleoperate applications. TeleVuer class for OpenXR-based XR teleoperate applications.
This class handles the communication with the Vuer server and manages image and pose data. This class handles the communication with the Vuer server and manages image and pose data.
:param use_hand_tracking: bool, whether to use hand tracking or controller tracking. :param use_hand_tracking: bool, whether to use hand tracking or controller tracking.
:param pass_through: bool, controls the VR viewing mode.
Note:
- if pass_through is True, the XR user will see the real world through the VR headset cameras.
- if pass_through is False, the XR user will see the images provided by webrtc or render_to_xr method:
- webrtc is prior to render_to_xr. if webrtc is True, the class will use webrtc for image transmission.
- if webrtc is False, the class will use render_to_xr for image transmission.
:param binocular: bool, whether the application is binocular (stereoscopic) or monocular. :param binocular: bool, whether the application is binocular (stereoscopic) or monocular.
:param img_shape: tuple, shape of the head image (height, width). :param img_shape: tuple, shape of the head image (height, width).
:param display_fps: float, target frames per second for display updates (default: 30.0).
:param display_mode: str, controls the VR viewing mode. Options are "immersive", "pass-through", and "fov".
:param zmq: bool, whether to use zmq for image transmission.
:param webrtc: bool, whether to use webrtc for real-time communication.
:param webrtc_url: str, URL for the webrtc offer. must be provided if webrtc is True.
:param cert_file: str, path to the SSL certificate file. :param cert_file: str, path to the SSL certificate file.
:param key_file: str, path to the SSL key file. :param key_file: str, path to the SSL key file.
:param webrtc: bool, whether to use WebRTC for real-time communication. if False, use ImageBackground.
:param webrtc_url: str, URL for the WebRTC offer.
:param display_fps: float, target frames per second for display updates (default: 30.0).
Note:
- display_mode controls what the VR headset displays:
* "immersive": fully immersive mode; VR shows the robot's first-person view (zmq or webrtc must be enabled).
* "pass-through": VR shows the real world through the VR headset cameras; no image from zmq or webrtc is displayed (even if enabled).
* "fov": Field-of-View mode; a small window in the center shows the robot's first-person view, while the surrounding area shows the real world.
- Only one image mode is active at a time.
- Image transmission to VR occurs only if display_mode is "immersive" or "fov" and the corresponding zmq or webrtc option is enabled.
- If zmq and webrtc simultaneously enabled, webrtc will be prioritized.
-------------- ------------------- -------------- ----------------- -------
display_mode | display behavior | image to VR | image source | Notes
-------------- ------------------- -------------- ----------------- -------
immersive | fully immersive view (robot) | Yes (full) | zmq or webrtc | if both enabled, webrtc prioritized
-------------- ------------------- -------------- ----------------- -------
pass-through | Real world view (VR) | No | N/A | even if image source enabled, don't display
-------------- ------------------- -------------- ----------------- -------
fov | FOV view (robot + VR) | Yes (small) | zmq or webrtc | if both enabled, webrtc prioritized
-------------- ------------------- -------------- ----------------- -------
""" """
self.use_hand_tracking = use_hand_tracking self.use_hand_tracking = use_hand_tracking
self.display_fps = display_fps
self.pass_through = pass_through
self.binocular = binocular self.binocular = binocular
if img_shape is None:
raise ValueError("[TeleVuer] img_shape must be provided.")
self.img_shape = (img_shape[0], img_shape[1], 3) self.img_shape = (img_shape[0], img_shape[1], 3)
self.display_fps = display_fps
self.img_height = self.img_shape[0] self.img_height = self.img_shape[0]
if self.binocular: if self.binocular:
self.img_width = self.img_shape[1] // 2 self.img_width = self.img_shape[1] // 2
@ -76,16 +95,29 @@ class TeleVuer:
else: else:
self.vuer.add_handler("CONTROLLER_MOVE")(self.on_controller_move) self.vuer.add_handler("CONTROLLER_MOVE")(self.on_controller_move)
self.display_mode = display_mode
self.zmq = zmq
self.webrtc = webrtc self.webrtc = webrtc
self.webrtc_url = webrtc_url self.webrtc_url = webrtc_url
if self.webrtc:
if self.binocular:
self.vuer.spawn(start=False)(self.main_image_binocular_webrtc)
if self.display_mode == "immersive":
if self.webrtc:
fn = self.main_image_binocular_webrtc if self.binocular else self.main_image_monocular_webrtc
elif self.zmq:
self.img2display_shm = shared_memory.SharedMemory(create=True, size=np.prod(self.img_shape) * np.uint8().itemsize)
self.img2display = np.ndarray(self.img_shape, dtype=np.uint8, buffer=self.img2display_shm.buf)
self.latest_frame = None
self.new_frame_event = threading.Event()
self.stop_writer_event = threading.Event()
self.writer_thread = threading.Thread(target=self._xr_render_loop, daemon=True)
self.writer_thread.start()
fn = self.main_image_binocular_zmq if self.binocular else self.main_image_monocular_zmq
else: else:
self.vuer.spawn(start=False)(self.main_image_monocular_webrtc)
else:
if self.pass_through is False:
raise ValueError("[TeleVuer] immersive mode requires zmq=True or webrtc=True.")
elif self.display_mode == "fov":
if self.webrtc:
fn = self.main_image_binocular_webrtc_fov if self.binocular else self.main_image_monocular_webrtc_fov
elif self.zmq:
self.img2display_shm = shared_memory.SharedMemory(create=True, size=np.prod(self.img_shape) * np.uint8().itemsize) self.img2display_shm = shared_memory.SharedMemory(create=True, size=np.prod(self.img_shape) * np.uint8().itemsize)
self.img2display = np.ndarray(self.img_shape, dtype=np.uint8, buffer=self.img2display_shm.buf) self.img2display = np.ndarray(self.img_shape, dtype=np.uint8, buffer=self.img2display_shm.buf)
self.latest_frame = None self.latest_frame = None
@ -93,10 +125,15 @@ class TeleVuer:
self.stop_writer_event = threading.Event() self.stop_writer_event = threading.Event()
self.writer_thread = threading.Thread(target=self._xr_render_loop, daemon=True) self.writer_thread = threading.Thread(target=self._xr_render_loop, daemon=True)
self.writer_thread.start() self.writer_thread.start()
if self.binocular:
self.vuer.spawn(start=False)(self.main_image_binocular)
fn = self.main_image_binocular_zmq_fov if self.binocular else self.main_image_monocular_zmq_fov
else: else:
self.vuer.spawn(start=False)(self.main_image_monocular)
raise ValueError("[TeleVuer] fov mode requires zmq=True or webrtc=True.")
elif self.display_mode == "pass-through":
fn = self.main_pass_through
else:
raise ValueError(f"[TeleVuer] Unknown display_mode: {self.display_mode}")
self.vuer.spawn(start=False)(fn)
self.head_pose_shared = Array('d', 16, lock=True) self.head_pose_shared = Array('d', 16, lock=True)
self.left_arm_pose_shared = Array('d', 16, lock=True) self.left_arm_pose_shared = Array('d', 16, lock=True)
@ -162,7 +199,7 @@ class TeleVuer:
self.img2display[:] = latest_frame self.img2display[:] = latest_frame
def render_to_xr(self, image): def render_to_xr(self, image):
if self.webrtc or self.pass_through:
if self.webrtc or self.display_mode == "pass-through":
print("[TeleVuer] Warning: render_to_xr is ignored when webrtc is enabled or pass_through is True.") print("[TeleVuer] Warning: render_to_xr is ignored when webrtc is enabled or pass_through is True.")
return return
self.latest_frame = image self.latest_frame = image
@ -171,7 +208,7 @@ class TeleVuer:
def close(self): def close(self):
self.process.terminate() self.process.terminate()
self.process.join(timeout=0.5) self.process.join(timeout=0.5)
if not self.webrtc and not self.pass_through:
if self.display_mode in ("immersive", "fov") and not self.webrtc:
self.stop_writer_event.set() self.stop_writer_event.set()
self.new_frame_event.set() self.new_frame_event.set()
self.writer_thread.join(timeout=0.5) self.writer_thread.join(timeout=0.5)
@ -274,7 +311,8 @@ class TeleVuer:
except: except:
pass pass
async def main_image_binocular(self, session):
## immersive MODE
async def main_image_binocular_zmq(self, session):
if self.use_hand_tracking: if self.use_hand_tracking:
session.upsert( session.upsert(
Hands( Hands(
@ -296,41 +334,40 @@ class TeleVuer:
to="bgChildren", to="bgChildren",
) )
while True: while True:
if self.pass_through is False:
session.upsert(
[
ImageBackground(
self.img2display[:, :self.img_width],
aspect=self.aspect_ratio,
height=1,
distanceToCamera=1,
# The underlying rendering engine supported a layer binary bitmask for both objects and the camera.
# Below we set the two image planes, left and right, to layers=1 and layers=2.
# Note that these two masks are associated with left eye’s camera and the right eye’s camera.
layers=1,
format="jpeg",
quality=80,
key="background-left",
interpolate=True,
),
ImageBackground(
self.img2display[:, self.img_width:],
aspect=self.aspect_ratio,
height=1,
distanceToCamera=1,
layers=2,
format="jpeg",
quality=80,
key="background-right",
interpolate=True,
),
],
to="bgChildren",
)
session.upsert(
[
ImageBackground(
self.img2display[:, :self.img_width],
aspect=self.aspect_ratio,
height=1,
distanceToCamera=1,
# The underlying rendering engine supported a layer binary bitmask for both objects and the camera.
# Below we set the two image planes, left and right, to layers=1 and layers=2.
# Note that these two masks are associated with left eye’s camera and the right eye’s camera.
layers=1,
format="jpeg",
quality=80,
key="background-left",
interpolate=True,
),
ImageBackground(
self.img2display[:, self.img_width:],
aspect=self.aspect_ratio,
height=1,
distanceToCamera=1,
layers=2,
format="jpeg",
quality=80,
key="background-right",
interpolate=True,
),
],
to="bgChildren",
)
# 'jpeg' encoding should give you about 30fps with a 16ms wait in-between. # 'jpeg' encoding should give you about 30fps with a 16ms wait in-between.
await asyncio.sleep(1.0 / self.display_fps) await asyncio.sleep(1.0 / self.display_fps)
async def main_image_monocular(self, session):
async def main_image_monocular_zmq(self, session):
if self.use_hand_tracking: if self.use_hand_tracking:
session.upsert( session.upsert(
Hands( Hands(
@ -353,22 +390,21 @@ class TeleVuer:
) )
while True: while True:
if self.pass_through is False:
session.upsert(
[
ImageBackground(
self.img2display,
aspect=self.aspect_ratio,
height=1,
distanceToCamera=1,
format="jpeg",
quality=80,
key="background-mono",
interpolate=True,
),
],
to="bgChildren",
)
session.upsert(
[
ImageBackground(
self.img2display,
aspect=self.aspect_ratio,
height=1,
distanceToCamera=1,
format="jpeg",
quality=80,
key="background-mono",
interpolate=True,
),
],
to="bgChildren",
)
await asyncio.sleep(1.0 / self.display_fps) await asyncio.sleep(1.0 / self.display_fps)
async def main_image_binocular_webrtc(self, session): async def main_image_binocular_webrtc(self, session):
@ -394,23 +430,113 @@ class TeleVuer:
) )
while True: while True:
if self.pass_through is False:
session.upsert(
WebRTCStereoVideoPlane(
src=self.webrtc_url,
iceServer=None,
iceServers=[],
key="video-quad",
session.upsert(
WebRTCStereoVideoPlane(
src=self.webrtc_url,
iceServer=None,
iceServers=[],
key="video-quad",
aspect=self.aspect_ratio,
height = 7,
layout="stereo-left-right"
),
to="bgChildren",
)
await asyncio.sleep(1.0 / self.display_fps)
async def main_image_monocular_webrtc(self, session):
if self.use_hand_tracking:
session.upsert(
Hands(
stream=True,
key="hands",
hideLeft=True,
hideRight=True
),
to="bgChildren",
)
else:
session.upsert(
MotionControllers(
stream=True,
key="motionControllers",
left=True,
right=True,
),
to="bgChildren",
)
while True:
session.upsert(
WebRTCVideoPlane(
src=self.webrtc_url,
iceServer=None,
iceServers=[],
key="video-quad",
aspect=self.aspect_ratio,
height = 7,
),
to="bgChildren",
)
await asyncio.sleep(1.0 / self.display_fps)
## FOV MODE
async def main_image_binocular_zmq_fov(self, session):
if self.use_hand_tracking:
session.upsert(
Hands(
stream=True,
key="hands",
hideLeft=True,
hideRight=True
),
to="bgChildren",
)
else:
session.upsert(
MotionControllers(
stream=True,
key="motionControllers",
left=True,
right=True,
),
to="bgChildren",
)
while True:
session.upsert(
[
ImageBackground(
self.img2display[:, :self.img_width],
aspect=self.aspect_ratio, aspect=self.aspect_ratio,
height = 7,
layout="stereo-left-right"
height=0.75,
distanceToCamera=2,
# The underlying rendering engine supported a layer binary bitmask for both objects and the camera.
# Below we set the two image planes, left and right, to layers=1 and layers=2.
# Note that these two masks are associated with left eye’s camera and the right eye’s camera.
layers=1,
format="jpeg",
quality=80,
key="background-left",
interpolate=True,
), ),
to="bgChildren",
)
ImageBackground(
self.img2display[:, self.img_width:],
aspect=self.aspect_ratio,
height=0.75,
distanceToCamera=2,
layers=2,
format="jpeg",
quality=80,
key="background-right",
interpolate=True,
),
],
to="bgChildren",
)
# 'jpeg' encoding should give you about 30fps with a 16ms wait in-between.
await asyncio.sleep(1.0 / self.display_fps) await asyncio.sleep(1.0 / self.display_fps)
async def main_image_monocular_webrtc(self, session):
async def main_image_monocular_zmq_fov(self, session):
if self.use_hand_tracking: if self.use_hand_tracking:
session.upsert( session.upsert(
Hands( Hands(
@ -433,20 +559,122 @@ class TeleVuer:
) )
while True: while True:
if self.pass_through is False:
session.upsert(
WebRTCVideoPlane(
src=self.webrtc_url,
iceServer=None,
iceServers=[],
key="video-quad",
session.upsert(
[
ImageBackground(
self.img2display,
aspect=self.aspect_ratio, aspect=self.aspect_ratio,
height = 7,
height=0.75,
distanceToCamera=2,
format="jpeg",
quality=80,
key="background-mono",
interpolate=True,
), ),
to="bgChildren",
)
],
to="bgChildren",
)
await asyncio.sleep(1.0 / self.display_fps)
async def main_image_binocular_webrtc_fov(self, session):
if self.use_hand_tracking:
session.upsert(
Hands(
stream=True,
key="hands",
hideLeft=True,
hideRight=True
),
to="bgChildren",
)
else:
session.upsert(
MotionControllers(
stream=True,
key="motionControllers",
left=True,
right=True,
),
to="bgChildren",
)
while True:
session.upsert(
WebRTCStereoVideoPlane(
src=self.webrtc_url,
iceServer=None,
iceServers=[],
key="video-quad",
aspect=self.aspect_ratio,
height=3,
layout="stereo-left-right"
),
to="bgChildren",
)
await asyncio.sleep(1.0 / self.display_fps) await asyncio.sleep(1.0 / self.display_fps)
async def main_image_monocular_webrtc_fov(self, session):
if self.use_hand_tracking:
session.upsert(
Hands(
stream=True,
key="hands",
hideLeft=True,
hideRight=True
),
to="bgChildren",
)
else:
session.upsert(
MotionControllers(
stream=True,
key="motionControllers",
left=True,
right=True,
),
to="bgChildren",
)
while True:
session.upsert(
WebRTCVideoPlane(
src=self.webrtc_url,
iceServer=None,
iceServers=[],
key="video-quad",
aspect=self.aspect_ratio,
height=3,
),
to="bgChildren",
)
await asyncio.sleep(1.0 / self.display_fps)
## pass-through MODE
async def main_pass_through(self, session):
if self.use_hand_tracking:
session.upsert(
Hands(
stream=True,
key="hands",
hideLeft=True,
hideRight=True
),
to="bgChildren",
)
else:
session.upsert(
MotionControllers(
stream=True,
key="motionControllers",
left=True,
right=True,
),
to="bgChildren",
)
while True:
await asyncio.sleep(1.0 / self.display_fps)
# ==================== common data ==================== # ==================== common data ====================
@property @property
def head_pose(self): def head_pose(self):

52
src/televuer/tv_wrapper.py

@ -1,7 +1,7 @@
import numpy as np import numpy as np
from .televuer import TeleVuer from .televuer import TeleVuer
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Literal
""" """
(basis) OpenXR Convention : y up, z back, x right. (basis) OpenXR Convention : y up, z back, x right.
(basis) Robot Convention : z up, y left, x front. (basis) Robot Convention : z up, y left, x front.
@ -193,35 +193,51 @@ class TeleData:
class TeleVuerWrapper: class TeleVuerWrapper:
def __init__(self, use_hand_tracking: bool, pass_through: bool=False, binocular: bool=True, img_shape: tuple=(480, 1280),
cert_file: str=None, key_file: str=None, webrtc: bool=False, webrtc_url: str=None, display_fps: float=30.0,
return_hand_rot_data: bool=False):
def __init__(self, use_hand_tracking: bool, binocular: bool=True, img_shape: tuple=(480, 1280), display_fps: float=30.0,
display_mode: Literal["immersive", "pass-through", "fov"]="immersive", zmq: bool=False, webrtc: bool=False, webrtc_url: str=None,
cert_file: str=None, key_file: str=None, return_hand_rot_data: bool=False):
""" """
TeleVuerWrapper is a wrapper for the TeleVuer class, which handles XR device's data suit for robot control. TeleVuerWrapper is a wrapper for the TeleVuer class, which handles XR device's data suit for robot control.
It initializes the TeleVuer instance with the specified parameters and provides a method to get motion state data. It initializes the TeleVuer instance with the specified parameters and provides a method to get motion state data.
:param use_hand_tracking: bool, whether to use hand tracking or controller tracking. :param use_hand_tracking: bool, whether to use hand tracking or controller tracking.
:param pass_through: bool, controls the VR viewing mode.
Note:
- if pass_through is True, the XR user will see the real world through the VR headset cameras.
- if pass_through is False, the XR user will see the images provided by webrtc or render_to_xr method:
- webrtc is prior to render_to_xr. if webrtc is True, the class will use webrtc for image transmission.
- if webrtc is False, the class will use render_to_xr for image transmission.
:param binocular: bool, whether the application is binocular (stereoscopic) or monocular. :param binocular: bool, whether the application is binocular (stereoscopic) or monocular.
:param img_shape: tuple, shape of the head image (height, width). :param img_shape: tuple, shape of the head image (height, width).
:param display_fps: float, target frames per second for display updates (default: 30.0).
:param display_mode: str, controls the VR viewing mode. Options are "immersive", "pass-through", and "fov".
:param zmq: bool, whether to use ZMQ for image transmission.
:param webrtc: bool, whether to use webrtc for real-time communication.
:param webrtc_url: str, URL for the webrtc offer. must be provided if webrtc is True.
:param cert_file: str, path to the SSL certificate file. :param cert_file: str, path to the SSL certificate file.
:param key_file: str, path to the SSL key file. :param key_file: str, path to the SSL key file.
:param webrtc: bool, whether to use WebRTC for real-time communication. if False, use ImageBackground.
:param webrtc_url: str, URL for the WebRTC offer.
:param display_fps: float, target frames per second for display updates (default: 30.0).
:param return_hand_rot_data: bool, whether to return hand rotation data in TeleData
Note:
- display_mode controls what the VR headset displays:
* "immersive": fully immersive mode; VR shows the robot's first-person view (zmq or webrtc must be enabled).
* "pass-through": VR shows the real world through the VR headset cameras; no image from zmq or webrtc is displayed (even if enabled).
* "fov": Field-of-View mode; a small window in the center shows the robot's first-person view, while the surrounding area shows the real world.
- Only one image mode is active at a time.
- Image transmission to VR occurs only if display_mode is "immersive" or "fov" and the corresponding zmq or webrtc option is enabled.
- If zmq and webrtc simultaneously enabled, webrtc will be prioritized.
-------------- ------------------- -------------- ----------------- -------
display_mode | display behavior | image to VR | image source | Notes
-------------- ------------------- -------------- ----------------- -------
immersive | fully immersive view (robot) | Yes (full) | zmq or webrtc | if both enabled, webrtc prioritized
-------------- ------------------- -------------- ----------------- -------
pass-through | Real world view (VR) | No | N/A | even if image source enabled, don't display
-------------- ------------------- -------------- ----------------- -------
fov | FOV view (robot + VR) | Yes (small) | zmq or webrtc | if both enabled, webrtc prioritized
-------------- ------------------- -------------- ----------------- -------
""" """
self.use_hand_tracking = use_hand_tracking self.use_hand_tracking = use_hand_tracking
self.return_hand_rot_data = return_hand_rot_data self.return_hand_rot_data = return_hand_rot_data
self.tvuer = TeleVuer(use_hand_tracking=use_hand_tracking, pass_through=pass_through, binocular=binocular,img_shape=img_shape,
cert_file=cert_file, key_file=key_file, webrtc=webrtc, webrtc_url=webrtc_url, display_fps=display_fps)
self.tvuer = TeleVuer(use_hand_tracking=use_hand_tracking, binocular=binocular, img_shape=img_shape, display_fps=display_fps,
display_mode=display_mode, zmq=zmq, webrtc=webrtc, webrtc_url=webrtc_url,
cert_file=cert_file, key_file=key_file)
def get_tele_data(self): def get_tele_data(self):
""" """

31
test/test_televuer.py

@ -10,14 +10,39 @@ import logging_mp
logger_mp = logging_mp.get_logger(__name__, level=logging_mp.INFO) logger_mp = logging_mp.get_logger(__name__, level=logging_mp.INFO)
def run_test_TeleVuer(): def run_test_TeleVuer():
# xr-mode
use_hand_track = False use_hand_track = False
tv = TeleVuer(use_hand_tracking = use_hand_track, pass_through=True, binocular=True, img_shape=(480, 1280))
# teleimager, if you want to test real image streaming, make sure teleimager server is running
from teleimager.image_client import ImageClient
img_client = ImageClient(host="192.168.123.164")
camera_config = img_client.get_cam_config()
# teleimager + televuer
tv = TeleVuer(use_hand_tracking=use_hand_track,
binocular=camera_config['head_camera']['binocular'],
img_shape=camera_config['head_camera']['image_shape'],
display_fps=camera_config['head_camera']['fps'],
display_mode="immersive", # "fov" or "immersive" or "pass-through"
zmq=camera_config['head_camera']['enable_zmq'],
webrtc=camera_config['head_camera']['enable_webrtc'],
webrtc_url=f"https://192.168.123.164:{camera_config['head_camera']['webrtc_port']}/offer"
)
# pure televuer
# tv = TeleVuer(use_hand_tracking=use_hand_track,
# binocular=True,
# img_shape=(480, 1280),
# display_fps=30.0,
# display_mode="fov", # "fov" or "immersive" or "pass-through"
# zmq=False,
# webrtc=True,
# webrtc_url="https://192.168.123.164:60001/offer"
# )
try: try:
input("Press Enter to start TeleVuer test...") input("Press Enter to start TeleVuer test...")
running = True running = True
while running: while running:
img, _= img_client.get_head_frame()
tv.render_to_xr(img)
start_time = time.time() start_time = time.time()
logger_mp.info("=" * 80) logger_mp.info("=" * 80)
logger_mp.info("Common Data (always available):") logger_mp.info("Common Data (always available):")
@ -62,7 +87,7 @@ def run_test_TeleVuer():
current_time = time.time() current_time = time.time()
time_elapsed = current_time - start_time time_elapsed = current_time - start_time
sleep_time = max(0, 0.3 - time_elapsed)
sleep_time = max(0, 0.016 - time_elapsed)
time.sleep(sleep_time) time.sleep(sleep_time)
logger_mp.debug(f"main process sleep: {sleep_time}") logger_mp.debug(f"main process sleep: {sleep_time}")
except KeyboardInterrupt: except KeyboardInterrupt:

39
test/test_tv_wrapper.py

@ -11,24 +11,43 @@ logger_mp = logging_mp.get_logger(__name__, level=logging_mp.INFO)
def run_test_tv_wrapper(): def run_test_tv_wrapper():
# xr-mode
use_hand_track=False use_hand_track=False
tv_wrapper = TeleVuerWrapper(use_hand_tracking=use_hand_track, pass_through=False,
binocular=True, img_shape=(480, 1280),
# webrtc=True, webrtc_url="https://192.168.123.164:60001/offer"
# teleimager, if you want to test real image streaming, make sure teleimager server is running
from teleimager.image_client import ImageClient
img_client = ImageClient(host="192.168.123.164")
camera_config = img_client.get_cam_config()
# teleimager + televuer
tv_wrapper = TeleVuerWrapper(use_hand_tracking=use_hand_track,
binocular=camera_config['head_camera']['binocular'],
img_shape=camera_config['head_camera']['image_shape'],
display_mode="immersive",
display_fps=camera_config['head_camera']['fps'],
zmq=camera_config['head_camera']['enable_zmq'],
webrtc=camera_config['head_camera']['enable_webrtc'],
webrtc_url=f"https://192.168.123.164:{camera_config['head_camera']['webrtc_port']}/offer"
) )
# pure televuer
# tv_wrapper = TeleVuerWrapper(use_hand_tracking=use_hand_track,
# binocular=True,
# img_shape=(480, 1280),
# display_fps=30.0,
# display_mode="fov",
# zmq=True,
# webrtc=True,
# webrtc_url="https://192.168.123.164:60001/offer"
# )
try: try:
input("Press Enter to start tv_wrapper test...") input("Press Enter to start tv_wrapper test...")
running = True running = True
while running: while running:
start_time = time.time() start_time = time.time()
img, _= img_client.get_head_frame()
tv_wrapper.render_to_xr(img)
logger_mp.info("---- TV Wrapper TeleData ----") logger_mp.info("---- TV Wrapper TeleData ----")
teleData = tv_wrapper.get_tele_data() teleData = tv_wrapper.get_tele_data()
# import cv2
# img = cv2.videoCapture(0).read()[1]
# tv_wrapper.render_to_xr(img)
logger_mp.info("-------------------=== TeleData Snapshot ===-------------------") logger_mp.info("-------------------=== TeleData Snapshot ===-------------------")
logger_mp.info(f"[Head Pose]:\n{teleData.head_pose}") logger_mp.info(f"[Head Pose]:\n{teleData.head_pose}")
logger_mp.info(f"[Left Wrist Pose]:\n{teleData.left_wrist_pose}") logger_mp.info(f"[Left Wrist Pose]:\n{teleData.left_wrist_pose}")
@ -62,9 +81,9 @@ def run_test_tv_wrapper():
current_time = time.time() current_time = time.time()
time_elapsed = current_time - start_time time_elapsed = current_time - start_time
sleep_time = max(0, 0.16 - time_elapsed)
sleep_time = max(0, 0.016 - time_elapsed)
time.sleep(sleep_time) time.sleep(sleep_time)
logger_mp.debug(f"main process sleep: {sleep_time}")
logger_mp.info(f"main process sleep: {sleep_time}")
except KeyboardInterrupt: except KeyboardInterrupt:
running = False running = False

Loading…
Cancel
Save