You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

392 lines
15 KiB

#!/usr/bin/env python3
"""
Simple webcam recorder with optional preview window.
Usage:
webcam_recorder.py [--output-dir DIR] [--no-preview]
webcam_recorder.py --help
"""
import argparse
from datetime import datetime
import os
from pathlib import Path
import signal
import subprocess
import sys
import time
class WebcamRecorder:
def __init__(self, debug=False):
self.process = None
self.debug = debug
def _debug_print(self, message):
"""Print debug message only if debug mode is enabled."""
if self.debug:
print(f"[webcam_recorder] {message}", file=sys.stderr)
def _info_print(self, message):
"""Print info message always."""
print(f"[webcam_recorder] {message}", file=sys.stderr)
def find_webcam(self):
"""Find available webcam device."""
# Check container status only in debug mode
if self.debug and (os.path.exists("/.dockerenv") or os.environ.get("CONTAINER")):
self._debug_print("Running in Docker container")
# Check if we have video group permissions
try:
import grp
groups = os.getgroups()
video_gid = grp.getgrnam("video").gr_gid
if video_gid in groups:
self._debug_print("Has video group permissions ✓")
else:
self._debug_print("Warning: Not in video group")
except Exception:
pass
# Check /dev/v4l/by-id for Logitech webcam first
by_id_path = Path("/dev/v4l/by-id")
if by_id_path.exists():
self._debug_print("Checking /dev/v4l/by-id for devices...")
for device in by_id_path.iterdir():
if device.is_symlink():
device_name = device.name
if "Logitech" in device_name or "046d" in device_name:
resolved_device = str(device.resolve())
self._debug_print(
f"Found Logitech device: {device_name} -> {resolved_device}"
)
if self._is_video_capture_device(resolved_device):
return resolved_device
# No Logitech found, try any webcam
for device in by_id_path.iterdir():
if device.is_symlink() and "metadata" not in device.name:
resolved_device = str(device.resolve())
self._debug_print(f"Checking device: {device.name} -> {resolved_device}")
if self._is_video_capture_device(resolved_device):
return resolved_device
# Fallback to /dev/video* - prioritize external cameras
self._debug_print("Scanning /dev/video* devices...")
external_devices = []
integrated_devices = []
for i in range(20):
device = f"/dev/video{i}"
if os.path.exists(device):
# Check if we can access the device
if not os.access(device, os.R_OK):
self._debug_print(f"Found {device} but no read access")
continue
# Check if it's a video capture device (not metadata)
if self._is_video_capture_device(device):
# Check device name to prioritize external cameras
device_name = self._get_device_name(device)
if device_name:
if any(
keyword in device_name.lower()
for keyword in ["logitech", "brio", "c920", "c930", "c925"]
):
self._info_print(f"Found external camera: {device} ({device_name})")
external_devices.append(device)
elif (
"integrated" in device_name.lower() or "internal" in device_name.lower()
):
self._debug_print(f"Found integrated camera: {device} ({device_name})")
integrated_devices.append(device)
else:
self._debug_print(f"Found unknown camera: {device} ({device_name})")
external_devices.append(device) # Assume external if unknown
else:
self._debug_print(f"Found camera: {device} (no name info)")
external_devices.append(device)
# Return external camera first, then integrated as fallback
if external_devices:
self._info_print(f"Using external camera: {external_devices[0]}")
return external_devices[0]
elif integrated_devices:
self._info_print(
"No external camera found - integrated camera available but not preferred"
)
self._info_print(
"Please connect an external camera (Logitech, Brio, etc.) for recording"
)
return None
return None
def _get_device_name(self, device):
"""Get the friendly name of a video device."""
try:
# Extract device number from /dev/videoX
device_num = device.split("video")[-1]
name_path = f"/sys/class/video4linux/video{device_num}/name"
if os.path.exists(name_path):
with open(name_path, "r") as f:
return f.read().strip()
except Exception:
pass
return None
def _is_video_capture_device(self, device):
"""Check if a device is a video capture device (not metadata)."""
try:
# Try v4l2-ctl first if available
if subprocess.run(["which", "v4l2-ctl"], capture_output=True).returncode == 0:
result = subprocess.run(
["v4l2-ctl", "--device=" + device, "--info"], capture_output=True, timeout=2
)
if result.returncode == 0:
output = result.stdout.decode()
# Look for "Video Capture" capability in Device Caps, not just general Capabilities
lines = output.split("\n")
in_device_caps = False
for line in lines:
if "Device Caps" in line:
in_device_caps = True
continue
if in_device_caps:
if line.strip().startswith("Video Capture"):
# Double-check with ffmpeg
ffmpeg_result = subprocess.run(
[
"ffmpeg",
"-f",
"v4l2",
"-i",
device,
"-frames:v",
"1",
"-f",
"null",
"-",
],
capture_output=True,
timeout=2,
)
return ffmpeg_result.returncode == 0
# Stop checking if we hit another section
elif line.strip() and not line.startswith("\t"):
break
# Fallback: try ffmpeg directly if v4l2-ctl not available
self._debug_print(f"Testing {device} with ffmpeg...")
ffmpeg_result = subprocess.run(
[
"ffmpeg",
"-f",
"v4l2",
"-i",
device,
"-frames:v",
"1",
"-f",
"null",
"-",
],
capture_output=True,
timeout=5,
)
if ffmpeg_result.returncode == 0:
self._debug_print(f"{device} is a working video capture device")
return True
else:
stderr_output = ffmpeg_result.stderr.decode()
# Check if it's just a metadata device
if (
"metadata" in stderr_output.lower()
or "not a capture device" in stderr_output.lower()
):
self._debug_print(f"{device} is metadata device, skipping")
return False
else:
self._debug_print(f"{device} failed ffmpeg test: {stderr_output[:100]}")
return False
except Exception as e:
self._debug_print(f"Error testing {device}: {e}")
pass
return False
def test_camera_format(self, device):
"""Test camera and determine best format."""
# Try default format
result = subprocess.run(
["ffmpeg", "-f", "v4l2", "-i", device, "-frames:v", "1", "-f", "null", "-"],
capture_output=True,
)
if result.returncode == 0:
return [] # Default format works
# Try MJPEG format
result = subprocess.run(
[
"ffmpeg",
"-f",
"v4l2",
"-input_format",
"mjpeg",
"-i",
device,
"-frames:v",
"1",
"-f",
"null",
"-",
],
capture_output=True,
)
if result.returncode == 0:
return ["-input_format", "mjpeg"]
raise RuntimeError(f"Cannot access camera at {device}")
def record(self, output_dir="./logs_experiment", preview=True):
"""Start recording from webcam."""
# Find webcam
device = self.find_webcam()
if not device:
self._info_print("No webcam found")
return False
self._info_print(f"Recording from: {device}")
# Test camera format
try:
format_args = self.test_camera_format(device)
except RuntimeError as e:
self._info_print(str(e))
return False
# Create output directory with date subfolder and filename
now = datetime.now()
date_folder = now.strftime("%Y_%m_%d")
output_path = Path(output_dir) / date_folder
output_path.mkdir(parents=True, exist_ok=True)
timestamp = now.strftime("%Y_%m_%d_%H_%M_%S")
output_file = output_path / f"robot_video_{timestamp}.mp4"
self._info_print(f"Saving to: {output_file}")
if self.debug:
self._debug_print(f"Absolute path: {output_file.absolute()}")
# Check if preview is possible
if preview and not os.environ.get("DISPLAY"):
self._debug_print("No DISPLAY found, disabling preview")
preview = False
# Build ffmpeg command
cmd = (
["ffmpeg", "-loglevel", "error", "-f", "v4l2"]
+ format_args
+ ["-i", device, "-c:v", "libx264", "-preset", "ultrafast"]
)
if preview:
# Try with preview first
preview_cmd = cmd + ["-f", "tee", "-map", "0:v", f"[f=mp4]{output_file}|[f=nut]pipe:"]
try:
ffmpeg_proc = subprocess.Popen(
preview_cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL
)
ffplay_proc = subprocess.Popen(
[
"ffplay",
"-loglevel",
"quiet",
"-f",
"nut",
"-i",
"pipe:",
"-window_title",
"Webcam Preview",
],
stdin=ffmpeg_proc.stdout,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
# Wait a bit to see if it starts successfully
time.sleep(0.5)
if ffmpeg_proc.poll() is None and ffplay_proc.poll() is None:
self.process = ffplay_proc # Kill ffplay to stop both
self._info_print("Recording with preview started")
return True
else:
self._debug_print("Preview failed, falling back to no-preview mode")
# Clean up failed processes
try:
ffmpeg_proc.kill()
ffplay_proc.kill()
except Exception:
pass
except Exception as e:
self._debug_print(f"Preview error: {e}, using no-preview mode")
# Record without preview
cmd.append(str(output_file))
self.process = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
# Verify it's running
time.sleep(0.5)
if self.process.poll() is None:
self._info_print("Recording started (no preview)")
return True
else:
stderr = self.process.stderr.read().decode() if self.process.stderr else ""
self._info_print(f"Failed to start recording: {stderr}")
return False
def stop(self):
"""Stop recording."""
if self.process:
self.process.terminate()
try:
self.process.wait(timeout=5)
except subprocess.TimeoutExpired:
self.process.kill()
self._info_print("Recording stopped")
self.process = None
def main():
parser = argparse.ArgumentParser(description="Simple webcam recorder")
parser.add_argument(
"--output-dir", default="./logs_experiment", help="Output directory for recordings"
)
parser.add_argument("--no-preview", action="store_true", help="Disable preview window")
parser.add_argument("--test", action="store_true", help="Enable debug output")
args = parser.parse_args()
recorder = WebcamRecorder(debug=args.test)
# Set up signal handlers for clean shutdown
def signal_handler(signum, frame):
print("\nStopping recording...", file=sys.stderr)
recorder.stop()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Start recording
if recorder.record(args.output_dir, preview=not args.no_preview):
print("Press Ctrl+C to stop recording", file=sys.stderr)
# Keep running until interrupted
try:
signal.pause()
except KeyboardInterrupt:
pass
else:
sys.exit(1)
if __name__ == "__main__":
main()