#!/usr/bin/env python3 """Quick DDS connectivity test for G1 robot. Run this on the GB10 after connecting the robot to the network. Usage: python3 dds_test.py [interface_name] """ import sys import time import os import subprocess import socket import struct def main(): # Auto-detect or use provided interface iface = sys.argv[1] if len(sys.argv) > 1 else None if iface is None: result = subprocess.run(['ip', '-o', '-4', 'addr', 'show'], capture_output=True, text=True) for line in result.stdout.splitlines(): if '192.168.123' in line: iface = line.split()[1] break if iface is None: print('ERROR: No interface with 192.168.123.x found. Add one with:') print(' sudo ip addr add 192.168.123.100/24 dev ') sys.exit(1) # Get our IP on that interface result = subprocess.run(['ip', '-o', '-4', 'addr', 'show', iface], capture_output=True, text=True) our_ip = None for word in result.stdout.split(): if '192.168.123' in word: our_ip = word.split('/')[0] break print(f'[1/4] Interface: {iface} ({our_ip})', flush=True) # Step 1: Ping test print('[2/4] Ping test...', flush=True) for ip in ['192.168.123.161', '192.168.123.164']: r = subprocess.run(['ping', '-c', '1', '-W', '1', ip], capture_output=True, text=True) status = 'OK' if r.returncode == 0 else 'FAIL' print(f' {ip}: {status}', flush=True) # Step 2: Multicast receive test print('[3/4] DDS multicast discovery (5s)...', flush=True) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind(('', 7400)) mreq = struct.pack('4s4s', socket.inet_aton('239.255.0.1'), socket.inet_aton(our_ip)) sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) sock.settimeout(1.0) count = 0 sources = set() start = time.time() while time.time() - start < 5: try: data, addr = sock.recvfrom(4096) count += 1 sources.add(addr[0]) except socket.timeout: pass sock.close() robot_sources = {s for s in sources if s.startswith('192.168.123')} print(f' {count} packets from {sources}', flush=True) if robot_sources: print(f' Robot DDS discovered: {robot_sources}', flush=True) else: print(' WARNING: No DDS multicast from robot!', flush=True) print(' Check: sudo ufw disable OR sudo ufw allow from 192.168.123.0/24', flush=True) sys.exit(1) # Step 3: SDK subscriber test print('[4/4] Unitree SDK rt/lowstate test (8s)...', flush=True) try: sys.path.insert(0, '/home/mitchaiet/GR00T-WholeBodyControl/.venv/lib/python3.12/site-packages') from unitree_sdk2py.core.channel import ChannelFactoryInitialize, ChannelSubscriber from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowState_ ChannelFactoryInitialize(0, iface) sub = ChannelSubscriber('rt/lowstate', LowState_) sub.Init() for i in range(16): msg = sub.Read() if msg is not None: gyro = msg.imu_state.gyroscope print(f' GOT rt/lowstate! IMU gyro: [{gyro[0]:.4f}, {gyro[1]:.4f}, {gyro[2]:.4f}]', flush=True) print(f' Motor states: {len(msg.motor_state)} joints', flush=True) print('\n=== ALL TESTS PASSED ===', flush=True) os._exit(0) time.sleep(0.5) print(' FAIL: No data on rt/lowstate after 8s', flush=True) print(' DDS discovery works but topic data not received', flush=True) except Exception as e: print(f' SDK error: {e}', flush=True) os._exit(1) if __name__ == '__main__': main()