Browse Source

Add integral drift correction, arm reset key, and I-term toggle

Adds a closed-loop integral (I) term to compensate for arm drift caused
by the 250Hz velocity clipper. When the user moves faster than the clip
limit, the robot falls behind; the I-term accumulates the task-space
position error (FK vs VP target) and biases future IK targets to catch up.

New features:
- --ki/--i-clamp/--i-decay CLI args for tunable gain, anti-windup, decay
- 'h' key resets arms to home position without exiting the program
- 'i' key toggles integral correction on/off mid-session
- Per-second logging of error magnitude, I accumulator, and correction
- compute_fk() method on G1_29_ArmIK for Pinocchio forward kinematics

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
main
Joe DiPrima 1 month ago
parent
commit
1a3c2d77d9
  1. 19
      teleop/robot_control/robot_arm_ik.py
  2. 87
      teleop/teleop_hand_and_arm.py

19
teleop/robot_control/robot_arm_ik.py

@ -308,7 +308,24 @@ class G1_29_ArmIK:
# return sol_q, sol_tauff # return sol_q, sol_tauff
return current_lr_arm_motor_q, np.zeros(self.reduced_robot.model.nv) return current_lr_arm_motor_q, np.zeros(self.reduced_robot.model.nv)
def compute_fk(self, current_lr_arm_q):
"""Compute forward kinematics for both end-effectors.
Args:
current_lr_arm_q: 14-dim numpy array of current joint angles
Returns:
left_ee_pos: 3-dim numpy array, position of L_ee in waist frame
right_ee_pos: 3-dim numpy array, position of R_ee in waist frame
"""
q = np.array(current_lr_arm_q)
pin.forwardKinematics(self.reduced_robot.model, self.reduced_robot.data, q)
pin.updateFramePlacements(self.reduced_robot.model, self.reduced_robot.data)
left_ee_pos = self.reduced_robot.data.oMf[self.L_hand_id].translation.copy()
right_ee_pos = self.reduced_robot.data.oMf[self.R_hand_id].translation.copy()
return left_ee_pos, right_ee_pos
class G1_23_ArmIK: class G1_23_ArmIK:
def __init__(self, Unit_Test = False, Visualization = False): def __init__(self, Unit_Test = False, Visualization = False):
np.set_printoptions(precision=5, suppress=True, linewidth=200) np.set_printoptions(precision=5, suppress=True, linewidth=200)

87
teleop/teleop_hand_and_arm.py

@ -1,5 +1,6 @@
import time import time
import argparse import argparse
import numpy as np
from multiprocessing import Value, Array, Lock from multiprocessing import Value, Array, Lock
import threading import threading
import logging_mp import logging_mp
@ -36,6 +37,8 @@ STOP = False # Enable to begin system exit procedure
READY = False # Ready to (1) enter START state, (2) enter RECORD_RUNNING state READY = False # Ready to (1) enter START state, (2) enter RECORD_RUNNING state
RECORD_RUNNING = False # True if [Recording] RECORD_RUNNING = False # True if [Recording]
RECORD_TOGGLE = False # Toggle recording state RECORD_TOGGLE = False # Toggle recording state
INTEGRAL_ENABLED = True # Toggle integral drift correction via 'i' key
RESET_ARMS = False # Trigger arm reset via 'h' key
# ------- --------- ----------- ----------- --------- # ------- --------- ----------- ----------- ---------
# state [Ready] ==> [Recording] ==> [AutoSave] --> [Ready] # state [Ready] ==> [Recording] ==> [AutoSave] --> [Ready]
# ------- --------- | ----------- | ----------- | --------- # ------- --------- | ----------- | ----------- | ---------
@ -49,7 +52,7 @@ RECORD_TOGGLE = False # Toggle recording state
# --> auto : Auto-transition after saving data. # --> auto : Auto-transition after saving data.
def on_press(key): def on_press(key):
global STOP, START, RECORD_TOGGLE
global STOP, START, RECORD_TOGGLE, INTEGRAL_ENABLED, RESET_ARMS
if key == 'r': if key == 'r':
START = True START = True
elif key == 'q': elif key == 'q':
@ -57,6 +60,12 @@ def on_press(key):
STOP = True STOP = True
elif key == 's' and START == True: elif key == 's' and START == True:
RECORD_TOGGLE = True RECORD_TOGGLE = True
elif key == 'i':
INTEGRAL_ENABLED = not INTEGRAL_ENABLED
logger_mp.info(f"[on_press] Integral correction {'ENABLED' if INTEGRAL_ENABLED else 'DISABLED'}")
elif key == 'h' and START == True:
RESET_ARMS = True
logger_mp.info("[on_press] Arm reset requested (return to home)")
else: else:
logger_mp.warning(f"[on_press] {key} was pressed, but no action is defined for this key.") logger_mp.warning(f"[on_press] {key} was pressed, but no action is defined for this key.")
@ -86,6 +95,10 @@ if __name__ == '__main__':
parser.add_argument('--sim', action = 'store_true', help = 'Enable isaac simulation mode') parser.add_argument('--sim', action = 'store_true', help = 'Enable isaac simulation mode')
parser.add_argument('--ipc', action = 'store_true', help = 'Enable IPC server to handle input; otherwise enable sshkeyboard') parser.add_argument('--ipc', action = 'store_true', help = 'Enable IPC server to handle input; otherwise enable sshkeyboard')
parser.add_argument('--affinity', action = 'store_true', help = 'Enable high priority and set CPU affinity mode') parser.add_argument('--affinity', action = 'store_true', help = 'Enable high priority and set CPU affinity mode')
# integral drift correction
parser.add_argument('--ki', type=float, default=0.3, help='Integral gain for drift correction (0.0 = disabled)')
parser.add_argument('--i-clamp', type=float, default=0.10, help='Anti-windup clamp for I term in meters (default: 0.10)')
parser.add_argument('--i-decay', type=float, default=0.995, help='Per-frame decay factor for I term (default: 0.995)')
# record mode and task info # record mode and task info
parser.add_argument('--record', action = 'store_true', help = 'Enable data recording mode') parser.add_argument('--record', action = 'store_true', help = 'Enable data recording mode')
parser.add_argument('--task-dir', type = str, default = './utils/data/', help = 'path to save data') parser.add_argument('--task-dir', type = str, default = './utils/data/', help = 'path to save data')
@ -245,6 +258,11 @@ if __name__ == '__main__':
logger_mp.info("🟡 Press [s] to START or SAVE recording (toggle cycle).") logger_mp.info("🟡 Press [s] to START or SAVE recording (toggle cycle).")
else: else:
logger_mp.info("🔵 Recording is DISABLED (run with --record to enable).") logger_mp.info("🔵 Recording is DISABLED (run with --record to enable).")
logger_mp.info("🏠 Press [h] to reset arms to home position (without exiting).")
if args.ki > 0.0:
logger_mp.info(f"🔧 Press [i] to toggle integral drift correction (Ki={args.ki}, clamp={args.i_clamp}m, decay={args.i_decay})")
else:
logger_mp.info("⚪ Integral correction DISABLED (--ki=0.0)")
logger_mp.info("🔴 Press [q] to stop and exit the program.") logger_mp.info("🔴 Press [q] to stop and exit the program.")
logger_mp.info("⚠️ IMPORTANT: Please keep your distance and stay safe.") logger_mp.info("⚠️ IMPORTANT: Please keep your distance and stay safe.")
READY = True # now ready to (1) enter START state READY = True # now ready to (1) enter START state
@ -256,9 +274,34 @@ if __name__ == '__main__':
logger_mp.info("---------------------🚀start Tracking🚀-------------------------") logger_mp.info("---------------------🚀start Tracking🚀-------------------------")
arm_ctrl.speed_gradual_max() arm_ctrl.speed_gradual_max()
# Integral drift correction state
i_error_left = np.zeros(3) # Accumulated task-space position error (meters)
i_error_right = np.zeros(3)
last_loop_time = time.time()
i_frame_count = 0
# main loop. robot start to follow VR user's motion # main loop. robot start to follow VR user's motion
while not STOP: while not STOP:
start_time = time.time() start_time = time.time()
# handle arm reset request
if RESET_ARMS:
RESET_ARMS = False
logger_mp.info("[main] Resetting arms to home position...")
i_error_left = np.zeros(3)
i_error_right = np.zeros(3)
arm_ctrl.ctrl_dual_arm(np.zeros(14), np.zeros(14))
reset_start = time.time()
while time.time() - reset_start < 3.0:
if np.all(np.abs(arm_ctrl.get_current_dual_arm_q()) < 0.1):
break
time.sleep(0.05)
logger_mp.info("[main] Arms reset complete, resuming tracking")
arm_ctrl.speed_gradual_max()
last_loop_time = time.time()
continue
# get image # get image
if camera_config['head_camera']['enable_zmq']: if camera_config['head_camera']['enable_zmq']:
if args.record or xr_need_local_img: if args.record or xr_need_local_img:
@ -324,9 +367,47 @@ if __name__ == '__main__':
current_lr_arm_q = arm_ctrl.get_current_dual_arm_q() current_lr_arm_q = arm_ctrl.get_current_dual_arm_q()
current_lr_arm_dq = arm_ctrl.get_current_dual_arm_dq() current_lr_arm_dq = arm_ctrl.get_current_dual_arm_dq()
# solve ik using motor data and wrist pose, then use ik results to control arms.
# --- Integral drift correction ---
now = time.time()
dt = min(now - last_loop_time, 0.1) # clamp to prevent huge jumps after stalls
last_loop_time = now
left_wrist_adjusted = tele_data.left_wrist_pose.copy()
right_wrist_adjusted = tele_data.right_wrist_pose.copy()
if args.ki > 0.0 and INTEGRAL_ENABLED:
# FK: where are the robot hands actually?
left_ee_pos, right_ee_pos = arm_ik.compute_fk(current_lr_arm_q)
# Error: target - actual (task-space position)
left_error = tele_data.left_wrist_pose[:3, 3] - left_ee_pos
right_error = tele_data.right_wrist_pose[:3, 3] - right_ee_pos
# Leaky integrator with anti-windup
i_error_left = i_error_left * args.i_decay + left_error * dt
i_error_right = i_error_right * args.i_decay + right_error * dt
i_error_left = np.clip(i_error_left, -args.i_clamp, args.i_clamp)
i_error_right = np.clip(i_error_right, -args.i_clamp, args.i_clamp)
# Bias IK target position (rotation unchanged)
left_wrist_adjusted[:3, 3] += args.ki * i_error_left
right_wrist_adjusted[:3, 3] += args.ki * i_error_right
# Log every ~1 second (every 30 frames at 30Hz)
i_frame_count += 1
if i_frame_count % 30 == 0:
logger_mp.info(
f"[I-term] L_err={np.linalg.norm(left_error):.4f}m "
f"R_err={np.linalg.norm(right_error):.4f}m | "
f"L_I={np.linalg.norm(i_error_left):.4f} "
f"R_I={np.linalg.norm(i_error_right):.4f} | "
f"L_corr={np.linalg.norm(args.ki * i_error_left):.4f}m "
f"R_corr={np.linalg.norm(args.ki * i_error_right):.4f}m"
)
# solve ik using motor data and adjusted wrist pose
time_ik_start = time.time() time_ik_start = time.time()
sol_q, sol_tauff = arm_ik.solve_ik(tele_data.left_wrist_pose, tele_data.right_wrist_pose, current_lr_arm_q, current_lr_arm_dq)
sol_q, sol_tauff = arm_ik.solve_ik(left_wrist_adjusted, right_wrist_adjusted, current_lr_arm_q, current_lr_arm_dq)
time_ik_end = time.time() time_ik_end = time.time()
logger_mp.debug(f"ik:\t{round(time_ik_end - time_ik_start, 6)}") logger_mp.debug(f"ik:\t{round(time_ik_end - time_ik_start, 6)}")
arm_ctrl.ctrl_dual_arm(sol_q, sol_tauff) arm_ctrl.ctrl_dual_arm(sol_q, sol_tauff)

Loading…
Cancel
Save