Browse Source

[update] 1. new image server 2. support gripper 3. rerun_visualizer

main
silencht 1 year ago
committed by Your Name
parent
commit
6697411c45
  1. 1
      .gitignore
  2. 2
      teleop/image_server/image_client.py
  3. 215
      teleop/image_server/image_server.py
  4. 219
      teleop/robot_control/robot_hand_unitree.py
  5. 76
      teleop/teleop_hand_and_arm.py
  6. 13
      teleop/utils/episode_writer.py
  7. 231
      teleop/utils/rerun_visualizer.py

1
.gitignore

@ -19,3 +19,4 @@ __MACOSX/
!real_right_hand.jpg
!wrist_and_ring_mount.png
teleop/data/
teleop/utils/episode_0*

2
teleop/image_server/image_client.py

@ -160,7 +160,7 @@ class ImageClient:
np.copyto(self.tv_img_array, np.array(current_image[:, :self.tv_img_shape[1]]))
if self.wrist_enable_shm:
np.copyto(self.wrist_img_array, np.array(current_image[:, self.wrist_img_shape[1]:]))
np.copyto(self.wrist_img_array, np.array(current_image[:, -self.wrist_img_shape[1]:]))
if self._image_show:
height, width = current_image.shape[:2]

215
teleop/image_server/image_server.py

@ -8,7 +8,11 @@ import pyrealsense2 as rs
class RealSenseCamera(object):
def __init__(self, img_shape, fps, serial_number = None, enable_depth = False) -> None:
def __init__(self, img_shape, fps, serial_number=None, enable_depth=False) -> None:
"""
img_shape: [height, width]
serial_number: serial number
"""
self.img_shape = img_shape
self.fps = fps
self.serial_number = serial_number
@ -25,15 +29,15 @@ class RealSenseCamera(object):
if self.serial_number is not None:
config.enable_device(self.serial_number)
config.enable_stream(rs.stream.color, self.img_shape[1] // 2, self.img_shape[0], rs.format.bgr8, self.fps)
config.enable_stream(rs.stream.color, self.img_shape[1], self.img_shape[0], rs.format.bgr8, self.fps)
if self.enable_depth:
config.enable_stream(rs.stream.depth, self.img_shape[1] // 2, self.img_shape[0], rs.format.z16, self.fps)
config.enable_stream(rs.stream.depth, self.img_shape[1], self.img_shape[0], rs.format.z16, self.fps)
profile = self.pipeline.start(config)
self._device = profile.get_device()
if self._device is None:
print('pipe_profile.get_device() is None .')
print('[Image Server] pipe_profile.get_device() is None .')
if self.enable_depth:
assert self._device is not None
depth_sensor = self._device.first_depth_sensor()
@ -41,7 +45,6 @@ class RealSenseCamera(object):
self.intrinsics = profile.get_stream(rs.stream.color).as_video_stream_profile().get_intrinsics()
def get_frame(self):
frames = self.pipeline.wait_for_frames()
aligned_frames = self.align.process(frames)
@ -51,59 +54,133 @@ class RealSenseCamera(object):
depth_frame = aligned_frames.get_depth_frame()
if not color_frame:
return None, None
return None
color_image = np.asanyarray(color_frame.get_data())
# color_image = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB)
depth_image = np.asanyarray(depth_frame.get_data()) if self.enable_depth else None
return color_image, depth_image
def release(self):
self.pipeline.stop()
class ImageServer:
def __init__(self, img_shape = (480, 640 * 2, 3), fps = 30, enable_wrist = False, port = 5555, Unit_Test = False):
class OpenCVCamera():
def __init__(self, device_id, img_shape, fps):
"""
decive_id: /dev/video* or *
img_shape: [height, width]
"""
img_shape: User's expected camera resolution shape (H, W, C).
self.id = device_id
self.fps = fps
self.img_shape = img_shape
self.cap = cv2.VideoCapture(self.id, cv2.CAP_V4L2)
self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter.fourcc('M', 'J', 'P', 'G'))
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.img_shape[0])
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.img_shape[1])
self.cap.set(cv2.CAP_PROP_FPS, self.fps)
# Test if the camera can read frames
if not self._can_read_frame():
print(f"[Image Server] Camera {self.id} Error: Failed to initialize the camera or read frames. Exiting...")
self.release()
p.s.1: 'W' of binocular camera according to binocular width (instead of monocular).
def _can_read_frame(self):
success, _ = self.cap.read()
return success
p.s.2: User expectations are not necessarily the end result. The final img_shape value needs to be determined from \
the terminal output (i.e. "Image Resolution: width is ···")
def release(self):
self.cap.release()
fps: user's expected camera frames per second.
def get_frame(self):
ret, color_image = self.cap.read()
if not ret:
return None
return color_image
port: The port number to bind to, where you can receive messages from subscribers.
Unit_Test: When both server and client are True, it can be used to test the image transfer latency, \
network jitter, frame loss rate and other information.
class ImageServer:
def __init__(self, config, port = 5555, Unit_Test = False):
"""
self.img_shape = img_shape
self.fps = fps
self.enable_wrist = enable_wrist
self.port = port
self.enable_performance_eval = Unit_Test
config:
{
'fps':30 # frame per second
'head_camera_type': 'opencv', # opencv or realsense
'head_camera_image_shape': [480, 1280], # Head camera resolution [height, width]
'head_camera_id_numbers': [0], # '/dev/video0' (opencv)
'wrist_camera_type': 'realsense',
'wrist_camera_image_shape': [480, 640], # Wrist camera resolution [height, width]
'wrist_camera_id_numbers': ["218622271789", "241222076627"], # serial number (realsense)
}
"""
print(config)
self.fps = config.get('fps', 30)
self.head_camera_type = config.get('head_camera_type', 'opencv')
self.head_image_shape = config.get('head_camera_image_shape', [480, 640]) # (height, width)
self.head_camera_id_numbers = config.get('head_camera_id_numbers', [0])
# initiate camera
self.cap = cv2.VideoCapture(0, cv2.CAP_V4L2)
self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter.fourcc('M', 'J', 'P', 'G'))
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.img_shape[1])
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.img_shape[0])
self.cap.set(cv2.CAP_PROP_FPS, self.fps)
self.wrist_camera_type = config.get('wrist_camera_type', None)
self.wrist_image_shape = config.get('wrist_camera_image_shape', [480, 640]) # (height, width)
self.wrist_camera_id_numbers = config.get('wrist_camera_id_numbers', None)
if self.enable_wrist:
# initiate realsense camera
self.left_cam = RealSenseCamera(img_shape = self.img_shape, fps = self.fps, serial_number = "218622271789") # left wrist camera
self.right_cam = RealSenseCamera(img_shape = self.img_shape, fps = self.fps, serial_number = "218622278527") # right wrist camera
self.port = port
self.Unit_Test = Unit_Test
# Initialize head cameras
self.head_cameras = []
if self.head_camera_type == 'opencv':
for device_id in self.head_camera_id_numbers:
camera = OpenCVCamera(device_id=device_id, img_shape=self.head_image_shape, fps=self.fps)
self.head_cameras.append(camera)
elif self.head_camera_type == 'realsense':
for serial_number in self.head_camera_id_numbers:
camera = RealSenseCamera(img_shape=self.head_image_shape, fps=self.fps, serial_number=serial_number)
self.head_cameras.append(camera)
else:
print(f"[Image Server] Unsupported head_camera_type: {self.head_camera_type}")
# Initialize wrist cameras if provided
self.wrist_cameras = []
if self.wrist_camera_type and self.wrist_camera_id_numbers:
if self.wrist_camera_type == 'opencv':
for device_id in self.wrist_camera_id_numbers:
camera = OpenCVCamera(device_id=device_id, img_shape=self.wrist_image_shape, fps=self.fps)
self.wrist_cameras.append(camera)
elif self.wrist_camera_type == 'realsense':
for serial_number in self.wrist_camera_id_numbers:
camera = RealSenseCamera(img_shape=self.wrist_image_shape, fps=self.fps, serial_number=serial_number)
self.wrist_cameras.append(camera)
else:
print(f"[Image Server] Unsupported wrist_camera_type: {self.wrist_camera_type}")
# set ZeroMQ context and socket
# Set ZeroMQ context and socket
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PUB)
self.socket.bind(f"tcp://*:{self.port}")
if self.enable_performance_eval:
if self.Unit_Test:
self._init_performance_metrics()
print("Image server has started, waiting for client connections...")
print(f"Image Resolution: width is {self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)}, height is {self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)}\n")
for cam in self.head_cameras:
if isinstance(cam, OpenCVCamera):
print(f"[Image Server] Head camera {cam.id} resolution: {cam.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)} x {cam.cap.get(cv2.CAP_PROP_FRAME_WIDTH)}")
elif isinstance(cam, RealSenseCamera):
print(f"[Image Server] Head camera {cam.serial_number} resolution: {cam.img_shape[0]} x {cam.img_shape[1]}")
else:
print("[Image Server] Unknown camera type in head_cameras.")
for cam in self.wrist_cameras:
if isinstance(cam, OpenCVCamera):
print(f"[Image Server] Wrist camera {cam.id} resolution: {cam.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)} x {cam.cap.get(cv2.CAP_PROP_FRAME_WIDTH)}")
elif isinstance(cam, RealSenseCamera):
print(f"[Image Server] Wrist camera {cam.serial_number} resolution: {cam.img_shape[0]} x {cam.img_shape[1]}")
else:
print("[Image Server] Unknown camera type in wrist_cameras.")
print("[Image Server] Image server has started, waiting for client connections...")
def _init_performance_metrics(self):
self.frame_count = 0 # Total frames sent
@ -127,7 +204,10 @@ class ImageServer:
print(f"[Image Server] Real-time FPS: {real_time_fps:.2f}, Total frames sent: {self.frame_count}, Elapsed time: {elapsed_time:.2f} sec")
def _close(self):
self.cap.release()
for cam in self.head_cameras:
cam.release()
for cam in self.wrist_cameras:
cam.release()
self.socket.close()
self.context.term()
print("[Image Server] The server has been closed.")
@ -135,25 +215,52 @@ class ImageServer:
def send_process(self):
try:
while True:
ret, head_color = self.cap.read()
if not ret:
print("[Image Server] Frame read is error.")
head_frames = []
for cam in self.head_cameras:
if self.head_camera_type == 'opencv':
color_image = cam.get_frame()
if color_image is None:
print("[Image Server] Head camera frame read is error.")
break
elif self.head_camera_type == 'realsense':
color_image, depth_iamge = cam.get_frame()
if color_image is None:
print("[Image Server] Head camera frame read is error.")
break
head_frames.append(color_image)
if len(head_frames) != len(self.head_cameras):
break
head_color = cv2.hconcat(head_frames)
if self.wrist_cameras:
wrist_frames = []
for cam in self.wrist_cameras:
if self.wrist_camera_type == 'opencv':
color_image = cam.get_frame()
if color_image is None:
print("[Image Server] Wrist camera frame read is error.")
break
elif self.wrist_camera_type == 'realsense':
color_image, depth_iamge = cam.get_frame()
if color_image is None:
print("[Image Server] Wrist camera frame read is error.")
break
wrist_frames.append(color_image)
wrist_color = cv2.hconcat(wrist_frames)
if self.enable_wrist:
left_wrist_color, left_wrist_depth = self.left_cam.get_frame()
right_wrist_color, right_wrist_depth = self.right_cam.get_frame()
# Concatenate images along the width
head_color = cv2.hconcat([head_color, left_wrist_color, right_wrist_color])
# Concatenate head and wrist frames
full_color = cv2.hconcat([head_color, wrist_color])
else:
full_color = head_color
ret, buffer = cv2.imencode('.jpg', head_color)
ret, buffer = cv2.imencode('.jpg', full_color)
if not ret:
print("[Image Server] Frame imencode is failed.")
continue
jpg_bytes = buffer.tobytes()
if self.enable_performance_eval:
if self.Unit_Test:
timestamp = time.time()
frame_id = self.frame_count
header = struct.pack('dI', timestamp, frame_id) # 8-byte double, 4-byte unsigned int
@ -163,7 +270,7 @@ class ImageServer:
self.socket.send(message)
if self.enable_performance_eval:
if self.Unit_Test:
current_time = time.time()
self._update_performance_metrics(current_time)
self._print_performance_metrics(current_time)
@ -173,7 +280,17 @@ class ImageServer:
finally:
self._close()
if __name__ == "__main__":
# server = ImageServer(img_shape = (720, 640 * 2, 3), Unit_Test = True) # test
server = ImageServer(img_shape = (640, 480 * 2, 3), enable_wrist = True, Unit_Test = False) # deployment
config = {
'fps': 30,
'head_camera_type': 'opencv',
'head_camera_image_shape': [480, 1280], # Head camera resolution
'head_camera_id_numbers': [0],
'wrist_camera_type': 'opencv',
'wrist_camera_image_shape': [480, 640], # Wrist camera resolution
'wrist_camera_id_numbers': [2, 4],
}
server = ImageServer(config, Unit_Test=False)
server.send_process()

219
teleop/robot_control/robot_hand_unitree.py

@ -1,6 +1,11 @@
# for dex3-1
from unitree_sdk2py.core.channel import ChannelPublisher, ChannelSubscriber, ChannelFactoryInitialize # dds
from unitree_sdk2py.idl.unitree_hg.msg.dds_ import HandCmd_, HandState_ # idl
from unitree_sdk2py.idl.default import unitree_hg_msg_dds__HandCmd_
# for gripper
from unitree_sdk2py.core.channel import ChannelPublisher, ChannelSubscriber, ChannelFactoryInitialize # dds
from unitree_sdk2py.idl.unitree_go.msg.dds_ import MotorCmds_, MotorStates_ # idl
from unitree_sdk2py.idl.default import unitree_go_msg_dds__MotorCmd_
import numpy as np
from enum import IntEnum
@ -15,8 +20,7 @@ sys.path.append(parent2_dir)
from teleop.robot_control.hand_retargeting import HandRetargeting, HandType
unitree_tip_indices = [4, 9, 14]
unitree_tip_indices = [4, 9, 14] # [thumb, index, middle] in OpenXR
Dex3_Num_Motors = 7
kTopicDex3LeftCommand = "rt/dex3/left/cmd"
kTopicDex3RightCommand = "rt/dex3/right/cmd"
@ -171,8 +175,7 @@ class Dex3_1_Controller:
# Read left and right q_state from shared arrays
state_data = np.concatenate((np.array(left_hand_state_array[:]), np.array(right_hand_state_array[:])))
is_hand_data_initialized = not np.all(left_hand_mat == 0.0)
if is_hand_data_initialized:
if not np.all(left_hand_mat == 0.0): # if hand data has been initialized.
left_q_target = self.hand_retargeting.left_retargeting.retarget(left_hand_mat[unitree_tip_indices])[[0,1,2,3,4,5,6]]
right_q_target = self.hand_retargeting.right_retargeting.retarget(right_hand_mat[unitree_tip_indices])[[0,1,2,3,4,5,6]]
@ -210,36 +213,222 @@ class Dex3_1_Right_JointIndex(IntEnum):
kRightHandSix = 6 # middle_1
unitree_gripper_indices = [4, 9] # [thumb, index]
Gripper_Num_Motors = 2
kTopicGripperCommand = "rt/unitree_actuator/cmd"
kTopicGripperState = "rt/unitree_actuator/state"
class Gripper_Controller:
def __init__(self, left_hand_array, right_hand_array, dual_gripper_data_lock = None, dual_gripper_state_out = None, dual_gripper_action_out = None,
fps = 250.0, Unit_Test = False):
"""
[note] A *_array type parameter requires using a multiprocessing Array, because it needs to be passed to the internal child process
left_hand_array: [input] Left hand skeleton data (required from XR device) to control_thread
right_hand_array: [input] Right hand skeleton data (required from XR device) to control_thread
dual_gripper_data_lock: Data synchronization lock for dual_gripper_state_array and dual_gripper_action_array
dual_gripper_state: [output] Return left(1), right(1) gripper motor state
dual_gripper_action: [output] Return left(1), right(1) gripper motor action
fps: Control frequency
Unit_Test: Whether to enable unit testing
"""
print("Initialize Gripper_Controller...")
self.fps = fps
self.Unit_Test = Unit_Test
if self.Unit_Test:
ChannelFactoryInitialize(0)
# initialize handcmd publisher and handstate subscriber
self.GripperCmb_publisher = ChannelPublisher(kTopicGripperCommand, MotorCmds_)
self.GripperCmb_publisher.Init()
self.GripperState_subscriber = ChannelSubscriber(kTopicGripperState, MotorStates_)
self.GripperState_subscriber.Init()
self.dual_gripper_state = [0.0] * len(Gripper_JointIndex)
# initialize subscribe thread
self.subscribe_state_thread = threading.Thread(target=self._subscribe_gripper_state)
self.subscribe_state_thread.daemon = True
self.subscribe_state_thread.start()
while True:
if any(state != 0.0 for state in self.dual_gripper_state):
break
time.sleep(0.01)
print("[Gripper_Controller] Waiting to subscribe dds...")
self.gripper_control_thread = threading.Thread(target=self.control_thread, args=(left_hand_array, right_hand_array, self.dual_gripper_state,
dual_gripper_data_lock, dual_gripper_state_out, dual_gripper_action_out))
self.gripper_control_thread.daemon = True
self.gripper_control_thread.start()
print("Initialize Gripper_Controller OK!\n")
def _subscribe_gripper_state(self):
while True:
gripper_msg = self.GripperState_subscriber.Read()
if gripper_msg is not None:
for idx, id in enumerate(Gripper_JointIndex):
self.dual_gripper_state[idx] = gripper_msg.states[id].q
time.sleep(0.002)
def ctrl_dual_gripper(self, gripper_q_target):
"""set current left, right gripper motor state target q"""
for idx, id in enumerate(Gripper_JointIndex):
self.gripper_msg.cmds[id].q = gripper_q_target[idx]
self.GripperCmb_publisher.Write(self.gripper_msg)
# print("gripper ctrl publish ok.")
def control_thread(self, left_hand_array, right_hand_array, dual_gripper_state_in, dual_hand_data_lock = None,
dual_gripper_state_out = None, dual_gripper_action_out = None):
self.running = True
DELTA_GRIPPER_CMD = 0.18 # The motor rotates 5.4 radians, the clamping jaw slide open 9 cm, so 0.6 rad <==> 1 cm, 0.18 rad <==> 3 mm
THUMB_INDEX_DISTANCE_MIN = 0.030 # Assuming a minimum Euclidean distance is 3 cm between thumb and index.
THUMB_INDEX_DISTANCE_MAX = 0.120 # Assuming a maximum Euclidean distance is 12 cm between thumb and index.
LEFT_MAPPED_MIN = 0.0 # The minimum initial motor position when the gripper closes at startup.
RIGHT_MAPPED_MIN = 0.0 # The minimum initial motor position when the gripper closes at startup.
# The maximum initial motor position when the gripper closes before calibration (with the rail stroke calculated as 0.6 cm/rad * 9 rad = 5.4 cm).
LEFT_MAPPED_MAX = LEFT_MAPPED_MIN + 5.40
RIGHT_MAPPED_MAX = RIGHT_MAPPED_MIN + 5.40
left_target_action = (LEFT_MAPPED_MAX - LEFT_MAPPED_MIN) / 2.0
right_target_action = (RIGHT_MAPPED_MAX - RIGHT_MAPPED_MIN) / 2.0
dq = 0.0
tau = 0.0
kp = 5.0
kd = 0.05
# initialize gripper cmd msg
self.gripper_msg = MotorCmds_()
self.gripper_msg.cmds = [unitree_go_msg_dds__MotorCmd_() for _ in range(len(Gripper_JointIndex))]
for id in Gripper_JointIndex:
self.gripper_msg.cmds[id].dq = dq
self.gripper_msg.cmds[id].tau = tau
self.gripper_msg.cmds[id].kp = kp
self.gripper_msg.cmds[id].kd = kd
try:
while self.running:
start_time = time.time()
# get dual hand skeletal point state from XR device
left_hand_mat = np.array(left_hand_array[:]).reshape(25, 3).copy()
right_hand_mat = np.array(right_hand_array[:]).reshape(25, 3).copy()
if not np.all(left_hand_mat == 0.0): # if hand data has been initialized.
left_euclidean_distance = np.linalg.norm(left_hand_mat[unitree_gripper_indices[1]] - left_hand_mat[unitree_gripper_indices[0]])
right_euclidean_distance = np.linalg.norm(right_hand_mat[unitree_gripper_indices[1]] - left_hand_mat[unitree_gripper_indices[0]])
# Linear mapping from [0, THUMB_INDEX_DISTANCE_MAX] to gripper action range
left_target_action = np.interp(left_euclidean_distance, [THUMB_INDEX_DISTANCE_MIN, THUMB_INDEX_DISTANCE_MAX], [LEFT_MAPPED_MIN, LEFT_MAPPED_MAX])
right_target_action = np.interp(right_euclidean_distance, [THUMB_INDEX_DISTANCE_MIN, THUMB_INDEX_DISTANCE_MAX], [RIGHT_MAPPED_MIN, RIGHT_MAPPED_MAX])
# else: # TEST WITHOUT APPLE VISION PRO
# current_time = time.time()
# period = 2.5
# import math
# left_euclidean_distance = THUMB_INDEX_DISTANCE_MAX * (math.sin(2 * math.pi * current_time / period) + 1) / 2
# right_euclidean_distance = THUMB_INDEX_DISTANCE_MAX * (math.sin(2 * math.pi * current_time / period) + 1) / 2
# left_target_action = np.interp(left_euclidean_distance, [THUMB_INDEX_DISTANCE_MIN, THUMB_INDEX_DISTANCE_MAX], [LEFT_MAPPED_MIN, LEFT_MAPPED_MAX])
# right_target_action = np.interp(right_euclidean_distance, [THUMB_INDEX_DISTANCE_MIN, THUMB_INDEX_DISTANCE_MAX], [RIGHT_MAPPED_MIN, RIGHT_MAPPED_MAX])
# get current dual gripper motor state
dual_gripper_state = np.array(dual_gripper_state_in[:])
# clip dual gripper action to avoid overflow
left_actual_action = np.clip(left_target_action, dual_gripper_state[1] - DELTA_GRIPPER_CMD, dual_gripper_state[1] + DELTA_GRIPPER_CMD)
right_actual_action = np.clip(right_target_action, dual_gripper_state[0] - DELTA_GRIPPER_CMD, dual_gripper_state[0] + DELTA_GRIPPER_CMD)
dual_gripper_action = np.array([right_actual_action, left_actual_action])
if dual_gripper_state_out and dual_gripper_action_out:
with dual_hand_data_lock:
dual_gripper_state_out[:] = dual_gripper_state - np.array([RIGHT_MAPPED_MIN, LEFT_MAPPED_MIN])
dual_gripper_action_out[:] = dual_gripper_action - np.array([RIGHT_MAPPED_MIN, LEFT_MAPPED_MIN])
# print(f"LEFT: euclidean:{left_euclidean_distance:.4f} \tstate:{dual_gripper_state_out[1]:.4f}\
# \ttarget_action:{right_target_action - RIGHT_MAPPED_MIN:.4f} \tactual_action:{dual_gripper_action_out[1]:.4f}")
# print(f"RIGHT:euclidean:{right_euclidean_distance:.4f} \tstate:{dual_gripper_state_out[0]:.4f}\
# \ttarget_action:{left_target_action - LEFT_MAPPED_MIN:.4f} \tactual_action:{dual_gripper_action_out[0]:.4f}")
self.ctrl_dual_gripper(dual_gripper_action)
current_time = time.time()
time_elapsed = current_time - start_time
sleep_time = max(0, (1 / self.fps) - time_elapsed)
time.sleep(sleep_time)
finally:
print("Gripper_Controller has been closed.")
class Gripper_JointIndex(IntEnum):
kLeftGripper = 0
kRightGripper = 1
if __name__ == "__main__":
import argparse
from teleop.open_television.tv_wrapper import TeleVisionWrapper
from teleop.image_server.image_client import ImageClient
parser = argparse.ArgumentParser()
parser.add_argument('--binocular', action='store_true', help='Use binocular camera')
parser.add_argument('--monocular', dest='binocular', action='store_false', help='Use monocular camera')
parser.set_defaults(binocular=True)
parser.add_argument('--dex', action='store_true', help='Use dex3-1 hand')
parser.add_argument('--gripper', dest='dex', action='store_false', help='Use gripper')
parser.set_defaults(dex=True)
args = parser.parse_args()
print(f"args:{args}\n")
# image
img_shape = (480, 1280, 3)
img_shm = shared_memory.SharedMemory(create=True, size=np.prod(img_shape) * np.uint8().itemsize)
img_array = np.ndarray(img_shape, dtype=np.uint8, buffer=img_shm.buf)
img_client = ImageClient(tv_img_shape = img_shape, tv_img_shm_name = img_shm.name)
image_receive_thread = threading.Thread(target=img_client.receive_process, daemon=True)
img_config = {
'fps': 30,
'head_camera_type': 'opencv',
'head_camera_image_shape': [480, 1280], # Head camera resolution
'head_camera_id_numbers': [0],
'wrist_camera_type': 'opencv',
'wrist_camera_image_shape': [480, 640], # Wrist camera resolution
'wrist_camera_id_numbers': [2, 4],
}
ASPECT_RATIO_THRESHOLD = 2.0 # If the aspect ratio exceeds this value, it is considered binocular
if len(img_config['head_camera_id_numbers']) > 1 or (img_config['head_camera_image_shape'][1] / img_config['head_camera_image_shape'][0] > ASPECT_RATIO_THRESHOLD):
BINOCULAR = True
else:
BINOCULAR = False
# image
if BINOCULAR and not (img_config['head_camera_image_shape'][1] / img_config['head_camera_image_shape'][0] > ASPECT_RATIO_THRESHOLD):
tv_img_shape = (img_config['head_camera_image_shape'][0], img_config['head_camera_image_shape'][1] * 2, 3)
else:
tv_img_shape = (img_config['head_camera_image_shape'][0], img_config['head_camera_image_shape'][1], 3)
img_shm = shared_memory.SharedMemory(create = True, size = np.prod(tv_img_shape) * np.uint8().itemsize)
img_array = np.ndarray(tv_img_shape, dtype = np.uint8, buffer = img_shm.buf)
img_client = ImageClient(tv_img_shape = tv_img_shape, tv_img_shm_name = img_shm.name)
image_receive_thread = threading.Thread(target = img_client.receive_process, daemon = True)
image_receive_thread.daemon = True
image_receive_thread.start()
# television and arm
tv_wrapper = TeleVisionWrapper(args.binocular, img_shape, img_shm.name)
tv_wrapper = TeleVisionWrapper(BINOCULAR, tv_img_shape, img_shm.name)
if args.dex:
left_hand_array = Array('d', 75, lock=True)
right_hand_array = Array('d', 75, lock=True)
dual_hand_data_lock = Lock()
dual_hand_state_array = Array('d', 14, lock=False) # current left, right hand state(14) data.
dual_hand_action_array = Array('d', 14, lock=False) # current left, right hand action(14) data.
hand_ctrl = Dex3_1_Controller(left_hand_array, right_hand_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array, Unit_Test = True)
else:
left_hand_array = Array('d', 75, lock=True)
right_hand_array = Array('d', 75, lock=True)
dual_gripper_data_lock = Lock()
dual_gripper_state_array = Array('d', 2, lock=False) # current left, right gripper state(2) data.
dual_gripper_action_array = Array('d', 2, lock=False) # current left, right gripper action(2) data.
gripper_ctrl = Gripper_Controller(left_hand_array, right_hand_array, dual_gripper_data_lock, dual_gripper_state_array, dual_gripper_action_array, Unit_Test = True)
user_input = input("Please enter the start signal (enter 's' to start the subsequent program):\n")
@ -249,5 +438,7 @@ if __name__ == "__main__":
# send hand skeleton data to hand_ctrl.control_process
left_hand_array[:] = left_hand.flatten()
right_hand_array[:] = right_hand.flatten()
print(f"Dual hand state array: {list(dual_hand_state_array)}")
# with dual_hand_data_lock:
# print(f"state : {list(dual_hand_state_array)} \naction: {list(dual_hand_action_array)} \n")
time.sleep(0.01)

76
teleop/teleop_hand_and_arm.py

@ -14,7 +14,7 @@ sys.path.append(parent_dir)
from teleop.open_television.tv_wrapper import TeleVisionWrapper
from teleop.robot_control.robot_arm import G1_29_ArmController
from teleop.robot_control.robot_arm_ik import G1_29_ArmIK
from teleop.robot_control.robot_hand_unitree import Dex3_1_Controller
from teleop.robot_control.robot_hand_unitree import Dex3_1_Controller, Gripper_Controller
from teleop.image_server.image_client import ImageClient
from teleop.utils.episode_writer import EpisodeWriter
@ -26,25 +26,44 @@ if __name__ == '__main__':
parser.add_argument('--record', action = 'store_true', help = 'Save data or not')
parser.add_argument('--no-record', dest = 'record', action = 'store_false', help = 'Do not save data')
parser.set_defaults(record = True)
parser.set_defaults(record = False)
parser.add_argument('--binocular', action = 'store_true', help = 'Use binocular camera')
parser.add_argument('--monocular', dest = 'binocular', action = 'store_false', help = 'Use monocular camera')
parser.set_defaults(binocular = True)
parser.add_argument('--wrist', action = 'store_true', help = 'Use wrist camera')
parser.add_argument('--no-wrist', dest = 'wrist', action = 'store_false', help = 'Not use wrist camera')
parser.set_defaults(wrist = False)
parser.add_argument('--dex', action='store_true', help='Use dex3-1 hand')
parser.add_argument('--gripper', dest='dex', action='store_false', help='Use gripper')
parser.set_defaults(dex = False)
args = parser.parse_args()
print(f"args:{args}\n")
# image
tv_img_shape = (480, 640 * 2, 3)
img_config = {
'fps': 30,
'head_camera_type': 'opencv',
'head_camera_image_shape': [480, 1280], # Head camera resolution
'head_camera_id_numbers': [0],
'wrist_camera_type': 'opencv',
'wrist_camera_image_shape': [480, 640], # Wrist camera resolution
'wrist_camera_id_numbers': [2, 4],
}
ASPECT_RATIO_THRESHOLD = 2.0 # If the aspect ratio exceeds this value, it is considered binocular
if len(img_config['head_camera_id_numbers']) > 1 or (img_config['head_camera_image_shape'][1] / img_config['head_camera_image_shape'][0] > ASPECT_RATIO_THRESHOLD):
BINOCULAR = True
else:
BINOCULAR = False
if 'wrist_camera_type' in img_config:
WRIST = True
else:
WRIST = False
if BINOCULAR and not (img_config['head_camera_image_shape'][1] / img_config['head_camera_image_shape'][0] > ASPECT_RATIO_THRESHOLD):
tv_img_shape = (img_config['head_camera_image_shape'][0], img_config['head_camera_image_shape'][1] * 2, 3)
else:
tv_img_shape = (img_config['head_camera_image_shape'][0], img_config['head_camera_image_shape'][1], 3)
tv_img_shm = shared_memory.SharedMemory(create = True, size = np.prod(tv_img_shape) * np.uint8().itemsize)
tv_img_array = np.ndarray(tv_img_shape, dtype = np.uint8, buffer = tv_img_shm.buf)
if args.wrist:
wrist_img_shape = tv_img_shape
if WRIST:
wrist_img_shape = (img_config['wrist_camera_image_shape'][0], img_config['wrist_camera_image_shape'][1] * 2, 3)
wrist_img_shm = shared_memory.SharedMemory(create = True, size = np.prod(wrist_img_shape) * np.uint8().itemsize)
wrist_img_array = np.ndarray(wrist_img_shape, dtype = np.uint8, buffer = wrist_img_shm.buf)
img_client = ImageClient(tv_img_shape = tv_img_shape, tv_img_shm_name = tv_img_shm.name,
@ -57,20 +76,28 @@ if __name__ == '__main__':
image_receive_thread.start()
# television and arm
tv_wrapper = TeleVisionWrapper(args.binocular, tv_img_shape, tv_img_shm.name)
tv_wrapper = TeleVisionWrapper(BINOCULAR, tv_img_shape, tv_img_shm.name)
arm_ctrl = G1_29_ArmController()
arm_ik = G1_29_ArmIK()
# hand
if args.dex:
left_hand_array = Array('d', 75, lock = True) # [input]
right_hand_array = Array('d', 75, lock = True) # [input]
dual_hand_data_lock = Lock()
dual_hand_state_array = Array('d', 14, lock = False) # [output] current left, right hand state(14) data.
dual_hand_action_array = Array('d', 14, lock = False) # [output] current left, right hand action(14) data.
hand_ctrl = Dex3_1_Controller(left_hand_array, right_hand_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array)
else:
left_hand_array = Array('d', 75, lock=True)
right_hand_array = Array('d', 75, lock=True)
dual_gripper_data_lock = Lock()
dual_gripper_state_array = Array('d', 2, lock=False) # current left, right gripper state(2) data.
dual_gripper_action_array = Array('d', 2, lock=False) # current left, right gripper action(2) data.
gripper_ctrl = Gripper_Controller(left_hand_array, right_hand_array, dual_gripper_data_lock, dual_gripper_state_array, dual_gripper_action_array)
if args.record:
recorder = EpisodeWriter(task_dir = args.task_dir, frequency = args.frequency)
recorder = EpisodeWriter(task_dir = args.task_dir, frequency = args.frequency, log = True)
try:
user_input = input("Please enter the start signal (enter 'r' to start the subsequent program):\n")
@ -120,14 +147,25 @@ if __name__ == '__main__':
# record data
if args.record:
# dex hand or gripper
if args.dex:
with dual_hand_data_lock:
left_hand_state = dual_hand_state_array[:7]
right_hand_state = dual_hand_state_array[-7:]
left_hand_action = dual_hand_action_array[:7]
right_hand_action = dual_hand_action_array[-7:]
else:
with dual_gripper_data_lock:
left_hand_state = dual_gripper_state_array[1]
right_hand_state = dual_gripper_state_array[0]
left_hand_action = dual_gripper_action_array[1]
right_hand_action = dual_gripper_action_array[0]
# head image
current_tv_image = tv_img_array.copy()
if args.wrist:
# wrist image
if WRIST:
current_wrist_image = wrist_img_array.copy()
# arm state and action
left_arm_state = current_lr_arm_q[:7]
right_arm_state = current_lr_arm_q[-7:]
left_arm_action = sol_q[:7]
@ -136,15 +174,15 @@ if __name__ == '__main__':
if recording:
colors = {}
depths = {}
if args.binocular:
if BINOCULAR:
colors[f"color_{0}"] = current_tv_image[:, :tv_img_shape[1]//2]
colors[f"color_{1}"] = current_tv_image[:, tv_img_shape[1]//2:]
if args.wrist:
if WRIST:
colors[f"color_{2}"] = current_wrist_image[:, :wrist_img_shape[1]//2]
colors[f"color_{3}"] = current_wrist_image[:, wrist_img_shape[1]//2:]
else:
colors[f"color_{0}"] = current_tv_image
if args.wrist:
if WRIST:
colors[f"color_{1}"] = current_wrist_image[:, :wrist_img_shape[1]//2]
colors[f"color_{2}"] = current_wrist_image[:, wrist_img_shape[1]//2:]
states = {
@ -206,7 +244,7 @@ if __name__ == '__main__':
finally:
tv_img_shm.unlink()
tv_img_shm.close()
if args.wrist:
if WRIST:
wrist_img_shm.unlink()
wrist_img_shm.close()
print("Finally, exiting program...")

13
teleop/utils/episode_writer.py

@ -4,10 +4,11 @@ import json
import datetime
import numpy as np
import time
from .rerun_visualizer import RerunLogger
class EpisodeWriter(object):
def __init__(self, task_dir, frequency=30, image_size=[640, 480]):
class EpisodeWriter():
def __init__(self, task_dir, frequency=30, image_size=[640, 480], log = False):
"""
image_size: [width, height]
"""
@ -20,6 +21,7 @@ class EpisodeWriter(object):
self.episode_data = []
self.item_id = -1
self.episode_id = -1
self.log = log
if os.path.exists(self.task_dir):
episode_dirs = [episode_dir for episode_dir in os.listdir(self.task_dir) if 'episode_' in episode_dir]
episode_last = sorted(episode_dirs)[-1] if len(episode_dirs) > 0 else None
@ -83,8 +85,10 @@ class EpisodeWriter(object):
os.makedirs(self.color_dir, exist_ok=True)
os.makedirs(self.depth_dir, exist_ok=True)
os.makedirs(self.audio_dir, exist_ok=True)
if self.log:
self.online_logger = RerunLogger(prefix="online/", IdxRangeBoundary = 60)
def add_item(self, colors, depths=None, states=None, actions=None, tactiles=None, audios=None, log = True):
def add_item(self, colors, depths=None, states=None, actions=None, tactiles=None, audios=None):
self.item_id += 1
item_data = {
'idx': self.item_id,
@ -125,9 +129,10 @@ class EpisodeWriter(object):
self.episode_data.append(item_data)
if log:
if self.log:
curent_record_time = time.time()
print(f"==> episode_id:{self.episode_id} item_id:{self.item_id} current_time:{curent_record_time}")
self.online_logger.log_item_data(item_data)
def save_episode(self):

231
teleop/utils/rerun_visualizer.py

@ -0,0 +1,231 @@
import os
import json
import cv2
import time
import rerun as rr
import rerun.blueprint as rrb
from datetime import datetime
class RerunEpisodeReader:
def __init__(self, task_dir = ".", json_file="data.json"):
self.task_dir = task_dir
self.json_file = json_file
def return_episode_data(self, episode_idx):
# Load episode data on-demand
episode_dir = os.path.join(self.task_dir, f"episode_{episode_idx:04d}")
json_path = os.path.join(episode_dir, self.json_file)
if not os.path.exists(json_path):
raise FileNotFoundError(f"Episode {episode_idx} data.json not found.")
with open(json_path, 'r', encoding='utf-8') as jsonf:
json_file = json.load(jsonf)
episode_data = []
# Loop over the data entries and process each one
for item_data in json_file['data']:
# Process images and other data
colors = self._process_images(item_data, 'colors', episode_dir)
depths = self._process_images(item_data, 'depths', episode_dir)
audios = self._process_audio(item_data, 'audios', episode_dir)
# Append the data in the item_data list
episode_data.append(
{
'idx': item_data.get('idx', 0),
'colors': colors,
'depths': depths,
'states': item_data.get('states', {}),
'actions': item_data.get('actions', {}),
'tactiles': item_data.get('tactiles', {}),
'audios': audios,
}
)
return episode_data
def _process_images(self, item_data, data_type, dir_path):
images = {}
for key, file_name in item_data.get(data_type, {}).items():
if file_name:
file_path = os.path.join(dir_path, file_name)
if os.path.exists(file_path):
image = cv2.imread(file_path)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
images[key] = image
return images
def _process_audio(self, item_data, data_type, episode_dir):
audio_data = {}
dir_path = os.path.join(episode_dir, data_type)
for key, file_name in item_data.get(data_type, {}).items():
if file_name:
file_path = os.path.join(dir_path, file_name)
if os.path.exists(file_path):
pass # Handle audio data if needed
return audio_data
class RerunLogger:
def __init__(self, prefix = "", IdxRangeBoundary = 30, memory_limit = "200MB"):
rr.init(datetime.now().strftime("Runtime_%Y%m%d_%H%M%S"))
rr.spawn(memory_limit = memory_limit)
self.prefix = prefix
self.IdxRangeBoundary = IdxRangeBoundary
# Set up blueprint for live visualization
if self.IdxRangeBoundary:
self.setup_blueprint()
def setup_blueprint(self):
data_plot_paths = [
f"{self.prefix}left_arm",
f"{self.prefix}right_arm",
f"{self.prefix}left_hand",
f"{self.prefix}right_hand"
]
image_plot_paths = [
f"{self.prefix}colors/color_0",
f"{self.prefix}colors/color_1",
f"{self.prefix}colors/color_2",
f"{self.prefix}colors/color_3"
]
views = []
for plot_path in data_plot_paths:
view = rrb.TimeSeriesView(
origin = plot_path,
time_ranges=[
rrb.VisibleTimeRange(
"idx",
start = rrb.TimeRangeBoundary.cursor_relative(seq = -self.IdxRangeBoundary),
end = rrb.TimeRangeBoundary.cursor_relative(),
)
],
plot_legend = rrb.PlotLegend(visible = True),
)
views.append(view)
for plot_path in image_plot_paths:
view = rrb.Spatial2DView(
origin = plot_path,
time_ranges=[
rrb.VisibleTimeRange(
"idx",
start = rrb.TimeRangeBoundary.cursor_relative(seq = -self.IdxRangeBoundary),
end = rrb.TimeRangeBoundary.cursor_relative(),
)
],
)
views.append(view)
grid = rrb.Grid(contents = views,
grid_columns=2,
column_shares=[1, 1],
row_shares=[1, 1, 0.5],
)
rr.send_blueprint(grid)
def log_item_data(self, item_data: dict):
rr.set_time_sequence("idx", item_data.get('idx', 0))
# Log states
states = item_data.get('states', {}) or {}
for part, state_info in states.items():
if state_info:
values = state_info.get('qpos', [])
for idx, val in enumerate(values):
rr.log(f"{self.prefix}{part}/states/qpos/{idx}", rr.Scalar(val))
# Log actions
actions = item_data.get('actions', {}) or {}
for part, action_info in actions.items():
if action_info:
values = action_info.get('qpos', [])
for idx, val in enumerate(values):
rr.log(f"{self.prefix}{part}/actions/qpos/{idx}", rr.Scalar(val))
# Log colors (images)
colors = item_data.get('colors', {}) or {}
for color_key, color_val in colors.items():
if color_val is not None:
rr.log(f"{self.prefix}colors/{color_key}", rr.Image(color_val))
# Log depths (images)
depths = item_data.get('depths', {}) or {}
for depth_key, depth_val in depths.items():
if depth_val is not None:
# rr.log(f"{self.prefix}depths/{depth_key}", rr.Image(depth_val))
pass # Handle depth if needed
# Log tactile if needed
tactiles = item_data.get('tactiles', {}) or {}
for hand, tactile_vals in tactiles.items():
if tactile_vals is not None:
pass # Handle tactile if needed
# Log audios if needed
audios = item_data.get('audios', {}) or {}
for audio_key, audio_val in audios.items():
if audio_val is not None:
pass # Handle audios if needed
def log_episode_data(self, episode_data: list):
for item_data in episode_data:
self.log_item_data(item_data)
if __name__ == "__main__":
import gdown
import zipfile
import os
url = "https://drive.google.com/file/d/1f5UuFl1z_gaByg_7jDRj1_NxfJZh2evD/view?usp=sharing"
zip_file = "rerun_testdata.zip"
if not os.path.exists("episode_0006"):
if not os.path.exists(zip_file):
file_id = url.split('/')[5]
gdown.download(id=file_id, output=zip_file, quiet=False)
print("download ok.")
with zipfile.ZipFile(zip_file, 'r') as zip_ref:
zip_ref.extractall(".")
print("uncompress ok.")
os.remove(zip_file)
print("clean file ok.")
else:
print("rerun_testdata exits.")
episode_reader = RerunEpisodeReader()
episode_data6 = episode_reader.return_episode_data(6)
episode_data8 = episode_reader.return_episode_data(8)
# Example 1: Offline Visualization
user_input = input("Please enter the start signal (enter 'off' to start the subsequent program):\n")
if user_input.lower() == 'off':
print("Starting offline visualization...")
offline_logger = RerunLogger(prefix="offline/")
offline_logger.log_episode_data(episode_data6)
print("Offline visualization completed.")
# Example 2: Online Visualization with Fixed Time Window
user_input = input("Please enter the start signal (enter 'on' to start the subsequent program):\n")
if user_input.lower() == 'on':
print("Starting online visualization with fixed idx size...")
online_logger = RerunLogger(prefix="online/", IdxRangeBoundary = 60)
for item_data in episode_data6:
online_logger.log_item_data(item_data)
time.sleep(0.033) # 30hz
print("Online visualization completed.")
# Example 3: Online Visualization with Fixed Time Window
user_input = input("Please enter the start signal (enter 'on' to start the subsequent program):\n")
if user_input.lower() == 'on':
print("Starting online visualization with fixed idx size...")
online_logger = RerunLogger(prefix="online/", IdxRangeBoundary = 60)
for item_data in episode_data8:
online_logger.log_item_data(item_data)
time.sleep(0.033) # 30hz
print("Online visualization completed.")
Loading…
Cancel
Save