You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
392 lines
15 KiB
392 lines
15 KiB
#!/usr/bin/env python3
|
|
"""
|
|
Simple webcam recorder with optional preview window.
|
|
Usage:
|
|
webcam_recorder.py [--output-dir DIR] [--no-preview]
|
|
webcam_recorder.py --help
|
|
"""
|
|
|
|
import argparse
|
|
from datetime import datetime
|
|
import os
|
|
from pathlib import Path
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
|
|
|
|
class WebcamRecorder:
|
|
def __init__(self, debug=False):
|
|
self.process = None
|
|
self.debug = debug
|
|
|
|
def _debug_print(self, message):
|
|
"""Print debug message only if debug mode is enabled."""
|
|
if self.debug:
|
|
print(f"[webcam_recorder] {message}", file=sys.stderr)
|
|
|
|
def _info_print(self, message):
|
|
"""Print info message always."""
|
|
print(f"[webcam_recorder] {message}", file=sys.stderr)
|
|
|
|
def find_webcam(self):
|
|
"""Find available webcam device."""
|
|
# Check container status only in debug mode
|
|
if self.debug and (os.path.exists("/.dockerenv") or os.environ.get("CONTAINER")):
|
|
self._debug_print("Running in Docker container")
|
|
# Check if we have video group permissions
|
|
try:
|
|
import grp
|
|
|
|
groups = os.getgroups()
|
|
video_gid = grp.getgrnam("video").gr_gid
|
|
if video_gid in groups:
|
|
self._debug_print("Has video group permissions ✓")
|
|
else:
|
|
self._debug_print("Warning: Not in video group")
|
|
except Exception:
|
|
pass
|
|
|
|
# Check /dev/v4l/by-id for Logitech webcam first
|
|
by_id_path = Path("/dev/v4l/by-id")
|
|
if by_id_path.exists():
|
|
self._debug_print("Checking /dev/v4l/by-id for devices...")
|
|
for device in by_id_path.iterdir():
|
|
if device.is_symlink():
|
|
device_name = device.name
|
|
if "Logitech" in device_name or "046d" in device_name:
|
|
resolved_device = str(device.resolve())
|
|
self._debug_print(
|
|
f"Found Logitech device: {device_name} -> {resolved_device}"
|
|
)
|
|
if self._is_video_capture_device(resolved_device):
|
|
return resolved_device
|
|
|
|
# No Logitech found, try any webcam
|
|
for device in by_id_path.iterdir():
|
|
if device.is_symlink() and "metadata" not in device.name:
|
|
resolved_device = str(device.resolve())
|
|
self._debug_print(f"Checking device: {device.name} -> {resolved_device}")
|
|
if self._is_video_capture_device(resolved_device):
|
|
return resolved_device
|
|
|
|
# Fallback to /dev/video* - prioritize external cameras
|
|
self._debug_print("Scanning /dev/video* devices...")
|
|
external_devices = []
|
|
integrated_devices = []
|
|
|
|
for i in range(20):
|
|
device = f"/dev/video{i}"
|
|
if os.path.exists(device):
|
|
# Check if we can access the device
|
|
if not os.access(device, os.R_OK):
|
|
self._debug_print(f"Found {device} but no read access")
|
|
continue
|
|
|
|
# Check if it's a video capture device (not metadata)
|
|
if self._is_video_capture_device(device):
|
|
# Check device name to prioritize external cameras
|
|
device_name = self._get_device_name(device)
|
|
if device_name:
|
|
if any(
|
|
keyword in device_name.lower()
|
|
for keyword in ["logitech", "brio", "c920", "c930", "c925"]
|
|
):
|
|
self._info_print(f"Found external camera: {device} ({device_name})")
|
|
external_devices.append(device)
|
|
elif (
|
|
"integrated" in device_name.lower() or "internal" in device_name.lower()
|
|
):
|
|
self._debug_print(f"Found integrated camera: {device} ({device_name})")
|
|
integrated_devices.append(device)
|
|
else:
|
|
self._debug_print(f"Found unknown camera: {device} ({device_name})")
|
|
external_devices.append(device) # Assume external if unknown
|
|
else:
|
|
self._debug_print(f"Found camera: {device} (no name info)")
|
|
external_devices.append(device)
|
|
|
|
# Return external camera first, then integrated as fallback
|
|
if external_devices:
|
|
self._info_print(f"Using external camera: {external_devices[0]}")
|
|
return external_devices[0]
|
|
elif integrated_devices:
|
|
self._info_print(
|
|
"No external camera found - integrated camera available but not preferred"
|
|
)
|
|
self._info_print(
|
|
"Please connect an external camera (Logitech, Brio, etc.) for recording"
|
|
)
|
|
return None
|
|
|
|
return None
|
|
|
|
def _get_device_name(self, device):
|
|
"""Get the friendly name of a video device."""
|
|
try:
|
|
# Extract device number from /dev/videoX
|
|
device_num = device.split("video")[-1]
|
|
name_path = f"/sys/class/video4linux/video{device_num}/name"
|
|
if os.path.exists(name_path):
|
|
with open(name_path, "r") as f:
|
|
return f.read().strip()
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
def _is_video_capture_device(self, device):
|
|
"""Check if a device is a video capture device (not metadata)."""
|
|
try:
|
|
# Try v4l2-ctl first if available
|
|
if subprocess.run(["which", "v4l2-ctl"], capture_output=True).returncode == 0:
|
|
result = subprocess.run(
|
|
["v4l2-ctl", "--device=" + device, "--info"], capture_output=True, timeout=2
|
|
)
|
|
if result.returncode == 0:
|
|
output = result.stdout.decode()
|
|
# Look for "Video Capture" capability in Device Caps, not just general Capabilities
|
|
lines = output.split("\n")
|
|
in_device_caps = False
|
|
for line in lines:
|
|
if "Device Caps" in line:
|
|
in_device_caps = True
|
|
continue
|
|
if in_device_caps:
|
|
if line.strip().startswith("Video Capture"):
|
|
# Double-check with ffmpeg
|
|
ffmpeg_result = subprocess.run(
|
|
[
|
|
"ffmpeg",
|
|
"-f",
|
|
"v4l2",
|
|
"-i",
|
|
device,
|
|
"-frames:v",
|
|
"1",
|
|
"-f",
|
|
"null",
|
|
"-",
|
|
],
|
|
capture_output=True,
|
|
timeout=2,
|
|
)
|
|
return ffmpeg_result.returncode == 0
|
|
# Stop checking if we hit another section
|
|
elif line.strip() and not line.startswith("\t"):
|
|
break
|
|
|
|
# Fallback: try ffmpeg directly if v4l2-ctl not available
|
|
self._debug_print(f"Testing {device} with ffmpeg...")
|
|
ffmpeg_result = subprocess.run(
|
|
[
|
|
"ffmpeg",
|
|
"-f",
|
|
"v4l2",
|
|
"-i",
|
|
device,
|
|
"-frames:v",
|
|
"1",
|
|
"-f",
|
|
"null",
|
|
"-",
|
|
],
|
|
capture_output=True,
|
|
timeout=5,
|
|
)
|
|
if ffmpeg_result.returncode == 0:
|
|
self._debug_print(f"{device} is a working video capture device")
|
|
return True
|
|
else:
|
|
stderr_output = ffmpeg_result.stderr.decode()
|
|
# Check if it's just a metadata device
|
|
if (
|
|
"metadata" in stderr_output.lower()
|
|
or "not a capture device" in stderr_output.lower()
|
|
):
|
|
self._debug_print(f"{device} is metadata device, skipping")
|
|
return False
|
|
else:
|
|
self._debug_print(f"{device} failed ffmpeg test: {stderr_output[:100]}")
|
|
return False
|
|
except Exception as e:
|
|
self._debug_print(f"Error testing {device}: {e}")
|
|
pass
|
|
return False
|
|
|
|
def test_camera_format(self, device):
|
|
"""Test camera and determine best format."""
|
|
# Try default format
|
|
result = subprocess.run(
|
|
["ffmpeg", "-f", "v4l2", "-i", device, "-frames:v", "1", "-f", "null", "-"],
|
|
capture_output=True,
|
|
)
|
|
if result.returncode == 0:
|
|
return [] # Default format works
|
|
|
|
# Try MJPEG format
|
|
result = subprocess.run(
|
|
[
|
|
"ffmpeg",
|
|
"-f",
|
|
"v4l2",
|
|
"-input_format",
|
|
"mjpeg",
|
|
"-i",
|
|
device,
|
|
"-frames:v",
|
|
"1",
|
|
"-f",
|
|
"null",
|
|
"-",
|
|
],
|
|
capture_output=True,
|
|
)
|
|
if result.returncode == 0:
|
|
return ["-input_format", "mjpeg"]
|
|
|
|
raise RuntimeError(f"Cannot access camera at {device}")
|
|
|
|
def record(self, output_dir="./logs_experiment", preview=True):
|
|
"""Start recording from webcam."""
|
|
# Find webcam
|
|
device = self.find_webcam()
|
|
if not device:
|
|
self._info_print("No webcam found")
|
|
return False
|
|
|
|
self._info_print(f"Recording from: {device}")
|
|
|
|
# Test camera format
|
|
try:
|
|
format_args = self.test_camera_format(device)
|
|
except RuntimeError as e:
|
|
self._info_print(str(e))
|
|
return False
|
|
|
|
# Create output directory with date subfolder and filename
|
|
now = datetime.now()
|
|
date_folder = now.strftime("%Y_%m_%d")
|
|
output_path = Path(output_dir) / date_folder
|
|
output_path.mkdir(parents=True, exist_ok=True)
|
|
timestamp = now.strftime("%Y_%m_%d_%H_%M_%S")
|
|
output_file = output_path / f"robot_video_{timestamp}.mp4"
|
|
|
|
self._info_print(f"Saving to: {output_file}")
|
|
if self.debug:
|
|
self._debug_print(f"Absolute path: {output_file.absolute()}")
|
|
|
|
# Check if preview is possible
|
|
if preview and not os.environ.get("DISPLAY"):
|
|
self._debug_print("No DISPLAY found, disabling preview")
|
|
preview = False
|
|
|
|
# Build ffmpeg command
|
|
cmd = (
|
|
["ffmpeg", "-loglevel", "error", "-f", "v4l2"]
|
|
+ format_args
|
|
+ ["-i", device, "-c:v", "libx264", "-preset", "ultrafast"]
|
|
)
|
|
|
|
if preview:
|
|
# Try with preview first
|
|
preview_cmd = cmd + ["-f", "tee", "-map", "0:v", f"[f=mp4]{output_file}|[f=nut]pipe:"]
|
|
|
|
try:
|
|
ffmpeg_proc = subprocess.Popen(
|
|
preview_cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL
|
|
)
|
|
ffplay_proc = subprocess.Popen(
|
|
[
|
|
"ffplay",
|
|
"-loglevel",
|
|
"quiet",
|
|
"-f",
|
|
"nut",
|
|
"-i",
|
|
"pipe:",
|
|
"-window_title",
|
|
"Webcam Preview",
|
|
],
|
|
stdin=ffmpeg_proc.stdout,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
)
|
|
|
|
# Wait a bit to see if it starts successfully
|
|
time.sleep(0.5)
|
|
if ffmpeg_proc.poll() is None and ffplay_proc.poll() is None:
|
|
self.process = ffplay_proc # Kill ffplay to stop both
|
|
self._info_print("Recording with preview started")
|
|
return True
|
|
else:
|
|
self._debug_print("Preview failed, falling back to no-preview mode")
|
|
# Clean up failed processes
|
|
try:
|
|
ffmpeg_proc.kill()
|
|
ffplay_proc.kill()
|
|
except Exception:
|
|
pass
|
|
except Exception as e:
|
|
self._debug_print(f"Preview error: {e}, using no-preview mode")
|
|
|
|
# Record without preview
|
|
cmd.append(str(output_file))
|
|
self.process = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
|
|
|
|
# Verify it's running
|
|
time.sleep(0.5)
|
|
if self.process.poll() is None:
|
|
self._info_print("Recording started (no preview)")
|
|
return True
|
|
else:
|
|
stderr = self.process.stderr.read().decode() if self.process.stderr else ""
|
|
self._info_print(f"Failed to start recording: {stderr}")
|
|
return False
|
|
|
|
def stop(self):
|
|
"""Stop recording."""
|
|
if self.process:
|
|
self.process.terminate()
|
|
try:
|
|
self.process.wait(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
self.process.kill()
|
|
self._info_print("Recording stopped")
|
|
self.process = None
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Simple webcam recorder")
|
|
parser.add_argument(
|
|
"--output-dir", default="./logs_experiment", help="Output directory for recordings"
|
|
)
|
|
parser.add_argument("--no-preview", action="store_true", help="Disable preview window")
|
|
parser.add_argument("--test", action="store_true", help="Enable debug output")
|
|
args = parser.parse_args()
|
|
|
|
recorder = WebcamRecorder(debug=args.test)
|
|
|
|
# Set up signal handlers for clean shutdown
|
|
def signal_handler(signum, frame):
|
|
print("\nStopping recording...", file=sys.stderr)
|
|
recorder.stop()
|
|
sys.exit(0)
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
# Start recording
|
|
if recorder.record(args.output_dir, preview=not args.no_preview):
|
|
print("Press Ctrl+C to stop recording", file=sys.stderr)
|
|
# Keep running until interrupted
|
|
try:
|
|
signal.pause()
|
|
except KeyboardInterrupt:
|
|
pass
|
|
else:
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|