From 66e1b7a51a17bb420503e9f86d9f46c993b295f9 Mon Sep 17 00:00:00 2001 From: silencht Date: Fri, 9 Aug 2024 11:48:28 +0800 Subject: [PATCH] [detele] legacy:robot_arm_tem [fix] dds:robot_arm --- teleop/robot_control/robot_arm.py | 144 +++++++----- teleop/robot_control/robot_arm_tem.py | 316 -------------------------- teleop/teleop_hand_and_arm.py | 6 +- 3 files changed, 87 insertions(+), 379 deletions(-) delete mode 100644 teleop/robot_control/robot_arm_tem.py diff --git a/teleop/robot_control/robot_arm.py b/teleop/robot_control/robot_arm.py index 3b8fb02..6751601 100644 --- a/teleop/robot_control/robot_arm.py +++ b/teleop/robot_control/robot_arm.py @@ -2,7 +2,6 @@ import numpy as np import threading import time -# from unitree_dds_wrapper.idl import unitree_hx as unitree_hg # use to test from unitree_dds_wrapper.idl import unitree_hg from unitree_dds_wrapper.publisher import Publisher from unitree_dds_wrapper.subscription import Subscription @@ -14,9 +13,7 @@ import copy kTopicLowCommand = "rt/lowcmd" kTopicLowState = "rt/lowstate" -# kTopicLowCommand = "rt/lowcmd_hx" # use to test -# kTopicLowState = "rt/lowstate_hx" # use to test -kNumMotors = 30 +kNumMotors = 35 @@ -59,9 +56,7 @@ class H1ArmController: self.q_desList = np.zeros(kNumMotors) self.q_tau_ff = np.zeros(kNumMotors) self.msg = unitree_hg.msg.dds_.LowCmd_() - self.__packCRCformat = '<2B2x2I' \ - + 'B3x5f3I' * len(self.msg.motor_cmd) \ - + '41B104B3x3I' + self.__packFmtHGLowCmd = '<2B2x' + 'B3x5fI' * 35 + '5I' self.msg.head = [0xFE, 0xEF] self.lowcmd_publisher = Publisher(unitree_hg.msg.dds_.LowCmd_, kTopicLowCommand) @@ -98,7 +93,6 @@ class H1ArmController: self.msg.motor_cmd[id].q = self.lowstate_subscriber.msg.motor_state[id].q self.q_target.append(self.msg.motor_cmd[id].q) print(f"Init q_pose is :{self.q_target}") - duration = 1000 init_q = np.array([self.lowstate_subscriber.msg.motor_state[id].q for id in JointIndex]) print("Lock Leg...") @@ -135,42 +129,65 @@ class H1ArmController: self.q_desList = q_desList self.q_tau_ff = q_tau_ff + def __Trans(self, packData): + calcData = [] + calcLen = ((len(packData)>>2)-1) + + for i in range(calcLen): + d = ((packData[i*4+3] << 24) | (packData[i*4+2] << 16) | (packData[i*4+1] << 8) | (packData[i*4])) + calcData.append(d) + + return calcData + + def __Crc32(self, data): + bit = 0 + crc = 0xFFFFFFFF + polynomial = 0x04c11db7 + + for i in range(len(data)): + bit = 1 << 31 + current = data[i] + + for b in range(32): + if crc & 0x80000000: + crc = (crc << 1) & 0xFFFFFFFF + crc ^= polynomial + else: + crc = (crc << 1) & 0xFFFFFFFF + + if current & bit: + crc ^= polynomial + + bit >>= 1 + + return crc + def pre_communication(self): self.__pack_crc() + def __pack_crc(self): - rawdata = [] - rawdata.extend(self.msg.head) - rawdata.extend(self.msg.version) - for i in range(len(self.msg.motor_cmd)): - rawdata.append(self.msg.motor_cmd[i].mode) - rawdata.append(self.msg.motor_cmd[i].q) - rawdata.append(self.msg.motor_cmd[i].dq) - rawdata.append(self.msg.motor_cmd[i].tau) - rawdata.append(self.msg.motor_cmd[i].kp) - rawdata.append(self.msg.motor_cmd[i].kd) - rawdata.extend(self.msg.motor_cmd[i].reserve) - rawdata.append(self.msg.bms_cmd.cmd) - rawdata.extend(self.msg.bms_cmd.reserve) - rawdata.extend(self.msg.led_cmd) - rawdata.extend(self.msg.fan_cmd) - rawdata.extend(self.msg.cmd) - rawdata.extend(self.msg.data) - rawdata.extend(self.msg.reserve) - rawdata.append(self.msg.crc) - - packdata = struct.pack(self.__packCRCformat, *rawdata) - calcdata = [] - calclen = (len(packdata)>>2)-1 - for i in range(calclen): - d = ((packdata[i*4+3] << 24) | (packdata[i*4+2] << 16) | (packdata[i*4+1] << 8) | (packdata[i*4])) - calcdata.append(d) - - self.msg.crc = crc32(calcdata) + origData = [] + origData.append(self.msg.mode_pr) + origData.append(self.msg.mode_machine) + + for i in range(35): + origData.append(self.msg.motor_cmd[i].mode) + origData.append(self.msg.motor_cmd[i].q) + origData.append(self.msg.motor_cmd[i].dq) + origData.append(self.msg.motor_cmd[i].tau) + origData.append(self.msg.motor_cmd[i].kp) + origData.append(self.msg.motor_cmd[i].kd) + origData.append(self.msg.motor_cmd[i].reserve) + + origData.extend(self.msg.reserve) + origData.append(self.msg.crc) + calcdata = struct.pack(self.__packFmtHGLowCmd, *origData) + calcdata = self.__Trans(calcdata) + self.msg.crc = self.__Crc32(calcdata) def LowCommandWriter(self): while True: mc_tmp_ptr = self.motor_command_buffer.GetData() - # rawdata.extend(self.msg.version) if mc_tmp_ptr: for i in JointArmIndex: self.msg.motor_cmd[i].tau = mc_tmp_ptr.tau_ff[i] @@ -242,46 +259,48 @@ class H1ArmController: weak_motors = [ JointIndex.kLeftAnkle, JointIndex.kRightAnkle, - JointIndex.kRightShoulderPitch, - JointIndex.kRightShoulderRoll, - JointIndex.kRightShoulderYaw, - JointIndex.kRightElbow, + # Left arm JointIndex.kLeftShoulderPitch, JointIndex.kLeftShoulderRoll, JointIndex.kLeftShoulderYaw, - JointIndex.kLeftElbow + JointIndex.kLeftElbowPitch, + # Right arm + JointIndex.kRightShoulderPitch, + JointIndex.kRightShoulderRoll, + JointIndex.kRightShoulderYaw, + JointIndex.kRightElbowPitch, ] return motor_index in weak_motors def IsWristMotor(self, motor_index): - weak_motors = [ - JointIndex.kLeftWristYaw, + wrist_motors = [ + JointIndex.kLeftElbowRoll, JointIndex.kLeftWristPitch, - JointIndex.kLeftWristRoll, - JointIndex.kRightWristYaw, + JointIndex.kLeftWristyaw, + JointIndex.kRightElbowRoll, JointIndex.kRightWristPitch, - JointIndex.kRightWristRoll + JointIndex.kRightWristYaw, ] - return motor_index in weak_motors + return motor_index in wrist_motors class JointArmIndex(IntEnum): # Left arm kLeftShoulderPitch = 13 kLeftShoulderRoll = 14 kLeftShoulderYaw = 15 - kLeftElbow = 16 - kLeftWristYaw = 17 + kLeftElbowPitch = 16 + kLeftElbowRoll = 17 kLeftWristPitch = 18 - kLeftWristRoll = 19 + kLeftWristyaw = 19 # Right arm kRightShoulderPitch = 20 kRightShoulderRoll = 21 kRightShoulderYaw = 22 - kRightElbow = 23 - kRightWristYaw = 24 + kRightElbowPitch = 23 + kRightElbowRoll = 24 kRightWristPitch = 25 - kRightWristRoll = 26 + kRightWristYaw = 26 class JointIndex(IntEnum): # Left leg @@ -306,21 +325,26 @@ class JointIndex(IntEnum): kLeftShoulderPitch = 13 kLeftShoulderRoll = 14 kLeftShoulderYaw = 15 - kLeftElbow = 16 - kLeftWristYaw = 17 + kLeftElbowPitch = 16 + kLeftElbowRoll = 17 kLeftWristPitch = 18 - kLeftWristRoll = 19 + kLeftWristyaw = 19 # Right arm kRightShoulderPitch = 20 kRightShoulderRoll = 21 kRightShoulderYaw = 22 - kRightElbow = 23 - kRightWristYaw = 24 + kRightElbowPitch = 23 + kRightElbowRoll = 24 kRightWristPitch = 25 - kRightWristRoll = 26 + kRightWristYaw = 26 kNotUsedJoint = 27 kNotUsedJoint1 = 28 kNotUsedJoint2 = 29 + kNotUsedJoint3 = 30 + kNotUsedJoint4 = 31 + kNotUsedJoint5 = 32 + kNotUsedJoint6 = 33 + kNotUsedJoint7 = 34 diff --git a/teleop/robot_control/robot_arm_tem.py b/teleop/robot_control/robot_arm_tem.py deleted file mode 100644 index f7585a0..0000000 --- a/teleop/robot_control/robot_arm_tem.py +++ /dev/null @@ -1,316 +0,0 @@ -import numpy as np -import threading -import time - -from unitree_dds_wrapper.idl import unitree_hx -from unitree_dds_wrapper.publisher import Publisher -from unitree_dds_wrapper.subscription import Subscription -from unitree_dds_wrapper.utils.crc import crc32 -import struct -from enum import IntEnum -import copy -kTopicLowCommand = "rt/arm_sdk" -kTopicLowState = "rt/lowstate_hx" -kNumMotors = 30 - - -# 定义电机命令类 -class MotorCommand: - def __init__(self): - self.q_ref = np.zeros(kNumMotors) - self.dq_ref = np.zeros(kNumMotors) - self.tau_ff = np.zeros(kNumMotors) - self.kp = np.zeros(kNumMotors) - self.kd = np.zeros(kNumMotors) - - -class MotorState: - def __init__(self): - self.q = np.zeros(kNumMotors) - self.dq = np.zeros(kNumMotors) - -class BaseState: - def __init__(self): - self.omega = np.zeros(3) - self.rpy = np.zeros(3) - -class DataBuffer: - def __init__(self): - self.data = None - self.lock = threading.Lock() - - def GetData(self): - with self.lock: - return self.data - - def SetData(self, data): - with self.lock: - self.data = data - -np.set_printoptions(linewidth=240) - -class H1ArmController: - def __init__(self): - print("Initialize channel factory.") - self.q_desList = np.zeros(kNumMotors) - self.q_tau_ff = np.zeros(kNumMotors) - self.msg = unitree_hx.msg.dds_.LowCmd_() - self.msg.head = [0xFE, 0xEF] # 设置消息头 - self.lowcmd_publisher = Publisher(unitree_hx.msg.dds_.LowCmd_, kTopicLowCommand) - self.lowstate_subscriber = Subscription(unitree_hx.msg.dds_.LowState_, kTopicLowState) - - self.motor_state_buffer = DataBuffer() - self.motor_command_buffer = DataBuffer() - self.base_state_buffer = DataBuffer() - - self.kp_low = 140.0 - self.kd_low = 7.5 - - self.kp_high = 200.0 - self.kd_high = 5.0 - - self.kp_wrist = 35.0 - self.kd_wrist = 6.0 - - self.control_dt = 0.01 - self.hip_pitch_init_pos = -0.5 - self.knee_init_pos = 1.0 - self.ankle_init_pos = -0.5 - self.shoulder_pitch_init_pos = -1.4 - self.time = 0.0 - self.init_duration = 10.0 - self.report_dt = 0.1 - self.ratio = 0.0 - self.q_target = [] - while not self.lowstate_subscriber.msg: - time.sleep(0.01) - - for id in JointIndex: - self.msg.motor_cmd[id].q = self.lowstate_subscriber.msg.motor_state[id].q - self.q_target.append(self.msg.motor_cmd[id].q) - - duration = 1000 - init_q = np.array([self.lowstate_subscriber.msg.motor_state[id].q for id in JointIndex]) - print("Lock Leg...") - for i in range(duration): - time.sleep(0.001) - q_t = init_q + (self.q_target - init_q) * i / duration - for i, id in enumerate(JointIndex): - self.msg.motor_cmd[id].mode = 1 - if id not in JointArmIndex: - self.msg.motor_cmd[id].kp = 200 - self.msg.motor_cmd[id].kd = 5 - self.msg.motor_cmd[id].q = q_t[i] - self.pre_communication() - self.lowcmd_publisher.msg = self.msg - self.lowcmd_publisher.write() # 发布命令 - - - self.report_rpy_thread = threading.Thread(target=self.SubscribeState) - self.report_rpy_thread.start() - - self.control_thread = threading.Thread(target=self.Control) - self.control_thread.start() - - self.command_writer_thread = threading.Thread(target=self.LowCommandWriter) - self.command_writer_thread.start() - - - def LowStateHandler(self, message): - low_state = message - self.RecordMotorState(low_state) - self.RecordBaseState(low_state) - - def SetMotorPose(self,q_desList,q_tau_ff): - self.q_desList = q_desList - self.q_tau_ff = q_tau_ff - - def pre_communication(self): - self.__pack_crc() - def __pack_crc(self): - rawdata = [] - rawdata.extend(self.msg.head) - rawdata.extend(self.msg.version) - for i in range(len(self.msg.motor_cmd)): - rawdata.append(self.msg.motor_cmd[i].mode) - rawdata.append(self.msg.motor_cmd[i].q) - rawdata.append(self.msg.motor_cmd[i].dq) - rawdata.append(self.msg.motor_cmd[i].tau) - rawdata.append(self.msg.motor_cmd[i].kp) - rawdata.append(self.msg.motor_cmd[i].kd) - rawdata.extend(self.msg.motor_cmd[i].reserve) - rawdata.append(self.msg.bms_cmd.cmd) - rawdata.extend(self.msg.bms_cmd.reserve) - rawdata.extend(self.msg.led_cmd) - rawdata.extend(self.msg.fan_cmd) - rawdata.extend(self.msg.cmd) - rawdata.extend(self.msg.data) - rawdata.extend(self.msg.reserve) - rawdata.append(self.msg.crc) - - packdata = struct.pack(self.__packCRCformat, *rawdata) - calcdata = [] - calclen = (len(packdata)>>2)-1 - for i in range(calclen): - d = ((packdata[i*4+3] << 24) | (packdata[i*4+2] << 16) | (packdata[i*4+1] << 8) | (packdata[i*4])) - calcdata.append(d) - - self.msg.crc = crc32(calcdata) - - def LowCommandWriter(self): - while True: - mc_tmp_ptr = self.motor_command_buffer.GetData() - # rawdata.extend(self.msg.version) - if mc_tmp_ptr: - for i in JointArmIndex: - self.msg.motor_cmd[i].tau = mc_tmp_ptr.tau_ff[i] - self.msg.motor_cmd[i].q = mc_tmp_ptr.q_ref[i] - self.msg.motor_cmd[i].dq = mc_tmp_ptr.dq_ref[i] - self.msg.motor_cmd[i].kp = mc_tmp_ptr.kp[i] - self.msg.motor_cmd[i].kd = mc_tmp_ptr.kd[i] - self.pre_communication() - self.lowcmd_publisher.msg = self.msg - self.lowcmd_publisher.write() # 发布命令 - time.sleep(0.002) # 控制循环周期 - - def Control(self): - while True: - ms_tmp_ptr_0 = self.motor_state_buffer.GetData() - if ms_tmp_ptr_0: - tem_q_desList = copy.deepcopy(self.q_desList) - tem_q_tau_ff = copy.deepcopy(self.q_tau_ff) - motor_command_tmp = MotorCommand() - ms_tmp_ptr = self.motor_state_buffer.GetData() - self.time += self.control_dt - self.time = min(max(self.time, 0.0), self.init_duration) - self.ratio = self.time / self.init_duration - for i in range(kNumMotors): - if self.IsWeakMotor(i): - motor_command_tmp.kp[i] = self.kp_low - motor_command_tmp.kd[i] = self.kd_low - elif self.IsWristMotor(i): - motor_command_tmp.kp[i] = self.kp_wrist - motor_command_tmp.kd[i] = self.kd_wrist - else: - motor_command_tmp.kp[i] = self.kp_high - motor_command_tmp.kd[i] = self.kd_high - motor_command_tmp.dq_ref[i] = 0.0 - motor_command_tmp.tau_ff[i] = tem_q_tau_ff[i] - q_des = tem_q_desList[i] - - q_des = (q_des - ms_tmp_ptr.q[i]) * self.ratio + ms_tmp_ptr.q[i] - motor_command_tmp.q_ref[i] = q_des - self.motor_command_buffer.SetData(motor_command_tmp) - time.sleep(0.002) - - def GetMotorState(self): - ms_tmp_ptr = self.motor_state_buffer.GetData() - if ms_tmp_ptr: - return ms_tmp_ptr.q[13:27],ms_tmp_ptr.dq[13:27] - else: - return None,None - - def SubscribeState(self): - while True: - if self.lowstate_subscriber.msg: - self.LowStateHandler(self.lowstate_subscriber.msg) - time.sleep(0.002) - - def RecordMotorState(self, msg): - ms_tmp = MotorState() - for i in range(kNumMotors): - ms_tmp.q[i] = msg.motor_state[i].q - ms_tmp.dq[i] = msg.motor_state[i].dq - self.motor_state_buffer.SetData(ms_tmp) - - def RecordBaseState(self, msg): - bs_tmp = BaseState() - bs_tmp.omega = msg.imu_state.gyroscope - bs_tmp.rpy = msg.imu_state.rpy - self.base_state_buffer.SetData(bs_tmp) - - def IsWeakMotor(self, motor_index): - weak_motors = [ - JointIndex.kLeftAnkle, - JointIndex.kRightAnkle, - JointIndex.kRightShoulderPitch, - JointIndex.kRightShoulderRoll, - JointIndex.kRightShoulderYaw, - JointIndex.kRightElbow, - JointIndex.kLeftShoulderPitch, - JointIndex.kLeftShoulderRoll, - JointIndex.kLeftShoulderYaw, - JointIndex.kLeftElbow - ] - return motor_index in weak_motors - - def IsWristMotor(self, motor_index): - weak_motors = [ - JointIndex.kLeftWristYaw, - JointIndex.kLeftWristPitch, - JointIndex.kLeftWristRoll, - JointIndex.kRightWristYaw, - JointIndex.kRightWristPitch, - JointIndex.kRightWristRoll - ] - return motor_index in weak_motors - -class JointArmIndex(IntEnum): - # Left arm - kLeftShoulderPitch = 13 - kLeftShoulderRoll = 14 - kLeftShoulderYaw = 15 - kLeftElbow = 16 - kLeftWristYaw = 17 - kLeftWristPitch = 18 - kLeftWristRoll = 19 - - # Right arm - kRightShoulderPitch = 20 - kRightShoulderRoll = 21 - kRightShoulderYaw = 22 - kRightElbow = 23 - kRightWristYaw = 24 - kRightWristPitch = 25 - kRightWristRoll = 26 - -class JointIndex: - # Left leg - kLeftHipYaw = 0 - kLeftHipRoll = 1 - kLeftHipPitch = 2 - kLeftKnee = 3 - kLeftAnkle = 4 - kLeftAnkleRoll = 5 - - # Right leg - kRightHipYaw = 6 - kRightHipRoll = 7 - kRightHipPitch = 8 - kRightKnee = 9 - kRightAnkle = 10 - kRightAnkleRoll = 11 - - kWaistYaw = 12 - - # Left arm - kLeftShoulderPitch = 13 - kLeftShoulderRoll = 14 - kLeftShoulderYaw = 15 - kLeftElbow = 16 - kLeftWristYaw = 17 - kLeftWristPitch = 18 - kLeftWristRoll = 19 - - # Right arm - kRightShoulderPitch = 20 - kRightShoulderRoll = 21 - kRightShoulderYaw = 22 - kRightElbow = 23 - kRightWristYaw = 24 - kRightWristPitch = 25 - kRightWristRoll = 26 - - kNotUsedJoint = 27 - kNotUsedJoint1 = 28 - kNotUsedJoint2 = 29 diff --git a/teleop/teleop_hand_and_arm.py b/teleop/teleop_hand_and_arm.py index 487db25..293f64a 100644 --- a/teleop/teleop_hand_and_arm.py +++ b/teleop/teleop_hand_and_arm.py @@ -135,8 +135,8 @@ if __name__ == '__main__': np.copyto(teleoperator.img_array, np.array(frame)) handstate = h1hand.get_hand_state() - q_poseList=np.zeros(30) - q_tau_ff=np.zeros(30) + q_poseList=np.zeros(35) + q_tau_ff=np.zeros(35) armstate,armv = h1arm.GetMotorState() head_rmat, left_pose, right_pose, left_qpos, right_qpos = teleoperator.step() @@ -147,7 +147,7 @@ if __name__ == '__main__': q_tau_ff[13:27] = tau_ff else: q_poseList[13:27] = armstate - q_tau_ff = np.zeros(30) + q_tau_ff = np.zeros(35) h1arm.SetMotorPose(q_poseList, q_tau_ff)