|
|
|
@ -12,7 +12,7 @@ from typing import Literal |
|
|
|
|
|
|
|
class TeleVuer: |
|
|
|
def __init__(self, use_hand_tracking: bool, binocular: bool=True, img_shape: tuple=None, display_fps: float=30.0, |
|
|
|
display_mode: Literal["immersive", "pass-through", "fov"]="immersive", zmq: bool=False, webrtc: bool=False, webrtc_url: str=None, |
|
|
|
display_mode: Literal["immersive", "pass-through", "ego"]="immersive", zmq: bool=False, webrtc: bool=False, webrtc_url: str=None, |
|
|
|
cert_file: str=None, key_file: str=None): |
|
|
|
""" |
|
|
|
TeleVuer class for OpenXR-based XR teleoperate applications. |
|
|
|
@ -23,7 +23,7 @@ class TeleVuer: |
|
|
|
:param img_shape: tuple, shape of the head image (height, width). |
|
|
|
:param display_fps: float, target frames per second for display updates (default: 30.0). |
|
|
|
|
|
|
|
:param display_mode: str, controls the VR viewing mode. Options are "immersive", "pass-through", and "fov". |
|
|
|
:param display_mode: str, controls the VR viewing mode. Options are "immersive", "pass-through", and "ego". |
|
|
|
:param zmq: bool, whether to use zmq for image transmission. |
|
|
|
:param webrtc: bool, whether to use webrtc for real-time communication. |
|
|
|
:param webrtc_url: str, URL for the webrtc offer. must be provided if webrtc is True. |
|
|
|
@ -35,10 +35,10 @@ class TeleVuer: |
|
|
|
- display_mode controls what the VR headset displays: |
|
|
|
* "immersive": fully immersive mode; VR shows the robot's first-person view (zmq or webrtc must be enabled). |
|
|
|
* "pass-through": VR shows the real world through the VR headset cameras; no image from zmq or webrtc is displayed (even if enabled). |
|
|
|
* "fov": Field-of-View mode; a small window in the center shows the robot's first-person view, while the surrounding area shows the real world. |
|
|
|
* "ego": a small window in the center shows the robot's first-person view, while the surrounding area shows the real world. |
|
|
|
|
|
|
|
- Only one image mode is active at a time. |
|
|
|
- Image transmission to VR occurs only if display_mode is "immersive" or "fov" and the corresponding zmq or webrtc option is enabled. |
|
|
|
- Image transmission to VR occurs only if display_mode is "immersive" or "ego" and the corresponding zmq or webrtc option is enabled. |
|
|
|
- If zmq and webrtc simultaneously enabled, webrtc will be prioritized. |
|
|
|
|
|
|
|
-------------- ------------------- -------------- ----------------- ------- |
|
|
|
@ -48,7 +48,7 @@ class TeleVuer: |
|
|
|
-------------- ------------------- -------------- ----------------- ------- |
|
|
|
pass-through | Real world view (VR) | No | N/A | even if image source enabled, don't display |
|
|
|
-------------- ------------------- -------------- ----------------- ------- |
|
|
|
fov | FOV view (robot + VR) | Yes (small) | zmq or webrtc | if both enabled, webrtc prioritized |
|
|
|
ego | ego view (robot + VR) | Yes (small) | zmq or webrtc | if both enabled, webrtc prioritized |
|
|
|
-------------- ------------------- -------------- ----------------- ------- |
|
|
|
|
|
|
|
""" |
|
|
|
@ -114,9 +114,9 @@ class TeleVuer: |
|
|
|
fn = self.main_image_binocular_zmq if self.binocular else self.main_image_monocular_zmq |
|
|
|
else: |
|
|
|
raise ValueError("[TeleVuer] immersive mode requires zmq=True or webrtc=True.") |
|
|
|
elif self.display_mode == "fov": |
|
|
|
elif self.display_mode == "ego": |
|
|
|
if self.webrtc: |
|
|
|
fn = self.main_image_binocular_webrtc_fov if self.binocular else self.main_image_monocular_webrtc_fov |
|
|
|
fn = self.main_image_binocular_webrtc_ego if self.binocular else self.main_image_monocular_webrtc_ego |
|
|
|
elif self.zmq: |
|
|
|
self.img2display_shm = shared_memory.SharedMemory(create=True, size=np.prod(self.img_shape) * np.uint8().itemsize) |
|
|
|
self.img2display = np.ndarray(self.img_shape, dtype=np.uint8, buffer=self.img2display_shm.buf) |
|
|
|
@ -125,9 +125,9 @@ class TeleVuer: |
|
|
|
self.stop_writer_event = threading.Event() |
|
|
|
self.writer_thread = threading.Thread(target=self._xr_render_loop, daemon=True) |
|
|
|
self.writer_thread.start() |
|
|
|
fn = self.main_image_binocular_zmq_fov if self.binocular else self.main_image_monocular_zmq_fov |
|
|
|
fn = self.main_image_binocular_zmq_ego if self.binocular else self.main_image_monocular_zmq_ego |
|
|
|
else: |
|
|
|
raise ValueError("[TeleVuer] fov mode requires zmq=True or webrtc=True.") |
|
|
|
raise ValueError("[TeleVuer] ego mode requires zmq=True or webrtc=True.") |
|
|
|
elif self.display_mode == "pass-through": |
|
|
|
fn = self.main_pass_through |
|
|
|
else: |
|
|
|
@ -208,7 +208,7 @@ class TeleVuer: |
|
|
|
def close(self): |
|
|
|
self.process.terminate() |
|
|
|
self.process.join(timeout=0.5) |
|
|
|
if self.display_mode in ("immersive", "fov") and not self.webrtc: |
|
|
|
if self.display_mode in ("immersive", "ego") and not self.webrtc: |
|
|
|
self.stop_writer_event.set() |
|
|
|
self.new_frame_event.set() |
|
|
|
self.writer_thread.join(timeout=0.5) |
|
|
|
@ -480,8 +480,8 @@ class TeleVuer: |
|
|
|
) |
|
|
|
await asyncio.sleep(1.0 / self.display_fps) |
|
|
|
|
|
|
|
## FOV MODE |
|
|
|
async def main_image_binocular_zmq_fov(self, session): |
|
|
|
## ego MODE |
|
|
|
async def main_image_binocular_zmq_ego(self, session): |
|
|
|
if self.use_hand_tracking: |
|
|
|
session.upsert( |
|
|
|
Hands( |
|
|
|
@ -536,7 +536,7 @@ class TeleVuer: |
|
|
|
# 'jpeg' encoding should give you about 30fps with a 16ms wait in-between. |
|
|
|
await asyncio.sleep(1.0 / self.display_fps) |
|
|
|
|
|
|
|
async def main_image_monocular_zmq_fov(self, session): |
|
|
|
async def main_image_monocular_zmq_ego(self, session): |
|
|
|
if self.use_hand_tracking: |
|
|
|
session.upsert( |
|
|
|
Hands( |
|
|
|
@ -576,7 +576,7 @@ class TeleVuer: |
|
|
|
) |
|
|
|
await asyncio.sleep(1.0 / self.display_fps) |
|
|
|
|
|
|
|
async def main_image_binocular_webrtc_fov(self, session): |
|
|
|
async def main_image_binocular_webrtc_ego(self, session): |
|
|
|
if self.use_hand_tracking: |
|
|
|
session.upsert( |
|
|
|
Hands( |
|
|
|
@ -613,7 +613,7 @@ class TeleVuer: |
|
|
|
) |
|
|
|
await asyncio.sleep(1.0 / self.display_fps) |
|
|
|
|
|
|
|
async def main_image_monocular_webrtc_fov(self, session): |
|
|
|
async def main_image_monocular_webrtc_ego(self, session): |
|
|
|
if self.use_hand_tracking: |
|
|
|
session.upsert( |
|
|
|
Hands( |
|
|
|
|