You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
295 lines
10 KiB
295 lines
10 KiB
#!/usr/bin/env python3
|
|
"""
|
|
Stream video to ESP32 Channel3 RF Broadcast
|
|
|
|
This script takes RGB24 frames from ffmpeg via stdin, dithers them to 16 colors,
|
|
packs them as 4bpp, and streams them to the ESP32 over TCP.
|
|
|
|
Usage:
|
|
ffmpeg -i video.mp4 -vf "scale=116:220" -f rawvideo -pix_fmt rgb24 - | python stream_video.py <ESP32_IP>
|
|
|
|
Options:
|
|
-p, --port Port number (default: 5000)
|
|
-f, --fps Target frame rate (default: 30)
|
|
--no-dither Disable Floyd-Steinberg dithering (faster but lower quality)
|
|
"""
|
|
|
|
import sys
|
|
import socket
|
|
import argparse
|
|
import time
|
|
import numpy as np
|
|
|
|
# Image dimensions
|
|
WIDTH = 116
|
|
HEIGHT = 220
|
|
FRAME_SIZE_RGB = WIDTH * HEIGHT * 3
|
|
FRAME_SIZE_4BPP = WIDTH * HEIGHT // 2
|
|
|
|
# CGA-like 16 color palette (RGB values)
|
|
PALETTE = np.array([
|
|
[0, 0, 0], # 0: Black
|
|
[0, 0, 170], # 1: Blue
|
|
[0, 170, 0], # 2: Green
|
|
[0, 170, 170], # 3: Cyan
|
|
[170, 0, 0], # 4: Red
|
|
[170, 0, 170], # 5: Magenta
|
|
[170, 85, 0], # 6: Brown
|
|
[170, 170, 170], # 7: Light Gray
|
|
[85, 85, 85], # 8: Dark Gray
|
|
[85, 85, 255], # 9: Light Blue
|
|
[85, 255, 85], # 10: Light Green
|
|
[85, 255, 255], # 11: Light Cyan
|
|
[255, 85, 85], # 12: Light Red
|
|
[255, 85, 255], # 13: Light Magenta
|
|
[255, 255, 85], # 14: Yellow
|
|
[255, 255, 255], # 15: White
|
|
], dtype=np.float32)
|
|
|
|
# Luminance values for each palette color (ITU-R BT.601)
|
|
# Y = 0.299*R + 0.587*G + 0.114*B
|
|
PALETTE_LUMINANCE = np.array([
|
|
0.299*0 + 0.587*0 + 0.114*0, # 0: Black = 0
|
|
0.299*0 + 0.587*0 + 0.114*170, # 1: Blue = 19.4
|
|
0.299*0 + 0.587*170 + 0.114*0, # 2: Green = 99.8
|
|
0.299*0 + 0.587*170 + 0.114*170, # 3: Cyan = 119.2
|
|
0.299*170 + 0.587*0 + 0.114*0, # 4: Red = 50.8
|
|
0.299*170 + 0.587*0 + 0.114*170, # 5: Magenta = 70.2
|
|
0.299*170 + 0.587*85 + 0.114*0, # 6: Brown = 100.7
|
|
0.299*170 + 0.587*170 + 0.114*170, # 7: Light Gray = 170
|
|
0.299*85 + 0.587*85 + 0.114*85, # 8: Dark Gray = 85
|
|
0.299*85 + 0.587*85 + 0.114*255, # 9: Light Blue = 104.4
|
|
0.299*85 + 0.587*255 + 0.114*85, # 10: Light Green = 185.3
|
|
0.299*85 + 0.587*255 + 0.114*255, # 11: Light Cyan = 204.6
|
|
0.299*255 + 0.587*85 + 0.114*85, # 12: Light Red = 135.9
|
|
0.299*255 + 0.587*85 + 0.114*255, # 13: Light Magenta = 155.2
|
|
0.299*255 + 0.587*255 + 0.114*85, # 14: Yellow = 235.6
|
|
0.299*255 + 0.587*255 + 0.114*255, # 15: White = 255
|
|
], dtype=np.float32)
|
|
|
|
# Palette indices sorted by luminance (darkest to brightest)
|
|
GRAYSCALE_ORDER = np.argsort(PALETTE_LUMINANCE) # [0,1,4,5,8,2,6,9,3,12,13,7,10,11,14,15]
|
|
SORTED_LUMINANCE = PALETTE_LUMINANCE[GRAYSCALE_ORDER]
|
|
|
|
|
|
def find_nearest_grayscale_fast(img):
|
|
"""
|
|
Convert RGB image to grayscale and map to palette by luminance.
|
|
This gives 16 distinct gray levels on a B&W TV.
|
|
"""
|
|
# Convert to grayscale using luminance formula
|
|
gray = 0.299 * img[:,:,0] + 0.587 * img[:,:,1] + 0.114 * img[:,:,2]
|
|
|
|
# Find nearest luminance level
|
|
gray_expanded = gray[:, :, np.newaxis] # (H, W, 1)
|
|
lum_expanded = SORTED_LUMINANCE[np.newaxis, np.newaxis, :] # (1, 1, 16)
|
|
distances = np.abs(gray_expanded - lum_expanded)
|
|
nearest_idx = np.argmin(distances, axis=2)
|
|
|
|
# Map back to actual palette index
|
|
return GRAYSCALE_ORDER[nearest_idx].astype(np.uint8)
|
|
|
|
|
|
def find_nearest_colors_fast(img):
|
|
"""
|
|
Find nearest palette color for each pixel using vectorized operations.
|
|
|
|
Args:
|
|
img: numpy array of shape (H, W, 3) with RGB values (float32)
|
|
|
|
Returns:
|
|
numpy array of shape (H, W) with palette indices 0-15
|
|
"""
|
|
# Reshape for broadcasting: (H, W, 3) -> (H, W, 1, 3)
|
|
img_expanded = img[:, :, np.newaxis, :]
|
|
# PALETTE shape: (16, 3) -> (1, 1, 16, 3)
|
|
palette_expanded = PALETTE[np.newaxis, np.newaxis, :, :]
|
|
|
|
# Calculate squared distances to all palette colors
|
|
# Result shape: (H, W, 16)
|
|
distances = np.sum((img_expanded - palette_expanded) ** 2, axis=3)
|
|
|
|
# Find index of minimum distance for each pixel
|
|
return np.argmin(distances, axis=2).astype(np.uint8)
|
|
|
|
|
|
def dither_frame_fast(frame):
|
|
"""
|
|
Convert RGB frame to 16-color indexed using optimized Floyd-Steinberg dithering.
|
|
Uses vectorized row operations for better performance.
|
|
|
|
Args:
|
|
frame: numpy array of shape (HEIGHT, WIDTH, 3) with RGB values
|
|
|
|
Returns:
|
|
numpy array of shape (HEIGHT, WIDTH) with palette indices 0-15
|
|
"""
|
|
img = frame.astype(np.float32)
|
|
output = np.zeros((HEIGHT, WIDTH), dtype=np.uint8)
|
|
|
|
for y in range(HEIGHT):
|
|
# Process entire row at once for color matching
|
|
row = np.clip(img[y], 0, 255)
|
|
|
|
# Find nearest colors for entire row
|
|
row_expanded = row[:, np.newaxis, :] # (W, 1, 3)
|
|
palette_expanded = PALETTE[np.newaxis, :, :] # (1, 16, 3)
|
|
distances = np.sum((row_expanded - palette_expanded) ** 2, axis=2) # (W, 16)
|
|
indices = np.argmin(distances, axis=1).astype(np.uint8)
|
|
output[y] = indices
|
|
|
|
# Calculate errors for entire row
|
|
chosen_colors = PALETTE[indices] # (W, 3)
|
|
errors = row - chosen_colors # (W, 3)
|
|
|
|
# Distribute errors (Floyd-Steinberg)
|
|
# Right pixel: 7/16
|
|
if y < HEIGHT:
|
|
img[y, 1:, :] += errors[:-1, :] * (7.0 / 16.0)
|
|
|
|
# Next row
|
|
if y + 1 < HEIGHT:
|
|
# Bottom-left: 3/16
|
|
img[y + 1, :-1, :] += errors[1:, :] * (3.0 / 16.0)
|
|
# Bottom: 5/16
|
|
img[y + 1, :, :] += errors * (5.0 / 16.0)
|
|
# Bottom-right: 1/16
|
|
img[y + 1, 1:, :] += errors[:-1, :] * (1.0 / 16.0)
|
|
|
|
return output
|
|
|
|
|
|
def dither_frame_none(frame):
|
|
"""
|
|
Convert RGB frame to 16-color indexed without dithering (fastest).
|
|
|
|
Args:
|
|
frame: numpy array of shape (HEIGHT, WIDTH, 3) with RGB values
|
|
|
|
Returns:
|
|
numpy array of shape (HEIGHT, WIDTH) with palette indices 0-15
|
|
"""
|
|
return find_nearest_colors_fast(frame.astype(np.float32))
|
|
|
|
|
|
def pack_4bpp_fast(indexed_frame):
|
|
"""
|
|
Pack indexed frame (0-15 values) into 4bpp format using vectorized operations.
|
|
Two pixels per byte: high nibble = first pixel, low nibble = second pixel.
|
|
|
|
Args:
|
|
indexed_frame: numpy array of shape (HEIGHT, WIDTH) with values 0-15
|
|
|
|
Returns:
|
|
bytes object of length FRAME_SIZE_4BPP
|
|
"""
|
|
flat = indexed_frame.flatten()
|
|
# Take pairs of pixels and pack them
|
|
high_nibbles = flat[0::2].astype(np.uint8) << 4
|
|
low_nibbles = flat[1::2].astype(np.uint8)
|
|
packed = high_nibbles | low_nibbles
|
|
return packed.tobytes()
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description='Stream video to ESP32 Channel3 RF Broadcast',
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
Basic usage (with PAR correction for wide TV pixels):
|
|
ffmpeg -f lavfi -i color=black:293x220 -i video.mp4 -filter_complex "[1:v]scale=293:220:force_original_aspect_ratio=decrease[vid];[0:v][vid]overlay=(W-w)/2:(H-h)/2,scale=116:220" -f rawvideo -pix_fmt rgb24 -shortest - | python stream_video.py 192.168.1.100
|
|
|
|
Stream webcam:
|
|
ffmpeg -f dshow -i video="Your Webcam" -vf "scale=116:220" -f rawvideo -pix_fmt rgb24 - | python stream_video.py 192.168.1.100
|
|
|
|
Fast mode (no dithering):
|
|
ffmpeg -i video.mp4 -vf "scale=116:220" -f rawvideo -pix_fmt rgb24 - | python stream_video.py 192.168.1.100 --no-dither -f 60
|
|
"""
|
|
)
|
|
parser.add_argument('host', help='ESP32 IP address')
|
|
parser.add_argument('-p', '--port', type=int, default=5000, help='Port number (default: 5000)')
|
|
parser.add_argument('-f', '--fps', type=float, default=30, help='Target frame rate (default: 30)')
|
|
parser.add_argument('--no-dither', action='store_true', help='Disable dithering (faster)')
|
|
parser.add_argument('--grayscale', '--bw', action='store_true', help='Grayscale mode for B&W TVs (16 distinct gray levels)')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Calculate frame timing
|
|
frame_interval = 1.0 / args.fps
|
|
|
|
print(f"Connecting to {args.host}:{args.port}...")
|
|
|
|
try:
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # Disable Nagle's algorithm
|
|
sock.connect((args.host, args.port))
|
|
print(f"Connected! Streaming at {args.fps} fps target...")
|
|
print("Press Ctrl+C to stop")
|
|
except Exception as e:
|
|
print(f"Connection failed: {e}")
|
|
sys.exit(1)
|
|
|
|
frame_count = 0
|
|
start_time = time.time()
|
|
|
|
# Select processing function based on mode
|
|
if args.grayscale:
|
|
# Grayscale mode: map by luminance for 16 distinct gray levels on B&W TV
|
|
dither_func = lambda f: find_nearest_grayscale_fast(f.astype(np.float32))
|
|
print("Grayscale mode: mapping to 16 luminance levels")
|
|
elif args.no_dither:
|
|
dither_func = dither_frame_none
|
|
else:
|
|
dither_func = dither_frame_fast
|
|
|
|
try:
|
|
while True:
|
|
frame_start = time.time()
|
|
|
|
# Read one RGB24 frame from stdin
|
|
raw_data = sys.stdin.buffer.read(FRAME_SIZE_RGB)
|
|
if len(raw_data) < FRAME_SIZE_RGB:
|
|
print(f"\nEnd of stream after {frame_count} frames")
|
|
break
|
|
|
|
# Convert to numpy array
|
|
frame = np.frombuffer(raw_data, dtype=np.uint8).reshape((HEIGHT, WIDTH, 3))
|
|
|
|
# Dither to 16 colors
|
|
indexed = dither_func(frame)
|
|
|
|
# Pack to 4bpp
|
|
packed = pack_4bpp_fast(indexed)
|
|
|
|
# Send to ESP32
|
|
try:
|
|
sock.sendall(packed)
|
|
except Exception as e:
|
|
print(f"\nSend error: {e}")
|
|
break
|
|
|
|
frame_count += 1
|
|
|
|
# Frame rate limiting
|
|
elapsed = time.time() - frame_start
|
|
if elapsed < frame_interval:
|
|
time.sleep(frame_interval - elapsed)
|
|
|
|
# Progress indicator
|
|
if frame_count % 30 == 0:
|
|
actual_fps = frame_count / (time.time() - start_time)
|
|
print(f"\rFrames: {frame_count}, FPS: {actual_fps:.1f} ", end='', flush=True)
|
|
|
|
except KeyboardInterrupt:
|
|
print(f"\nStopped after {frame_count} frames")
|
|
|
|
finally:
|
|
sock.close()
|
|
elapsed = time.time() - start_time
|
|
if elapsed > 0:
|
|
print(f"Average FPS: {frame_count / elapsed:.1f}")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|