You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
112 lines
3.8 KiB
112 lines
3.8 KiB
"""Diagnose R3 controller button bitmask positions via wireless_remote in rt/lowstate.
|
|
|
|
Tries different network interfaces and LowState types to find where
|
|
wireless_remote data flows.
|
|
"""
|
|
import paramiko, sys, time
|
|
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
|
|
|
|
ssh = paramiko.SSHClient()
|
|
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
|
ssh.connect('10.0.0.64', username='unitree', password='123',
|
|
timeout=15, look_for_keys=False, allow_agent=False)
|
|
|
|
# Deploy diagnostic script
|
|
script = r'''
|
|
import sys, time, struct
|
|
sys.path.insert(0, "/home/unitree/miniforge3/envs/tv/lib/python3.11/site-packages")
|
|
from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize
|
|
from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowState_ as hg_LowState
|
|
|
|
# Try different interfaces
|
|
import subprocess
|
|
result = subprocess.run(['ip', 'link', 'show'], capture_output=True, text=True)
|
|
print("=== Network interfaces ===")
|
|
for line in result.stdout.split('\n'):
|
|
if ': ' in line and 'state' in line:
|
|
print(f" {line.strip()}")
|
|
|
|
# Use eth0 which connects to the internal bus
|
|
iface = "eth0"
|
|
print(f"\nUsing interface: {iface}")
|
|
|
|
ChannelFactoryInitialize(0, iface)
|
|
|
|
count = [0]
|
|
last_wr = [None]
|
|
|
|
def handler(msg):
|
|
count[0] += 1
|
|
# Check if wireless_remote exists and has data
|
|
try:
|
|
wr = msg.wireless_remote
|
|
data = bytes(wr)
|
|
|
|
# Only print if changed or first time or periodic
|
|
if data != last_wr[0] or count[0] % 200 == 1:
|
|
btn1 = data[2] if len(data) > 2 else 0
|
|
btn2 = data[3] if len(data) > 3 else 0
|
|
|
|
btn1_names = []
|
|
if btn1 & 0x01: btn1_names.append("R1")
|
|
if btn1 & 0x02: btn1_names.append("L1")
|
|
if btn1 & 0x04: btn1_names.append("Start")
|
|
if btn1 & 0x08: btn1_names.append("Select")
|
|
if btn1 & 0x10: btn1_names.append("R2")
|
|
if btn1 & 0x20: btn1_names.append("L2")
|
|
if btn1 & 0x40: btn1_names.append("F1")
|
|
if btn1 & 0x80: btn1_names.append("F3")
|
|
|
|
btn2_names = []
|
|
if btn2 & 0x01: btn2_names.append("A")
|
|
if btn2 & 0x02: btn2_names.append("B")
|
|
if btn2 & 0x04: btn2_names.append("X")
|
|
if btn2 & 0x08: btn2_names.append("Y")
|
|
if btn2 & 0x10: btn2_names.append("Up")
|
|
if btn2 & 0x20: btn2_names.append("Right")
|
|
if btn2 & 0x40: btn2_names.append("Down")
|
|
if btn2 & 0x80: btn2_names.append("Left")
|
|
|
|
any_nonzero = any(b != 0 for b in data[:4])
|
|
ts = time.strftime('%H:%M:%S')
|
|
print(f"[{ts}] #{count[0]:4d} len={len(data)} "
|
|
f"btn1=0x{btn1:02x}[{','.join(btn1_names) or '-'}] "
|
|
f"btn2=0x{btn2:02x}[{','.join(btn2_names) or '-'}] "
|
|
f"raw={data[:8].hex()} "
|
|
f"{'*** BUTTON ***' if any_nonzero else ''}", flush=True)
|
|
last_wr[0] = data
|
|
except Exception as e:
|
|
if count[0] <= 3:
|
|
print(f"Error reading wireless_remote: {e}", flush=True)
|
|
|
|
sub = ChannelSubscriber("rt/lowstate", hg_LowState)
|
|
sub.Init(handler, 10)
|
|
|
|
print("\n=== R3 Button Diagnostic (hg_LowState on eth0) ===", flush=True)
|
|
print("Press buttons on R3 controller. Running 15 seconds...\n", flush=True)
|
|
|
|
time.sleep(15)
|
|
print(f"\nTotal messages received: {count[0]}", flush=True)
|
|
'''
|
|
|
|
print("Deploying R3 button diagnostic to robot...")
|
|
sftp = ssh.open_sftp()
|
|
with sftp.file('/tmp/r3_diag.py', 'w') as f:
|
|
f.write(script)
|
|
sftp.close()
|
|
print("Deployed.")
|
|
|
|
print("\n=== Running diagnostic (15 seconds) ===")
|
|
print("Press each R3 button now!\n")
|
|
|
|
_, o, e = ssh.exec_command(
|
|
'/home/unitree/miniforge3/envs/tv/bin/python /tmp/r3_diag.py',
|
|
timeout=30)
|
|
|
|
output = o.read().decode('utf-8', errors='replace')
|
|
print(output)
|
|
err = e.read().decode('utf-8', errors='replace')
|
|
if err:
|
|
print(f"Stderr: {err}")
|
|
|
|
ssh.close()
|