Browse Source

Migrate IPC to @ virtual address

main
silencht 4 months ago
parent
commit
46603c5ff3
  1. 37
      teleop/utils/ipc.py

37
teleop/utils/ipc.py

@ -87,29 +87,15 @@ class IPC_Server:
self._data_loop_thread = None
self._hb_loop_thread = None
rd = os.environ.get("XDG_RUNTIME_DIR") or "/tmp"
self.ctx = zmq.Context.instance()
# data IPC (REQ/REP): required
self.data_ipc = os.path.join(rd, f"xr-teleoperate-data-{os.getuid()}.ipc")
self.rep_socket = self.ctx.socket(zmq.REP)
try:
if os.path.exists(self.data_ipc):
os.unlink(self.data_ipc) # remove stale IPC file
except OSError:
pass
self.rep_socket.bind(f"ipc://{self.data_ipc}")
logger_mp.info(f"[IPC_Server] Listening to Data at ipc://{self.data_ipc}")
self.rep_socket.bind("ipc://@xr_teleoperate_data.ipc")
logger_mp.info("[IPC_Server] Listening to Data at ipc://@xr_teleoperate_data.ipc")
# heartbeat IPC (PUB/SUB)
self.hb_ipc = os.path.join(rd, f"xr-teleoperate-hb-{os.getuid()}.ipc")
self.pub_socket = self.ctx.socket(zmq.PUB)
try:
if os.path.exists(self.hb_ipc):
os.unlink(self.hb_ipc) # remove stale IPC file
except OSError:
pass
self.pub_socket.bind(f"ipc://{self.hb_ipc}")
logger_mp.info(f"[IPC_Server] Publishing HeartBeat at ipc://{self.hb_ipc}")
self.pub_socket.bind("ipc://@xr_teleoperate_hb.ipc")
logger_mp.info("[IPC_Server] Publishing HeartBeat at ipc://@xr_teleoperate_hb.ipc")
def _data_loop(self):
"""
@ -213,7 +199,6 @@ class IPC_Client:
"""
def __init__(self, hb_fps=10.0):
"""hb_fps: heartbeat subscribe frequency, should match server side."""
rd = os.environ.get("XDG_RUNTIME_DIR") or "/tmp"
self.ctx = zmq.Context.instance()
# heartbeat IPC (PUB/SUB)
@ -224,20 +209,20 @@ class IPC_Client:
self._hb_interval = 1.0 / float(hb_fps) # expected heartbeat interval
self._hb_lock = threading.Lock() # lock for heartbeat state
self._hb_timeout = 5.0 * self._hb_interval # timeout to consider offline
self.hb_ipc = os.path.join(rd, f"xr-teleoperate-hb-{os.getuid()}.ipc")
self.sub_socket = self.ctx.socket(zmq.SUB)
self.sub_socket.setsockopt(zmq.RCVHWM, 1)
self.sub_socket.connect(f"ipc://{self.hb_ipc}")
self.sub_socket.connect("ipc://@xr_teleoperate_hb.ipc")
self.sub_socket.setsockopt_string(zmq.SUBSCRIBE, "")
logger_mp.info(f"[IPC_Client] Subscribed to HeartBeat at ipc://{self.hb_ipc}")
logger_mp.info("[IPC_Client] Subscribed to HeartBeat at ipc://@xr_teleoperate_hb.ipc")
self._hb_thread = threading.Thread(target=self._hb_loop, daemon=True)
self._hb_thread.start()
# data IPC (REQ/REP)
self.data_ipc = os.path.join(rd, f"xr-teleoperate-data-{os.getuid()}.ipc")
self.req_socket = self.ctx.socket(zmq.REQ)
self.req_socket.connect(f"ipc://{self.data_ipc}")
logger_mp.info(f"[IPC_Client] Connected to Data at ipc://{self.data_ipc}")
self.req_socket.connect("ipc://@xr_teleoperate_data.ipc")
logger_mp.info("[IPC_Client] Connected to Data at ipc://@xr_teleoperate_data.ipc")
def _make_reqid(self) -> str:
import uuid
@ -322,7 +307,7 @@ class IPC_Client:
self.sub_socket.setsockopt(zmq.LINGER, 0)
self.sub_socket.close()
except Exception:
pass
pass
try:
self.ctx.term()
except Exception:

Loading…
Cancel
Save