30 changed files with 750 additions and 5452 deletions
-
6.gitmodules
-
7LICENSE
-
455README.md
-
556README_ja-JP.md
-
431README_zh-CN.md
-
20requirements.txt
-
4teleop/open_television/__init__.py
-
86teleop/open_television/_test_television.py
-
95teleop/open_television/_test_tv_wrapper.py
-
48teleop/open_television/setup.py
-
514teleop/open_television/television.py
-
410teleop/open_television/tv_wrapper.py
-
1teleop/robot_control/dex-retargeting
-
22teleop/robot_control/dex_retargeting/CITATION.cff
-
21teleop/robot_control/dex_retargeting/LICENSE
-
1teleop/robot_control/dex_retargeting/__init__.py
-
85teleop/robot_control/dex_retargeting/constants.py
-
102teleop/robot_control/dex_retargeting/kinematics_adaptor.py
-
516teleop/robot_control/dex_retargeting/optimizer.py
-
17teleop/robot_control/dex_retargeting/optimizer_utils.py
-
255teleop/robot_control/dex_retargeting/retargeting_config.py
-
93teleop/robot_control/dex_retargeting/robot_wrapper.py
-
151teleop/robot_control/dex_retargeting/seq_retarget.py
-
2237teleop/robot_control/dex_retargeting/yourdfpy.py
-
2teleop/robot_control/hand_retargeting.py
-
4teleop/robot_control/robot_hand_unitree.py
-
8teleop/teleop_hand_and_arm.py
-
1teleop/televuer
-
1teleop/utils/data/-home-unitree-Code-unitree-opject-unitree_sim_isaaclab-.txt
-
53teleop/utils/sim_state_topic.py
@ -0,0 +1,6 @@ |
|||
[submodule "teleop/televuer"] |
|||
path = teleop/televuer |
|||
url = https://github.com/silencht/televuer |
|||
[submodule "teleop/robot_control/dex-retargeting"] |
|||
path = teleop/robot_control/dex-retargeting |
|||
url = https://github.com/silencht/dex-retargeting |
|||
@ -1,20 +1,4 @@ |
|||
einops==0.8.0 |
|||
h5py==3.11.0 |
|||
ipython==8.12.3 |
|||
matplotlib==3.7.5 |
|||
packaging==24.1 |
|||
pandas==2.0.3 |
|||
params_proto==2.12.1 |
|||
pytransform3d==3.5.0 |
|||
PyYAML==6.0.1 |
|||
scikit_learn==1.3.2 |
|||
scipy==1.10.1 |
|||
seaborn==0.13.2 |
|||
setuptools==69.5.1 |
|||
torch==2.3.0 |
|||
torchvision==0.18.0 |
|||
rerun-sdk==0.20.1 |
|||
nlopt>=2.6.1,<2.8.0 |
|||
trimesh>=4.4.0 |
|||
anytree>=2.12.0 |
|||
logging-mp==0.1.5 |
|||
meshcat==0.3.2 |
|||
sshkeyboard==2.3.1 |
|||
@ -1,4 +0,0 @@ |
|||
# open_television/__init__.py |
|||
# openssl req -x509 -nodes -days 365 -newkey rsa:2048 -keyout key.pem -out cert.pem |
|||
from .television import TeleVision |
|||
from .tv_wrapper import TeleVisionWrapper, TeleData, TeleStateData |
|||
@ -1,86 +0,0 @@ |
|||
import os, sys |
|||
this_file = os.path.abspath(__file__) |
|||
project_root = os.path.abspath(os.path.join(os.path.dirname(this_file), '..')) |
|||
if project_root not in sys.path: |
|||
sys.path.insert(0, project_root) |
|||
|
|||
import time |
|||
import numpy as np |
|||
from multiprocessing import shared_memory |
|||
from open_television import TeleVision |
|||
import logging_mp |
|||
logger_mp = logging_mp.get_logger(__name__, level=logging_mp.INFO) |
|||
|
|||
def run_test_television(): |
|||
# image |
|||
image_shape = (480, 640 * 2, 3) |
|||
image_shm = shared_memory.SharedMemory(create=True, size=np.prod(image_shape) * np.uint8().itemsize) |
|||
image_array = np.ndarray(image_shape, dtype=np.uint8, buffer=image_shm.buf) |
|||
|
|||
# from image_server.image_client import ImageClient |
|||
# import threading |
|||
# image_client = ImageClient(tv_img_shape = image_shape, tv_img_shm_name = image_shm.name, image_show=True, server_address="127.0.0.1") |
|||
# image_receive_thread = threading.Thread(target = image_client.receive_process, daemon = True) |
|||
# image_receive_thread.daemon = True |
|||
# image_receive_thread.start() |
|||
|
|||
# television |
|||
use_hand_track = True |
|||
tv = TeleVision(binocular = True, use_hand_tracking = use_hand_track, img_shape = image_shape, img_shm_name = image_shm.name, webrtc=False) |
|||
|
|||
try: |
|||
input("Press Enter to start television test...") |
|||
running = True |
|||
while running: |
|||
logger_mp.info("=" * 80) |
|||
logger_mp.info("Common Data (always available):") |
|||
logger_mp.info(f"head_pose shape: {tv.head_pose.shape}\n{tv.head_pose}\n") |
|||
logger_mp.info(f"left_arm_pose shape: {tv.left_arm_pose.shape}\n{tv.left_arm_pose}\n") |
|||
logger_mp.info(f"right_arm_pose shape: {tv.right_arm_pose.shape}\n{tv.right_arm_pose}\n") |
|||
logger_mp.info("=" * 80) |
|||
|
|||
if use_hand_track: |
|||
logger_mp.info("Hand Tracking Data:") |
|||
logger_mp.info(f"left_hand_positions shape: {tv.left_hand_positions.shape}\n{tv.left_hand_positions}\n") |
|||
logger_mp.info(f"right_hand_positions shape: {tv.right_hand_positions.shape}\n{tv.right_hand_positions}\n") |
|||
logger_mp.info(f"left_hand_orientations shape: {tv.left_hand_orientations.shape}\n{tv.left_hand_orientations}\n") |
|||
logger_mp.info(f"right_hand_orientations shape: {tv.right_hand_orientations.shape}\n{tv.right_hand_orientations}\n") |
|||
logger_mp.info(f"left_hand_pinch_state: {tv.left_hand_pinch_state}") |
|||
logger_mp.info(f"left_hand_pinch_value: {tv.left_hand_pinch_value}") |
|||
logger_mp.info(f"left_hand_squeeze_state: {tv.left_hand_squeeze_state}") |
|||
logger_mp.info(f"left_hand_squeeze_value: {tv.left_hand_squeeze_value}") |
|||
logger_mp.info(f"right_hand_pinch_state: {tv.right_hand_pinch_state}") |
|||
logger_mp.info(f"right_hand_pinch_value: {tv.right_hand_pinch_value}") |
|||
logger_mp.info(f"right_hand_squeeze_state: {tv.right_hand_squeeze_state}") |
|||
logger_mp.info(f"right_hand_squeeze_value: {tv.right_hand_squeeze_value}") |
|||
else: |
|||
logger_mp.info("Controller Data:") |
|||
logger_mp.info(f"left_controller_trigger_state: {tv.left_controller_trigger_state}") |
|||
logger_mp.info(f"left_controller_trigger_value: {tv.left_controller_trigger_value}") |
|||
logger_mp.info(f"left_controller_squeeze_state: {tv.left_controller_squeeze_state}") |
|||
logger_mp.info(f"left_controller_squeeze_value: {tv.left_controller_squeeze_value}") |
|||
logger_mp.info(f"left_controller_thumbstick_state: {tv.left_controller_thumbstick_state}") |
|||
logger_mp.info(f"left_controller_thumbstick_value: {tv.left_controller_thumbstick_value}") |
|||
logger_mp.info(f"left_controller_aButton: {tv.left_controller_aButton}") |
|||
logger_mp.info(f"left_controller_bButton: {tv.left_controller_bButton}") |
|||
logger_mp.info(f"right_controller_trigger_state: {tv.right_controller_trigger_state}") |
|||
logger_mp.info(f"right_controller_trigger_value: {tv.right_controller_trigger_value}") |
|||
logger_mp.info(f"right_controller_squeeze_state: {tv.right_controller_squeeze_state}") |
|||
logger_mp.info(f"right_controller_squeeze_value: {tv.right_controller_squeeze_value}") |
|||
logger_mp.info(f"right_controller_thumbstick_state: {tv.right_controller_thumbstick_state}") |
|||
logger_mp.info(f"right_controller_thumbstick_value: {tv.right_controller_thumbstick_value}") |
|||
logger_mp.info(f"right_controller_aButton: {tv.right_controller_aButton}") |
|||
logger_mp.info(f"right_controller_bButton: {tv.right_controller_bButton}") |
|||
logger_mp.info("=" * 80) |
|||
time.sleep(0.03) |
|||
except KeyboardInterrupt: |
|||
running = False |
|||
logger_mp.warning("KeyboardInterrupt, exiting program...") |
|||
finally: |
|||
image_shm.unlink() |
|||
image_shm.close() |
|||
logger_mp.warning("Finally, exiting program...") |
|||
exit(0) |
|||
|
|||
if __name__ == '__main__': |
|||
run_test_television() |
|||
@ -1,95 +0,0 @@ |
|||
import os, sys |
|||
this_file = os.path.abspath(__file__) |
|||
project_root = os.path.abspath(os.path.join(os.path.dirname(this_file), '..')) |
|||
if project_root not in sys.path: |
|||
sys.path.insert(0, project_root) |
|||
|
|||
import numpy as np |
|||
import time |
|||
from multiprocessing import shared_memory |
|||
from open_television import TeleVisionWrapper |
|||
import logging_mp |
|||
logger_mp = logging_mp.get_logger(__name__, level=logging_mp.INFO) |
|||
|
|||
|
|||
def run_test_tv_wrapper(): |
|||
image_shape = (480, 640 * 2, 3) |
|||
image_shm = shared_memory.SharedMemory(create=True, size=np.prod(image_shape) * np.uint8().itemsize) |
|||
image_array = np.ndarray(image_shape, dtype=np.uint8, buffer=image_shm.buf) |
|||
|
|||
# from image_server.image_client import ImageClient |
|||
# import threading |
|||
# image_client = ImageClient(tv_img_shape = image_shape, tv_img_shm_name = image_shm.name, image_show=True, server_address="127.0.0.1") |
|||
# image_receive_thread = threading.Thread(target = image_client.receive_process, daemon = True) |
|||
# image_receive_thread.daemon = True |
|||
# image_receive_thread.start() |
|||
|
|||
use_hand_track=True |
|||
tv_wrapper = TeleVisionWrapper(binocular=True, use_hand_tracking=use_hand_track, img_shape=image_shape, img_shm_name=image_shm.name, |
|||
return_state_data=True, return_hand_rot_data = True) |
|||
try: |
|||
input("Press Enter to start tv_wrapper test...") |
|||
running = True |
|||
while running: |
|||
start_time = time.time() |
|||
teleData = tv_wrapper.get_motion_state_data() |
|||
|
|||
logger_mp.info("=== TeleData Snapshot ===") |
|||
logger_mp.info(f"[Head Rotation Matrix]:\n{teleData.head_pose}") |
|||
logger_mp.info(f"[Left EE Pose]:\n{teleData.left_arm_pose}") |
|||
logger_mp.info(f"[Right EE Pose]:\n{teleData.right_arm_pose}") |
|||
|
|||
if use_hand_track: |
|||
logger_mp.info(f"[Left Hand Position] shape {teleData.left_hand_pos.shape}:\n{teleData.left_hand_pos}") |
|||
logger_mp.info(f"[Right Hand Position] shape {teleData.right_hand_pos.shape}:\n{teleData.right_hand_pos}") |
|||
|
|||
if teleData.left_hand_rot is not None: |
|||
logger_mp.info(f"[Left Hand Rotation] shape {teleData.left_hand_rot.shape}:\n{teleData.left_hand_rot}") |
|||
if teleData.right_hand_rot is not None: |
|||
logger_mp.info(f"[Right Hand Rotation] shape {teleData.right_hand_rot.shape}:\n{teleData.right_hand_rot}") |
|||
|
|||
if teleData.left_pinch_value is not None: |
|||
logger_mp.info(f"[Left Pinch Value]: {teleData.left_pinch_value:.2f}") |
|||
if teleData.right_pinch_value is not None: |
|||
logger_mp.info(f"[Right Pinch Value]: {teleData.right_pinch_value:.2f}") |
|||
|
|||
if teleData.tele_state: |
|||
state = teleData.tele_state |
|||
logger_mp.info("[Hand State]:") |
|||
logger_mp.info(f" Left Pinch state: {state.left_pinch_state}") |
|||
logger_mp.info(f" Left Squeeze: {state.left_squeeze_state} ({state.left_squeeze_value:.2f})") |
|||
logger_mp.info(f" Right Pinch state: {state.right_pinch_state}") |
|||
logger_mp.info(f" Right Squeeze: {state.right_squeeze_state} ({state.right_squeeze_value:.2f})") |
|||
else: |
|||
logger_mp.info(f"[Left Trigger Value]: {teleData.left_trigger_value:.2f}") |
|||
logger_mp.info(f"[Right Trigger Value]: {teleData.right_trigger_value:.2f}") |
|||
|
|||
if teleData.tele_state: |
|||
state = teleData.tele_state |
|||
logger_mp.info("[Controller State]:") |
|||
logger_mp.info(f" Left Trigger: {state.left_trigger_state}") |
|||
logger_mp.info(f" Left Squeeze: {state.left_squeeze_ctrl_state} ({state.left_squeeze_ctrl_value:.2f})") |
|||
logger_mp.info(f" Left Thumbstick: {state.left_thumbstick_state} ({state.left_thumbstick_value})") |
|||
logger_mp.info(f" Left A/B Buttons: A={state.left_aButton}, B={state.left_bButton}") |
|||
logger_mp.info(f" Right Trigger: {state.right_trigger_state}") |
|||
logger_mp.info(f" Right Squeeze: {state.right_squeeze_ctrl_state} ({state.right_squeeze_ctrl_value:.2f})") |
|||
logger_mp.info(f" Right Thumbstick: {state.right_thumbstick_state} ({state.right_thumbstick_value})") |
|||
logger_mp.info(f" Right A/B Buttons: A={state.right_aButton}, B={state.right_bButton}") |
|||
|
|||
current_time = time.time() |
|||
time_elapsed = current_time - start_time |
|||
sleep_time = max(0, 0.033 - time_elapsed) |
|||
time.sleep(sleep_time) |
|||
logger_mp.debug(f"main process sleep: {sleep_time}") |
|||
|
|||
except KeyboardInterrupt: |
|||
running = False |
|||
logger_mp.warning("KeyboardInterrupt, exiting program...") |
|||
finally: |
|||
image_shm.unlink() |
|||
image_shm.close() |
|||
logger_mp.warning("Finally, exiting program...") |
|||
exit(0) |
|||
|
|||
if __name__ == '__main__': |
|||
run_test_tv_wrapper() |
|||
@ -1,48 +0,0 @@ |
|||
from setuptools import setup, find_packages |
|||
|
|||
setup( |
|||
name='open_television', |
|||
version='0.0.1', |
|||
description='XR vision and hand/controller interface for unitree robotics', |
|||
author='silencht', |
|||
packages=find_packages(), |
|||
install_requires=[ |
|||
'numpy==1.23.0', |
|||
'opencv_contrib_python==4.10.0.82', |
|||
'opencv_python==4.9.0.80', |
|||
'aiohttp==3.9.5', |
|||
'aiohttp_cors==0.7.0', |
|||
'aiortc==1.8.0', |
|||
'av==11.0.0', |
|||
# 'vuer==0.0.32rc7', # avp \ pico, hand tracking all work fine. but it not support controller tracking. |
|||
|
|||
# 'vuer==0.0.35', # avp hand tracking works fine. pico is fine too, but the right eye occasionally goes black for a short time at start. |
|||
|
|||
# from 'vuer==0.0.36rc1', # avp hand tracking only shows a flat RGB image — there's no stereo view. Pico (hand / controller tracking) is the same, |
|||
# and sometimes the right eye occasionally goes black for a short time at start. |
|||
# Both avp / pico can display the hand or controller marker, which looks like a black box. |
|||
|
|||
# to 'vuer==0.0.42rc16', # avp hand tracking only shows a flat RGB image — there's no stereo view. Pico (hand / controller tracking) is the same, |
|||
# and sometimes the right eye occasionally goes black for a short time at start. |
|||
# Both avp / pico can display the hand or controller marker, which looks like RGB three-axis color coordinates. |
|||
|
|||
# from 'vuer==0.0.42', # avp hand tracking only shows a flat RGB image — there's no stereo view. |
|||
# to 'vuer==0.0.45', pico hand tracking also only shows a flat RGB image — there's no stereo view. Unable to retrieve hand tracking data. |
|||
# pico controller tracking also only shows a flat RGB image — there's no stereo view. Controller data can be retrieved. |
|||
|
|||
# from 'vuer==0.0.46' # avp hand tracking work fine. |
|||
# to |
|||
# 'vuer==0.0.56', # In pico hand tracking mode, using a hand gesture to click the "Virtual Reality" button |
|||
# causes the screen to stay black and stuck loading. But if the button is clicked with the controller instead, everything works fine. |
|||
# In pico controller tracking mode, using controller to click the "Virtual Reality" button |
|||
# causes the screen to stay black and stuck loading. But if the button is clicked with a hand gesture instead, everything works fine. |
|||
# avp \ pico all have hand marker visualization (RGB three-axis color coordinates). |
|||
|
|||
'vuer==0.0.60', # a good version |
|||
# https://github.com/unitreerobotics/avp_teleoperate/issues/53 |
|||
# https://github.com/vuer-ai/vuer/issues/45 |
|||
# https://github.com/vuer-ai/vuer/issues/65 |
|||
|
|||
], |
|||
python_requires='>=3.8', |
|||
) |
|||
@ -1,514 +0,0 @@ |
|||
from vuer import Vuer |
|||
from vuer.schemas import ImageBackground, Hands, MotionControllers, WebRTCVideoPlane, WebRTCStereoVideoPlane |
|||
from multiprocessing import Value, Array, Process, shared_memory |
|||
import numpy as np |
|||
import asyncio |
|||
import cv2 |
|||
import os |
|||
|
|||
|
|||
class TeleVision: |
|||
def __init__(self, binocular: bool, use_hand_tracking: bool, img_shape, img_shm_name, cert_file=None, key_file=None, ngrok=False, webrtc=False): |
|||
""" |
|||
TeleVision class for OpenXR-based VR/AR applications. |
|||
This class handles the communication with the Vuer server and manages the shared memory for image and pose data. |
|||
|
|||
:param binocular: bool, whether the application is binocular (stereoscopic) or monocular. |
|||
:param use_hand_tracking: bool, whether to use hand tracking or controller tracking. |
|||
:param img_shape: tuple, shape of the image (height, width, channels). |
|||
:param img_shm_name: str, name of the shared memory for the image. |
|||
:param cert_file: str, path to the SSL certificate file. |
|||
:param key_file: str, path to the SSL key file. |
|||
:param ngrok: bool, whether to use ngrok for tunneling. |
|||
""" |
|||
self.binocular = binocular |
|||
self.use_hand_tracking = use_hand_tracking |
|||
self.img_height = img_shape[0] |
|||
if self.binocular: |
|||
self.img_width = img_shape[1] // 2 |
|||
else: |
|||
self.img_width = img_shape[1] |
|||
|
|||
current_module_dir = os.path.dirname(os.path.abspath(__file__)) |
|||
if cert_file is None: |
|||
cert_file = os.path.join(current_module_dir, "cert.pem") |
|||
if key_file is None: |
|||
key_file = os.path.join(current_module_dir, "key.pem") |
|||
|
|||
if ngrok: |
|||
self.vuer = Vuer(host='0.0.0.0', queries=dict(grid=False), queue_len=3) |
|||
else: |
|||
self.vuer = Vuer(host='0.0.0.0', cert=cert_file, key=key_file, queries=dict(grid=False), queue_len=3) |
|||
|
|||
self.vuer.add_handler("CAMERA_MOVE")(self.on_cam_move) |
|||
if self.use_hand_tracking: |
|||
self.vuer.add_handler("HAND_MOVE")(self.on_hand_move) |
|||
else: |
|||
self.vuer.add_handler("CONTROLLER_MOVE")(self.on_controller_move) |
|||
|
|||
existing_shm = shared_memory.SharedMemory(name=img_shm_name) |
|||
self.img_array = np.ndarray(img_shape, dtype=np.uint8, buffer=existing_shm.buf) |
|||
|
|||
self.webrtc = webrtc |
|||
if self.binocular and not self.webrtc: |
|||
self.vuer.spawn(start=False)(self.main_image_binocular) |
|||
elif not self.binocular and not self.webrtc: |
|||
self.vuer.spawn(start=False)(self.main_image_monocular) |
|||
elif self.webrtc: |
|||
self.vuer.spawn(start=False)(self.main_image_webrtc) |
|||
|
|||
self.head_pose_shared = Array('d', 16, lock=True) |
|||
self.left_arm_pose_shared = Array('d', 16, lock=True) |
|||
self.right_arm_pose_shared = Array('d', 16, lock=True) |
|||
if self.use_hand_tracking: |
|||
self.left_hand_position_shared = Array('d', 75, lock=True) |
|||
self.right_hand_position_shared = Array('d', 75, lock=True) |
|||
self.left_hand_orientation_shared = Array('d', 25 * 9, lock=True) |
|||
self.right_hand_orientation_shared = Array('d', 25 * 9, lock=True) |
|||
|
|||
self.left_pinch_state_shared = Value('b', False, lock=True) |
|||
self.left_pinch_value_shared = Value('d', 0.0, lock=True) |
|||
self.left_squeeze_state_shared = Value('b', False, lock=True) |
|||
self.left_squeeze_value_shared = Value('d', 0.0, lock=True) |
|||
|
|||
self.right_pinch_state_shared = Value('b', False, lock=True) |
|||
self.right_pinch_value_shared = Value('d', 0.0, lock=True) |
|||
self.right_squeeze_state_shared = Value('b', False, lock=True) |
|||
self.right_squeeze_value_shared = Value('d', 0.0, lock=True) |
|||
else: |
|||
self.left_trigger_state_shared = Value('b', False, lock=True) |
|||
self.left_trigger_value_shared = Value('d', 0.0, lock=True) |
|||
self.left_squeeze_state_shared = Value('b', False, lock=True) |
|||
self.left_squeeze_value_shared = Value('d', 0.0, lock=True) |
|||
self.left_thumbstick_state_shared = Value('b', False, lock=True) |
|||
self.left_thumbstick_value_shared = Array('d', 2, lock=True) |
|||
self.left_aButton_shared = Value('b', False, lock=True) |
|||
self.left_bButton_shared = Value('b', False, lock=True) |
|||
|
|||
self.right_trigger_state_shared = Value('b', False, lock=True) |
|||
self.right_trigger_value_shared = Value('d', 0.0, lock=True) |
|||
self.right_squeeze_state_shared = Value('b', False, lock=True) |
|||
self.right_squeeze_value_shared = Value('d', 0.0, lock=True) |
|||
self.right_thumbstick_state_shared = Value('b', False, lock=True) |
|||
self.right_thumbstick_value_shared = Array('d', 2, lock=True) |
|||
self.right_aButton_shared = Value('b', False, lock=True) |
|||
self.right_bButton_shared = Value('b', False, lock=True) |
|||
|
|||
self.process = Process(target=self.vuer_run) |
|||
self.process.daemon = True |
|||
self.process.start() |
|||
|
|||
def vuer_run(self): |
|||
self.vuer.run() |
|||
|
|||
async def on_cam_move(self, event, session, fps=60): |
|||
try: |
|||
with self.head_pose_shared.get_lock(): |
|||
self.head_pose_shared[:] = event.value["camera"]["matrix"] |
|||
except: |
|||
pass |
|||
|
|||
async def on_controller_move(self, event, session, fps=60): |
|||
try: |
|||
with self.left_arm_pose_shared.get_lock(): |
|||
self.left_arm_pose_shared[:] = event.value["left"] |
|||
with self.right_arm_pose_shared.get_lock(): |
|||
self.right_arm_pose_shared[:] = event.value["right"] |
|||
|
|||
left_controller_state = event.value["leftState"] |
|||
right_controller_state = event.value["rightState"] |
|||
|
|||
def extract_controller_states(state_dict, prefix): |
|||
# trigger |
|||
with getattr(self, f"{prefix}_trigger_state_shared").get_lock(): |
|||
getattr(self, f"{prefix}_trigger_state_shared").value = bool(state_dict.get("trigger", False)) |
|||
with getattr(self, f"{prefix}_trigger_value_shared").get_lock(): |
|||
getattr(self, f"{prefix}_trigger_value_shared").value = float(state_dict.get("triggerValue", 0.0)) |
|||
# squeeze |
|||
with getattr(self, f"{prefix}_squeeze_state_shared").get_lock(): |
|||
getattr(self, f"{prefix}_squeeze_state_shared").value = bool(state_dict.get("squeeze", False)) |
|||
with getattr(self, f"{prefix}_squeeze_value_shared").get_lock(): |
|||
getattr(self, f"{prefix}_squeeze_value_shared").value = float(state_dict.get("squeezeValue", 0.0)) |
|||
# thumbstick |
|||
with getattr(self, f"{prefix}_thumbstick_state_shared").get_lock(): |
|||
getattr(self, f"{prefix}_thumbstick_state_shared").value = bool(state_dict.get("thumbstick", False)) |
|||
with getattr(self, f"{prefix}_thumbstick_value_shared").get_lock(): |
|||
getattr(self, f"{prefix}_thumbstick_value_shared")[:] = state_dict.get("thumbstickValue", [0.0, 0.0]) |
|||
# buttons |
|||
with getattr(self, f"{prefix}_aButton_shared").get_lock(): |
|||
getattr(self, f"{prefix}_aButton_shared").value = bool(state_dict.get("aButton", False)) |
|||
with getattr(self, f"{prefix}_bButton_shared").get_lock(): |
|||
getattr(self, f"{prefix}_bButton_shared").value = bool(state_dict.get("bButton", False)) |
|||
|
|||
extract_controller_states(left_controller_state, "left") |
|||
extract_controller_states(right_controller_state, "right") |
|||
except: |
|||
pass |
|||
|
|||
async def on_hand_move(self, event, session, fps=60): |
|||
try: |
|||
left_hand_data = event.value["left"] |
|||
right_hand_data = event.value["right"] |
|||
left_hand_state = event.value["leftState"] |
|||
right_hand_state = event.value["rightState"] |
|||
|
|||
def extract_hand_poses(hand_data, arm_pose_shared, hand_position_shared, hand_orientation_shared): |
|||
with arm_pose_shared.get_lock(): |
|||
arm_pose_shared[:] = hand_data[0:16] |
|||
|
|||
with hand_position_shared.get_lock(): |
|||
for i in range(25): |
|||
base = i * 16 |
|||
hand_position_shared[i * 3: i * 3 + 3] = [hand_data[base + 12], hand_data[base + 13], hand_data[base + 14]] |
|||
|
|||
with hand_orientation_shared.get_lock(): |
|||
for i in range(25): |
|||
base = i * 16 |
|||
hand_orientation_shared[i * 9: i * 9 + 9] = [ |
|||
hand_data[base + 0], hand_data[base + 1], hand_data[base + 2], |
|||
hand_data[base + 4], hand_data[base + 5], hand_data[base + 6], |
|||
hand_data[base + 8], hand_data[base + 9], hand_data[base + 10], |
|||
] |
|||
|
|||
def extract_hand_states(state_dict, prefix): |
|||
# pinch |
|||
with getattr(self, f"{prefix}_pinch_state_shared").get_lock(): |
|||
getattr(self, f"{prefix}_pinch_state_shared").value = bool(state_dict.get("pinch", False)) |
|||
with getattr(self, f"{prefix}_pinch_value_shared").get_lock(): |
|||
getattr(self, f"{prefix}_pinch_value_shared").value = float(state_dict.get("pinchValue", 0.0)) |
|||
# squeeze |
|||
with getattr(self, f"{prefix}_squeeze_state_shared").get_lock(): |
|||
getattr(self, f"{prefix}_squeeze_state_shared").value = bool(state_dict.get("squeeze", False)) |
|||
with getattr(self, f"{prefix}_squeeze_value_shared").get_lock(): |
|||
getattr(self, f"{prefix}_squeeze_value_shared").value = float(state_dict.get("squeezeValue", 0.0)) |
|||
|
|||
extract_hand_poses(left_hand_data, self.left_arm_pose_shared, self.left_hand_position_shared, self.left_hand_orientation_shared) |
|||
extract_hand_poses(right_hand_data, self.right_arm_pose_shared, self.right_hand_position_shared, self.right_hand_orientation_shared) |
|||
extract_hand_states(left_hand_state, "left") |
|||
extract_hand_states(right_hand_state, "right") |
|||
|
|||
except: |
|||
pass |
|||
|
|||
async def main_image_binocular(self, session, fps=60): |
|||
if self.use_hand_tracking: |
|||
session.upsert( |
|||
Hands( |
|||
stream=True, |
|||
key="hands", |
|||
hideLeft=True, |
|||
hideRight=True |
|||
), |
|||
to="bgChildren", |
|||
) |
|||
else: |
|||
session.upsert( |
|||
MotionControllers( |
|||
stream=True, |
|||
key="motionControllers", |
|||
left=True, |
|||
right=True, |
|||
), |
|||
to="bgChildren", |
|||
) |
|||
|
|||
while True: |
|||
display_image = cv2.cvtColor(self.img_array, cv2.COLOR_BGR2RGB) |
|||
# aspect_ratio = self.img_width / self.img_height |
|||
session.upsert( |
|||
[ |
|||
ImageBackground( |
|||
display_image[:, :self.img_width], |
|||
aspect=1.778, |
|||
height=1, |
|||
distanceToCamera=1, |
|||
# The underlying rendering engine supported a layer binary bitmask for both objects and the camera. |
|||
# Below we set the two image planes, left and right, to layers=1 and layers=2. |
|||
# Note that these two masks are associated with left eye’s camera and the right eye’s camera. |
|||
layers=1, |
|||
format="jpeg", |
|||
quality=100, |
|||
key="background-left", |
|||
interpolate=True, |
|||
), |
|||
ImageBackground( |
|||
display_image[:, self.img_width:], |
|||
aspect=1.778, |
|||
height=1, |
|||
distanceToCamera=1, |
|||
layers=2, |
|||
format="jpeg", |
|||
quality=100, |
|||
key="background-right", |
|||
interpolate=True, |
|||
), |
|||
], |
|||
to="bgChildren", |
|||
) |
|||
# 'jpeg' encoding should give you about 30fps with a 16ms wait in-between. |
|||
await asyncio.sleep(0.016 * 2) |
|||
|
|||
async def main_image_monocular(self, session, fps=60): |
|||
if self.use_hand_tracking: |
|||
session.upsert( |
|||
Hands( |
|||
stream=True, |
|||
key="hands", |
|||
hideLeft=True, |
|||
hideRight=True |
|||
), |
|||
to="bgChildren", |
|||
) |
|||
else: |
|||
session.upsert( |
|||
MotionControllers( |
|||
stream=True, |
|||
key="motionControllers", |
|||
left=True, |
|||
right=True, |
|||
), |
|||
to="bgChildren", |
|||
) |
|||
|
|||
while True: |
|||
display_image = cv2.cvtColor(self.img_array, cv2.COLOR_BGR2RGB) |
|||
# aspect_ratio = self.img_width / self.img_height |
|||
session.upsert( |
|||
[ |
|||
ImageBackground( |
|||
display_image, |
|||
aspect=1.778, |
|||
height=1, |
|||
distanceToCamera=1, |
|||
format="jpeg", |
|||
quality=50, |
|||
key="background-mono", |
|||
interpolate=True, |
|||
), |
|||
], |
|||
to="bgChildren", |
|||
) |
|||
await asyncio.sleep(0.016) |
|||
|
|||
async def main_image_webrtc(self, session, fps=60): |
|||
if self.use_hand_tracking: |
|||
session.upsert( |
|||
Hands( |
|||
stream=True, |
|||
key="hands", |
|||
showLeft=False, |
|||
showRight=False |
|||
), |
|||
to="bgChildren", |
|||
) |
|||
else: |
|||
session.upsert( |
|||
MotionControllers( |
|||
stream=True, |
|||
key="motionControllers", |
|||
showLeft=False, |
|||
showRight=False, |
|||
) |
|||
) |
|||
|
|||
session.upsert( |
|||
WebRTCVideoPlane( |
|||
# WebRTCStereoVideoPlane( |
|||
src="https://10.0.7.49:8080/offer", |
|||
iceServer={}, |
|||
key="webrtc", |
|||
aspect=1.778, |
|||
height = 7, |
|||
), |
|||
to="bgChildren", |
|||
) |
|||
while True: |
|||
await asyncio.sleep(1) |
|||
# ==================== common data ==================== |
|||
@property |
|||
def head_pose(self): |
|||
"""np.ndarray, shape (4, 4), head SE(3) pose matrix from Vuer (basis OpenXR Convention).""" |
|||
with self.head_pose_shared.get_lock(): |
|||
return np.array(self.head_pose_shared[:]).reshape(4, 4, order="F") |
|||
|
|||
@property |
|||
def left_arm_pose(self): |
|||
"""np.ndarray, shape (4, 4), left arm SE(3) pose matrix from Vuer (basis OpenXR Convention).""" |
|||
with self.left_arm_pose_shared.get_lock(): |
|||
return np.array(self.left_arm_pose_shared[:]).reshape(4, 4, order="F") |
|||
|
|||
@property |
|||
def right_arm_pose(self): |
|||
"""np.ndarray, shape (4, 4), right arm SE(3) pose matrix from Vuer (basis OpenXR Convention).""" |
|||
with self.right_arm_pose_shared.get_lock(): |
|||
return np.array(self.right_arm_pose_shared[:]).reshape(4, 4, order="F") |
|||
|
|||
# ==================== Hand Tracking Data ==================== |
|||
@property |
|||
def left_hand_positions(self): |
|||
"""np.ndarray, shape (25, 3), left hand 25 landmarks' 3D positions.""" |
|||
with self.left_hand_position_shared.get_lock(): |
|||
return np.array(self.left_hand_position_shared[:]).reshape(25, 3) |
|||
|
|||
@property |
|||
def right_hand_positions(self): |
|||
"""np.ndarray, shape (25, 3), right hand 25 landmarks' 3D positions.""" |
|||
with self.right_hand_position_shared.get_lock(): |
|||
return np.array(self.right_hand_position_shared[:]).reshape(25, 3) |
|||
|
|||
@property |
|||
def left_hand_orientations(self): |
|||
"""np.ndarray, shape (25, 3, 3), left hand 25 landmarks' orientations (flattened 3x3 matrices, column-major).""" |
|||
with self.left_hand_orientation_shared.get_lock(): |
|||
return np.array(self.left_hand_orientation_shared[:]).reshape(25, 9).reshape(25, 3, 3, order="F") |
|||
|
|||
@property |
|||
def right_hand_orientations(self): |
|||
"""np.ndarray, shape (25, 3, 3), right hand 25 landmarks' orientations (flattened 3x3 matrices, column-major).""" |
|||
with self.right_hand_orientation_shared.get_lock(): |
|||
return np.array(self.right_hand_orientation_shared[:]).reshape(25, 9).reshape(25, 3, 3, order="F") |
|||
|
|||
@property |
|||
def left_hand_pinch_state(self): |
|||
"""bool, whether left hand is pinching.""" |
|||
with self.left_pinch_state_shared.get_lock(): |
|||
return self.left_pinch_state_shared.value |
|||
|
|||
@property |
|||
def left_hand_pinch_value(self): |
|||
"""float, pinch strength of left hand.""" |
|||
with self.left_pinch_value_shared.get_lock(): |
|||
return self.left_pinch_value_shared.value |
|||
|
|||
@property |
|||
def left_hand_squeeze_state(self): |
|||
"""bool, whether left hand is squeezing.""" |
|||
with self.left_squeeze_state_shared.get_lock(): |
|||
return self.left_squeeze_state_shared.value |
|||
|
|||
@property |
|||
def left_hand_squeeze_value(self): |
|||
"""float, squeeze strength of left hand.""" |
|||
with self.left_squeeze_value_shared.get_lock(): |
|||
return self.left_squeeze_value_shared.value |
|||
|
|||
@property |
|||
def right_hand_pinch_state(self): |
|||
"""bool, whether right hand is pinching.""" |
|||
with self.right_pinch_state_shared.get_lock(): |
|||
return self.right_pinch_state_shared.value |
|||
|
|||
@property |
|||
def right_hand_pinch_value(self): |
|||
"""float, pinch strength of right hand.""" |
|||
with self.right_pinch_value_shared.get_lock(): |
|||
return self.right_pinch_value_shared.value |
|||
|
|||
@property |
|||
def right_hand_squeeze_state(self): |
|||
"""bool, whether right hand is squeezing.""" |
|||
with self.right_squeeze_state_shared.get_lock(): |
|||
return self.right_squeeze_state_shared.value |
|||
|
|||
@property |
|||
def right_hand_squeeze_value(self): |
|||
"""float, squeeze strength of right hand.""" |
|||
with self.right_squeeze_value_shared.get_lock(): |
|||
return self.right_squeeze_value_shared.value |
|||
|
|||
# ==================== Controller Data ==================== |
|||
@property |
|||
def left_controller_trigger_state(self): |
|||
"""bool, left controller trigger pressed or not.""" |
|||
with self.left_trigger_state_shared.get_lock(): |
|||
return self.left_trigger_state_shared.value |
|||
|
|||
@property |
|||
def left_controller_trigger_value(self): |
|||
"""float, left controller trigger analog value (0.0 ~ 1.0).""" |
|||
with self.left_trigger_value_shared.get_lock(): |
|||
return self.left_trigger_value_shared.value |
|||
|
|||
@property |
|||
def left_controller_squeeze_state(self): |
|||
"""bool, left controller squeeze pressed or not.""" |
|||
with self.left_squeeze_state_shared.get_lock(): |
|||
return self.left_squeeze_state_shared.value |
|||
|
|||
@property |
|||
def left_controller_squeeze_value(self): |
|||
"""float, left controller squeeze analog value (0.0 ~ 1.0).""" |
|||
with self.left_squeeze_value_shared.get_lock(): |
|||
return self.left_squeeze_value_shared.value |
|||
|
|||
@property |
|||
def left_controller_thumbstick_state(self): |
|||
"""bool, whether left thumbstick is touched or clicked.""" |
|||
with self.left_thumbstick_state_shared.get_lock(): |
|||
return self.left_thumbstick_state_shared.value |
|||
|
|||
@property |
|||
def left_controller_thumbstick_value(self): |
|||
"""np.ndarray, shape (2,), left thumbstick 2D axis values (x, y).""" |
|||
with self.left_thumbstick_value_shared.get_lock(): |
|||
return np.array(self.left_thumbstick_value_shared[:]) |
|||
|
|||
@property |
|||
def left_controller_aButton(self): |
|||
"""bool, left controller 'A' button pressed.""" |
|||
with self.left_aButton_shared.get_lock(): |
|||
return self.left_aButton_shared.value |
|||
|
|||
@property |
|||
def left_controller_bButton(self): |
|||
"""bool, left controller 'B' button pressed.""" |
|||
with self.left_bButton_shared.get_lock(): |
|||
return self.left_bButton_shared.value |
|||
|
|||
@property |
|||
def right_controller_trigger_state(self): |
|||
"""bool, right controller trigger pressed or not.""" |
|||
with self.right_trigger_state_shared.get_lock(): |
|||
return self.right_trigger_state_shared.value |
|||
|
|||
@property |
|||
def right_controller_trigger_value(self): |
|||
"""float, right controller trigger analog value (0.0 ~ 1.0).""" |
|||
with self.right_trigger_value_shared.get_lock(): |
|||
return self.right_trigger_value_shared.value |
|||
|
|||
@property |
|||
def right_controller_squeeze_state(self): |
|||
"""bool, right controller squeeze pressed or not.""" |
|||
with self.right_squeeze_state_shared.get_lock(): |
|||
return self.right_squeeze_state_shared.value |
|||
|
|||
@property |
|||
def right_controller_squeeze_value(self): |
|||
"""float, right controller squeeze analog value (0.0 ~ 1.0).""" |
|||
with self.right_squeeze_value_shared.get_lock(): |
|||
return self.right_squeeze_value_shared.value |
|||
|
|||
@property |
|||
def right_controller_thumbstick_state(self): |
|||
"""bool, whether right thumbstick is touched or clicked.""" |
|||
with self.right_thumbstick_state_shared.get_lock(): |
|||
return self.right_thumbstick_state_shared.value |
|||
|
|||
@property |
|||
def right_controller_thumbstick_value(self): |
|||
"""np.ndarray, shape (2,), right thumbstick 2D axis values (x, y).""" |
|||
with self.right_thumbstick_value_shared.get_lock(): |
|||
return np.array(self.right_thumbstick_value_shared[:]) |
|||
|
|||
@property |
|||
def right_controller_aButton(self): |
|||
"""bool, right controller 'A' button pressed.""" |
|||
with self.right_aButton_shared.get_lock(): |
|||
return self.right_aButton_shared.value |
|||
|
|||
@property |
|||
def right_controller_bButton(self): |
|||
"""bool, right controller 'B' button pressed.""" |
|||
with self.right_bButton_shared.get_lock(): |
|||
return self.right_bButton_shared.value |
|||
@ -1,410 +0,0 @@ |
|||
import numpy as np |
|||
from television import TeleVision |
|||
from dataclasses import dataclass, field |
|||
|
|||
""" |
|||
(basis) OpenXR Convention : y up, z back, x right. |
|||
(basis) Robot Convention : z up, y left, x front. |
|||
|
|||
under (basis) Robot Convention, humanoid arm's initial pose convention: |
|||
|
|||
# (initial pose) OpenXR Left Arm Pose Convention (hand tracking): |
|||
- the x-axis pointing from wrist toward middle. |
|||
- the y-axis pointing from index toward pinky. |
|||
- the z-axis pointing from palm toward back of the hand. |
|||
|
|||
# (initial pose) OpenXR Right Arm Pose Convention (hand tracking): |
|||
- the x-axis pointing from wrist toward middle. |
|||
- the y-axis pointing from pinky toward index. |
|||
- the z-axis pointing from palm toward back of the hand. |
|||
|
|||
# (initial pose) Unitree Humanoid Left Arm URDF Convention: |
|||
- the x-axis pointing from wrist toward middle. |
|||
- the y-axis pointing from palm toward back of the hand. |
|||
- the z-axis pointing from pinky toward index. |
|||
|
|||
# (initial pose) Unitree Humanoid Right Arm URDF Convention: |
|||
- the x-axis pointing from wrist toward middle. |
|||
- the y-axis pointing from back of the hand toward palm. |
|||
- the z-axis pointing from pinky toward index. |
|||
|
|||
under (basis) Robot Convention, humanoid hand's initial pose convention: |
|||
|
|||
# (initial pose) OpenXR Left Hand Pose Convention (hand tracking): |
|||
- the x-axis pointing from wrist toward middle. |
|||
- the y-axis pointing from index toward pinky. |
|||
- the z-axis pointing from palm toward back of the hand. |
|||
|
|||
# (initial pose) OpenXR Right Hand Pose Convention (hand tracking): |
|||
- the x-axis pointing from wrist toward middle. |
|||
- the y-axis pointing from pinky toward index. |
|||
- the z-axis pointing from palm toward back of the hand. |
|||
|
|||
# (initial pose) Unitree Humanoid Left Hand URDF Convention: |
|||
- The x-axis pointing from palm toward back of the hand. |
|||
- The y-axis pointing from middle toward wrist. |
|||
- The z-axis pointing from pinky toward index. |
|||
|
|||
# (initial pose) Unitree Humanoid Right Hand URDF Convention: |
|||
- The x-axis pointing from palm toward back of the hand. |
|||
- The y-axis pointing from middle toward wrist. |
|||
- The z-axis pointing from index toward pinky. |
|||
|
|||
p.s. TeleVision (Vuer) obtains all raw data under the (basis) OpenXR Convention. |
|||
In addition, arm pose data (hand tracking) follows the (initial pose) OpenXR Arm Pose Convention, |
|||
while arm pose data (controller tracking) directly follows the (initial pose) Unitree Humanoid Arm URDF Convention (thus no transform is needed). |
|||
Meanwhile, all raw data is in the WORLD frame defined by XR device odometry. |
|||
|
|||
p.s. From website: https://registry.khronos.org/OpenXR/specs/1.1/man/html/openxr.html. |
|||
You can find **(initial pose) OpenXR Left/Right Arm Pose Convention** related information like this below: |
|||
"The wrist joint is located at the pivot point of the wrist, which is location invariant when twisting the hand without moving the forearm. |
|||
The backward (+Z) direction is parallel to the line from wrist joint to middle finger metacarpal joint, and points away from the finger tips. |
|||
The up (+Y) direction points out towards back of the hand and perpendicular to the skin at wrist. |
|||
The X direction is perpendicular to the Y and Z directions and follows the right hand rule." |
|||
Note: The above context is of course under **(basis) OpenXR Convention**. |
|||
|
|||
p.s. **Unitree Arm/Hand URDF initial pose Convention** information come from URDF files. |
|||
""" |
|||
|
|||
|
|||
def safe_mat_update(prev_mat, mat): |
|||
# Return previous matrix and False flag if the new matrix is non-singular (determinant ≠ 0). |
|||
det = np.linalg.det(mat) |
|||
if not np.isfinite(det) or np.isclose(det, 0.0, atol=1e-6): |
|||
return prev_mat, False |
|||
return mat, True |
|||
|
|||
def fast_mat_inv(mat): |
|||
ret = np.eye(4) |
|||
ret[:3, :3] = mat[:3, :3].T |
|||
ret[:3, 3] = -mat[:3, :3].T @ mat[:3, 3] |
|||
return ret |
|||
|
|||
def safe_rot_update(prev_rot_array, rot_array): |
|||
dets = np.linalg.det(rot_array) |
|||
if not np.all(np.isfinite(dets)) or np.any(np.isclose(dets, 0.0, atol=1e-6)): |
|||
return prev_rot_array, False |
|||
return rot_array, True |
|||
|
|||
# constants variable |
|||
T_TO_UNITREE_HUMANOID_LEFT_ARM = np.array([[1, 0, 0, 0], |
|||
[0, 0,-1, 0], |
|||
[0, 1, 0, 0], |
|||
[0, 0, 0, 1]]) |
|||
|
|||
T_TO_UNITREE_HUMANOID_RIGHT_ARM = np.array([[1, 0, 0, 0], |
|||
[0, 0, 1, 0], |
|||
[0,-1, 0, 0], |
|||
[0, 0, 0, 1]]) |
|||
|
|||
T_TO_UNITREE_HAND = np.array([[0, 0, 1, 0], |
|||
[-1, 0, 0, 0], |
|||
[0, -1, 0, 0], |
|||
[0, 0, 0, 1]]) |
|||
|
|||
T_ROBOT_OPENXR = np.array([[ 0, 0,-1, 0], |
|||
[-1, 0, 0, 0], |
|||
[ 0, 1, 0, 0], |
|||
[ 0, 0, 0, 1]]) |
|||
|
|||
T_OPENXR_ROBOT = np.array([[ 0,-1, 0, 0], |
|||
[ 0, 0, 1, 0], |
|||
[-1, 0, 0, 0], |
|||
[ 0, 0, 0, 1]]) |
|||
|
|||
R_ROBOT_OPENXR = np.array([[ 0, 0,-1], |
|||
[-1, 0, 0], |
|||
[ 0, 1, 0]]) |
|||
|
|||
R_OPENXR_ROBOT = np.array([[ 0,-1, 0], |
|||
[ 0, 0, 1], |
|||
[-1, 0, 0]]) |
|||
|
|||
CONST_HEAD_POSE = np.array([[1, 0, 0, 0], |
|||
[0, 1, 0, 1.5], |
|||
[0, 0, 1, -0.2], |
|||
[0, 0, 0, 1]]) |
|||
|
|||
# For Robot initial position |
|||
CONST_RIGHT_ARM_POSE = np.array([[1, 0, 0, 0.15], |
|||
[0, 1, 0, 1.13], |
|||
[0, 0, 1, -0.3], |
|||
[0, 0, 0, 1]]) |
|||
|
|||
CONST_LEFT_ARM_POSE = np.array([[1, 0, 0, -0.15], |
|||
[0, 1, 0, 1.13], |
|||
[0, 0, 1, -0.3], |
|||
[0, 0, 0, 1]]) |
|||
|
|||
CONST_HAND_ROT = np.tile(np.eye(3)[None, :, :], (25, 1, 1)) |
|||
|
|||
@dataclass |
|||
class TeleStateData: |
|||
# hand tracking |
|||
left_pinch_state: bool = False # True if index and thumb are pinching |
|||
left_squeeze_state: bool = False # True if hand is making a fist |
|||
left_squeeze_value: float = 0.0 # (0.0 ~ 1.0) degree of hand squeeze |
|||
right_pinch_state: bool = False # True if index and thumb are pinching |
|||
right_squeeze_state: bool = False # True if hand is making a fist |
|||
right_squeeze_value: float = 0.0 # (0.0 ~ 1.0) degree of hand squeeze |
|||
|
|||
# controller tracking |
|||
left_trigger_state: bool = False # True if trigger is actively pressed |
|||
left_squeeze_ctrl_state: bool = False # True if grip button is pressed |
|||
left_squeeze_ctrl_value: float = 0.0 # (0.0 ~ 1.0) grip pull depth |
|||
left_thumbstick_state: bool = False # True if thumbstick button is pressed |
|||
left_thumbstick_value: np.ndarray = field(default_factory=lambda: np.zeros(2)) # 2D vector (x, y), normalized |
|||
left_aButton: bool = False # True if A button is pressed |
|||
left_bButton: bool = False # True if B button is pressed |
|||
right_trigger_state: bool = False # True if trigger is actively pressed |
|||
right_squeeze_ctrl_state: bool = False # True if grip button is pressed |
|||
right_squeeze_ctrl_value: float = 0.0 # (0.0 ~ 1.0) grip pull depth |
|||
right_thumbstick_state: bool = False # True if thumbstick button is pressed |
|||
right_thumbstick_value: np.ndarray = field(default_factory=lambda: np.zeros(2)) # 2D vector (x, y), normalized |
|||
right_aButton: bool = False # True if A button is pressed |
|||
right_bButton: bool = False # True if B button is pressed |
|||
|
|||
@dataclass |
|||
class TeleData: |
|||
head_pose: np.ndarray # (4,4) SE(3) pose of head matrix |
|||
left_arm_pose: np.ndarray # (4,4) SE(3) pose of left arm |
|||
right_arm_pose: np.ndarray # (4,4) SE(3) pose of right arm |
|||
left_hand_pos: np.ndarray = None # (25,3) 3D positions of left hand joints |
|||
right_hand_pos: np.ndarray = None # (25,3) 3D positions of right hand joints |
|||
left_hand_rot: np.ndarray = None # (25,3,3) rotation matrices of left hand joints |
|||
right_hand_rot: np.ndarray = None # (25,3,3) rotation matrices of right hand joints |
|||
left_pinch_value: float = None # float (1x.0 ~ 0.0) pinch distance between index and thumb |
|||
right_pinch_value: float = None # float (1x.0 ~ 0.0) pinch distance between index and thumb |
|||
left_trigger_value: float = None # float (10.0 ~ 0.0) trigger pull depth |
|||
right_trigger_value: float = None # float (10.0 ~ 0.0) trigger pull depth |
|||
tele_state: TeleStateData = field(default_factory=TeleStateData) |
|||
|
|||
|
|||
class TeleVisionWrapper: |
|||
def __init__(self, binocular: bool, use_hand_tracking: bool, img_shape, img_shm_name, return_state_data: bool = True, return_hand_rot_data: bool = False, |
|||
cert_file = None, key_file = None, ngrok = False, webrtc = False): |
|||
""" |
|||
TeleVisionWrapper is a wrapper for the TeleVision class, which handles XR device's data suit for robot control. |
|||
It initializes the TeleVision instance with the specified parameters and provides a method to get motion state data. |
|||
|
|||
:param binocular: A boolean indicating whether the head camera device is binocular or not. |
|||
:param use_hand_tracking: A boolean indicating whether to use hand tracking or use controller tracking. |
|||
:param img_shape: The shape of the image to be processed. |
|||
:param img_shm_name: The name of the shared memory for the image. |
|||
:param return_state: A boolean indicating whether to return the state of the hand or controller. |
|||
:param return_hand_rot: A boolean indicating whether to return the hand rotation data. |
|||
:param cert_file: The path to the certificate file for secure connection. |
|||
:param key_file: The path to the key file for secure connection. |
|||
""" |
|||
self.use_hand_tracking = use_hand_tracking |
|||
self.return_state_data = return_state_data |
|||
self.return_hand_rot_data = return_hand_rot_data |
|||
self.tvuer = TeleVision(binocular, use_hand_tracking, img_shape, img_shm_name, cert_file=cert_file, key_file=key_file, |
|||
ngrok=ngrok, webrtc=webrtc) |
|||
|
|||
def get_motion_state_data(self): |
|||
""" |
|||
Get processed motion state data from the TeleVision instance. |
|||
|
|||
All returned data are transformed from the OpenXR Convention to the (Robot & Unitree) Convention. |
|||
""" |
|||
# Variable Naming Convention below, |
|||
# ┌────────────┬───────────────────────────┬──────────────────────────────────┬────────────────────────────────────┬────────────────────────────────────┐ |
|||
# │left / right│ Bxr │ Brobot │ IPxr │ IPunitree │ |
|||
# │────────────│───────────────────────────│──────────────────────────────────│────────────────────────────────────│────────────────────────────────────│ |
|||
# │ side │ (basis) OpenXR Convention │ (basis) Robot Convention │ (initial pose) OpenXR Convention │ (initial pose) Unitree Convention │ |
|||
# └────────────┴───────────────────────────┴──────────────────────────────────┴────────────────────────────────────┴────────────────────────────────────┘ |
|||
# ┌───────────────────────────────────┬─────────────────────┐ |
|||
# │ world / arm / head / waist │ arm / head / hand │ |
|||
# │───────────────────────────────────│─────────────────────│ |
|||
# │ source frame │ target frame │ |
|||
# └───────────────────────────────────┴─────────────────────┘ |
|||
|
|||
# TeleVision (Vuer) obtains all raw data under the (basis) OpenXR Convention. |
|||
Bxr_world_head, head_pose_is_valid = safe_mat_update(CONST_HEAD_POSE, self.tvuer.head_pose) |
|||
|
|||
if self.use_hand_tracking: |
|||
# 'Arm' pose data follows (basis) OpenXR Convention and (initial pose) OpenXR Arm Convention. |
|||
left_IPxr_Bxr_world_arm, left_arm_is_valid = safe_mat_update(CONST_LEFT_ARM_POSE, self.tvuer.left_arm_pose) |
|||
right_IPxr_Bxr_world_arm, right_arm_is_valid = safe_mat_update(CONST_RIGHT_ARM_POSE, self.tvuer.right_arm_pose) |
|||
|
|||
# Change basis convention |
|||
# From (basis) OpenXR Convention to (basis) Robot Convention: |
|||
# Brobot_Pose = T_{robot}_{openxr} * Bxr_Pose * T_{robot}_{openxr}^T ==> |
|||
# Brobot_Pose = T_{robot}_{openxr} * Bxr_Pose * T_{openxr}_{robot} |
|||
# Reason for right multiply T_OPENXR_ROBOT = fast_mat_inv(T_ROBOT_OPENXR): |
|||
# This is similarity transformation: B = PAP^{-1}, that is B ~ A |
|||
# For example: |
|||
# - For a pose data T_r under the (basis) Robot Convention, left-multiplying Brobot_Pose means: |
|||
# Brobot_Pose * T_r ==> T_{robot}_{openxr} * PoseMatrix_openxr * T_{openxr}_{robot} * T_r |
|||
# - First, transform T_r to the (basis) OpenXR Convention (The function of T_{openxr}_{robot}) |
|||
# - Then, apply the rotation PoseMatrix_openxr in the OpenXR Convention (The function of PoseMatrix_openxr) |
|||
# - Finally, transform back to the Robot Convention (The function of T_{robot}_{openxr}) |
|||
# - This results in the same rotation effect under the Robot Convention as in the OpenXR Convention. |
|||
Brobot_world_head = T_ROBOT_OPENXR @ Bxr_world_head @ T_OPENXR_ROBOT |
|||
left_IPxr_Brobot_world_arm = T_ROBOT_OPENXR @ left_IPxr_Bxr_world_arm @ T_OPENXR_ROBOT |
|||
right_IPxr_Brobot_world_arm = T_ROBOT_OPENXR @ right_IPxr_Bxr_world_arm @ T_OPENXR_ROBOT |
|||
|
|||
# Change initial pose convention |
|||
# From (initial pose) OpenXR Arm Convention to (initial pose) Unitree Humanoid Arm URDF Convention |
|||
# Reason for right multiply (T_TO_UNITREE_HUMANOID_LEFT_ARM) : Rotate 90 degrees counterclockwise about its own x-axis. |
|||
# Reason for right multiply (T_TO_UNITREE_HUMANOID_RIGHT_ARM): Rotate 90 degrees clockwise about its own x-axis. |
|||
left_IPunitree_Brobot_world_arm = left_IPxr_Brobot_world_arm @ (T_TO_UNITREE_HUMANOID_LEFT_ARM if left_arm_is_valid else np.eye(4)) |
|||
right_IPunitree_Brobot_world_arm = right_IPxr_Brobot_world_arm @ (T_TO_UNITREE_HUMANOID_RIGHT_ARM if right_arm_is_valid else np.eye(4)) |
|||
|
|||
# Transfer from WORLD to HEAD coordinate (translation adjustment only) |
|||
left_IPunitree_Brobot_head_arm = left_IPunitree_Brobot_world_arm.copy() |
|||
right_IPunitree_Brobot_head_arm = right_IPunitree_Brobot_world_arm.copy() |
|||
left_IPunitree_Brobot_head_arm[0:3, 3] = left_IPunitree_Brobot_head_arm[0:3, 3] - Brobot_world_head[0:3, 3] |
|||
right_IPunitree_Brobot_head_arm[0:3, 3] = right_IPunitree_Brobot_world_arm[0:3, 3] - Brobot_world_head[0:3, 3] |
|||
|
|||
# =====coordinate origin offset===== |
|||
# The origin of the coordinate for IK Solve is near the WAIST joint motor. You can use teleop/robot_control/robot_arm_ik.py Unit_Test to visualize it. |
|||
# The origin of the coordinate of IPunitree_Brobot_head_arm is HEAD. |
|||
# So it is necessary to translate the origin of IPunitree_Brobot_head_arm from HEAD to WAIST. |
|||
left_IPunitree_Brobot_waist_arm = left_IPunitree_Brobot_head_arm.copy() |
|||
right_IPunitree_Brobot_waist_arm = right_IPunitree_Brobot_head_arm.copy() |
|||
left_IPunitree_Brobot_waist_arm[0, 3] +=0.15 # x |
|||
right_IPunitree_Brobot_waist_arm[0,3] +=0.15 |
|||
left_IPunitree_Brobot_waist_arm[2, 3] +=0.45 # z |
|||
right_IPunitree_Brobot_waist_arm[2,3] +=0.45 |
|||
|
|||
# -----------------------------------hand position---------------------------------------- |
|||
if left_arm_is_valid and right_arm_is_valid: |
|||
# Homogeneous, [xyz] to [xyz1] |
|||
# np.concatenate([25,3]^T,(1,25)) ==> Bxr_world_hand_pos.shape is (4,25) |
|||
# Now under (basis) OpenXR Convention, Bxr_world_hand_pos data like this: |
|||
# [x0 x1 x2 ··· x23 x24] |
|||
# [y0 y1 y1 ··· y23 y24] |
|||
# [z0 z1 z2 ··· z23 z24] |
|||
# [ 1 1 1 ··· 1 1] |
|||
left_IPxr_Bxr_world_hand_pos = np.concatenate([self.tvuer.left_hand_positions.T, np.ones((1, self.tvuer.left_hand_positions.shape[0]))]) |
|||
right_IPxr_Bxr_world_hand_pos = np.concatenate([self.tvuer.right_hand_positions.T, np.ones((1, self.tvuer.right_hand_positions.shape[0]))]) |
|||
|
|||
# Change basis convention |
|||
# From (basis) OpenXR Convention to (basis) Robot Convention |
|||
# Just a change of basis for 3D points. No rotation, only translation. So, no need to right-multiply fast_mat_inv(T_ROBOT_OPENXR). |
|||
left_IPxr_Brobot_world_hand_pos = T_ROBOT_OPENXR @ left_IPxr_Bxr_world_hand_pos |
|||
right_IPxr_Brobot_world_hand_pos = T_ROBOT_OPENXR @ right_IPxr_Bxr_world_hand_pos |
|||
|
|||
# Transfer from WORLD to ARM frame under (basis) Robot Convention: |
|||
# Brobot_{world}_{arm}^T * Brobot_{world}_pos ==> Brobot_{arm}_{world} * Brobot_{world}_pos ==> Brobot_arm_hand_pos, Now it's based on the arm frame. |
|||
left_IPxr_Brobot_arm_hand_pos = fast_mat_inv(left_IPxr_Brobot_world_arm) @ left_IPxr_Brobot_world_hand_pos |
|||
right_IPxr_Brobot_arm_hand_pos = fast_mat_inv(right_IPxr_Brobot_world_arm) @ right_IPxr_Brobot_world_hand_pos |
|||
|
|||
# Change initial pose convention |
|||
# From (initial pose) XR Hand Convention to (initial pose) Unitree Humanoid Hand URDF Convention: |
|||
# T_TO_UNITREE_HAND @ IPxr_Brobot_arm_hand_pos ==> IPunitree_Brobot_arm_hand_pos |
|||
# ((4,4) @ (4,25))[0:3, :].T ==> (4,25)[0:3, :].T ==> (3,25).T ==> (25,3) |
|||
# Now under (initial pose) Unitree Humanoid Hand URDF Convention, matrix shape like this: |
|||
# [x0, y0, z0] |
|||
# [x1, y1, z1] |
|||
# ··· |
|||
# [x23,y23,z23] |
|||
# [x24,y24,z24] |
|||
left_IPunitree_Brobot_arm_hand_pos = (T_TO_UNITREE_HAND @ left_IPxr_Brobot_arm_hand_pos)[0:3, :].T |
|||
right_IPunitree_Brobot_arm_hand_pos = (T_TO_UNITREE_HAND @ right_IPxr_Brobot_arm_hand_pos)[0:3, :].T |
|||
else: |
|||
left_IPunitree_Brobot_arm_hand_pos = np.zeros((25, 3)) |
|||
right_IPunitree_Brobot_arm_hand_pos = np.zeros((25, 3)) |
|||
|
|||
# -----------------------------------hand rotation---------------------------------------- |
|||
if self.return_hand_rot_data: |
|||
left_Bxr_world_hand_rot, left_hand_rot_is_valid = safe_rot_update(CONST_HAND_ROT, self.tvuer.left_hand_orientations) # [25, 3, 3] |
|||
right_Bxr_world_hand_rot, right_hand_rot_is_valid = safe_rot_update(CONST_HAND_ROT, self.tvuer.right_hand_orientations) |
|||
|
|||
if left_hand_rot_is_valid and right_hand_rot_is_valid: |
|||
left_Bxr_arm_hand_rot = np.einsum('ij,njk->nik', left_IPxr_Bxr_world_arm[:3, :3].T, left_Bxr_world_hand_rot) |
|||
right_Bxr_arm_hand_rot = np.einsum('ij,njk->nik', right_IPxr_Bxr_world_arm[:3, :3].T, right_Bxr_world_hand_rot) |
|||
# Change basis convention |
|||
left_Brobot_arm_hand_rot = np.einsum('ij,njk,kl->nil', R_ROBOT_OPENXR, left_Bxr_arm_hand_rot, R_OPENXR_ROBOT) |
|||
right_Brobot_arm_hand_rot = np.einsum('ij,njk,kl->nil', R_ROBOT_OPENXR, right_Bxr_arm_hand_rot, R_OPENXR_ROBOT) |
|||
else: |
|||
left_Brobot_arm_hand_rot = left_Bxr_world_hand_rot |
|||
right_Brobot_arm_hand_rot = right_Bxr_world_hand_rot |
|||
else: |
|||
left_Brobot_arm_hand_rot = None |
|||
right_Brobot_arm_hand_rot = None |
|||
|
|||
if self.return_state_data: |
|||
hand_state = TeleStateData( |
|||
left_pinch_state=self.tvuer.left_hand_pinch_state, |
|||
left_squeeze_state=self.tvuer.left_hand_squeeze_state, |
|||
left_squeeze_value=self.tvuer.left_hand_squeeze_value, |
|||
right_pinch_state=self.tvuer.right_hand_pinch_state, |
|||
right_squeeze_state=self.tvuer.right_hand_squeeze_state, |
|||
right_squeeze_value=self.tvuer.right_hand_squeeze_value, |
|||
) |
|||
else: |
|||
hand_state = None |
|||
|
|||
return TeleData( |
|||
head_pose=Brobot_world_head, |
|||
left_arm_pose=left_IPunitree_Brobot_waist_arm, |
|||
right_arm_pose=right_IPunitree_Brobot_waist_arm, |
|||
left_hand_pos=left_IPunitree_Brobot_arm_hand_pos, |
|||
right_hand_pos=right_IPunitree_Brobot_arm_hand_pos, |
|||
left_hand_rot=left_Brobot_arm_hand_rot, |
|||
right_hand_rot=right_Brobot_arm_hand_rot, |
|||
left_pinch_value=self.tvuer.left_hand_pinch_value * 100.0, |
|||
right_pinch_value=self.tvuer.right_hand_pinch_value * 100.0, |
|||
tele_state=hand_state |
|||
) |
|||
else: |
|||
# Controller pose data directly follows the (initial pose) Unitree Humanoid Arm URDF Convention (thus no transform is needed). |
|||
left_IPunitree_Bxr_world_arm, left_arm_is_valid = safe_mat_update(CONST_LEFT_ARM_POSE, self.tvuer.left_arm_pose) |
|||
right_IPunitree_Bxr_world_arm, right_arm_is_valid = safe_mat_update(CONST_RIGHT_ARM_POSE, self.tvuer.right_arm_pose) |
|||
|
|||
# Change basis convention |
|||
Brobot_world_head = T_ROBOT_OPENXR @ Bxr_world_head @ T_OPENXR_ROBOT |
|||
left_IPunitree_Brobot_world_arm = T_ROBOT_OPENXR @ left_IPunitree_Bxr_world_arm @ T_OPENXR_ROBOT |
|||
right_IPunitree_Brobot_world_arm = T_ROBOT_OPENXR @ right_IPunitree_Bxr_world_arm @ T_OPENXR_ROBOT |
|||
|
|||
# Transfer from WORLD to HEAD coordinate (translation adjustment only) |
|||
left_IPunitree_Brobot_head_arm = left_IPunitree_Brobot_world_arm.copy() |
|||
right_IPunitree_Brobot_head_arm = right_IPunitree_Brobot_world_arm.copy() |
|||
left_IPunitree_Brobot_head_arm[0:3, 3] = left_IPunitree_Brobot_head_arm[0:3, 3] - Brobot_world_head[0:3, 3] |
|||
right_IPunitree_Brobot_head_arm[0:3, 3] = right_IPunitree_Brobot_head_arm[0:3, 3] - Brobot_world_head[0:3, 3] |
|||
|
|||
# =====coordinate origin offset===== |
|||
# The origin of the coordinate for IK Solve is near the WAIST joint motor. You can use teleop/robot_control/robot_arm_ik.py Unit_Test to check it. |
|||
# The origin of the coordinate of IPunitree_Brobot_head_arm is HEAD. |
|||
# So it is necessary to translate the origin of IPunitree_Brobot_head_arm from HEAD to WAIST. |
|||
left_IPunitree_Brobot_waist_arm = left_IPunitree_Brobot_head_arm.copy() |
|||
right_IPunitree_Brobot_waist_arm = right_IPunitree_Brobot_head_arm.copy() |
|||
left_IPunitree_Brobot_waist_arm[0, 3] +=0.15 # x |
|||
right_IPunitree_Brobot_waist_arm[0,3] +=0.15 |
|||
left_IPunitree_Brobot_waist_arm[2, 3] +=0.45 # z |
|||
right_IPunitree_Brobot_waist_arm[2,3] +=0.45 |
|||
# left_IPunitree_Brobot_waist_arm[1, 3] +=0.02 # y |
|||
# right_IPunitree_Brobot_waist_arm[1,3] +=0.02 |
|||
|
|||
# return data |
|||
if self.return_state_data: |
|||
controller_state = TeleStateData( |
|||
left_trigger_state=self.tvuer.left_controller_trigger_state, |
|||
left_squeeze_ctrl_state=self.tvuer.left_controller_squeeze_state, |
|||
left_squeeze_ctrl_value=self.tvuer.left_controller_squeeze_value, |
|||
left_thumbstick_state=self.tvuer.left_controller_thumbstick_state, |
|||
left_thumbstick_value=self.tvuer.left_controller_thumbstick_value, |
|||
left_aButton=self.tvuer.left_controller_aButton, |
|||
left_bButton=self.tvuer.left_controller_bButton, |
|||
right_trigger_state=self.tvuer.right_controller_trigger_state, |
|||
right_squeeze_ctrl_state=self.tvuer.right_controller_squeeze_state, |
|||
right_squeeze_ctrl_value=self.tvuer.right_controller_squeeze_value, |
|||
right_thumbstick_state=self.tvuer.right_controller_thumbstick_state, |
|||
right_thumbstick_value=self.tvuer.right_controller_thumbstick_value, |
|||
right_aButton=self.tvuer.right_controller_aButton, |
|||
right_bButton=self.tvuer.right_controller_bButton, |
|||
) |
|||
else: |
|||
controller_state = None |
|||
|
|||
return TeleData( |
|||
head_pose=Brobot_world_head, |
|||
left_arm_pose=left_IPunitree_Brobot_waist_arm, |
|||
right_arm_pose=right_IPunitree_Brobot_waist_arm, |
|||
left_trigger_value=10.0 - self.tvuer.left_controller_trigger_value * 10, |
|||
right_trigger_value=10.0 - self.tvuer.right_controller_trigger_value * 10, |
|||
tele_state=controller_state |
|||
) |
|||
@ -0,0 +1 @@ |
|||
Subproject commit d7753d38c9ff11f80bafea6cd168351fd3db9b0e |
|||
@ -1,22 +0,0 @@ |
|||
cff-version: 1.2.0 |
|||
authors: |
|||
- family-names: "Qin" |
|||
given-names: "Yuzhe" |
|||
- family-names: "Yang" |
|||
given-names: "Wei" |
|||
- family-names: "Huang" |
|||
given-names: "Binghao" |
|||
- family-names: "Van Wyk" |
|||
given-names: "Karl" |
|||
- family-names: "Su" |
|||
given-names: "Hao" |
|||
- family-names: "Wang" |
|||
given-names: "Xiaolong" |
|||
- family-names: "Chao" |
|||
given-names: "Yu-Wei" |
|||
- family-names: "Fox" |
|||
given-names: "Dieter" |
|||
date-released: 2023-10-25 |
|||
title: "AnyTeleop" |
|||
message: "Thanks for using dex-retargeting. If you use this software, please cite it as below." |
|||
url: "https://github.com/dexsuite/dex-retargeting" |
|||
@ -1,21 +0,0 @@ |
|||
The MIT License (MIT) |
|||
|
|||
Copyright (c) 2023 Yuzhe Qin |
|||
|
|||
Permission is hereby granted, free of charge, to any person obtaining a copy |
|||
of this software and associated documentation files (the "Software"), to deal |
|||
in the Software without restriction, including without limitation the rights |
|||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
|||
copies of the Software, and to permit persons to whom the Software is |
|||
furnished to do so, subject to the following conditions: |
|||
|
|||
The above copyright notice and this permission notice shall be included in all |
|||
copies or substantial portions of the Software. |
|||
|
|||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
|||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
|||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
|||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
|||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
|||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE |
|||
SOFTWARE. |
|||
@ -1 +0,0 @@ |
|||
__version__ = "0.4.6" |
|||
@ -1,85 +0,0 @@ |
|||
import enum |
|||
from pathlib import Path |
|||
from typing import Optional |
|||
|
|||
import numpy as np |
|||
|
|||
OPERATOR2MANO_RIGHT = np.array( |
|||
[ |
|||
[0, 0, -1], |
|||
[-1, 0, 0], |
|||
[0, 1, 0], |
|||
] |
|||
) |
|||
|
|||
OPERATOR2MANO_LEFT = np.array( |
|||
[ |
|||
[0, 0, -1], |
|||
[1, 0, 0], |
|||
[0, -1, 0], |
|||
] |
|||
) |
|||
|
|||
|
|||
class RobotName(enum.Enum): |
|||
allegro = enum.auto() |
|||
shadow = enum.auto() |
|||
svh = enum.auto() |
|||
leap = enum.auto() |
|||
ability = enum.auto() |
|||
inspire = enum.auto() |
|||
panda = enum.auto() |
|||
|
|||
|
|||
class RetargetingType(enum.Enum): |
|||
vector = enum.auto() # For teleoperation, no finger closing prior |
|||
position = enum.auto() # For offline data processing, especially hand-object interaction data |
|||
dexpilot = enum.auto() # For teleoperation, with finger closing prior |
|||
|
|||
|
|||
class HandType(enum.Enum): |
|||
right = enum.auto() |
|||
left = enum.auto() |
|||
|
|||
|
|||
ROBOT_NAME_MAP = { |
|||
RobotName.allegro: "allegro_hand", |
|||
RobotName.shadow: "shadow_hand", |
|||
RobotName.svh: "schunk_svh_hand", |
|||
RobotName.leap: "leap_hand", |
|||
RobotName.ability: "ability_hand", |
|||
RobotName.inspire: "inspire_hand", |
|||
RobotName.panda: "panda_gripper", |
|||
} |
|||
|
|||
ROBOT_NAMES = list(ROBOT_NAME_MAP.keys()) |
|||
|
|||
|
|||
def get_default_config_path( |
|||
robot_name: RobotName, retargeting_type: RetargetingType, hand_type: HandType |
|||
) -> Optional[Path]: |
|||
config_path = Path(__file__).parent / "configs" |
|||
if retargeting_type is RetargetingType.position: |
|||
config_path = config_path / "offline" |
|||
else: |
|||
config_path = config_path / "teleop" |
|||
|
|||
robot_name_str = ROBOT_NAME_MAP[robot_name] |
|||
hand_type_str = hand_type.name |
|||
if "gripper" in robot_name_str: # For gripper robots, only use gripper config file. |
|||
if retargeting_type == RetargetingType.dexpilot: |
|||
config_name = f"{robot_name_str}_dexpilot.yml" |
|||
else: |
|||
config_name = f"{robot_name_str}.yml" |
|||
else: |
|||
if retargeting_type == RetargetingType.dexpilot: |
|||
config_name = f"{robot_name_str}_{hand_type_str}_dexpilot.yml" |
|||
else: |
|||
config_name = f"{robot_name_str}_{hand_type_str}.yml" |
|||
return config_path / config_name |
|||
|
|||
|
|||
OPERATOR2MANO = { |
|||
HandType.right: OPERATOR2MANO_RIGHT, |
|||
HandType.left: OPERATOR2MANO_LEFT, |
|||
} |
|||
@ -1,102 +0,0 @@ |
|||
from abc import abstractmethod |
|||
from typing import List |
|||
|
|||
import numpy as np |
|||
|
|||
from .robot_wrapper import RobotWrapper |
|||
|
|||
|
|||
class KinematicAdaptor: |
|||
def __init__(self, robot: RobotWrapper, target_joint_names: List[str]): |
|||
self.robot = robot |
|||
self.target_joint_names = target_joint_names |
|||
|
|||
# Index mapping |
|||
self.idx_pin2target = np.array([robot.get_joint_index(n) for n in target_joint_names]) |
|||
|
|||
@abstractmethod |
|||
def forward_qpos(self, qpos: np.ndarray) -> np.ndarray: |
|||
""" |
|||
Adapt the joint position for different kinematics constraints. |
|||
Note that the joint order of this qpos is consistent with pinocchio |
|||
Args: |
|||
qpos: the pinocchio qpos |
|||
|
|||
Returns: the adapted qpos with the same shape as input |
|||
|
|||
""" |
|||
pass |
|||
|
|||
@abstractmethod |
|||
def backward_jacobian(self, jacobian: np.ndarray) -> np.ndarray: |
|||
""" |
|||
Adapt the jacobian for different kinematics applications. |
|||
Note that the joint order of this Jacobian is consistent with pinocchio |
|||
Args: |
|||
jacobian: the original jacobian |
|||
|
|||
Returns: the adapted jacobian with the same shape as input |
|||
|
|||
""" |
|||
pass |
|||
|
|||
|
|||
class MimicJointKinematicAdaptor(KinematicAdaptor): |
|||
def __init__( |
|||
self, |
|||
robot: RobotWrapper, |
|||
target_joint_names: List[str], |
|||
source_joint_names: List[str], |
|||
mimic_joint_names: List[str], |
|||
multipliers: List[float], |
|||
offsets: List[float], |
|||
): |
|||
super().__init__(robot, target_joint_names) |
|||
|
|||
self.multipliers = np.array(multipliers) |
|||
self.offsets = np.array(offsets) |
|||
|
|||
# Joint name check |
|||
union_set = set(mimic_joint_names).intersection(set(target_joint_names)) |
|||
if len(union_set) > 0: |
|||
raise ValueError( |
|||
f"Mimic joint should not be one of the target joints.\n" |
|||
f"Mimic joints: {mimic_joint_names}.\n" |
|||
f"Target joints: {target_joint_names}\n" |
|||
f"You need to specify the target joint names explicitly in your retargeting config" |
|||
f" for robot with mimic joint constraints: {target_joint_names}" |
|||
) |
|||
|
|||
# Indices in the pinocchio |
|||
self.idx_pin2source = np.array([robot.get_joint_index(name) for name in source_joint_names]) |
|||
self.idx_pin2mimic = np.array([robot.get_joint_index(name) for name in mimic_joint_names]) |
|||
|
|||
# Indices in the output results |
|||
self.idx_target2source = np.array([self.target_joint_names.index(n) for n in source_joint_names]) |
|||
|
|||
# Dimension check |
|||
len_source, len_mimic = self.idx_target2source.shape[0], self.idx_pin2mimic.shape[0] |
|||
len_mul, len_offset = self.multipliers.shape[0], self.offsets.shape[0] |
|||
if not (len_mimic == len_source == len_mul == len_offset): |
|||
raise ValueError( |
|||
f"Mimic joints setting dimension mismatch.\n" |
|||
f"Source joints: {len_source}, mimic joints: {len_mimic}, multiplier: {len_mul}, offset: {len_offset}" |
|||
) |
|||
self.num_active_joints = len(robot.dof_joint_names) - len_mimic |
|||
|
|||
# Uniqueness check |
|||
if len(mimic_joint_names) != len(np.unique(mimic_joint_names)): |
|||
raise ValueError(f"Redundant mimic joint names: {mimic_joint_names}") |
|||
|
|||
def forward_qpos(self, pin_qpos: np.ndarray) -> np.ndarray: |
|||
mimic_qpos = pin_qpos[self.idx_pin2source] * self.multipliers + self.offsets |
|||
pin_qpos[self.idx_pin2mimic] = mimic_qpos |
|||
return pin_qpos |
|||
|
|||
def backward_jacobian(self, jacobian: np.ndarray) -> np.ndarray: |
|||
target_jacobian = jacobian[..., self.idx_pin2target] |
|||
mimic_joint_jacobian = jacobian[..., self.idx_pin2mimic] * self.multipliers |
|||
|
|||
for i, index in enumerate(self.idx_target2source): |
|||
target_jacobian[..., index] += mimic_joint_jacobian[..., i] |
|||
return target_jacobian |
|||
@ -1,516 +0,0 @@ |
|||
from abc import abstractmethod |
|||
from typing import List, Optional |
|||
|
|||
import nlopt |
|||
import numpy as np |
|||
import torch |
|||
|
|||
from .kinematics_adaptor import KinematicAdaptor, MimicJointKinematicAdaptor |
|||
from .robot_wrapper import RobotWrapper |
|||
|
|||
|
|||
class Optimizer: |
|||
retargeting_type = "BASE" |
|||
|
|||
def __init__( |
|||
self, |
|||
robot: RobotWrapper, |
|||
target_joint_names: List[str], |
|||
target_link_human_indices: np.ndarray, |
|||
): |
|||
self.robot = robot |
|||
self.num_joints = robot.dof |
|||
|
|||
joint_names = robot.dof_joint_names |
|||
idx_pin2target = [] |
|||
for target_joint_name in target_joint_names: |
|||
if target_joint_name not in joint_names: |
|||
raise ValueError(f"Joint {target_joint_name} given does not appear to be in robot XML.") |
|||
idx_pin2target.append(joint_names.index(target_joint_name)) |
|||
self.target_joint_names = target_joint_names |
|||
self.idx_pin2target = np.array(idx_pin2target) |
|||
|
|||
self.idx_pin2fixed = np.array([i for i in range(robot.dof) if i not in idx_pin2target], dtype=int) |
|||
self.opt = nlopt.opt(nlopt.LD_SLSQP, len(idx_pin2target)) |
|||
self.opt_dof = len(idx_pin2target) # This dof includes the mimic joints |
|||
|
|||
# Target |
|||
self.target_link_human_indices = target_link_human_indices |
|||
|
|||
# Free joint |
|||
link_names = robot.link_names |
|||
self.has_free_joint = len([name for name in link_names if "dummy" in name]) >= 6 |
|||
|
|||
# Kinematics adaptor |
|||
self.adaptor: Optional[KinematicAdaptor] = None |
|||
|
|||
def set_joint_limit(self, joint_limits: np.ndarray, epsilon=1e-3): |
|||
if joint_limits.shape != (self.opt_dof, 2): |
|||
raise ValueError(f"Expect joint limits have shape: {(self.opt_dof, 2)}, but get {joint_limits.shape}") |
|||
self.opt.set_lower_bounds((joint_limits[:, 0] - epsilon).tolist()) |
|||
self.opt.set_upper_bounds((joint_limits[:, 1] + epsilon).tolist()) |
|||
|
|||
def get_link_indices(self, target_link_names): |
|||
return [self.robot.get_link_index(link_name) for link_name in target_link_names] |
|||
|
|||
def set_kinematic_adaptor(self, adaptor: KinematicAdaptor): |
|||
self.adaptor = adaptor |
|||
|
|||
# Remove mimic joints from fixed joint list |
|||
if isinstance(adaptor, MimicJointKinematicAdaptor): |
|||
fixed_idx = self.idx_pin2fixed |
|||
mimic_idx = adaptor.idx_pin2mimic |
|||
new_fixed_id = np.array([x for x in fixed_idx if x not in mimic_idx], dtype=int) |
|||
self.idx_pin2fixed = new_fixed_id |
|||
|
|||
def retarget(self, ref_value, fixed_qpos, last_qpos): |
|||
""" |
|||
Compute the retargeting results using non-linear optimization |
|||
Args: |
|||
ref_value: the reference value in cartesian space as input, different optimizer has different reference |
|||
fixed_qpos: the fixed value (not optimized) in retargeting, consistent with self.fixed_joint_names |
|||
last_qpos: the last retargeting results or initial value, consistent with function return |
|||
|
|||
Returns: joint position of robot, the joint order and dim is consistent with self.target_joint_names |
|||
|
|||
""" |
|||
if len(fixed_qpos) != len(self.idx_pin2fixed): |
|||
raise ValueError( |
|||
f"Optimizer has {len(self.idx_pin2fixed)} joints but non_target_qpos {fixed_qpos} is given" |
|||
) |
|||
objective_fn = self.get_objective_function(ref_value, fixed_qpos, np.array(last_qpos).astype(np.float32)) |
|||
|
|||
self.opt.set_min_objective(objective_fn) |
|||
try: |
|||
qpos = self.opt.optimize(last_qpos) |
|||
return np.array(qpos, dtype=np.float32) |
|||
except RuntimeError as e: |
|||
print(e) |
|||
return np.array(last_qpos, dtype=np.float32) |
|||
|
|||
@abstractmethod |
|||
def get_objective_function(self, ref_value: np.ndarray, fixed_qpos: np.ndarray, last_qpos: np.ndarray): |
|||
pass |
|||
|
|||
@property |
|||
def fixed_joint_names(self): |
|||
joint_names = self.robot.dof_joint_names |
|||
return [joint_names[i] for i in self.idx_pin2fixed] |
|||
|
|||
|
|||
class PositionOptimizer(Optimizer): |
|||
retargeting_type = "POSITION" |
|||
|
|||
def __init__( |
|||
self, |
|||
robot: RobotWrapper, |
|||
target_joint_names: List[str], |
|||
target_link_names: List[str], |
|||
target_link_human_indices: np.ndarray, |
|||
huber_delta=0.02, |
|||
norm_delta=4e-3, |
|||
): |
|||
super().__init__(robot, target_joint_names, target_link_human_indices) |
|||
self.body_names = target_link_names |
|||
self.huber_loss = torch.nn.SmoothL1Loss(beta=huber_delta) |
|||
self.norm_delta = norm_delta |
|||
|
|||
# Sanity check and cache link indices |
|||
self.target_link_indices = self.get_link_indices(target_link_names) |
|||
|
|||
self.opt.set_ftol_abs(1e-5) |
|||
|
|||
def get_objective_function(self, target_pos: np.ndarray, fixed_qpos: np.ndarray, last_qpos: np.ndarray): |
|||
qpos = np.zeros(self.num_joints) |
|||
qpos[self.idx_pin2fixed] = fixed_qpos |
|||
torch_target_pos = torch.as_tensor(target_pos) |
|||
torch_target_pos.requires_grad_(False) |
|||
|
|||
def objective(x: np.ndarray, grad: np.ndarray) -> float: |
|||
qpos[self.idx_pin2target] = x |
|||
|
|||
# Kinematics forwarding for qpos |
|||
if self.adaptor is not None: |
|||
qpos[:] = self.adaptor.forward_qpos(qpos)[:] |
|||
|
|||
self.robot.compute_forward_kinematics(qpos) |
|||
target_link_poses = [self.robot.get_link_pose(index) for index in self.target_link_indices] |
|||
body_pos = np.stack([pose[:3, 3] for pose in target_link_poses], axis=0) # (n ,3) |
|||
|
|||
# Torch computation for accurate loss and grad |
|||
torch_body_pos = torch.as_tensor(body_pos) |
|||
torch_body_pos.requires_grad_() |
|||
|
|||
# Loss term for kinematics retargeting based on 3D position error |
|||
huber_distance = self.huber_loss(torch_body_pos, torch_target_pos) |
|||
result = huber_distance.cpu().detach().item() |
|||
|
|||
if grad.size > 0: |
|||
jacobians = [] |
|||
for i, index in enumerate(self.target_link_indices): |
|||
link_body_jacobian = self.robot.compute_single_link_local_jacobian(qpos, index)[:3, ...] |
|||
link_pose = target_link_poses[i] |
|||
link_rot = link_pose[:3, :3] |
|||
link_kinematics_jacobian = link_rot @ link_body_jacobian |
|||
jacobians.append(link_kinematics_jacobian) |
|||
|
|||
# Note: the joint order in this jacobian is consistent pinocchio |
|||
jacobians = np.stack(jacobians, axis=0) |
|||
huber_distance.backward() |
|||
grad_pos = torch_body_pos.grad.cpu().numpy()[:, None, :] |
|||
|
|||
# Convert the jacobian from pinocchio order to target order |
|||
if self.adaptor is not None: |
|||
jacobians = self.adaptor.backward_jacobian(jacobians) |
|||
else: |
|||
jacobians = jacobians[..., self.idx_pin2target] |
|||
|
|||
# Compute the gradient to the qpos |
|||
grad_qpos = np.matmul(grad_pos, jacobians) |
|||
grad_qpos = grad_qpos.mean(1).sum(0) |
|||
grad_qpos += 2 * self.norm_delta * (x - last_qpos) |
|||
|
|||
grad[:] = grad_qpos[:] |
|||
|
|||
return result |
|||
|
|||
return objective |
|||
|
|||
|
|||
class VectorOptimizer(Optimizer): |
|||
retargeting_type = "VECTOR" |
|||
|
|||
def __init__( |
|||
self, |
|||
robot: RobotWrapper, |
|||
target_joint_names: List[str], |
|||
target_origin_link_names: List[str], |
|||
target_task_link_names: List[str], |
|||
target_link_human_indices: np.ndarray, |
|||
huber_delta=0.02, |
|||
norm_delta=4e-3, |
|||
scaling=1.0, |
|||
): |
|||
super().__init__(robot, target_joint_names, target_link_human_indices) |
|||
self.origin_link_names = target_origin_link_names |
|||
self.task_link_names = target_task_link_names |
|||
self.huber_loss = torch.nn.SmoothL1Loss(beta=huber_delta, reduction="mean") |
|||
self.norm_delta = norm_delta |
|||
self.scaling = scaling |
|||
|
|||
# Computation cache for better performance |
|||
# For one link used in multiple vectors, e.g. hand palm, we do not want to compute it multiple times |
|||
self.computed_link_names = list(set(target_origin_link_names).union(set(target_task_link_names))) |
|||
self.origin_link_indices = torch.tensor( |
|||
[self.computed_link_names.index(name) for name in target_origin_link_names] |
|||
) |
|||
self.task_link_indices = torch.tensor([self.computed_link_names.index(name) for name in target_task_link_names]) |
|||
|
|||
# Cache link indices that will involve in kinematics computation |
|||
self.computed_link_indices = self.get_link_indices(self.computed_link_names) |
|||
|
|||
self.opt.set_ftol_abs(1e-6) |
|||
|
|||
def get_objective_function(self, target_vector: np.ndarray, fixed_qpos: np.ndarray, last_qpos: np.ndarray): |
|||
qpos = np.zeros(self.num_joints) |
|||
qpos[self.idx_pin2fixed] = fixed_qpos |
|||
torch_target_vec = torch.as_tensor(target_vector) * self.scaling |
|||
torch_target_vec.requires_grad_(False) |
|||
|
|||
def objective(x: np.ndarray, grad: np.ndarray) -> float: |
|||
qpos[self.idx_pin2target] = x |
|||
|
|||
# Kinematics forwarding for qpos |
|||
if self.adaptor is not None: |
|||
qpos[:] = self.adaptor.forward_qpos(qpos)[:] |
|||
|
|||
self.robot.compute_forward_kinematics(qpos) |
|||
target_link_poses = [self.robot.get_link_pose(index) for index in self.computed_link_indices] |
|||
body_pos = np.array([pose[:3, 3] for pose in target_link_poses]) |
|||
|
|||
# Torch computation for accurate loss and grad |
|||
torch_body_pos = torch.as_tensor(body_pos) |
|||
torch_body_pos.requires_grad_() |
|||
|
|||
# Index link for computation |
|||
origin_link_pos = torch_body_pos[self.origin_link_indices, :] |
|||
task_link_pos = torch_body_pos[self.task_link_indices, :] |
|||
robot_vec = task_link_pos - origin_link_pos |
|||
|
|||
# Loss term for kinematics retargeting based on 3D position error |
|||
vec_dist = torch.norm(robot_vec - torch_target_vec, dim=1, keepdim=False) |
|||
huber_distance = self.huber_loss(vec_dist, torch.zeros_like(vec_dist)) |
|||
result = huber_distance.cpu().detach().item() |
|||
|
|||
if grad.size > 0: |
|||
jacobians = [] |
|||
for i, index in enumerate(self.computed_link_indices): |
|||
link_body_jacobian = self.robot.compute_single_link_local_jacobian(qpos, index)[:3, ...] |
|||
link_pose = target_link_poses[i] |
|||
link_rot = link_pose[:3, :3] |
|||
link_kinematics_jacobian = link_rot @ link_body_jacobian |
|||
jacobians.append(link_kinematics_jacobian) |
|||
|
|||
# Note: the joint order in this jacobian is consistent pinocchio |
|||
jacobians = np.stack(jacobians, axis=0) |
|||
huber_distance.backward() |
|||
grad_pos = torch_body_pos.grad.cpu().numpy()[:, None, :] |
|||
|
|||
# Convert the jacobian from pinocchio order to target order |
|||
if self.adaptor is not None: |
|||
jacobians = self.adaptor.backward_jacobian(jacobians) |
|||
else: |
|||
jacobians = jacobians[..., self.idx_pin2target] |
|||
|
|||
grad_qpos = np.matmul(grad_pos, np.array(jacobians)) |
|||
grad_qpos = grad_qpos.mean(1).sum(0) |
|||
grad_qpos += 2 * self.norm_delta * (x - last_qpos) |
|||
|
|||
grad[:] = grad_qpos[:] |
|||
|
|||
return result |
|||
|
|||
return objective |
|||
|
|||
|
|||
class DexPilotOptimizer(Optimizer): |
|||
"""Retargeting optimizer using the method proposed in DexPilot |
|||
|
|||
This is a broader adaptation of the original optimizer delineated in the DexPilot paper. |
|||
While the initial DexPilot study focused solely on the four-fingered Allegro Hand, this version of the optimizer |
|||
embraces the same principles for both four-fingered and five-fingered hands. It projects the distance between the |
|||
thumb and the other fingers to facilitate more stable grasping. |
|||
Reference: https://arxiv.org/abs/1910.03135 |
|||
|
|||
Args: |
|||
robot: |
|||
target_joint_names: |
|||
finger_tip_link_names: |
|||
wrist_link_name: |
|||
gamma: |
|||
project_dist: |
|||
escape_dist: |
|||
eta1: |
|||
eta2: |
|||
scaling: |
|||
""" |
|||
|
|||
retargeting_type = "DEXPILOT" |
|||
|
|||
def __init__( |
|||
self, |
|||
robot: RobotWrapper, |
|||
target_joint_names: List[str], |
|||
finger_tip_link_names: List[str], |
|||
wrist_link_name: str, |
|||
target_link_human_indices: Optional[np.ndarray] = None, |
|||
huber_delta=0.03, |
|||
norm_delta=4e-3, |
|||
# DexPilot parameters |
|||
# gamma=2.5e-3, |
|||
project_dist=0.03, |
|||
escape_dist=0.05, |
|||
eta1=1e-4, |
|||
eta2=3e-2, |
|||
scaling=1.0, |
|||
): |
|||
if len(finger_tip_link_names) < 2 or len(finger_tip_link_names) > 5: |
|||
raise ValueError( |
|||
f"DexPilot optimizer can only be applied to hands with 2 to 5 fingers, but got " |
|||
f"{len(finger_tip_link_names)} fingers." |
|||
) |
|||
self.num_fingers = len(finger_tip_link_names) |
|||
|
|||
origin_link_index, task_link_index = self.generate_link_indices(self.num_fingers) |
|||
|
|||
if target_link_human_indices is None: |
|||
logical_indices = np.stack([origin_link_index, task_link_index], axis=0) |
|||
target_link_human_indices = np.where( |
|||
logical_indices > 0, |
|||
logical_indices * 5 - 1, |
|||
0 |
|||
).astype(int) |
|||
link_names = [wrist_link_name] + finger_tip_link_names |
|||
target_origin_link_names = [link_names[index] for index in origin_link_index] |
|||
target_task_link_names = [link_names[index] for index in task_link_index] |
|||
|
|||
super().__init__(robot, target_joint_names, target_link_human_indices) |
|||
self.origin_link_names = target_origin_link_names |
|||
self.task_link_names = target_task_link_names |
|||
self.scaling = scaling |
|||
self.huber_loss = torch.nn.SmoothL1Loss(beta=huber_delta, reduction="none") |
|||
self.norm_delta = norm_delta |
|||
|
|||
# DexPilot parameters |
|||
self.project_dist = project_dist |
|||
self.escape_dist = escape_dist |
|||
self.eta1 = eta1 |
|||
self.eta2 = eta2 |
|||
|
|||
# Computation cache for better performance |
|||
# For one link used in multiple vectors, e.g. hand palm, we do not want to compute it multiple times |
|||
self.computed_link_names = list(set(target_origin_link_names).union(set(target_task_link_names))) |
|||
self.origin_link_indices = torch.tensor( |
|||
[self.computed_link_names.index(name) for name in target_origin_link_names] |
|||
) |
|||
self.task_link_indices = torch.tensor([self.computed_link_names.index(name) for name in target_task_link_names]) |
|||
|
|||
# Sanity check and cache link indices |
|||
self.computed_link_indices = self.get_link_indices(self.computed_link_names) |
|||
|
|||
self.opt.set_ftol_abs(1e-6) |
|||
|
|||
# DexPilot cache |
|||
self.projected, self.s2_project_index_origin, self.s2_project_index_task, self.projected_dist = ( |
|||
self.set_dexpilot_cache(self.num_fingers, eta1, eta2) |
|||
) |
|||
|
|||
@staticmethod |
|||
def generate_link_indices(num_fingers): |
|||
""" |
|||
Example: |
|||
>>> generate_link_indices(4) |
|||
([2, 3, 4, 3, 4, 4, 0, 0, 0, 0], [1, 1, 1, 2, 2, 3, 1, 2, 3, 4]) |
|||
""" |
|||
origin_link_index = [] |
|||
task_link_index = [] |
|||
|
|||
# S1:Add indices for connections between fingers |
|||
for i in range(1, num_fingers): |
|||
for j in range(i + 1, num_fingers + 1): |
|||
origin_link_index.append(j) |
|||
task_link_index.append(i) |
|||
|
|||
# S2:Add indices for connections to the base (0) |
|||
for i in range(1, num_fingers + 1): |
|||
origin_link_index.append(0) |
|||
task_link_index.append(i) |
|||
|
|||
return origin_link_index, task_link_index |
|||
|
|||
@staticmethod |
|||
def set_dexpilot_cache(num_fingers, eta1, eta2): |
|||
""" |
|||
Example: |
|||
>>> set_dexpilot_cache(4, 0.1, 0.2) |
|||
(array([False, False, False, False, False, False]), |
|||
[1, 2, 2], |
|||
[0, 0, 1], |
|||
array([0.1, 0.1, 0.1, 0.2, 0.2, 0.2])) |
|||
""" |
|||
projected = np.zeros(num_fingers * (num_fingers - 1) // 2, dtype=bool) |
|||
|
|||
s2_project_index_origin = [] |
|||
s2_project_index_task = [] |
|||
for i in range(0, num_fingers - 2): |
|||
for j in range(i + 1, num_fingers - 1): |
|||
s2_project_index_origin.append(j) |
|||
s2_project_index_task.append(i) |
|||
|
|||
projected_dist = np.array([eta1] * (num_fingers - 1) + [eta2] * ((num_fingers - 1) * (num_fingers - 2) // 2)) |
|||
|
|||
return projected, s2_project_index_origin, s2_project_index_task, projected_dist |
|||
|
|||
def get_objective_function(self, target_vector: np.ndarray, fixed_qpos: np.ndarray, last_qpos: np.ndarray): |
|||
qpos = np.zeros(self.num_joints) |
|||
qpos[self.idx_pin2fixed] = fixed_qpos |
|||
|
|||
len_proj = len(self.projected) |
|||
len_s2 = len(self.s2_project_index_task) |
|||
len_s1 = len_proj - len_s2 |
|||
|
|||
# Update projection indicator |
|||
target_vec_dist = np.linalg.norm(target_vector[:len_proj], axis=1) |
|||
self.projected[:len_s1][target_vec_dist[0:len_s1] < self.project_dist] = True |
|||
self.projected[:len_s1][target_vec_dist[0:len_s1] > self.escape_dist] = False |
|||
self.projected[len_s1:len_proj] = np.logical_and( |
|||
self.projected[:len_s1][self.s2_project_index_origin], self.projected[:len_s1][self.s2_project_index_task] |
|||
) |
|||
self.projected[len_s1:len_proj] = np.logical_and( |
|||
self.projected[len_s1:len_proj], target_vec_dist[len_s1:len_proj] <= 0.03 |
|||
) |
|||
|
|||
# Update weight vector |
|||
normal_weight = np.ones(len_proj, dtype=np.float32) * 1 |
|||
high_weight = np.array([200] * len_s1 + [400] * len_s2, dtype=np.float32) |
|||
weight = np.where(self.projected, high_weight, normal_weight) |
|||
|
|||
# We change the weight to 10 instead of 1 here, for vector originate from wrist to fingertips |
|||
# This ensures better intuitive mapping due wrong pose detection |
|||
weight = torch.from_numpy( |
|||
np.concatenate([weight, np.ones(self.num_fingers, dtype=np.float32) * len_proj + self.num_fingers]) |
|||
) |
|||
|
|||
# Compute reference distance vector |
|||
normal_vec = target_vector * self.scaling # (10, 3) |
|||
dir_vec = target_vector[:len_proj] / (target_vec_dist[:, None] + 1e-6) # (6, 3) |
|||
projected_vec = dir_vec * self.projected_dist[:, None] # (6, 3) |
|||
|
|||
# Compute final reference vector |
|||
reference_vec = np.where(self.projected[:, None], projected_vec, normal_vec[:len_proj]) # (6, 3) |
|||
reference_vec = np.concatenate([reference_vec, normal_vec[len_proj:]], axis=0) # (10, 3) |
|||
torch_target_vec = torch.as_tensor(reference_vec, dtype=torch.float32) |
|||
torch_target_vec.requires_grad_(False) |
|||
|
|||
def objective(x: np.ndarray, grad: np.ndarray) -> float: |
|||
qpos[self.idx_pin2target] = x |
|||
|
|||
# Kinematics forwarding for qpos |
|||
if self.adaptor is not None: |
|||
qpos[:] = self.adaptor.forward_qpos(qpos)[:] |
|||
|
|||
self.robot.compute_forward_kinematics(qpos) |
|||
target_link_poses = [self.robot.get_link_pose(index) for index in self.computed_link_indices] |
|||
body_pos = np.array([pose[:3, 3] for pose in target_link_poses]) |
|||
|
|||
# Torch computation for accurate loss and grad |
|||
torch_body_pos = torch.as_tensor(body_pos) |
|||
torch_body_pos.requires_grad_() |
|||
|
|||
# Index link for computation |
|||
origin_link_pos = torch_body_pos[self.origin_link_indices, :] |
|||
task_link_pos = torch_body_pos[self.task_link_indices, :] |
|||
robot_vec = task_link_pos - origin_link_pos |
|||
|
|||
# Loss term for kinematics retargeting based on 3D position error |
|||
# Different from the original DexPilot, we use huber loss here instead of the squared dist |
|||
vec_dist = torch.norm(robot_vec - torch_target_vec, dim=1, keepdim=False) |
|||
huber_distance = ( |
|||
self.huber_loss(vec_dist, torch.zeros_like(vec_dist)) * weight / (robot_vec.shape[0]) |
|||
).sum() |
|||
huber_distance = huber_distance.sum() |
|||
result = huber_distance.cpu().detach().item() |
|||
|
|||
if grad.size > 0: |
|||
jacobians = [] |
|||
for i, index in enumerate(self.computed_link_indices): |
|||
link_body_jacobian = self.robot.compute_single_link_local_jacobian(qpos, index)[:3, ...] |
|||
link_pose = target_link_poses[i] |
|||
link_rot = link_pose[:3, :3] |
|||
link_kinematics_jacobian = link_rot @ link_body_jacobian |
|||
jacobians.append(link_kinematics_jacobian) |
|||
|
|||
# Note: the joint order in this jacobian is consistent pinocchio |
|||
jacobians = np.stack(jacobians, axis=0) |
|||
huber_distance.backward() |
|||
grad_pos = torch_body_pos.grad.cpu().numpy()[:, None, :] |
|||
|
|||
# Convert the jacobian from pinocchio order to target order |
|||
if self.adaptor is not None: |
|||
jacobians = self.adaptor.backward_jacobian(jacobians) |
|||
else: |
|||
jacobians = jacobians[..., self.idx_pin2target] |
|||
|
|||
grad_qpos = np.matmul(grad_pos, np.array(jacobians)) |
|||
grad_qpos = grad_qpos.mean(1).sum(0) |
|||
|
|||
# In the original DexPilot, γ = 2.5 × 10−3 is a weight on regularizing the Allegro angles to zero |
|||
# which is equivalent to fully opened the hand |
|||
# In our implementation, we regularize the joint angles to the previous joint angles |
|||
grad_qpos += 2 * self.norm_delta * (x - last_qpos) |
|||
|
|||
grad[:] = grad_qpos[:] |
|||
|
|||
return result |
|||
|
|||
return objective |
|||
@ -1,17 +0,0 @@ |
|||
class LPFilter: |
|||
def __init__(self, alpha): |
|||
self.alpha = alpha |
|||
self.y = None |
|||
self.is_init = False |
|||
|
|||
def next(self, x): |
|||
if not self.is_init: |
|||
self.y = x |
|||
self.is_init = True |
|||
return self.y.copy() |
|||
self.y = self.y + self.alpha * (x - self.y) |
|||
return self.y.copy() |
|||
|
|||
def reset(self): |
|||
self.y = None |
|||
self.is_init = False |
|||
@ -1,255 +0,0 @@ |
|||
from dataclasses import dataclass |
|||
from pathlib import Path |
|||
from typing import List, Optional, Dict, Any, Tuple |
|||
from typing import Union |
|||
|
|||
import numpy as np |
|||
import yaml |
|||
import os |
|||
|
|||
from . import yourdfpy as urdf |
|||
from .kinematics_adaptor import MimicJointKinematicAdaptor |
|||
from .optimizer_utils import LPFilter |
|||
from .robot_wrapper import RobotWrapper |
|||
from .seq_retarget import SeqRetargeting |
|||
from .yourdfpy import DUMMY_JOINT_NAMES |
|||
|
|||
|
|||
@dataclass |
|||
class RetargetingConfig: |
|||
type: str |
|||
urdf_path: str |
|||
target_joint_names: Optional[List[str]] = None |
|||
|
|||
# Whether to add free joint to the root of the robot. Free joint enable the robot hand move freely in the space |
|||
add_dummy_free_joint: bool = False |
|||
|
|||
# DexPilot retargeting related |
|||
# The link on the robot hand which corresponding to the wrist of human hand |
|||
wrist_link_name: Optional[str] = None |
|||
# DexPilot retargeting link names |
|||
finger_tip_link_names: Optional[List[str]] = None |
|||
target_link_human_indices_dexpilot: Optional[np.ndarray] = None |
|||
|
|||
# Position retargeting link names |
|||
target_link_names: Optional[List[str]] = None |
|||
target_link_human_indices_position: Optional[np.ndarray] = None |
|||
|
|||
# Vector retargeting link names |
|||
target_origin_link_names: Optional[List[str]] = None |
|||
target_task_link_names: Optional[List[str]] = None |
|||
target_link_human_indices_vector: Optional[np.ndarray] = None |
|||
|
|||
# Scaling factor for vector retargeting only |
|||
# For example, Allegro is 1.6 times larger than normal human hand, then this scaling factor should be 1.6 |
|||
scaling_factor: float = 1.0 |
|||
|
|||
# Low pass filter |
|||
low_pass_alpha: float = 0.1 |
|||
|
|||
# Optimization parameters |
|||
normal_delta: float = 4e-3 |
|||
huber_delta: float = 2e-2 |
|||
|
|||
# DexPilot optimizer parameters |
|||
project_dist: float = 0.03 |
|||
escape_dist: float = 0.05 |
|||
|
|||
# Joint limit tag |
|||
has_joint_limits: bool = True |
|||
|
|||
# Mimic joint tag |
|||
ignore_mimic_joint: bool = False |
|||
|
|||
_TYPE = ["vector", "position", "dexpilot"] |
|||
_DEFAULT_URDF_DIR = "./" |
|||
|
|||
def __post_init__(self): |
|||
# Retargeting type check |
|||
self.type = self.type.lower() |
|||
if self.type not in self._TYPE: |
|||
raise ValueError(f"Retargeting type must be one of {self._TYPE}") |
|||
|
|||
# Vector retargeting requires: target_origin_link_names + target_task_link_names |
|||
# Position retargeting requires: target_link_names |
|||
if self.type == "vector": |
|||
if self.target_origin_link_names is None or self.target_task_link_names is None: |
|||
raise ValueError(f"Vector retargeting requires: target_origin_link_names + target_task_link_names") |
|||
if len(self.target_task_link_names) != len(self.target_origin_link_names): |
|||
raise ValueError(f"Vector retargeting origin and task links dim mismatch") |
|||
if self.target_link_human_indices_vector.shape != (2, len(self.target_origin_link_names)): |
|||
raise ValueError(f"Vector retargeting link names and link indices dim mismatch") |
|||
if self.target_link_human_indices_vector is None: |
|||
raise ValueError(f"Vector retargeting requires: target_link_human_indices_vector") |
|||
|
|||
elif self.type == "position": |
|||
if self.target_link_names is None: |
|||
raise ValueError(f"Position retargeting requires: target_link_names") |
|||
self.target_link_human_indices_position = self.target_link_human_indices_position.squeeze() |
|||
if self.target_link_human_indices_position.shape != (len(self.target_link_names),): |
|||
raise ValueError(f"Position retargeting link names and link indices dim mismatch") |
|||
if self.target_link_human_indices_position is None: |
|||
raise ValueError(f"Position retargeting requires: target_link_human_indices_position") |
|||
|
|||
elif self.type == "dexpilot": |
|||
if self.finger_tip_link_names is None or self.wrist_link_name is None: |
|||
raise ValueError(f"Position retargeting requires: finger_tip_link_names + wrist_link_name") |
|||
if self.target_link_human_indices_dexpilot is not None: |
|||
print( |
|||
"\033[33m", |
|||
"Target link human indices is provided in the DexPilot retargeting config, which is uncommon.\n" |
|||
"If you do not know exactly how it is used, please leave it to None for default.\n" |
|||
"\033[00m", |
|||
) |
|||
|
|||
# URDF path check |
|||
urdf_path = Path(self.urdf_path) |
|||
if not urdf_path.is_absolute(): |
|||
urdf_path = self._DEFAULT_URDF_DIR / urdf_path |
|||
urdf_path = urdf_path.absolute() |
|||
if not urdf_path.exists(): |
|||
raise ValueError(f"URDF path {urdf_path} does not exist") |
|||
self.urdf_path = str(urdf_path) |
|||
|
|||
@classmethod |
|||
def set_default_urdf_dir(cls, urdf_dir: Union[str, Path]): |
|||
path = Path(urdf_dir) |
|||
if not path.exists(): |
|||
raise ValueError(f"URDF dir {urdf_dir} not exists.") |
|||
cls._DEFAULT_URDF_DIR = urdf_dir |
|||
|
|||
@classmethod |
|||
def load_from_file(cls, config_path: Union[str, Path], override: Optional[Dict] = None): |
|||
path = Path(config_path) |
|||
if not path.is_absolute(): |
|||
path = path.absolute() |
|||
|
|||
with path.open("r") as f: |
|||
yaml_config = yaml.load(f, Loader=yaml.FullLoader) |
|||
cfg = yaml_config["retargeting"] |
|||
return cls.from_dict(cfg, override) |
|||
|
|||
@classmethod |
|||
def from_dict(cls, cfg: Dict[str, Any], override: Optional[Dict] = None): |
|||
if "target_link_human_indices_position" in cfg: |
|||
cfg["target_link_human_indices_position"] = np.array(cfg["target_link_human_indices_position"]) |
|||
if "target_link_human_indices_vector" in cfg: |
|||
cfg["target_link_human_indices_vector"] = np.array(cfg["target_link_human_indices_vector"]) |
|||
if "target_link_human_indices_dexpilot" in cfg: |
|||
cfg["target_link_human_indices_dexpilot"] = np.array(cfg["target_link_human_indices_dexpilot"]) |
|||
|
|||
if override is not None: |
|||
for key, value in override.items(): |
|||
cfg[key] = value |
|||
config = RetargetingConfig(**cfg) |
|||
return config |
|||
|
|||
def build(self) -> SeqRetargeting: |
|||
from .optimizer import ( |
|||
VectorOptimizer, |
|||
PositionOptimizer, |
|||
DexPilotOptimizer, |
|||
) |
|||
import tempfile |
|||
|
|||
# Process the URDF with yourdfpy to better find file path |
|||
robot_urdf = urdf.URDF.load( |
|||
self.urdf_path, add_dummy_free_joints=self.add_dummy_free_joint, build_scene_graph=False |
|||
) |
|||
urdf_name = self.urdf_path.split(os.path.sep)[-1] |
|||
temp_dir = tempfile.mkdtemp(prefix="dex_retargeting-") |
|||
temp_path = f"{temp_dir}/{urdf_name}" |
|||
robot_urdf.write_xml_file(temp_path) |
|||
|
|||
# Load pinocchio model |
|||
robot = RobotWrapper(temp_path) |
|||
|
|||
# Add 6D dummy joint to target joint names so that it will also be optimized |
|||
if self.add_dummy_free_joint and self.target_joint_names is not None: |
|||
self.target_joint_names = DUMMY_JOINT_NAMES + self.target_joint_names |
|||
joint_names = self.target_joint_names if self.target_joint_names is not None else robot.dof_joint_names |
|||
|
|||
if self.type == "position": |
|||
optimizer = PositionOptimizer( |
|||
robot, |
|||
joint_names, |
|||
target_link_names=self.target_link_names, |
|||
target_link_human_indices=self.target_link_human_indices_position, |
|||
norm_delta=self.normal_delta, |
|||
huber_delta=self.huber_delta, |
|||
) |
|||
elif self.type == "vector": |
|||
optimizer = VectorOptimizer( |
|||
robot, |
|||
joint_names, |
|||
target_origin_link_names=self.target_origin_link_names, |
|||
target_task_link_names=self.target_task_link_names, |
|||
target_link_human_indices=self.target_link_human_indices_vector, |
|||
scaling=self.scaling_factor, |
|||
norm_delta=self.normal_delta, |
|||
huber_delta=self.huber_delta, |
|||
) |
|||
elif self.type == "dexpilot": |
|||
optimizer = DexPilotOptimizer( |
|||
robot, |
|||
joint_names, |
|||
finger_tip_link_names=self.finger_tip_link_names, |
|||
wrist_link_name=self.wrist_link_name, |
|||
target_link_human_indices=self.target_link_human_indices_dexpilot, |
|||
scaling=self.scaling_factor, |
|||
project_dist=self.project_dist, |
|||
escape_dist=self.escape_dist, |
|||
) |
|||
else: |
|||
raise RuntimeError() |
|||
|
|||
if 0 <= self.low_pass_alpha <= 1: |
|||
lp_filter = LPFilter(self.low_pass_alpha) |
|||
else: |
|||
lp_filter = None |
|||
|
|||
# Parse mimic joints and set kinematics adaptor for optimizer |
|||
has_mimic_joints, source_names, mimic_names, multipliers, offsets = parse_mimic_joint(robot_urdf) |
|||
if has_mimic_joints and not self.ignore_mimic_joint: |
|||
adaptor = MimicJointKinematicAdaptor( |
|||
robot, |
|||
target_joint_names=joint_names, |
|||
source_joint_names=source_names, |
|||
mimic_joint_names=mimic_names, |
|||
multipliers=multipliers, |
|||
offsets=offsets, |
|||
) |
|||
optimizer.set_kinematic_adaptor(adaptor) |
|||
print( |
|||
"\033[34m", |
|||
"Mimic joint adaptor enabled. The mimic joint tags in the URDF will be considered during retargeting.\n" |
|||
"To disable mimic joint adaptor, consider setting ignore_mimic_joint=True in the configuration.", |
|||
"\033[39m", |
|||
) |
|||
|
|||
retargeting = SeqRetargeting( |
|||
optimizer, |
|||
has_joint_limits=self.has_joint_limits, |
|||
lp_filter=lp_filter, |
|||
) |
|||
return retargeting |
|||
|
|||
|
|||
def get_retargeting_config(config_path: Union[str, Path]) -> RetargetingConfig: |
|||
config = RetargetingConfig.load_from_file(config_path) |
|||
return config |
|||
|
|||
|
|||
def parse_mimic_joint(robot_urdf: urdf.URDF) -> Tuple[bool, List[str], List[str], List[float], List[float]]: |
|||
mimic_joint_names = [] |
|||
source_joint_names = [] |
|||
multipliers = [] |
|||
offsets = [] |
|||
for name, joint in robot_urdf.joint_map.items(): |
|||
if joint.mimic is not None: |
|||
mimic_joint_names.append(name) |
|||
source_joint_names.append(joint.mimic.joint) |
|||
multipliers.append(joint.mimic.multiplier) |
|||
offsets.append(joint.mimic.offset) |
|||
|
|||
return len(mimic_joint_names) > 0, source_joint_names, mimic_joint_names, multipliers, offsets |
|||
@ -1,93 +0,0 @@ |
|||
from typing import List |
|||
|
|||
import numpy as np |
|||
import numpy.typing as npt |
|||
import pinocchio as pin |
|||
|
|||
|
|||
class RobotWrapper: |
|||
""" |
|||
This class does not take mimic joint into consideration |
|||
""" |
|||
|
|||
def __init__(self, urdf_path: str, use_collision=False, use_visual=False): |
|||
# Create robot model and data |
|||
self.model: pin.Model = pin.buildModelFromUrdf(urdf_path) |
|||
self.data: pin.Data = self.model.createData() |
|||
|
|||
if use_visual or use_collision: |
|||
raise NotImplementedError |
|||
|
|||
self.q0 = pin.neutral(self.model) |
|||
if self.model.nv != self.model.nq: |
|||
raise NotImplementedError(f"Can not handle robot with special joint.") |
|||
|
|||
# -------------------------------------------------------------------------- # |
|||
# Robot property |
|||
# -------------------------------------------------------------------------- # |
|||
@property |
|||
def joint_names(self) -> List[str]: |
|||
return list(self.model.names) |
|||
|
|||
@property |
|||
def dof_joint_names(self) -> List[str]: |
|||
nqs = self.model.nqs |
|||
return [name for i, name in enumerate(self.model.names) if nqs[i] > 0] |
|||
|
|||
@property |
|||
def dof(self) -> int: |
|||
return self.model.nq |
|||
|
|||
@property |
|||
def link_names(self) -> List[str]: |
|||
link_names = [] |
|||
for i, frame in enumerate(self.model.frames): |
|||
link_names.append(frame.name) |
|||
return link_names |
|||
|
|||
@property |
|||
def joint_limits(self): |
|||
lower = self.model.lowerPositionLimit |
|||
upper = self.model.upperPositionLimit |
|||
return np.stack([lower, upper], axis=1) |
|||
|
|||
# -------------------------------------------------------------------------- # |
|||
# Query function |
|||
# -------------------------------------------------------------------------- # |
|||
def get_joint_index(self, name: str): |
|||
return self.dof_joint_names.index(name) |
|||
|
|||
def get_link_index(self, name: str): |
|||
if name not in self.link_names: |
|||
raise ValueError(f"{name} is not a link name. Valid link names: \n{self.link_names}") |
|||
return self.model.getFrameId(name, pin.BODY) |
|||
|
|||
def get_joint_parent_child_frames(self, joint_name: str): |
|||
joint_id = self.model.getFrameId(joint_name) |
|||
parent_id = self.model.frames[joint_id].parent |
|||
child_id = -1 |
|||
for idx, frame in enumerate(self.model.frames): |
|||
if frame.previousFrame == joint_id: |
|||
child_id = idx |
|||
if child_id == -1: |
|||
raise ValueError(f"Can not find child link of {joint_name}") |
|||
|
|||
return parent_id, child_id |
|||
|
|||
# -------------------------------------------------------------------------- # |
|||
# Kinematics function |
|||
# -------------------------------------------------------------------------- # |
|||
def compute_forward_kinematics(self, qpos: npt.NDArray): |
|||
pin.forwardKinematics(self.model, self.data, qpos) |
|||
|
|||
def get_link_pose(self, link_id: int) -> npt.NDArray: |
|||
pose: pin.SE3 = pin.updateFramePlacement(self.model, self.data, link_id) |
|||
return pose.homogeneous |
|||
|
|||
def get_link_pose_inv(self, link_id: int) -> npt.NDArray: |
|||
pose: pin.SE3 = pin.updateFramePlacement(self.model, self.data, link_id) |
|||
return pose.inverse().homogeneous |
|||
|
|||
def compute_single_link_local_jacobian(self, qpos, link_id: int) -> npt.NDArray: |
|||
J = pin.computeFrameJacobian(self.model, self.data, qpos, link_id) |
|||
return J |
|||
@ -1,151 +0,0 @@ |
|||
import time |
|||
from typing import Optional |
|||
|
|||
import numpy as np |
|||
from pytransform3d import rotations |
|||
|
|||
from .constants import OPERATOR2MANO, HandType |
|||
from .optimizer import Optimizer |
|||
from .optimizer_utils import LPFilter |
|||
|
|||
|
|||
class SeqRetargeting: |
|||
def __init__( |
|||
self, |
|||
optimizer: Optimizer, |
|||
has_joint_limits=True, |
|||
lp_filter: Optional[LPFilter] = None, |
|||
): |
|||
self.optimizer = optimizer |
|||
robot = self.optimizer.robot |
|||
|
|||
# Joint limit |
|||
self.has_joint_limits = has_joint_limits |
|||
joint_limits = np.ones_like(robot.joint_limits) |
|||
joint_limits[:, 0] = -1e4 # a large value is equivalent to no limit |
|||
joint_limits[:, 1] = 1e4 |
|||
if has_joint_limits: |
|||
joint_limits[:] = robot.joint_limits[:] |
|||
self.optimizer.set_joint_limit(joint_limits[self.optimizer.idx_pin2target]) |
|||
self.joint_limits = joint_limits[self.optimizer.idx_pin2target] |
|||
|
|||
# Temporal information |
|||
self.last_qpos = joint_limits.mean(1)[self.optimizer.idx_pin2target].astype(np.float32) |
|||
self.accumulated_time = 0 |
|||
self.num_retargeting = 0 |
|||
|
|||
# Filter |
|||
self.filter = lp_filter |
|||
|
|||
# Warm started |
|||
self.is_warm_started = False |
|||
|
|||
def warm_start( |
|||
self, |
|||
wrist_pos: np.ndarray, |
|||
wrist_quat: np.ndarray, |
|||
hand_type: HandType = HandType.right, |
|||
is_mano_convention: bool = False, |
|||
): |
|||
""" |
|||
Initialize the wrist joint pose using analytical computation instead of retargeting optimization. |
|||
This function is specifically for position retargeting with the flying robot hand, i.e. has 6D free joint |
|||
You are not expected to use this function for vector retargeting, e.g. when you are working on teleoperation |
|||
|
|||
Args: |
|||
wrist_pos: position of the hand wrist, typically from human hand pose |
|||
wrist_quat: quaternion of the hand wrist, the same convention as the operator frame definition if not is_mano_convention |
|||
hand_type: hand type, used to determine the operator2mano matrix |
|||
is_mano_convention: whether the wrist_quat is in mano convention |
|||
""" |
|||
# This function can only be used when the first joints of robot are free joints |
|||
|
|||
if len(wrist_pos) != 3: |
|||
raise ValueError(f"Wrist pos: {wrist_pos} is not a 3-dim vector.") |
|||
if len(wrist_quat) != 4: |
|||
raise ValueError(f"Wrist quat: {wrist_quat} is not a 4-dim vector.") |
|||
|
|||
operator2mano = OPERATOR2MANO[hand_type] if is_mano_convention else np.eye(3) |
|||
robot = self.optimizer.robot |
|||
target_wrist_pose = np.eye(4) |
|||
target_wrist_pose[:3, :3] = rotations.matrix_from_quaternion(wrist_quat) @ operator2mano.T |
|||
target_wrist_pose[:3, 3] = wrist_pos |
|||
|
|||
name_list = [ |
|||
"dummy_x_translation_joint", |
|||
"dummy_y_translation_joint", |
|||
"dummy_z_translation_joint", |
|||
"dummy_x_rotation_joint", |
|||
"dummy_y_rotation_joint", |
|||
"dummy_z_rotation_joint", |
|||
] |
|||
wrist_link_id = robot.get_joint_parent_child_frames(name_list[5])[1] |
|||
|
|||
# Set the dummy joints angles to zero |
|||
old_qpos = robot.q0 |
|||
new_qpos = old_qpos.copy() |
|||
for num, joint_name in enumerate(self.optimizer.target_joint_names): |
|||
if joint_name in name_list: |
|||
new_qpos[num] = 0 |
|||
|
|||
robot.compute_forward_kinematics(new_qpos) |
|||
root2wrist = robot.get_link_pose_inv(wrist_link_id) |
|||
target_root_pose = target_wrist_pose @ root2wrist |
|||
|
|||
euler = rotations.euler_from_matrix(target_root_pose[:3, :3], 0, 1, 2, extrinsic=False) |
|||
pose_vec = np.concatenate([target_root_pose[:3, 3], euler]) |
|||
|
|||
# Find the dummy joints |
|||
for num, joint_name in enumerate(self.optimizer.target_joint_names): |
|||
if joint_name in name_list: |
|||
index = name_list.index(joint_name) |
|||
self.last_qpos[num] = pose_vec[index] |
|||
|
|||
self.is_warm_started = True |
|||
|
|||
def retarget(self, ref_value, fixed_qpos=np.array([])): |
|||
tic = time.perf_counter() |
|||
|
|||
qpos = self.optimizer.retarget( |
|||
ref_value=ref_value.astype(np.float32), |
|||
fixed_qpos=fixed_qpos.astype(np.float32), |
|||
last_qpos=np.clip(self.last_qpos, self.joint_limits[:, 0], self.joint_limits[:, 1]), |
|||
) |
|||
self.accumulated_time += time.perf_counter() - tic |
|||
self.num_retargeting += 1 |
|||
self.last_qpos = qpos |
|||
robot_qpos = np.zeros(self.optimizer.robot.dof) |
|||
robot_qpos[self.optimizer.idx_pin2fixed] = fixed_qpos |
|||
robot_qpos[self.optimizer.idx_pin2target] = qpos |
|||
|
|||
if self.optimizer.adaptor is not None: |
|||
robot_qpos = self.optimizer.adaptor.forward_qpos(robot_qpos) |
|||
|
|||
if self.filter is not None: |
|||
robot_qpos = self.filter.next(robot_qpos) |
|||
return robot_qpos |
|||
|
|||
def set_qpos(self, robot_qpos: np.ndarray): |
|||
target_qpos = robot_qpos[self.optimizer.idx_pin2target] |
|||
self.last_qpos = target_qpos |
|||
|
|||
def get_qpos(self, fixed_qpos: Optional[np.ndarray] = None): |
|||
robot_qpos = np.zeros(self.optimizer.robot.dof) |
|||
robot_qpos[self.optimizer.idx_pin2target] = self.last_qpos |
|||
if fixed_qpos is not None: |
|||
robot_qpos[self.optimizer.idx_pin2fixed] = fixed_qpos |
|||
return robot_qpos |
|||
|
|||
def verbose(self): |
|||
min_value = self.optimizer.opt.last_optimum_value() |
|||
print(f"Retargeting {self.num_retargeting} times takes: {self.accumulated_time}s") |
|||
print(f"Last distance: {min_value}") |
|||
|
|||
def reset(self): |
|||
self.last_qpos = self.joint_limits.mean(1).astype(np.float32) |
|||
self.num_retargeting = 0 |
|||
self.accumulated_time = 0 |
|||
|
|||
@property |
|||
def joint_names(self): |
|||
return self.optimizer.robot.dof_joint_names |
|||
2237
teleop/robot_control/dex_retargeting/yourdfpy.py
File diff suppressed because it is too large
View File
File diff suppressed because it is too large
View File
@ -1 +0,0 @@ |
|||
/home/unitree/Code/unitree-opject/unitree_sim_isaaclab/data |
|||
Write
Preview
Loading…
Cancel
Save
Reference in new issue