import sys import socket import struct import time import threading import json import math from PyQt5.QtWidgets import ( QApplication, QWidget, QPushButton, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QComboBox, QMessageBox ) from PyQt5.QtCore import pyqtSignal, QObject class DMXController(QObject): # Signal to notify parameter changes params_changed = pyqtSignal() def __init__(self): super().__init__() self.load_params() self.running = False self.lock = threading.Lock() self.thread = None def load_params(self): try: with open('dmx_params.json', 'r') as f: params = json.load(f) except (FileNotFoundError, json.JSONDecodeError): params = { 'ip_address': '255.255.255.255', 'port': 6454, 'universe': 0, 'num_channels': 512, 'fade_time': 5, 'pattern': 'Fade Up and Down', 'speed': 1.0 # Default speed for Moving Sine Wave } self.ip_address = params.get('ip_address', '255.255.255.255') self.port = params.get('port', 6454) self.universe = params.get('universe', 0) self.num_channels = params.get('num_channels', 512) self.fade_time = params.get('fade_time', 5) self.pattern = params.get('pattern', 'Fade Up and Down') self.speed = params.get('speed', 1.0) def save_params(self): params = { 'ip_address': self.ip_address, 'port': self.port, 'universe': self.universe, 'num_channels': self.num_channels, 'fade_time': self.fade_time, 'pattern': self.pattern, 'speed': self.speed } with open('dmx_params.json', 'w') as f: json.dump(params, f, indent=4) def start(self): if not self.running: self.running = True self.thread = threading.Thread(target=self.run) self.thread.start() def stop(self): if self.running: self.running = False if self.thread is not None: self.thread.join() def run(self): try: # Create UDP socket sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) UDP_IP = self.ip_address UDP_PORT = self.port universe = self.universe sequence = 0 physical = 0 if self.pattern == 'Fade Up and Down': self.fade_up_down(sock, UDP_IP, UDP_PORT, universe) elif self.pattern == 'Moving Sine Wave': self.moving_sine_wave(sock, UDP_IP, UDP_PORT, universe) else: print("Unknown pattern selected.") except Exception as e: print(f"Error in DMXController: {e}") finally: sock.close() def fade_up_down(self, sock, UDP_IP, UDP_PORT, universe): number_of_steps = 256 sleep_time = self.fade_time / number_of_steps while self.running: # Fade up for value in range(0, 256): if not self.running: break dmx_data = [value] * self.num_channels dmx_packet = self.build_artdmx_packet(universe, dmx_data) sock.sendto(dmx_packet, (UDP_IP, UDP_PORT)) time.sleep(sleep_time) # Fade down for value in range(255, -1, -1): if not self.running: break dmx_data = [value] * self.num_channels dmx_packet = self.build_artdmx_packet(universe, dmx_data) sock.sendto(dmx_packet, (UDP_IP, UDP_PORT)) time.sleep(sleep_time) def moving_sine_wave(self, sock, UDP_IP, UDP_PORT, universe): while self.running: for t in range(0, 360): if not self.running: break dmx_data = [] for channel in range(self.num_channels): angle = (channel * 10 + t) % 360 value = int((math.sin(math.radians(angle)) + 1) * 127) dmx_data.append(value) dmx_packet = self.build_artdmx_packet(universe, dmx_data) sock.sendto(dmx_packet, (UDP_IP, UDP_PORT)) time.sleep(0.02 / self.speed) # Adjust sleep time based on speed def build_artdmx_packet(self, universe, dmx_data): header = b'Art-Net\x00' # Protocol identifier opcode = struct.pack('H', 14) # Protocol version (14 for Art-Net) sequence = struct.pack('B', 0) # Sequence (ignored) physical = struct.pack('B', 0) # Physical port (ignored) universe = struct.pack('H', len(dmx_data)) # Data length, big endian data = bytes(dmx_data) # DMX data packet = header + opcode + protocol_version + sequence + physical + universe + length + data return packet def update_params(self, ip_address, port, universe, num_channels, fade_time, pattern, speed): with self.lock: self.ip_address = ip_address self.port = port self.universe = universe self.num_channels = num_channels self.fade_time = fade_time self.pattern = pattern self.speed = speed self.save_params() self.params_changed.emit() class DMXApp(QWidget): def __init__(self): super().__init__() self.controller = DMXController() self.init_ui() self.bind_signals() # Use dark theme self.setStyleSheet(''' QWidget { background-color: #222222; color: #FFFFFF; } QSlider::groove:horizontal { border: 1px solid #444444; height: 8px; background: #555555; } QSlider::handle:horizontal { background: #888888; border: 1px solid #444444; width: 18px; margin: -8px 0; } QLineEdit { background-color: #333333; border: 1px solid #555555; color: #FFFFFF; padding: 4px; } QPushButton { background-color: #444444; border: 1px solid #666666; padding: 6px; } QPushButton:hover { background-color: #555555; } QLabel { color: #FFFFFF; } ''') def init_ui(self): # Labels and input fields self.ip_label = QLabel('IP Address:') self.ip_input = QLineEdit(self.controller.ip_address) self.port_label = QLabel('Port:') self.port_input = QLineEdit(str(self.controller.port)) self.universe_label = QLabel('Universe:') self.universe_input = QLineEdit(str(self.controller.universe)) self.channels_label = QLabel('Number of Channels:') self.channels_input = QLineEdit(str(self.controller.num_channels)) self.fade_time_label = QLabel('Fade Time (s):') self.fade_time_input = QLineEdit(str(self.controller.fade_time)) self.speed_label = QLabel('Speed (1-10):') self.speed_input = QLineEdit(str(self.controller.speed)) self.pattern_label = QLabel('Pattern:') self.pattern_combo = QComboBox() self.pattern_combo.addItems(['Fade Up and Down', 'Moving Sine Wave']) self.pattern_combo.setCurrentText(self.controller.pattern) # Buttons self.start_button = QPushButton('Start') self.stop_button = QPushButton('Stop') self.stop_button.setEnabled(False) # Layouts layout = QVBoxLayout() layout.addWidget(self.ip_label) layout.addWidget(self.ip_input) layout.addWidget(self.port_label) layout.addWidget(self.port_input) layout.addWidget(self.universe_label) layout.addWidget(self.universe_input) layout.addWidget(self.channels_label) layout.addWidget(self.channels_input) layout.addWidget(self.pattern_label) layout.addWidget(self.pattern_combo) layout.addWidget(self.fade_time_label) layout.addWidget(self.fade_time_input) layout.addWidget(self.speed_label) layout.addWidget(self.speed_input) button_layout = QHBoxLayout() button_layout.addWidget(self.start_button) button_layout.addWidget(self.stop_button) layout.addLayout(button_layout) self.setLayout(layout) self.setWindowTitle('DMX Controller') # Initially hide or show inputs based on the selected pattern if self.controller.pattern == 'Moving Sine Wave': self.fade_time_label.hide() self.fade_time_input.hide() else: self.speed_label.hide() self.speed_input.hide() def bind_signals(self): # Start and stop buttons self.start_button.clicked.connect(self.start_dmx) self.stop_button.clicked.connect(self.stop_dmx) # Input fields self.ip_input.editingFinished.connect(self.update_params) self.port_input.editingFinished.connect(self.update_params) self.universe_input.editingFinished.connect(self.update_params) self.channels_input.editingFinished.connect(self.update_params) self.fade_time_input.editingFinished.connect(self.update_params) self.speed_input.editingFinished.connect(self.update_params) self.pattern_combo.currentIndexChanged.connect(self.pattern_changed) # Parameter change signal self.controller.params_changed.connect(self.on_params_changed) def start_dmx(self): self.controller.start() self.start_button.setEnabled(False) self.stop_button.setEnabled(True) def stop_dmx(self): self.controller.stop() self.start_button.setEnabled(True) self.stop_button.setEnabled(False) def update_params(self): # Validate and update parameters try: ip_address = self.ip_input.text() socket.inet_aton(ip_address) # Validate IP port = int(self.port_input.text()) universe = int(self.universe_input.text()) num_channels = int(self.channels_input.text()) pattern = self.pattern_combo.currentText() if pattern == 'Fade Up and Down': fade_time = float(self.fade_time_input.text()) speed = self.controller.speed # Keep existing speed elif pattern == 'Moving Sine Wave': speed = float(self.speed_input.text()) fade_time = self.controller.fade_time # Keep existing fade_time else: fade_time = self.controller.fade_time speed = self.controller.speed self.controller.update_params( ip_address, port, universe, num_channels, fade_time, pattern, speed ) except Exception as e: QMessageBox.warning(self, 'Invalid Input', str(e)) def pattern_changed(self): pattern = self.pattern_combo.currentText() if pattern == 'Moving Sine Wave': self.fade_time_label.hide() self.fade_time_input.hide() self.speed_label.show() self.speed_input.show() else: self.fade_time_label.show() self.fade_time_input.show() self.speed_label.hide() self.speed_input.hide() self.update_params() def on_params_changed(self): # Update the UI if needed pass if __name__ == '__main__': app = QApplication(sys.argv) window = DMXApp() window.show() sys.exit(app.exec_())