From 0ad5732966e8c40efd142caf0aa3c35ee9aa6a48 Mon Sep 17 00:00:00 2001 From: "wei.li" Date: Mon, 23 Jun 2025 20:55:38 +0800 Subject: [PATCH] [update] Add DDS communication support for simulation object position resetting --- teleop/teleop_hand_and_arm.py | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/teleop/teleop_hand_and_arm.py b/teleop/teleop_hand_and_arm.py index 461b9f9..e6449f0 100644 --- a/teleop/teleop_hand_and_arm.py +++ b/teleop/teleop_hand_and_arm.py @@ -21,8 +21,22 @@ from teleop.robot_control.robot_hand_unitree import Dex3_1_Controller, Gripper_C from teleop.robot_control.robot_hand_inspire import Inspire_Controller from teleop.image_server.image_client import ImageClient from teleop.utils.episode_writer import EpisodeWriter + +from unitree_sdk2py.core.channel import ChannelPublisher, ChannelFactoryInitialize +from unitree_sdk2py.idl.std_msgs.msg.dds_ import String_ + + from sshkeyboard import listen_keyboard, stop_listening +def publish_reset_category(category: int,publisher): + # construct message + msg = String_(data=str(category)) # pass data parameter directly during initialization + + # create publisher + + # publish message + publisher.Write(msg) + print(f"published reset category: {category}") start_signal = False running = True @@ -47,14 +61,14 @@ if __name__ == '__main__': parser.add_argument('--xr-mode', type=str, choices=['hand', 'controller'], default='hand', help='Select XR device tracking source') parser.add_argument('--arm', type=str, choices=['G1_29', 'G1_23', 'H1_2', 'H1'], default='G1_29', help='Select arm controller') - parser.add_argument('--ee', type=str, choices=['dex3', 'gripper', 'inspire1'], help='Select end effector controller') + parser.add_argument('--ee', type=str, choices=['dex3', 'gripper', 'inspire1'],default='gripper', help='Select end effector controller') parser.add_argument('--record', action = 'store_true', help = 'Enable data recording') parser.add_argument('--debug', action = 'store_false', help = 'Enable debug mode') parser.add_argument('--headless', action='store_true', help='Run in headless mode (no display)') parser.add_argument('--is_use_sim', action = 'store_true', help = 'Use simulation or not') - parser.set_defaults(is_use_sim = False) + parser.set_defaults(is_use_sim = True) args = parser.parse_args() logger_mp.info(f"args: {args}") @@ -155,7 +169,9 @@ if __name__ == '__main__': hand_ctrl = Inspire_Controller(left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array) else: pass - + if args.is_use_sim: + reset_pose_publisher = ChannelPublisher("rt/reset_pose/cmd", String_) + reset_pose_publisher.Init() # xr mode if args.xr_mode == 'controller' and not args.debug: from unitree_sdk2py.g1.loco.g1_loco_client import LocoClient @@ -183,8 +199,13 @@ if __name__ == '__main__': if key == ord('q'): stop_listening() running = False + if args.is_use_sim: + publish_reset_category(2, reset_pose_publisher) elif key == ord('s'): should_toggle_recording = True + elif key == ord('a'): + if args.is_use_sim: + publish_reset_category(2, reset_pose_publisher) if args.record and should_toggle_recording: should_toggle_recording = False @@ -196,7 +217,8 @@ if __name__ == '__main__': else: is_recording = False recorder.save_episode() - + if args.is_use_sim: + publish_reset_category(1, reset_pose_publisher) # get input data tele_data = tv_wrapper.get_motion_state_data() if (args.ee == 'dex3' or args.ee == 'inspire1') and args.xr_mode == 'hand':