Browse Source

Route R3 controller buttons to teleop commands

Map physical R3 buttons (A/B/X/Y/Start) to teleop actions so the
operator can control tracking, arm reset, I-term toggle, and recording
without needing a separate SSH keyboard session.

- Store wireless_remote bytes from rt/lowstate in G1_29_LowState
- Add get_wireless_remote() accessor to arm controller
- Parse button bitmasks with edge detection (fire on 0→1 only)
- A=start, B=stop, X=reset arms, Y=toggle I-term, Start=toggle recording
- Buttons work in both pre-tracking wait loop and main control loop
- Keyboard shortcuts (r/q/h/i/s) still work alongside R3 buttons

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
main
Joe DiPrima 1 month ago
parent
commit
47b5c2d957
  1. 11
      teleop/robot_control/robot_arm.py
  2. 65
      teleop/teleop_hand_and_arm.py

11
teleop/robot_control/robot_arm.py

@ -32,6 +32,7 @@ class MotorState:
class G1_29_LowState: class G1_29_LowState:
def __init__(self): def __init__(self):
self.motor_state = [MotorState() for _ in range(G1_29_Num_Motors)] self.motor_state = [MotorState() for _ in range(G1_29_Num_Motors)]
self.wireless_remote = bytes(40)
class G1_23_LowState: class G1_23_LowState:
def __init__(self): def __init__(self):
@ -146,6 +147,7 @@ class G1_29_ArmController:
for id in range(G1_29_Num_Motors): for id in range(G1_29_Num_Motors):
lowstate.motor_state[id].q = msg.motor_state[id].q lowstate.motor_state[id].q = msg.motor_state[id].q
lowstate.motor_state[id].dq = msg.motor_state[id].dq lowstate.motor_state[id].dq = msg.motor_state[id].dq
lowstate.wireless_remote = bytes(msg.wireless_remote)
self.lowstate_buffer.SetData(lowstate) self.lowstate_buffer.SetData(lowstate)
time.sleep(0.002) time.sleep(0.002)
@ -212,7 +214,14 @@ class G1_29_ArmController:
def get_current_dual_arm_dq(self): def get_current_dual_arm_dq(self):
'''Return current state dq of the left and right arm motors.''' '''Return current state dq of the left and right arm motors.'''
return np.array([self.lowstate_buffer.GetData().motor_state[id].dq for id in G1_29_JointArmIndex]) return np.array([self.lowstate_buffer.GetData().motor_state[id].dq for id in G1_29_JointArmIndex])
def get_wireless_remote(self):
'''Return raw wireless_remote bytes (40 bytes) from latest lowstate.'''
data = self.lowstate_buffer.GetData()
if data is not None:
return data.wireless_remote
return bytes(40)
def ctrl_dual_arm_go_home(self): def ctrl_dual_arm_go_home(self):
'''Move both the left and right arms of the robot to their home position by setting the target joint angles (q) and torques (tau) to zero.''' '''Move both the left and right arms of the robot to their home position by setting the target joint angles (q) and torques (tau) to zero.'''
logger_mp.info("[G1_29_ArmController] ctrl_dual_arm_go_home start...") logger_mp.info("[G1_29_ArmController] ctrl_dual_arm_go_home start...")

65
teleop/teleop_hand_and_arm.py

@ -1,5 +1,6 @@
import time import time
import argparse import argparse
import struct
import numpy as np import numpy as np
from multiprocessing import Value, Array, Lock from multiprocessing import Value, Array, Lock
import threading import threading
@ -51,6 +52,23 @@ RESET_ARMS = False # Trigger arm reset via 'h' key
# ==> manual: when READY is True, set RECORD_TOGGLE=True to transition. # ==> manual: when READY is True, set RECORD_TOGGLE=True to transition.
# --> auto : Auto-transition after saving data. # --> auto : Auto-transition after saving data.
def parse_r3_buttons(data):
"""Parse R3 controller button bytes from wireless_remote. Returns dict of button states."""
if len(data) < 4:
return {}
btn1 = data[2] # R1=0x01, L1=0x02, Start=0x04, Select=0x08, R2=0x10, L2=0x20
btn2 = data[3] # A=0x01, B=0x02, X=0x04, Y=0x08, Up=0x10, Right=0x20, Down=0x40, Left=0x80
return {
'A': bool(btn2 & 0x01),
'B': bool(btn2 & 0x02),
'X': bool(btn2 & 0x04),
'Y': bool(btn2 & 0x08),
'Start': bool(btn1 & 0x04),
}
# Previous button state for edge detection
_r3_prev_buttons = {}
def on_press(key): def on_press(key):
global STOP, START, RECORD_TOGGLE, INTEGRAL_ENABLED, RESET_ARMS global STOP, START, RECORD_TOGGLE, INTEGRAL_ENABLED, RESET_ARMS
if key == 'r': if key == 'r':
@ -253,21 +271,28 @@ if __name__ == '__main__':
rerun_log = not args.headless) rerun_log = not args.headless)
logger_mp.info("----------------------------------------------------------------") logger_mp.info("----------------------------------------------------------------")
logger_mp.info("🟢 Press [r] to start syncing the robot with your movements.")
logger_mp.info(" Keyboard | R3 Controller | Action")
logger_mp.info(" --------- --------------- ------")
logger_mp.info(" [r] | [A] | Start arm tracking")
logger_mp.info(" [q] | [B] | Stop and exit")
logger_mp.info(" [h] | [X] | Reset arms to home")
logger_mp.info(" [i] | [Y] | Toggle I-term correction")
if args.record: if args.record:
logger_mp.info("🟡 Press [s] to START or SAVE recording (toggle cycle).")
else:
logger_mp.info("🔵 Recording is DISABLED (run with --record to enable).")
logger_mp.info("🏠 Press [h] to reset arms to home position (without exiting).")
logger_mp.info(" [s] | [Start] | Toggle recording")
if args.ki > 0.0: if args.ki > 0.0:
logger_mp.info(f"🔧 Press [i] to toggle integral drift correction (Ki={args.ki}, clamp={args.i_clamp}m, decay={args.i_decay})")
else:
logger_mp.info("⚪ Integral correction DISABLED (--ki=0.0)")
logger_mp.info("🔴 Press [q] to stop and exit the program.")
logger_mp.info(f" I-term: Ki={args.ki}, clamp={args.i_clamp}m, decay={args.i_decay}")
logger_mp.info("----------------------------------------------------------------")
logger_mp.info("⚠️ IMPORTANT: Please keep your distance and stay safe.") logger_mp.info("⚠️ IMPORTANT: Please keep your distance and stay safe.")
READY = True # now ready to (1) enter START state READY = True # now ready to (1) enter START state
while not START and not STOP: # wait for start or stop signal. while not START and not STOP: # wait for start or stop signal.
time.sleep(0.033) time.sleep(0.033)
# Poll R3 A button in wait loop (edge-detected)
r3_data = arm_ctrl.get_wireless_remote()
r3_btns = parse_r3_buttons(r3_data)
if r3_btns.get('A') and not _r3_prev_buttons.get('A'):
START = True
logger_mp.info("[R3] A pressed → START tracking")
_r3_prev_buttons = r3_btns
if camera_config['head_camera']['enable_zmq'] and xr_need_local_img: if camera_config['head_camera']['enable_zmq'] and xr_need_local_img:
head_img = img_client.get_head_frame() head_img = img_client.get_head_frame()
tv_wrapper.render_to_xr(head_img) tv_wrapper.render_to_xr(head_img)
@ -302,6 +327,28 @@ if __name__ == '__main__':
last_loop_time = time.time() last_loop_time = time.time()
continue continue
# Poll R3 controller buttons (edge-detected)
r3_data = arm_ctrl.get_wireless_remote()
r3_btns = parse_r3_buttons(r3_data)
if r3_btns.get('A') and not _r3_prev_buttons.get('A'):
if not START:
START = True
logger_mp.info("[R3] A pressed → START tracking")
if r3_btns.get('B') and not _r3_prev_buttons.get('B'):
START = False
STOP = True
logger_mp.info("[R3] B pressed → STOP")
if r3_btns.get('X') and not _r3_prev_buttons.get('X') and START:
RESET_ARMS = True
logger_mp.info("[R3] X pressed → Reset arms")
if r3_btns.get('Y') and not _r3_prev_buttons.get('Y'):
INTEGRAL_ENABLED = not INTEGRAL_ENABLED
logger_mp.info(f"[R3] Y pressed → Integral {'ENABLED' if INTEGRAL_ENABLED else 'DISABLED'}")
if r3_btns.get('Start') and not _r3_prev_buttons.get('Start') and START:
RECORD_TOGGLE = True
logger_mp.info("[R3] Start pressed → Toggle recording")
_r3_prev_buttons = r3_btns
# get image # get image
if camera_config['head_camera']['enable_zmq']: if camera_config['head_camera']['enable_zmq']:
if args.record or xr_need_local_img: if args.record or xr_need_local_img:

Loading…
Cancel
Save