Browse Source

[upgrade] image_server to teleimager

main
silencht 5 months ago
parent
commit
3780dfff60
  1. 3
      .gitmodules
  2. 197
      teleop/image_server/image_client.py
  3. 321
      teleop/image_server/image_server.py
  4. 47
      teleop/robot_control/robot_hand_unitree.py
  5. 1
      teleop/teleimager
  6. 171
      teleop/teleop_hand_and_arm.py
  7. 2
      teleop/televuer

3
.gitmodules

@ -4,3 +4,6 @@
[submodule "teleop/robot_control/dex-retargeting"]
path = teleop/robot_control/dex-retargeting
url = https://github.com/silencht/dex-retargeting
[submodule "teleop/teleimager"]
path = teleop/teleimager
url = https://github.com/silencht/teleimager.git

197
teleop/image_server/image_client.py

@ -1,197 +0,0 @@
import cv2
import zmq
import numpy as np
import time
import struct
from collections import deque
from multiprocessing import shared_memory
import logging_mp
logger_mp = logging_mp.get_logger(__name__)
class ImageClient:
def __init__(self, tv_img_shape = None, tv_img_shm_name = None, wrist_img_shape = None, wrist_img_shm_name = None,
image_show = False, server_address = "192.168.123.164", port = 5555, Unit_Test = False):
"""
tv_img_shape: User's expected head camera resolution shape (H, W, C). It should match the output of the image service terminal.
tv_img_shm_name: Shared memory is used to easily transfer images across processes to the Vuer.
wrist_img_shape: User's expected wrist camera resolution shape (H, W, C). It should maintain the same shape as tv_img_shape.
wrist_img_shm_name: Shared memory is used to easily transfer images.
image_show: Whether to display received images in real time.
server_address: The ip address to execute the image server script.
port: The port number to bind to. It should be the same as the image server.
Unit_Test: When both server and client are True, it can be used to test the image transfer latency, \
network jitter, frame loss rate and other information.
"""
self.running = True
self._image_show = image_show
self._server_address = server_address
self._port = port
self.tv_img_shape = tv_img_shape
self.wrist_img_shape = wrist_img_shape
self.tv_enable_shm = False
if self.tv_img_shape is not None and tv_img_shm_name is not None:
self.tv_image_shm = shared_memory.SharedMemory(name=tv_img_shm_name)
self.tv_img_array = np.ndarray(tv_img_shape, dtype = np.uint8, buffer = self.tv_image_shm.buf)
self.tv_enable_shm = True
self.wrist_enable_shm = False
if self.wrist_img_shape is not None and wrist_img_shm_name is not None:
self.wrist_image_shm = shared_memory.SharedMemory(name=wrist_img_shm_name)
self.wrist_img_array = np.ndarray(wrist_img_shape, dtype = np.uint8, buffer = self.wrist_image_shm.buf)
self.wrist_enable_shm = True
# Performance evaluation parameters
self._enable_performance_eval = Unit_Test
if self._enable_performance_eval:
self._init_performance_metrics()
def _init_performance_metrics(self):
self._frame_count = 0 # Total frames received
self._last_frame_id = -1 # Last received frame ID
# Real-time FPS calculation using a time window
self._time_window = 1.0 # Time window size (in seconds)
self._frame_times = deque() # Timestamps of frames received within the time window
# Data transmission quality metrics
self._latencies = deque() # Latencies of frames within the time window
self._lost_frames = 0 # Total lost frames
self._total_frames = 0 # Expected total frames based on frame IDs
def _update_performance_metrics(self, timestamp, frame_id, receive_time):
# Update latency
latency = receive_time - timestamp
self._latencies.append(latency)
# Remove latencies outside the time window
while self._latencies and self._frame_times and self._latencies[0] < receive_time - self._time_window:
self._latencies.popleft()
# Update frame times
self._frame_times.append(receive_time)
# Remove timestamps outside the time window
while self._frame_times and self._frame_times[0] < receive_time - self._time_window:
self._frame_times.popleft()
# Update frame counts for lost frame calculation
expected_frame_id = self._last_frame_id + 1 if self._last_frame_id != -1 else frame_id
if frame_id != expected_frame_id:
lost = frame_id - expected_frame_id
if lost < 0:
logger_mp.info(f"[Image Client] Received out-of-order frame ID: {frame_id}")
else:
self._lost_frames += lost
logger_mp.warning(f"[Image Client] Detected lost frames: {lost}, Expected frame ID: {expected_frame_id}, Received frame ID: {frame_id}")
self._last_frame_id = frame_id
self._total_frames = frame_id + 1
self._frame_count += 1
def _print_performance_metrics(self, receive_time):
if self._frame_count % 30 == 0:
# Calculate real-time FPS
real_time_fps = len(self._frame_times) / self._time_window if self._time_window > 0 else 0
# Calculate latency metrics
if self._latencies:
avg_latency = sum(self._latencies) / len(self._latencies)
max_latency = max(self._latencies)
min_latency = min(self._latencies)
jitter = max_latency - min_latency
else:
avg_latency = max_latency = min_latency = jitter = 0
# Calculate lost frame rate
lost_frame_rate = (self._lost_frames / self._total_frames) * 100 if self._total_frames > 0 else 0
logger_mp.info(f"[Image Client] Real-time FPS: {real_time_fps:.2f}, Avg Latency: {avg_latency*1000:.2f} ms, Max Latency: {max_latency*1000:.2f} ms, \
Min Latency: {min_latency*1000:.2f} ms, Jitter: {jitter*1000:.2f} ms, Lost Frame Rate: {lost_frame_rate:.2f}%")
def _close(self):
self._socket.close()
self._context.term()
if self._image_show:
cv2.destroyAllWindows()
logger_mp.info("Image client has been closed.")
def receive_process(self):
# Set up ZeroMQ context and socket
self._context = zmq.Context()
self._socket = self._context.socket(zmq.SUB)
self._socket.connect(f"tcp://{self._server_address}:{self._port}")
self._socket.setsockopt_string(zmq.SUBSCRIBE, "")
logger_mp.info("Image client has started, waiting to receive data...")
try:
while self.running:
# Receive message
message = self._socket.recv()
receive_time = time.time()
if self._enable_performance_eval:
header_size = struct.calcsize('dI')
try:
# Attempt to extract header and image data
header = message[:header_size]
jpg_bytes = message[header_size:]
timestamp, frame_id = struct.unpack('dI', header)
except struct.error as e:
logger_mp.warning(f"[Image Client] Error unpacking header: {e}, discarding message.")
continue
else:
# No header, entire message is image data
jpg_bytes = message
# Decode image
np_img = np.frombuffer(jpg_bytes, dtype=np.uint8)
current_image = cv2.imdecode(np_img, cv2.IMREAD_COLOR)
if current_image is None:
logger_mp.warning("[Image Client] Failed to decode image.")
continue
if self.tv_enable_shm:
np.copyto(self.tv_img_array, np.array(current_image[:, :self.tv_img_shape[1]]))
if self.wrist_enable_shm:
np.copyto(self.wrist_img_array, np.array(current_image[:, -self.wrist_img_shape[1]:]))
if self._image_show:
height, width = current_image.shape[:2]
resized_image = cv2.resize(current_image, (width // 2, height // 2))
cv2.imshow('Image Client Stream', resized_image)
if cv2.waitKey(1) & 0xFF == ord('q'):
self.running = False
if self._enable_performance_eval:
self._update_performance_metrics(timestamp, frame_id, receive_time)
self._print_performance_metrics(receive_time)
except KeyboardInterrupt:
logger_mp.info("Image client interrupted by user.")
except Exception as e:
logger_mp.warning(f"[Image Client] An error occurred while receiving data: {e}")
finally:
self._close()
if __name__ == "__main__":
# example1
# tv_img_shape = (480, 1280, 3)
# img_shm = shared_memory.SharedMemory(create=True, size=np.prod(tv_img_shape) * np.uint8().itemsize)
# img_array = np.ndarray(tv_img_shape, dtype=np.uint8, buffer=img_shm.buf)
# img_client = ImageClient(tv_img_shape = tv_img_shape, tv_img_shm_name = img_shm.name)
# img_client.receive_process()
# example2
# Initialize the client with performance evaluation enabled
# client = ImageClient(image_show = True, server_address='127.0.0.1', Unit_Test=True) # local test
client = ImageClient(image_show = True, server_address='192.168.123.164', Unit_Test=False) # deployment test
client.receive_process()

321
teleop/image_server/image_server.py

@ -1,321 +0,0 @@
import cv2
import zmq
import time
import struct
from collections import deque
import numpy as np
import pyrealsense2 as rs
import logging_mp
logger_mp = logging_mp.get_logger(__name__, level=logging_mp.DEBUG)
class RealSenseCamera(object):
def __init__(self, img_shape, fps, serial_number=None, enable_depth=False) -> None:
"""
img_shape: [height, width]
serial_number: serial number
"""
self.img_shape = img_shape
self.fps = fps
self.serial_number = serial_number
self.enable_depth = enable_depth
align_to = rs.stream.color
self.align = rs.align(align_to)
self.init_realsense()
def init_realsense(self):
self.pipeline = rs.pipeline()
config = rs.config()
if self.serial_number is not None:
config.enable_device(self.serial_number)
config.enable_stream(rs.stream.color, self.img_shape[1], self.img_shape[0], rs.format.bgr8, self.fps)
if self.enable_depth:
config.enable_stream(rs.stream.depth, self.img_shape[1], self.img_shape[0], rs.format.z16, self.fps)
profile = self.pipeline.start(config)
self._device = profile.get_device()
if self._device is None:
logger_mp.error('[Image Server] pipe_profile.get_device() is None .')
if self.enable_depth:
assert self._device is not None
depth_sensor = self._device.first_depth_sensor()
self.g_depth_scale = depth_sensor.get_depth_scale()
self.intrinsics = profile.get_stream(rs.stream.color).as_video_stream_profile().get_intrinsics()
def get_frame(self):
frames = self.pipeline.wait_for_frames()
aligned_frames = self.align.process(frames)
color_frame = aligned_frames.get_color_frame()
if self.enable_depth:
depth_frame = aligned_frames.get_depth_frame()
if not color_frame:
return None
color_image = np.asanyarray(color_frame.get_data())
# color_image = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB)
depth_image = np.asanyarray(depth_frame.get_data()) if self.enable_depth else None
return color_image, depth_image
def release(self):
self.pipeline.stop()
class OpenCVCamera():
def __init__(self, device_id, img_shape, fps):
"""
decive_id: /dev/video* or *
img_shape: [height, width]
"""
self.id = device_id
self.fps = fps
self.img_shape = img_shape
self.cap = cv2.VideoCapture(self.id, cv2.CAP_V4L2)
self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter.fourcc('M', 'J', 'P', 'G'))
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.img_shape[0])
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.img_shape[1])
self.cap.set(cv2.CAP_PROP_FPS, self.fps)
# Test if the camera can read frames
if not self._can_read_frame():
logger_mp.error(f"[Image Server] Camera {self.id} Error: Failed to initialize the camera or read frames. Exiting...")
self.release()
def _can_read_frame(self):
success, _ = self.cap.read()
return success
def release(self):
self.cap.release()
def get_frame(self):
ret, color_image = self.cap.read()
if not ret:
return None
return color_image
class ImageServer:
def __init__(self, config, port = 5555, Unit_Test = False):
"""
config example1:
{
'fps':30 # frame per second
'head_camera_type': 'opencv', # opencv or realsense
'head_camera_image_shape': [480, 1280], # Head camera resolution [height, width]
'head_camera_id_numbers': [0], # '/dev/video0' (opencv)
'wrist_camera_type': 'realsense',
'wrist_camera_image_shape': [480, 640], # Wrist camera resolution [height, width]
'wrist_camera_id_numbers': ["218622271789", "241222076627"], # realsense camera's serial number
}
config example2:
{
'fps':30 # frame per second
'head_camera_type': 'realsense', # opencv or realsense
'head_camera_image_shape': [480, 640], # Head camera resolution [height, width]
'head_camera_id_numbers': ["218622271739"], # realsense camera's serial number
'wrist_camera_type': 'opencv',
'wrist_camera_image_shape': [480, 640], # Wrist camera resolution [height, width]
'wrist_camera_id_numbers': [0,1], # '/dev/video0' and '/dev/video1' (opencv)
}
If you are not using the wrist camera, you can comment out its configuration, like this below:
config:
{
'fps':30 # frame per second
'head_camera_type': 'opencv', # opencv or realsense
'head_camera_image_shape': [480, 1280], # Head camera resolution [height, width]
'head_camera_id_numbers': [0], # '/dev/video0' (opencv)
#'wrist_camera_type': 'realsense',
#'wrist_camera_image_shape': [480, 640], # Wrist camera resolution [height, width]
#'wrist_camera_id_numbers': ["218622271789", "241222076627"], # serial number (realsense)
}
"""
logger_mp.info(config)
self.fps = config.get('fps', 30)
self.head_camera_type = config.get('head_camera_type', 'opencv')
self.head_image_shape = config.get('head_camera_image_shape', [480, 640]) # (height, width)
self.head_camera_id_numbers = config.get('head_camera_id_numbers', [0])
self.wrist_camera_type = config.get('wrist_camera_type', None)
self.wrist_image_shape = config.get('wrist_camera_image_shape', [480, 640]) # (height, width)
self.wrist_camera_id_numbers = config.get('wrist_camera_id_numbers', None)
self.port = port
self.Unit_Test = Unit_Test
# Initialize head cameras
self.head_cameras = []
if self.head_camera_type == 'opencv':
for device_id in self.head_camera_id_numbers:
camera = OpenCVCamera(device_id=device_id, img_shape=self.head_image_shape, fps=self.fps)
self.head_cameras.append(camera)
elif self.head_camera_type == 'realsense':
for serial_number in self.head_camera_id_numbers:
camera = RealSenseCamera(img_shape=self.head_image_shape, fps=self.fps, serial_number=serial_number)
self.head_cameras.append(camera)
else:
logger_mp.warning(f"[Image Server] Unsupported head_camera_type: {self.head_camera_type}")
# Initialize wrist cameras if provided
self.wrist_cameras = []
if self.wrist_camera_type and self.wrist_camera_id_numbers:
if self.wrist_camera_type == 'opencv':
for device_id in self.wrist_camera_id_numbers:
camera = OpenCVCamera(device_id=device_id, img_shape=self.wrist_image_shape, fps=self.fps)
self.wrist_cameras.append(camera)
elif self.wrist_camera_type == 'realsense':
for serial_number in self.wrist_camera_id_numbers:
camera = RealSenseCamera(img_shape=self.wrist_image_shape, fps=self.fps, serial_number=serial_number)
self.wrist_cameras.append(camera)
else:
logger_mp.warning(f"[Image Server] Unsupported wrist_camera_type: {self.wrist_camera_type}")
# Set ZeroMQ context and socket
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PUB)
self.socket.bind(f"tcp://*:{self.port}")
if self.Unit_Test:
self._init_performance_metrics()
for cam in self.head_cameras:
if isinstance(cam, OpenCVCamera):
logger_mp.info(f"[Image Server] Head camera {cam.id} resolution: {cam.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)} x {cam.cap.get(cv2.CAP_PROP_FRAME_WIDTH)}")
elif isinstance(cam, RealSenseCamera):
logger_mp.info(f"[Image Server] Head camera {cam.serial_number} resolution: {cam.img_shape[0]} x {cam.img_shape[1]}")
else:
logger_mp.warning("[Image Server] Unknown camera type in head_cameras.")
for cam in self.wrist_cameras:
if isinstance(cam, OpenCVCamera):
logger_mp.info(f"[Image Server] Wrist camera {cam.id} resolution: {cam.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)} x {cam.cap.get(cv2.CAP_PROP_FRAME_WIDTH)}")
elif isinstance(cam, RealSenseCamera):
logger_mp.info(f"[Image Server] Wrist camera {cam.serial_number} resolution: {cam.img_shape[0]} x {cam.img_shape[1]}")
else:
logger_mp.warning("[Image Server] Unknown camera type in wrist_cameras.")
logger_mp.info("[Image Server] Image server has started, waiting for client connections...")
def _init_performance_metrics(self):
self.frame_count = 0 # Total frames sent
self.time_window = 1.0 # Time window for FPS calculation (in seconds)
self.frame_times = deque() # Timestamps of frames sent within the time window
self.start_time = time.time() # Start time of the streaming
def _update_performance_metrics(self, current_time):
# Add current time to frame times deque
self.frame_times.append(current_time)
# Remove timestamps outside the time window
while self.frame_times and self.frame_times[0] < current_time - self.time_window:
self.frame_times.popleft()
# Increment frame count
self.frame_count += 1
def _print_performance_metrics(self, current_time):
if self.frame_count % 30 == 0:
elapsed_time = current_time - self.start_time
real_time_fps = len(self.frame_times) / self.time_window
logger_mp.info(f"[Image Server] Real-time FPS: {real_time_fps:.2f}, Total frames sent: {self.frame_count}, Elapsed time: {elapsed_time:.2f} sec")
def _close(self):
for cam in self.head_cameras:
cam.release()
for cam in self.wrist_cameras:
cam.release()
self.socket.close()
self.context.term()
logger_mp.info("[Image Server] The server has been closed.")
def send_process(self):
try:
while True:
head_frames = []
for cam in self.head_cameras:
if self.head_camera_type == 'opencv':
color_image = cam.get_frame()
if color_image is None:
logger_mp.error("[Image Server] Head camera frame read is error.")
break
elif self.head_camera_type == 'realsense':
color_image, depth_iamge = cam.get_frame()
if color_image is None:
logger_mp.error("[Image Server] Head camera frame read is error.")
break
head_frames.append(color_image)
if len(head_frames) != len(self.head_cameras):
break
head_color = cv2.hconcat(head_frames)
if self.wrist_cameras:
wrist_frames = []
for cam in self.wrist_cameras:
if self.wrist_camera_type == 'opencv':
color_image = cam.get_frame()
if color_image is None:
logger_mp.error("[Image Server] Wrist camera frame read is error.")
break
elif self.wrist_camera_type == 'realsense':
color_image, depth_iamge = cam.get_frame()
if color_image is None:
logger_mp.error("[Image Server] Wrist camera frame read is error.")
break
wrist_frames.append(color_image)
wrist_color = cv2.hconcat(wrist_frames)
# Concatenate head and wrist frames
full_color = cv2.hconcat([head_color, wrist_color])
else:
full_color = head_color
ret, buffer = cv2.imencode('.jpg', full_color)
if not ret:
logger_mp.error("[Image Server] Frame imencode is failed.")
continue
jpg_bytes = buffer.tobytes()
if self.Unit_Test:
timestamp = time.time()
frame_id = self.frame_count
header = struct.pack('dI', timestamp, frame_id) # 8-byte double, 4-byte unsigned int
message = header + jpg_bytes
else:
message = jpg_bytes
self.socket.send(message)
if self.Unit_Test:
current_time = time.time()
self._update_performance_metrics(current_time)
self._print_performance_metrics(current_time)
except KeyboardInterrupt:
logger_mp.warning("[Image Server] Interrupted by user.")
finally:
self._close()
if __name__ == "__main__":
config = {
'fps': 30,
'head_camera_type': 'opencv',
'head_camera_image_shape': [480, 1280], # Head camera resolution
'head_camera_id_numbers': [0],
'wrist_camera_type': 'opencv',
'wrist_camera_image_shape': [480, 640], # Wrist camera resolution
'wrist_camera_id_numbers': [2, 4],
}
server = ImageServer(config, Unit_Test=False)
server.send_process()

47
teleop/robot_control/robot_hand_unitree.py

@ -406,7 +406,7 @@ class Gripper_JointIndex(IntEnum):
if __name__ == "__main__":
import argparse
from televuer import TeleVuerWrapper
from teleop.image_server.image_client import ImageClient
from teleimager import ImageClient
parser = argparse.ArgumentParser()
parser.add_argument('--xr-mode', type=str, choices=['hand', 'controller'], default='hand', help='Select XR device tracking source')
@ -414,34 +414,15 @@ if __name__ == "__main__":
args = parser.parse_args()
logger_mp.info(f"args:{args}\n")
# image
img_config = {
'fps': 30,
'head_camera_type': 'opencv',
'head_camera_image_shape': [480, 1280], # Head camera resolution
'head_camera_id_numbers': [0],
}
ASPECT_RATIO_THRESHOLD = 2.0 # If the aspect ratio exceeds this value, it is considered binocular
if len(img_config['head_camera_id_numbers']) > 1 or (img_config['head_camera_image_shape'][1] / img_config['head_camera_image_shape'][0] > ASPECT_RATIO_THRESHOLD):
BINOCULAR = True
else:
BINOCULAR = False
# image
if BINOCULAR and not (img_config['head_camera_image_shape'][1] / img_config['head_camera_image_shape'][0] > ASPECT_RATIO_THRESHOLD):
tv_img_shape = (img_config['head_camera_image_shape'][0], img_config['head_camera_image_shape'][1] * 2, 3)
else:
tv_img_shape = (img_config['head_camera_image_shape'][0], img_config['head_camera_image_shape'][1], 3)
tv_img_shm = shared_memory.SharedMemory(create = True, size = np.prod(tv_img_shape) * np.uint8().itemsize)
tv_img_array = np.ndarray(tv_img_shape, dtype = np.uint8, buffer = tv_img_shm.buf)
img_client = ImageClient(tv_img_shape = tv_img_shape, tv_img_shm_name = tv_img_shm.name)
image_receive_thread = threading.Thread(target = img_client.receive_process, daemon = True)
image_receive_thread.daemon = True
image_receive_thread.start()
# image client
img_client = ImageClient(host='127.0.0.1') #host='192.168.123.164'
if not img_client.has_head_cam():
logger_mp.error("Head camera is required. Please enable head camera on the image server side.")
head_img_shape = img_client.get_head_shape()
tv_binocular = img_client.is_binocular()
# television: obtain hand pose data from the XR device and transmit the robot's head camera image to the XR device.
tv_wrapper = TeleVuerWrapper(binocular=BINOCULAR, use_hand_tracking=args.xr_mode == "hand", img_shape=tv_img_shape, img_shm_name=tv_img_shm.name,
return_state_data=True, return_hand_rot_data = False)
tv_wrapper = TeleVuerWrapper(binocular=tv_binocular, use_hand_tracking=args.xr_mode == "hand", img_shape=head_img_shape, return_hand_rot_data = False)
# end-effector
if args.ee == "dex3":
@ -462,7 +443,9 @@ if __name__ == "__main__":
user_input = input("Please enter the start signal (enter 's' to start the subsequent program):\n")
if user_input.lower() == 's':
while True:
tele_data = tv_wrapper.get_motion_state_data()
head_img, head_img_fps = img_client.get_head_frame()
tv_wrapper.set_display_image(head_img)
tele_data = tv_wrapper.get_tele_data()
if args.ee == "dex3" and args.xr_mode == "hand":
with left_hand_pos_array.get_lock():
left_hand_pos_array[:] = tele_data.left_hand_pos.flatten()
@ -470,14 +453,14 @@ if __name__ == "__main__":
right_hand_pos_array[:] = tele_data.right_hand_pos.flatten()
elif args.ee == "dex1" and args.xr_mode == "controller":
with left_gripper_value.get_lock():
left_gripper_value.value = tele_data.left_trigger_value
left_gripper_value.value = tele_data.left_ctrl_triggerValue
with right_gripper_value.get_lock():
right_gripper_value.value = tele_data.right_trigger_value
right_gripper_value.value = tele_data.right_ctrl_triggerValue
elif args.ee == "dex1" and args.xr_mode == "hand":
with left_gripper_value.get_lock():
left_gripper_value.value = tele_data.left_pinch_value
left_gripper_value.value = tele_data.left_hand_pinchValue
with right_gripper_value.get_lock():
right_gripper_value.value = tele_data.right_pinch_value
right_gripper_value.value = tele_data.right_hand_pinchValue
else:
pass

1
teleop/teleimager

@ -0,0 +1 @@
Subproject commit 8e31bbbceb45ed0eeeae274eac0aa928959307e4

171
teleop/teleop_hand_and_arm.py

@ -1,8 +1,7 @@
import numpy as np
import time
import argparse
import cv2
from multiprocessing import shared_memory, Value, Array, Lock
from multiprocessing import Value, Array, Lock
import threading
import logging_mp
logging_mp.basic_config(level=logging_mp.INFO)
@ -20,7 +19,7 @@ from teleop.robot_control.robot_arm_ik import G1_29_ArmIK, G1_23_ArmIK, H1_2_Arm
from teleop.robot_control.robot_hand_unitree import Dex3_1_Controller, Dex1_1_Gripper_Controller
from teleop.robot_control.robot_hand_inspire import Inspire_Controller
from teleop.robot_control.robot_hand_brainco import Brainco_Controller
from teleop.image_server.image_client import ImageClient
from teleimager import ImageClient
from teleop.utils.episode_writer import EpisodeWriter
from teleop.utils.ipc import IPC_Server
from sshkeyboard import listen_keyboard, stop_listening
@ -38,7 +37,7 @@ START = False # Enable to start robot following VR user motion
STOP = False # Enable to begin system exit procedure
RECORD_TOGGLE = False # [Ready] ⇄ [Recording] ⟶ [AutoSave] ⟶ [Ready] (⇄ manual) (⟶ auto)
RECORD_RUNNING = False # True if [Recording]
RECORD_READY = True # True if [Ready], False if [Recording] / [AutoSave]
RECORD_READY = False # True if [Ready], False if [Recording] or [AutoSave]
# task info
TASK_NAME = None
TASK_DESC = None
@ -96,78 +95,25 @@ if __name__ == '__main__':
logger_mp.info(f"args: {args}")
try:
# ipc communication. client usage: see utils/ipc.py
# ipc communication mode. client usage: see utils/ipc.py
if args.ipc:
ipc_server = IPC_Server(on_press=on_press, on_info=on_info, get_state=get_state)
ipc_server.start()
# sshkeyboard communication
# sshkeyboard communication mode
else:
listen_keyboard_thread = threading.Thread(target=listen_keyboard, kwargs={"on_press": on_press, "until": None, "sequential": False,}, daemon=True)
listen_keyboard_thread.start()
# image client: img_config should be the same as the configuration in image_server.py (of Robot's development computing unit)
if args.sim:
img_config = {
'fps': 30,
'head_camera_type': 'opencv',
'head_camera_image_shape': [480, 640], # Head camera resolution
'head_camera_id_numbers': [0],
'wrist_camera_type': 'opencv',
'wrist_camera_image_shape': [480, 640], # Wrist camera resolution
'wrist_camera_id_numbers': [2, 4],
}
else:
img_config = {
'fps': 30,
'head_camera_type': 'opencv',
'head_camera_image_shape': [480, 1280], # Head camera resolution
'head_camera_id_numbers': [0],
'wrist_camera_type': 'opencv',
'wrist_camera_image_shape': [480, 640], # Wrist camera resolution
'wrist_camera_id_numbers': [2, 4],
}
ASPECT_RATIO_THRESHOLD = 2.0 # If the aspect ratio exceeds this value, it is considered binocular
if len(img_config['head_camera_id_numbers']) > 1 or (img_config['head_camera_image_shape'][1] / img_config['head_camera_image_shape'][0] > ASPECT_RATIO_THRESHOLD):
BINOCULAR = True
else:
BINOCULAR = False
if 'wrist_camera_type' in img_config:
WRIST = True
else:
WRIST = False
# image client
img_client = ImageClient(host='127.0.0.1')#host='192.168.123.164'
if not img_client.has_head_cam():
logger_mp.error("Head camera is required. Please enable head camera on the image server side.")
if BINOCULAR and not (img_config['head_camera_image_shape'][1] / img_config['head_camera_image_shape'][0] > ASPECT_RATIO_THRESHOLD):
tv_img_shape = (img_config['head_camera_image_shape'][0], img_config['head_camera_image_shape'][1] * 2, 3)
else:
tv_img_shape = (img_config['head_camera_image_shape'][0], img_config['head_camera_image_shape'][1], 3)
tv_img_shm = shared_memory.SharedMemory(create = True, size = np.prod(tv_img_shape) * np.uint8().itemsize)
tv_img_array = np.ndarray(tv_img_shape, dtype = np.uint8, buffer = tv_img_shm.buf)
if WRIST and args.sim:
wrist_img_shape = (img_config['wrist_camera_image_shape'][0], img_config['wrist_camera_image_shape'][1] * 2, 3)
wrist_img_shm = shared_memory.SharedMemory(create = True, size = np.prod(wrist_img_shape) * np.uint8().itemsize)
wrist_img_array = np.ndarray(wrist_img_shape, dtype = np.uint8, buffer = wrist_img_shm.buf)
img_client = ImageClient(tv_img_shape = tv_img_shape, tv_img_shm_name = tv_img_shm.name,
wrist_img_shape = wrist_img_shape, wrist_img_shm_name = wrist_img_shm.name, server_address="127.0.0.1")
elif WRIST and not args.sim:
wrist_img_shape = (img_config['wrist_camera_image_shape'][0], img_config['wrist_camera_image_shape'][1] * 2, 3)
wrist_img_shm = shared_memory.SharedMemory(create = True, size = np.prod(wrist_img_shape) * np.uint8().itemsize)
wrist_img_array = np.ndarray(wrist_img_shape, dtype = np.uint8, buffer = wrist_img_shm.buf)
img_client = ImageClient(tv_img_shape = tv_img_shape, tv_img_shm_name = tv_img_shm.name,
wrist_img_shape = wrist_img_shape, wrist_img_shm_name = wrist_img_shm.name)
else:
img_client = ImageClient(tv_img_shape = tv_img_shape, tv_img_shm_name = tv_img_shm.name)
image_receive_thread = threading.Thread(target = img_client.receive_process, daemon = True)
image_receive_thread.daemon = True
image_receive_thread.start()
head_img_shape = img_client.get_head_shape()
tv_binocular = img_client.is_binocular()
# television: obtain hand pose data from the XR device and transmit the robot's head camera image to the XR device.
tv_wrapper = TeleVuerWrapper(binocular=BINOCULAR, use_hand_tracking=args.xr_mode == "hand", img_shape=tv_img_shape, img_shm_name=tv_img_shm.name,
return_state_data=True, return_hand_rot_data = False)
tv_wrapper = TeleVuerWrapper(binocular=tv_binocular, use_hand_tracking=args.xr_mode == "hand", img_shape=head_img_shape, return_hand_rot_data = False)
# arm
if args.arm == "G1_29":
@ -248,7 +194,7 @@ if __name__ == '__main__':
sport_client.SetTimeout(0.0001)
sport_client.Init()
# record + headless mode
# record + headless / non-headless mode
if args.record and args.headless:
recorder = EpisodeWriter(task_dir = args.task_dir + args.task_name, task_goal = args.task_desc, frequency = args.frequency, rerun_log = False)
elif args.record and not args.headless:
@ -257,15 +203,29 @@ if __name__ == '__main__':
logger_mp.info("Please enter the start signal (enter 'r' to start the subsequent program)")
while not START and not STOP:
time.sleep(0.01)
# wait for start signal. by the way, get image and send it to xr
head_img, head_img_fps = img_client.get_head_frame()
tv_wrapper.set_display_image(head_img)
time.sleep(0.033)
logger_mp.info("start program.")
arm_ctrl.speed_gradual_max()
# main loop. robot start to follow VR user's motion
while not STOP:
start_time = time.time()
# get image
head_img, head_img_fps = img_client.get_head_frame()
if img_client.has_left_wrist_cam():
left_wrist_img, _ = img_client.get_left_wrist_frame()
if img_client.has_right_wrist_cam():
right_wrist_img, _ = img_client.get_right_wrist_frame()
# send head rimage to xr
tv_wrapper.set_display_image(head_img)
# non-headless mode
if not args.headless:
tv_resized_image = cv2.resize(tv_img_array, (tv_img_shape[1] // 2, tv_img_shape[0] // 2))
cv2.imshow("record image", tv_resized_image)
resized_image = cv2.resize(head_img, (head_img_shape[1] // 2, head_img_shape[0] // 2))
cv2.imshow("record image", resized_image)
# opencv GUI communication
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
@ -279,6 +239,7 @@ if __name__ == '__main__':
if args.sim:
publish_reset_category(2, reset_pose_publisher)
# record mode
if args.record and RECORD_TOGGLE:
RECORD_TOGGLE = False
if not RECORD_RUNNING:
@ -291,8 +252,9 @@ if __name__ == '__main__':
recorder.save_episode()
if args.sim:
publish_reset_category(1, reset_pose_publisher)
# get input data
tele_data = tv_wrapper.get_motion_state_data()
# get xr's tele data
tele_data = tv_wrapper.get_tele_data()
if (args.ee == "dex3" or args.ee == "inspire1" or args.ee == "brainco") and args.xr_mode == "hand":
with left_hand_pos_array.get_lock():
left_hand_pos_array[:] = tele_data.left_hand_pos.flatten()
@ -300,30 +262,30 @@ if __name__ == '__main__':
right_hand_pos_array[:] = tele_data.right_hand_pos.flatten()
elif args.ee == "dex1" and args.xr_mode == "controller":
with left_gripper_value.get_lock():
left_gripper_value.value = tele_data.left_trigger_value
left_gripper_value.value = tele_data.left_ctrl_triggerValue
with right_gripper_value.get_lock():
right_gripper_value.value = tele_data.right_trigger_value
right_gripper_value.value = tele_data.right_ctrl_triggerValue
elif args.ee == "dex1" and args.xr_mode == "hand":
with left_gripper_value.get_lock():
left_gripper_value.value = tele_data.left_pinch_value
left_gripper_value.value = tele_data.left_hand_pinchValue
with right_gripper_value.get_lock():
right_gripper_value.value = tele_data.right_pinch_value
right_gripper_value.value = tele_data.right_hand_pinchValue
else:
pass
# high level control
if args.xr_mode == "controller" and args.motion:
# quit teleoperate
if tele_data.tele_state.right_aButton:
if tele_data.right_ctrl_aButton:
START = False
STOP = True
# command robot to enter damping mode. soft emergency stop function
if tele_data.tele_state.left_thumbstick_state and tele_data.tele_state.right_thumbstick_state:
if tele_data.left_ctrl_thumbstick and tele_data.right_ctrl_thumbstick:
sport_client.Damp()
# control, limit velocity to within 0.3
sport_client.Move(-tele_data.tele_state.left_thumbstick_value[1] * 0.3,
-tele_data.tele_state.left_thumbstick_value[0] * 0.3,
-tele_data.tele_state.right_thumbstick_value[0] * 0.3)
# https://github.com/unitreerobotics/xr_teleoperate/issues/135, control, limit velocity to within 0.3
sport_client.Move(-tele_data.left_ctrl_thumbstickValue[1] * 0.3,
-tele_data.left_ctrl_thumbstickValue[0] * 0.3,
-tele_data.right_ctrl_thumbstickValue[0]* 0.3)
# get current robot state data.
current_lr_arm_q = arm_ctrl.get_current_dual_arm_q()
@ -331,7 +293,7 @@ if __name__ == '__main__':
# solve ik using motor data and wrist pose, then use ik results to control arms.
time_ik_start = time.time()
sol_q, sol_tauff = arm_ik.solve_ik(tele_data.left_arm_pose, tele_data.right_arm_pose, current_lr_arm_q, current_lr_arm_dq)
sol_q, sol_tauff = arm_ik.solve_ik(tele_data.left_wrist_pose, tele_data.right_wrist_pose, current_lr_arm_q, current_lr_arm_dq)
time_ik_end = time.time()
logger_mp.debug(f"ik:\t{round(time_ik_end - time_ik_start, 6)}")
arm_ctrl.ctrl_dual_arm(sol_q, sol_tauff)
@ -363,9 +325,9 @@ if __name__ == '__main__':
left_hand_action = [dual_gripper_action_array[0]]
right_hand_action = [dual_gripper_action_array[1]]
current_body_state = arm_ctrl.get_current_motor_q().tolist()
current_body_action = [-tele_data.tele_state.left_thumbstick_value[1] * 0.3,
-tele_data.tele_state.left_thumbstick_value[0] * 0.3,
-tele_data.tele_state.right_thumbstick_value[0] * 0.3]
current_body_action = [-tele_data.left_ctrl_thumbstickValue[1] * 0.3,
-tele_data.left_ctrl_thumbstickValue[0] * 0.3,
-tele_data.right_ctrl_thumbstickValue[0]* 0.3]
elif (args.ee == "inspire1" or args.ee == "brainco") and args.xr_mode == "hand":
with dual_hand_data_lock:
left_ee_state = dual_hand_state_array[:6]
@ -381,11 +343,7 @@ if __name__ == '__main__':
right_hand_action = []
current_body_state = []
current_body_action = []
# head image
current_tv_image = tv_img_array.copy()
# wrist image
if WRIST:
current_wrist_image = wrist_img_array.copy()
# arm state and action
left_arm_state = current_lr_arm_q[:7]
right_arm_state = current_lr_arm_q[-7:]
@ -394,17 +352,19 @@ if __name__ == '__main__':
if RECORD_RUNNING:
colors = {}
depths = {}
if BINOCULAR:
colors[f"color_{0}"] = current_tv_image[:, :tv_img_shape[1]//2]
colors[f"color_{1}"] = current_tv_image[:, tv_img_shape[1]//2:]
if WRIST:
colors[f"color_{2}"] = current_wrist_image[:, :wrist_img_shape[1]//2]
colors[f"color_{3}"] = current_wrist_image[:, wrist_img_shape[1]//2:]
if tv_binocular:
colors[f"color_{0}"] = head_img[:, :head_img_shape[1]//2]
colors[f"color_{1}"] = head_img[:, head_img_shape[1]//2:]
if img_client.has_left_wrist_cam():
colors[f"color_{2}"] = left_wrist_img
if img_client.has_right_wrist_cam():
colors[f"color_{3}"] = right_wrist_img
else:
colors[f"color_{0}"] = current_tv_image
if WRIST:
colors[f"color_{1}"] = current_wrist_image[:, :wrist_img_shape[1]//2]
colors[f"color_{2}"] = current_wrist_image[:, wrist_img_shape[1]//2:]
colors[f"color_{0}"] = head_img
if img_client.has_left_wrist_cam():
colors[f"color_{1}"] = left_wrist_img
if img_client.has_right_wrist_cam():
colors[f"color_{2}"] = right_wrist_img
states = {
"left_arm": {
"qpos": left_arm_state.tolist(), # numpy.array -> list
@ -471,6 +431,8 @@ if __name__ == '__main__':
logger_mp.info("KeyboardInterrupt, exiting program...")
finally:
arm_ctrl.ctrl_dual_arm_go_home()
img_client.close()
tv_wrapper.close()
if args.ipc:
ipc_server.stop()
@ -480,11 +442,6 @@ if __name__ == '__main__':
if args.sim:
sim_state_subscriber.stop_subscribe()
tv_img_shm.close()
tv_img_shm.unlink()
if WRIST:
wrist_img_shm.close()
wrist_img_shm.unlink()
if args.record:
recorder.close()

2
teleop/televuer

@ -1 +1 @@
Subproject commit 86367f8c9ba248f4065b959bfc53e5f36339bf6d
Subproject commit cb38fa12763ab989d7c1705b9d9a4318f5391170
Loading…
Cancel
Save