Browse Source

Add camera relay from Isaac Sim SHM to Quest 3 via WebSocket

Reads head camera from Isaac Sim shared memory, encodes as JPEG (quality 85),
and relays to Quest 3 over the existing WebSocket connection at ~15fps.
Falls back to left/right wrist cameras if head SHM unavailable.
Prevents Python resource_tracker from destroying SHM on bridge exit.
Also increases WebSocket write buffer and reduces send rate for stability.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
master
melancholytron 3 weeks ago
parent
commit
d96225b3a0
  1. 73
      server/retarget_bridge.py
  2. 8
      server/teleop_server.py

73
server/retarget_bridge.py

@ -35,6 +35,7 @@ import time
import argparse import argparse
import logging import logging
import signal import signal
import threading
import numpy as np import numpy as np
from scipy.spatial.transform import Rotation from scipy.spatial.transform import Rotation
@ -601,6 +602,72 @@ def retarget_hands(left_retargeting, right_retargeting,
return hand_cmd return hand_cmd
# ---------------------------------------------------------------------------
# Camera relay: teleimager ZMQ → WebSocket
# ---------------------------------------------------------------------------
def start_camera_relay(server, zmq_port=55555):
"""Read camera from Isaac Sim shared memory and relay JPEG frames to WebSocket clients."""
import cv2
sys.path.insert(0, os.path.expanduser("~/git/unitree_sim_isaaclab"))
from tools.shared_memory_utils import MultiImageReader
from multiprocessing import resource_tracker
def _untrack_shm(name):
"""Prevent Python's resource_tracker from unlinking SHM we don't own."""
try:
resource_tracker.unregister(f"/{name}", "shared_memory")
except Exception:
pass
def _relay_loop():
reader = MultiImageReader()
# Try camera names in priority order
camera_names = ["head", "left", "right"]
active_camera = None
frame_count = 0
last_ts = 0
logger.info("Camera relay: waiting for Isaac Sim shared memory...")
while active_camera is None:
for name in camera_names:
img = reader.read_single_image(name)
if img is not None:
active_camera = name
# Untrack all opened SHMs so killing the bridge won't destroy them
for shm_name in list(reader.shms.keys()):
_untrack_shm(shm_name)
logger.info(f"Camera relay: using '{name}' camera ({img.shape[1]}x{img.shape[0]})")
break
if active_camera is None:
time.sleep(1.0)
while True:
try:
img = reader.read_single_image(active_camera)
if img is not None:
cur_ts = reader.last_timestamps.get(active_camera, 0)
if cur_ts > last_ts:
last_ts = cur_ts
h, w = img.shape[:2]
ok, buf = cv2.imencode(".jpg", img, [cv2.IMWRITE_JPEG_QUALITY, 85])
if ok:
server.set_webcam_frame(buf.tobytes())
frame_count += 1
if frame_count == 1:
logger.info(f"Camera relay: first frame {w}x{h}, "
f"{len(buf)} bytes")
elif frame_count % 300 == 0:
logger.info(f"Camera relay: {frame_count} frames sent")
except Exception as e:
logger.error(f"Camera relay error: {e}")
time.sleep(1.0 / 15.0)
thread = threading.Thread(target=_relay_loop, daemon=True)
thread.start()
return thread
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Main loop # Main loop
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -627,6 +694,8 @@ def main():
help="Isolate a single joint for testing. Options: " help="Isolate a single joint for testing. Options: "
"l_sh_pitch, l_sh_roll, l_sh_yaw, l_elbow, " "l_sh_pitch, l_sh_roll, l_sh_yaw, l_elbow, "
"l_wr_roll, l_wr_pitch, l_wr_yaw") "l_wr_roll, l_wr_pitch, l_wr_yaw")
parser.add_argument("--camera-port", type=int, default=55555,
help="ZMQ port for teleimager camera feed (default: 55555, 0 to disable)")
args = parser.parse_args() args = parser.parse_args()
if args.solo_joint is not None: if args.solo_joint is not None:
@ -643,6 +712,10 @@ def main():
tv_wrapper = NativeTeleWrapper(port=args.port, host=args.host) tv_wrapper = NativeTeleWrapper(port=args.port, host=args.host)
tv_wrapper.start() tv_wrapper.start()
# --- Start camera relay (teleimager ZMQ → WebSocket) ---
if args.camera_port > 0:
start_camera_relay(tv_wrapper.server, zmq_port=args.camera_port)
# --- Initialize DDS --- # --- Initialize DDS ---
dds = None dds = None
for attempt in range(60): for attempt in range(60):

8
server/teleop_server.py

@ -237,8 +237,8 @@ class TeleopServer:
self.body_joints_shared[:] = body_joints self.body_joints_shared[:] = body_joints
async def _send_webcam_loop(self, websocket): async def _send_webcam_loop(self, websocket):
"""Send webcam JPEG frames to a client at ~15 fps."""
interval = 1.0 / 15.0
"""Send webcam JPEG frames to a client at ~10 fps."""
interval = 1.0 / 10.0
while True: while True:
await asyncio.sleep(interval) await asyncio.sleep(interval)
with self._webcam_lock: with self._webcam_lock:
@ -248,6 +248,9 @@ class TeleopServer:
await websocket.send(frame) await websocket.send(frame)
except websockets.exceptions.ConnectionClosed: except websockets.exceptions.ConnectionClosed:
break break
except Exception as e:
logger.warning(f"Webcam send error: {e}")
break
async def serve(self): async def serve(self):
"""Start the WebSocket server.""" """Start the WebSocket server."""
@ -257,6 +260,7 @@ class TeleopServer:
self.host, self.host,
self.port, self.port,
max_size=2**20, # 1 MB max message size max_size=2**20, # 1 MB max message size
write_limit=2**18, # 256 KB write buffer (for camera frames)
ping_interval=30, ping_interval=30,
ping_timeout=60, ping_timeout=60,
) as server: ) as server:

Loading…
Cancel
Save