Browse Source

[update] pass-through mode, webrtc mode, fix bugs, fix ipc mode, etc.

main
silencht 5 months ago
parent
commit
66c2c7c6eb
  1. 2
      teleop/robot_control/robot_hand_unitree.py
  2. 2
      teleop/teleimager
  3. 150
      teleop/teleop_hand_and_arm.py
  4. 2
      teleop/televuer
  5. 8
      teleop/utils/episode_writer.py
  6. 45
      teleop/utils/ipc.py

2
teleop/robot_control/robot_hand_unitree.py

@ -419,7 +419,7 @@ if __name__ == "__main__":
if not img_client.has_head_cam():
logger_mp.error("Head camera is required. Please enable head camera on the image server side.")
head_img_shape = img_client.get_head_shape()
tv_binocular = img_client.is_binocular()
tv_binocular = img_client.head_is_binocular()
# television: obtain hand pose data from the XR device and transmit the robot's head camera image to the XR device.
tv_wrapper = TeleVuerWrapper(binocular=tv_binocular, use_hand_tracking=args.xr_mode == "hand", img_shape=head_img_shape, return_hand_rot_data = False)

2
teleop/teleimager

@ -1 +1 @@
Subproject commit 8e31bbbceb45ed0eeeae274eac0aa928959307e4
Subproject commit f81e0c6b234698661831134c6b8aba4f1a3ee709

150
teleop/teleop_hand_and_arm.py

@ -35,13 +35,21 @@ def publish_reset_category(category: int,publisher): # Scene Reset signal
# state transition
START = False # Enable to start robot following VR user motion
STOP = False # Enable to begin system exit procedure
RECORD_TOGGLE = False # [Ready] ⇄ [Recording] ⟶ [AutoSave] ⟶ [Ready] (⇄ manual) (⟶ auto)
READY = False # Ready to (1) enter START state, (2) enter RECORD_RUNNING state
RECORD_RUNNING = False # True if [Recording]
RECORD_READY = False # True if [Ready], False if [Recording] or [AutoSave]
# task info
TASK_NAME = None
TASK_DESC = None
ITEM_ID = None
RECORD_TOGGLE = False # Toggle recording state
# ------- --------- ----------- ----------- ---------
# state [Ready] ==> [Recording] ==> [AutoSave] --> [Ready]
# ------- --------- | ----------- | ----------- | ---------
# START True |manual True |manual True | True
# READY True |set False |set False |auto True
# RECORD_RUNNING False |to True |to False | False
# ∨ ∨ ∨
# RECORD_TOGGLE False True False True False False
# ------- --------- ----------- ----------- ---------
# ==> manual: when READY is True, set RECORD_TOGGLE=True to transition.
# --> auto : Auto-transition after saving data.
def on_press(key):
global STOP, START, RECORD_TOGGLE
if key == 'r':
@ -54,27 +62,19 @@ def on_press(key):
else:
logger_mp.warning(f"[on_press] {key} was pressed, but no action is defined for this key.")
def on_info(info):
"""Only handle CMD_TOGGLE_RECORD's task info"""
global TASK_NAME, TASK_DESC, ITEM_ID
TASK_NAME = info.get("task_name")
TASK_DESC = info.get("task_desc")
ITEM_ID = info.get("item_id")
logger_mp.debug(f"[on_info] Updated globals: {TASK_NAME}, {TASK_DESC}, {ITEM_ID}")
def get_state() -> dict:
"""Return current heartbeat state"""
global START, STOP, RECORD_RUNNING, RECORD_READY
global START, STOP, RECORD_RUNNING, READY
return {
"START": START,
"STOP": STOP,
"READY": READY,
"RECORD_RUNNING": RECORD_RUNNING,
"RECORD_READY": RECORD_READY,
}
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--frequency', type = float, default = 30.0, help = 'save data\'s frequency')
parser.add_argument('--frequency', type = float, default = 30.0, help = 'save and control \'s frequency')
# basic control parameters
parser.add_argument('--xr-mode', type=str, choices=['hand', 'controller'], default='hand', help='Select XR device tracking source')
@ -84,12 +84,17 @@ if __name__ == '__main__':
parser.add_argument('--motion', action = 'store_true', help = 'Enable motion control mode')
parser.add_argument('--headless', action='store_true', help='Enable headless mode (no display)')
parser.add_argument('--sim', action = 'store_true', help = 'Enable isaac simulation mode')
parser.add_argument('--affinity', action = 'store_true', help = 'Enable high priority and set CPU affinity')
parser.add_argument('--affinity', action = 'store_true', help = 'Enable high priority and set CPU affinity mode')
parser.add_argument('--ipc', action = 'store_true', help = 'Enable IPC server to handle input; otherwise enable sshkeyboard')
parser.add_argument('--record', action = 'store_true', help = 'Enable data recording')
parser.add_argument('--pass-through', action='store_true', help='Enable passthrough mode (use real-world view in XR device)')
parser.add_argument('--img-server-ip', type=str, default='127.0.0.1', help='IP address of image server')
# record mode and task info
parser.add_argument('--record', action = 'store_true', help = 'Enable data recording mode')
parser.add_argument('--task-dir', type = str, default = './utils/data/', help = 'path to save data')
parser.add_argument('--task-name', type = str, default = 'pick cube', help = 'task name for recording')
parser.add_argument('--task-desc', type = str, default = 'e.g. pick the red cube on the table.', help = 'task goal for recording')
parser.add_argument('--task-name', type = str, default = 'pick cube', help = 'task file name for recording')
parser.add_argument('--task-goal', type = str, default = 'pick up cube.', help = 'task goal for recording at json file')
parser.add_argument('--task-desc', type = str, default = 'task description', help = 'task description for recording at json file')
parser.add_argument('--task-steps', type = str, default = 'step1: do this; step2: do that;', help = 'task steps for recording at json file')
args = parser.parse_args()
logger_mp.info(f"args: {args}")
@ -97,23 +102,26 @@ if __name__ == '__main__':
try:
# ipc communication mode. client usage: see utils/ipc.py
if args.ipc:
ipc_server = IPC_Server(on_press=on_press, on_info=on_info, get_state=get_state)
ipc_server = IPC_Server(on_press=on_press,get_state=get_state)
ipc_server.start()
# sshkeyboard communication mode
else:
listen_keyboard_thread = threading.Thread(target=listen_keyboard, kwargs={"on_press": on_press, "until": None, "sequential": False,}, daemon=True)
listen_keyboard_thread = threading.Thread(target=listen_keyboard,
kwargs={"on_press": on_press, "until": None, "sequential": False,},
daemon=True)
listen_keyboard_thread.start()
# image client
img_client = ImageClient(host='127.0.0.1')#host='192.168.123.164'
if not img_client.has_head_cam():
logger_mp.error("Head camera is required. Please enable head camera on the image server side.")
img_client = ImageClient(host=args.img_server_ip)
xr_need_local_img = not (args.pass_through or img_client.enable_head_webrtc())
head_img_shape = img_client.get_head_shape()
tv_binocular = img_client.is_binocular()
# television: obtain hand pose data from the XR device and transmit the robot's head camera image to the XR device.
tv_wrapper = TeleVuerWrapper(binocular=tv_binocular, use_hand_tracking=args.xr_mode == "hand", img_shape=head_img_shape, return_hand_rot_data = False)
# televuer_wrapper: obtain hand pose data from the XR device and transmit the robot's head camera image to the XR device.
tv_wrapper = TeleVuerWrapper(use_hand_tracking=args.xr_mode == "hand",
pass_through=args.pass_through,
binocular=img_client.head_is_binocular(),
img_shape=img_client.get_head_shape(),
webrtc=img_client.enable_head_webrtc(),
webrtc_url=img_client.head_webrtc_url())
# arm
if args.arm == "G1_29":
@ -136,28 +144,32 @@ if __name__ == '__main__':
dual_hand_data_lock = Lock()
dual_hand_state_array = Array('d', 14, lock = False) # [output] current left, right hand state(14) data.
dual_hand_action_array = Array('d', 14, lock = False) # [output] current left, right hand action(14) data.
hand_ctrl = Dex3_1_Controller(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array, simulation_mode=args.sim)
hand_ctrl = Dex3_1_Controller(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock,
dual_hand_state_array, dual_hand_action_array, simulation_mode=args.sim)
elif args.ee == "dex1":
left_gripper_value = Value('d', 0.0, lock=True) # [input]
right_gripper_value = Value('d', 0.0, lock=True) # [input]
dual_gripper_data_lock = Lock()
dual_gripper_state_array = Array('d', 2, lock=False) # current left, right gripper state(2) data.
dual_gripper_action_array = Array('d', 2, lock=False) # current left, right gripper action(2) data.
gripper_ctrl = Dex1_1_Gripper_Controller(left_gripper_value, right_gripper_value, dual_gripper_data_lock, dual_gripper_state_array, dual_gripper_action_array, simulation_mode=args.sim)
gripper_ctrl = Dex1_1_Gripper_Controller(left_gripper_value, right_gripper_value, dual_gripper_data_lock,
dual_gripper_state_array, dual_gripper_action_array, simulation_mode=args.sim)
elif args.ee == "inspire1":
left_hand_pos_array = Array('d', 75, lock = True) # [input]
right_hand_pos_array = Array('d', 75, lock = True) # [input]
dual_hand_data_lock = Lock()
dual_hand_state_array = Array('d', 12, lock = False) # [output] current left, right hand state(12) data.
dual_hand_action_array = Array('d', 12, lock = False) # [output] current left, right hand action(12) data.
hand_ctrl = Inspire_Controller(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array, simulation_mode=args.sim)
hand_ctrl = Inspire_Controller(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock,
dual_hand_state_array, dual_hand_action_array, simulation_mode=args.sim)
elif args.ee == "brainco":
left_hand_pos_array = Array('d', 75, lock = True) # [input]
right_hand_pos_array = Array('d', 75, lock = True) # [input]
dual_hand_data_lock = Lock()
dual_hand_state_array = Array('d', 12, lock = False) # [output] current left, right hand state(12) data.
dual_hand_action_array = Array('d', 12, lock = False) # [output] current left, right hand action(12) data.
hand_ctrl = Brainco_Controller(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array, simulation_mode=args.sim)
hand_ctrl = Brainco_Controller(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock,
dual_hand_state_array, dual_hand_action_array, simulation_mode=args.sim)
else:
pass
@ -195,49 +207,39 @@ if __name__ == '__main__':
sport_client.Init()
# record + headless / non-headless mode
if args.record and args.headless:
recorder = EpisodeWriter(task_dir = args.task_dir + args.task_name, task_goal = args.task_desc, frequency = args.frequency, rerun_log = False)
elif args.record and not args.headless:
recorder = EpisodeWriter(task_dir = args.task_dir + args.task_name, task_goal = args.task_desc, frequency = args.frequency, rerun_log = True)
if args.record:
recorder = EpisodeWriter(task_dir = os.path.join(args.task_dir, args.task_name),
task_goal = args.task_goal,
task_desc = args.task_desc,
task_steps = args.task_steps,
frequency = args.frequency,
rerun_log = not args.headless)
logger_mp.info("Please enter the start signal (enter 'r' to start the subsequent program)")
while not START and not STOP:
# wait for start signal. by the way, get image and send it to xr
head_img, head_img_fps = img_client.get_head_frame()
tv_wrapper.set_display_image(head_img)
READY = True # now ready to (1) enter START state
while not START and not STOP: # wait for start or stop signal.
time.sleep(0.033)
if img_client.enable_head_zmq() and xr_need_local_img:
head_img, _ = img_client.get_head_frame()
tv_wrapper.render_to_xr(head_img)
logger_mp.info("start program.")
arm_ctrl.speed_gradual_max()
# main loop. robot start to follow VR user's motion
while not STOP:
start_time = time.time()
# get image
if img_client.enable_head_zmq():
if args.record or xr_need_local_img:
head_img, head_img_fps = img_client.get_head_frame()
if img_client.has_left_wrist_cam():
if xr_need_local_img:
tv_wrapper.render_to_xr(head_img)
if img_client.enable_left_wrist_zmq():
if args.record:
left_wrist_img, _ = img_client.get_left_wrist_frame()
if img_client.has_right_wrist_cam():
if img_client.enable_right_wrist_zmq():
if args.record:
right_wrist_img, _ = img_client.get_right_wrist_frame()
# send head rimage to xr
tv_wrapper.set_display_image(head_img)
# non-headless mode
if not args.headless:
resized_image = cv2.resize(head_img, (head_img_shape[1] // 2, head_img_shape[0] // 2))
cv2.imshow("record image", resized_image)
# opencv GUI communication
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
START = False
STOP = True
if args.sim:
publish_reset_category(2, reset_pose_publisher)
elif key == ord('s'):
RECORD_TOGGLE = True
elif key == ord('a'):
if args.sim:
publish_reset_category(2, reset_pose_publisher)
# record mode
if args.record and RECORD_TOGGLE:
@ -300,7 +302,7 @@ if __name__ == '__main__':
# record data
if args.record:
RECORD_READY = recorder.is_ready()
READY = recorder.is_ready() # now ready to (2) enter RECORD_RUNNING state
# dex hand or gripper
if args.ee == "dex3" and args.xr_mode == "hand":
with dual_hand_data_lock:
@ -352,18 +354,18 @@ if __name__ == '__main__':
if RECORD_RUNNING:
colors = {}
depths = {}
if tv_binocular:
colors[f"color_{0}"] = head_img[:, :head_img_shape[1]//2]
colors[f"color_{1}"] = head_img[:, head_img_shape[1]//2:]
if img_client.has_left_wrist_cam():
if img_client.head_is_binocular():
colors[f"color_{0}"] = head_img[:, :img_client.get_head_shape()[1]//2]
colors[f"color_{1}"] = head_img[:, img_client.get_head_shape()[1]//2:]
if img_client.enable_left_wrist_zmq():
colors[f"color_{2}"] = left_wrist_img
if img_client.has_right_wrist_cam():
if img_client.enable_right_wrist_zmq():
colors[f"color_{3}"] = right_wrist_img
else:
colors[f"color_{0}"] = head_img
if img_client.has_left_wrist_cam():
if img_client.enable_left_wrist_zmq():
colors[f"color_{1}"] = left_wrist_img
if img_client.has_right_wrist_cam():
if img_client.enable_right_wrist_zmq():
colors[f"color_{2}"] = right_wrist_img
states = {
"left_arm": {

2
teleop/televuer

@ -1 +1 @@
Subproject commit cb38fa12763ab989d7c1705b9d9a4318f5391170
Subproject commit 3bc0cad0ce4407c75d0550b5cc61cea9254e0390

8
teleop/utils/episode_writer.py

@ -11,7 +11,7 @@ import logging_mp
logger_mp = logging_mp.get_logger(__name__)
class EpisodeWriter():
def __init__(self, task_dir, task_goal=None, frequency=30, image_size=[640, 480], rerun_log = True):
def __init__(self, task_dir, task_goal=None, task_desc = None, task_steps = None, frequency=30, image_size=[640, 480], rerun_log = True):
"""
image_size: [width, height]
"""
@ -24,6 +24,10 @@ class EpisodeWriter():
}
if task_goal is not None:
self.text['goal'] = task_goal
if task_desc is not None:
self.text['desc'] = task_desc
if task_steps is not None:
self.text['steps'] = task_steps
self.frequency = frequency
self.image_size = image_size
@ -37,7 +41,7 @@ class EpisodeWriter():
self.item_id = -1
self.episode_id = -1
if os.path.exists(self.task_dir):
episode_dirs = [episode_dir for episode_dir in os.listdir(self.task_dir) if 'episode_' in episode_dir]
episode_dirs = [episode_dir for episode_dir in os.listdir(self.task_dir) if 'episode_' in episode_dir and not episode_dir.endswith('.zip')]
episode_last = sorted(episode_dirs)[-1] if len(episode_dirs) > 0 else None
self.episode_id = 0 if episode_last is None else int(episode_last.split('_')[-1])
logger_mp.info(f"==> task_dir directory already exist, now self.episode_id is:{self.episode_id}\n")

45
teleop/utils/ipc.py

@ -22,12 +22,7 @@ logger_mp = logging_mp.get_logger(__name__, level=logging_mp.INFO)
3) start or stop (record toggle)
{
"reqid": unique id,
"cmd": "CMD_RECORD_TOGGLE",
"info": { # optional
"task_name": "T001",
"task_desc": "pick and place apple to basket",
"item_id": 1
}
"cmd": "CMD_RECORD_TOGGLE"
}
# Server → Client (Reply)
@ -44,7 +39,6 @@ logger_mp = logging_mp.get_logger(__name__, level=logging_mp.INFO)
"msg": "reqid not provided"
| "cmd not provided"
| "cmd not supported: {cmd}"
| "info missing keys: {missing_keys}"
| "internal error msg"
}
@ -72,19 +66,18 @@ class IPC_Server:
"CMD_RECORD_TOGGLE": "s", # start & stop (toggle record)
}
def __init__(self, on_press=None, on_info=None, get_state=None, hb_fps=10.0):
def __init__(self, on_press=None, get_state=None, hb_fps=10.0):
"""
Args:
on_press : callback(cmd:str), called for every command
on_info : callback(data:dict), only handle CMD_RECORD_TOGGLE's task info
hb_fps : heartbeat publish frequency
get_state : callback() -> dict, provides current heartbeat state
hb_fps : heartbeat publish frequency
"""
if callable(on_press):
self.on_press = on_press
else:
raise ValueError("[IPC_Server] on_press callback function must be provided")
self.on_info = on_info
if callable(get_state):
self.get_state = get_state
else:
@ -172,23 +165,6 @@ class IPC_Server:
if cmd not in self.cmd_map:
return {"repid": reqid, "status": "error", "msg": f"cmd not supported: {cmd}"}
# CMD_RECORD_TOGGLE: optional info
if cmd == "CMD_RECORD_TOGGLE":
info = msg.get("info", None)
if info:
required_keys = ["task_name", "task_desc", "item_id"]
missing_keys = [key for key in required_keys if key not in info]
if missing_keys:
return {"repid": reqid, "status": "error", "msg": f"info missing keys: {missing_keys}"}
else:
if self.on_info:
self.on_info(info)
logger_mp.debug(f"[IPC_Server] on_info called with info: {info}")
else:
logger_mp.warning("[IPC_Server] No on_info provided")
else:
logger_mp.warning("[IPC_Server] No info provided with cmd: CMD_RECORD_TOGGLE")
# supported cmd path
self.on_press(self.cmd_map[cmd])
return {"repid": reqid, "status": "ok", "msg": "ok"}
@ -232,7 +208,7 @@ class IPC_Server:
class IPC_Client:
"""
Inter - Process Communication Client:
- Send command/info via REQ
- Send command via REQ
- Subscribe heartbeat via SUB
"""
def __init__(self, hb_fps=10.0):
@ -300,7 +276,7 @@ class IPC_Client:
# ---------------------------
# Public API
# ---------------------------
def send_data(self, cmd: str, info: dict = None) -> dict:
def send_data(self, cmd: str) -> dict:
"""Send command to server and wait reply"""
reqid = self._make_reqid()
if not self.is_online():
@ -308,8 +284,6 @@ class IPC_Client:
return {"repid": reqid, "status": "error", "msg": "server offline (no heartbeat)"}
msg = {"reqid": reqid, "cmd": cmd}
if cmd == "CMD_RECORD_TOGGLE" and info:
msg["info"] = info
try:
self.req_socket.send_json(msg)
# wait up to 1s for reply
@ -374,13 +348,8 @@ if __name__ == "__main__":
logger_mp.info("Reply: %s", rep)
elif key == "s":
info = {
"task_name": "T003",
"task_desc": "pick and place pear.",
"item_id": 1,
}
logger_mp.info("⏺️ Sending record toggle command...")
rep = client.send_data("CMD_RECORD_TOGGLE", info=info) # optional info
rep = client.send_data("CMD_RECORD_TOGGLE")
logger_mp.info("Reply: %s", rep)

Loading…
Cancel
Save