Browse Source

first commit

master
Joe DiPrima 1 year ago
commit
0b43e70f4a
  1. 3
      .gitignore
  2. 141
      dmx.py
  3. 85
      main.py
  4. 2
      requirements.txt
  5. 9
      run.bat
  6. 9
      send.bat
  7. 55
      sender.py
  8. 14
      win-install.bat

3
.gitignore

@ -0,0 +1,3 @@
venv
__pycache__
*.mkv

141
dmx.py

@ -0,0 +1,141 @@
# dmx.py
from PyQt5.QtCore import QObject, QThread, pyqtSignal
import socket
import struct
import numpy as np
import subprocess
import time
class DMXRecorder(QObject):
def __init__(self):
super().__init__()
self.thread = None
self.recording_thread = None
def start_recording(self):
self.thread = QThread()
self.recording_thread = RecordingThread()
self.recording_thread.moveToThread(self.thread)
self.thread.started.connect(self.recording_thread.run)
self.recording_thread.finished.connect(self.thread.quit)
self.recording_thread.finished.connect(self.recording_thread.deleteLater)
self.thread.finished.connect(self.thread.deleteLater)
self.thread.start()
def stop_recording(self):
if self.recording_thread:
self.recording_thread.stop()
if self.thread:
self.thread.quit()
self.thread.wait()
class RecordingThread(QObject):
finished = pyqtSignal()
def __init__(self):
super().__init__()
self.running = False
self.socket = None
self.ffmpeg_process = None
def run(self):
self.running = True
# Set up UDP socket to listen for Art-Net DMX data
UDP_IP = "0.0.0.0"
UDP_PORT = 6454
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.bind((UDP_IP, UDP_PORT))
# Set socket to non-blocking
self.socket.setblocking(False)
# Prepare for recording
expected_universes = set(range(4)) # Universes 0 to 3 (adjust as needed)
num_universes = len(expected_universes)
universe_size = 512
# Define frame dimensions
frame_width = 64 # Adjust as needed (must divide evenly into total data size)
total_pixels = num_universes * universe_size
frame_height = total_pixels // frame_width
# Frame rate and timing
frame_rate = 30 # frames per second
frame_interval = 1.0 / frame_rate
last_frame_time = time.time()
# Set up ffmpeg process
ffmpeg_cmd = [
'ffmpeg',
'-y',
'-f', 'rawvideo',
'-pix_fmt', 'gray',
'-s', f'{frame_width}x{frame_height}',
'-r', str(frame_rate),
'-i', '-', # Input from stdin
'-c:v', 'ffv1',
'dmx_video.mkv'
]
self.ffmpeg_process = subprocess.Popen(ffmpeg_cmd, stdin=subprocess.PIPE)
universes_data = {}
last_received = {}
try:
while self.running:
current_time = time.time()
# Receive data
try:
data, addr = self.socket.recvfrom(1024)
if data.startswith(b'Art-Net'):
op_code = struct.unpack('<H', data[8:10])[0]
# OpCode for ArtDMX is 0x5000
if op_code == 0x5000:
# Parse universe
universe = struct.unpack('<H', data[14:16])[0]
# Get length
length = struct.unpack('>H', data[16:18])[0]
dmx_data = data[18:18+length]
# Store the data for this universe
universes_data[universe] = dmx_data
last_received[universe] = current_time
except BlockingIOError:
# No data received
pass
except Exception as e:
print(f"Error receiving data: {e}")
# Check if it's time to write a frame
if current_time - last_frame_time >= frame_interval:
# Assemble frame data
frame_data = b''
for universe in sorted(expected_universes):
dmx_data = universes_data.get(universe, b'\x00'*512)
if len(dmx_data) < 512:
dmx_data += b'\x00' * (512 - len(dmx_data))
frame_data += dmx_data
# Convert frame_data to numpy array
frame_array = np.frombuffer(frame_data, dtype=np.uint8)
# Reshape to image dimensions
frame_array = frame_array.reshape((frame_height, frame_width))
# Write frame to ffmpeg stdin
self.ffmpeg_process.stdin.write(frame_array.tobytes())
last_frame_time = current_time
time.sleep(0.001) # Sleep briefly to avoid maxing out CPU
finally:
# Clean up
self.ffmpeg_process.stdin.close()
self.ffmpeg_process.wait()
self.socket.close()
self.finished.emit()
def stop(self):
self.running = False

85
main.py

@ -0,0 +1,85 @@
# main.py
import sys
from PyQt5.QtWidgets import QApplication, QMainWindow, QPushButton, QWidget, QVBoxLayout
from PyQt5.QtCore import Qt
from dmx import DMXRecorder
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.initUI()
self.isRecording = False
self.recorder = None
def initUI(self):
self.setWindowTitle("DMX Recorder")
self.resize(300, 200)
# Use dark theme
self.setStyleSheet('''
QWidget {
background-color: #222222;
color: #FFFFFF;
}
QSlider::groove:horizontal {
border: 1px solid #444444;
height: 8px;
background: #555555;
}
QSlider::handle:horizontal {
background: #888888;
border: 1px solid #444444;
width: 18px;
margin: -8px 0;
}
QLineEdit {
background-color: #333333;
border: 1px solid #555555;
color: #FFFFFF;
padding: 4px;
}
QPushButton {
background-color: #444444;
border: 1px solid #666666;
padding: 6px;
}
QPushButton:hover {
background-color: #555555;
}
QLabel {
color: #FFFFFF;
}
''')
self.central_widget = QWidget()
self.setCentralWidget(self.central_widget)
self.layout = QVBoxLayout()
self.central_widget.setLayout(self.layout)
self.record_button = QPushButton("Record")
self.record_button.clicked.connect(self.toggle_recording)
self.layout.addWidget(self.record_button)
def toggle_recording(self):
if not self.isRecording:
# Start recording
self.recorder = DMXRecorder()
self.recorder.start_recording()
self.record_button.setText("Stop")
self.isRecording = True
else:
# Stop recording
if self.recorder:
self.recorder.stop_recording()
self.recorder = None
self.record_button.setText("Record")
self.isRecording = False
if __name__ == '__main__':
app = QApplication(sys.argv)
window = MainWindow()
window.show()
sys.exit(app.exec_())

2
requirements.txt

@ -0,0 +1,2 @@
PyQt5>=5.15.0
numpy>=1.19.0

9
run.bat

@ -0,0 +1,9 @@
@echo off
echo Activating the virtual environment...
call venv\Scripts\activate
echo Running main.py...
python main.py
echo Deactivating the virtual environment...
call venv\Scripts\deactivate

9
send.bat

@ -0,0 +1,9 @@
@echo off
echo Activating the virtual environment...
call venv\Scripts\activate
echo Running sender.py...
python sender.py
echo Deactivating the virtual environment...
call venv\Scripts\deactivate

55
sender.py

@ -0,0 +1,55 @@
import socket
import struct
import time
def send_dmx(universe=0, num_channels=512, fade_time=5):
UDP_IP = '255.255.255.255' # Broadcast address
UDP_PORT = 6454
# Create UDP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sequence = 0
physical = 0
# Fading parameters
fade_steps = 100
fade_interval = fade_time / fade_steps
try:
while True:
# Fade up
for value in range(0, 256):
dmx_data = [value] * num_channels
dmx_packet = build_artdmx_packet(sequence, physical, universe, dmx_data)
sock.sendto(dmx_packet, (UDP_IP, UDP_PORT))
time.sleep(fade_interval / 256)
# Fade down
for value in range(255, -1, -1):
dmx_data = [value] * num_channels
dmx_packet = build_artdmx_packet(sequence, physical, universe, dmx_data)
sock.sendto(dmx_packet, (UDP_IP, UDP_PORT))
time.sleep(fade_interval / 256)
except KeyboardInterrupt:
print("DMX transmission stopped.")
finally:
sock.close()
def build_artdmx_packet(sequence, physical, universe, dmx_data):
header = b'Art-Net\x00' # Protocol identifier
opcode = struct.pack('<H', 0x5000) # OpCode: ArtDMX (0x5000), little endian
protocol_version = struct.pack('>H', 14) # Protocol version (14 for Art-Net)
sequence = struct.pack('B', sequence) # Sequence
physical = struct.pack('B', physical) # Physical port
universe = struct.pack('<H', universe) # Universe, little endian
length = struct.pack('>H', len(dmx_data)) # Data length, big endian
data = bytes(dmx_data) # DMX data
packet = header + opcode + protocol_version + sequence + physical + universe + length + data
return packet
if __name__ == '__main__':
send_dmx()

14
win-install.bat

@ -0,0 +1,14 @@
@echo off
echo Creating virtual environment named venv...
python -m venv venv
echo Activating virtual environment...
call venv\Scripts\activate
echo Installing dependencies from requirements.txt...
pip install -r requirements.txt
echo Installation complete. Virtual environment is ready to use.
echo Deactivating the virtual environment...
call venv\Scripts\deactivate
Loading…
Cancel
Save