You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
340 lines
12 KiB
340 lines
12 KiB
import sys
|
|
import socket
|
|
import struct
|
|
import time
|
|
import threading
|
|
import json
|
|
import math
|
|
from PyQt5.QtWidgets import (
|
|
QApplication, QWidget, QPushButton, QVBoxLayout, QHBoxLayout,
|
|
QLabel, QLineEdit, QComboBox, QMessageBox
|
|
)
|
|
from PyQt5.QtCore import pyqtSignal, QObject
|
|
|
|
class DMXController(QObject):
|
|
# Signal to notify parameter changes
|
|
params_changed = pyqtSignal()
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.load_params()
|
|
self.running = False
|
|
self.lock = threading.Lock()
|
|
self.thread = None
|
|
|
|
def load_params(self):
|
|
try:
|
|
with open('dmx_params.json', 'r') as f:
|
|
params = json.load(f)
|
|
except (FileNotFoundError, json.JSONDecodeError):
|
|
params = {
|
|
'ip_address': '255.255.255.255',
|
|
'port': 6454,
|
|
'universe': 0,
|
|
'num_channels': 512,
|
|
'fade_time': 5,
|
|
'pattern': 'Fade Up and Down',
|
|
'speed': 1.0 # Default speed for Moving Sine Wave
|
|
}
|
|
self.ip_address = params.get('ip_address', '255.255.255.255')
|
|
self.port = params.get('port', 6454)
|
|
self.universe = params.get('universe', 0)
|
|
self.num_channels = params.get('num_channels', 512)
|
|
self.fade_time = params.get('fade_time', 5)
|
|
self.pattern = params.get('pattern', 'Fade Up and Down')
|
|
self.speed = params.get('speed', 1.0)
|
|
|
|
def save_params(self):
|
|
params = {
|
|
'ip_address': self.ip_address,
|
|
'port': self.port,
|
|
'universe': self.universe,
|
|
'num_channels': self.num_channels,
|
|
'fade_time': self.fade_time,
|
|
'pattern': self.pattern,
|
|
'speed': self.speed
|
|
}
|
|
with open('dmx_params.json', 'w') as f:
|
|
json.dump(params, f, indent=4)
|
|
|
|
def start(self):
|
|
if not self.running:
|
|
self.running = True
|
|
self.thread = threading.Thread(target=self.run)
|
|
self.thread.start()
|
|
|
|
def stop(self):
|
|
if self.running:
|
|
self.running = False
|
|
if self.thread is not None:
|
|
self.thread.join()
|
|
|
|
def run(self):
|
|
try:
|
|
# Create UDP socket
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
|
UDP_IP = self.ip_address
|
|
UDP_PORT = self.port
|
|
universe = self.universe
|
|
|
|
sequence = 0
|
|
physical = 0
|
|
|
|
if self.pattern == 'Fade Up and Down':
|
|
self.fade_up_down(sock, UDP_IP, UDP_PORT, universe)
|
|
elif self.pattern == 'Moving Sine Wave':
|
|
self.moving_sine_wave(sock, UDP_IP, UDP_PORT, universe)
|
|
else:
|
|
print("Unknown pattern selected.")
|
|
except Exception as e:
|
|
print(f"Error in DMXController: {e}")
|
|
finally:
|
|
sock.close()
|
|
|
|
def fade_up_down(self, sock, UDP_IP, UDP_PORT, universe):
|
|
number_of_steps = 256
|
|
sleep_time = self.fade_time / number_of_steps
|
|
|
|
while self.running:
|
|
# Fade up
|
|
for value in range(0, 256):
|
|
if not self.running:
|
|
break
|
|
dmx_data = [value] * self.num_channels
|
|
dmx_packet = self.build_artdmx_packet(universe, dmx_data)
|
|
sock.sendto(dmx_packet, (UDP_IP, UDP_PORT))
|
|
time.sleep(sleep_time)
|
|
# Fade down
|
|
for value in range(255, -1, -1):
|
|
if not self.running:
|
|
break
|
|
dmx_data = [value] * self.num_channels
|
|
dmx_packet = self.build_artdmx_packet(universe, dmx_data)
|
|
sock.sendto(dmx_packet, (UDP_IP, UDP_PORT))
|
|
time.sleep(sleep_time)
|
|
|
|
def moving_sine_wave(self, sock, UDP_IP, UDP_PORT, universe):
|
|
while self.running:
|
|
for t in range(0, 360):
|
|
if not self.running:
|
|
break
|
|
dmx_data = []
|
|
for channel in range(self.num_channels):
|
|
angle = (channel * 10 + t) % 360
|
|
value = int((math.sin(math.radians(angle)) + 1) * 127)
|
|
dmx_data.append(value)
|
|
dmx_packet = self.build_artdmx_packet(universe, dmx_data)
|
|
sock.sendto(dmx_packet, (UDP_IP, UDP_PORT))
|
|
time.sleep(0.02 / self.speed) # Adjust sleep time based on speed
|
|
|
|
def build_artdmx_packet(self, universe, dmx_data):
|
|
header = b'Art-Net\x00' # Protocol identifier
|
|
opcode = struct.pack('<H', 0x5000) # OpCode: ArtDMX (0x5000), little endian
|
|
protocol_version = struct.pack('>H', 14) # Protocol version (14 for Art-Net)
|
|
sequence = struct.pack('B', 0) # Sequence (ignored)
|
|
physical = struct.pack('B', 0) # Physical port (ignored)
|
|
universe = struct.pack('<H', universe) # Universe, little endian
|
|
length = struct.pack('>H', len(dmx_data)) # Data length, big endian
|
|
data = bytes(dmx_data) # DMX data
|
|
|
|
packet = header + opcode + protocol_version + sequence + physical + universe + length + data
|
|
return packet
|
|
|
|
def update_params(self, ip_address, port, universe, num_channels, fade_time, pattern, speed):
|
|
with self.lock:
|
|
self.ip_address = ip_address
|
|
self.port = port
|
|
self.universe = universe
|
|
self.num_channels = num_channels
|
|
self.fade_time = fade_time
|
|
self.pattern = pattern
|
|
self.speed = speed
|
|
self.save_params()
|
|
self.params_changed.emit()
|
|
|
|
class DMXApp(QWidget):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.controller = DMXController()
|
|
self.init_ui()
|
|
self.bind_signals()
|
|
|
|
# Use dark theme
|
|
self.setStyleSheet('''
|
|
QWidget {
|
|
background-color: #222222;
|
|
color: #FFFFFF;
|
|
}
|
|
QSlider::groove:horizontal {
|
|
border: 1px solid #444444;
|
|
height: 8px;
|
|
background: #555555;
|
|
}
|
|
QSlider::handle:horizontal {
|
|
background: #888888;
|
|
border: 1px solid #444444;
|
|
width: 18px;
|
|
margin: -8px 0;
|
|
}
|
|
QLineEdit {
|
|
background-color: #333333;
|
|
border: 1px solid #555555;
|
|
color: #FFFFFF;
|
|
padding: 4px;
|
|
}
|
|
QPushButton {
|
|
background-color: #444444;
|
|
border: 1px solid #666666;
|
|
padding: 6px;
|
|
}
|
|
QPushButton:hover {
|
|
background-color: #555555;
|
|
}
|
|
QLabel {
|
|
color: #FFFFFF;
|
|
}
|
|
''')
|
|
|
|
def init_ui(self):
|
|
# Labels and input fields
|
|
self.ip_label = QLabel('IP Address:')
|
|
self.ip_input = QLineEdit(self.controller.ip_address)
|
|
|
|
self.port_label = QLabel('Port:')
|
|
self.port_input = QLineEdit(str(self.controller.port))
|
|
|
|
self.universe_label = QLabel('Universe:')
|
|
self.universe_input = QLineEdit(str(self.controller.universe))
|
|
|
|
self.channels_label = QLabel('Number of Channels:')
|
|
self.channels_input = QLineEdit(str(self.controller.num_channels))
|
|
|
|
self.fade_time_label = QLabel('Fade Time (s):')
|
|
self.fade_time_input = QLineEdit(str(self.controller.fade_time))
|
|
|
|
self.speed_label = QLabel('Speed (1-10):')
|
|
self.speed_input = QLineEdit(str(self.controller.speed))
|
|
|
|
self.pattern_label = QLabel('Pattern:')
|
|
self.pattern_combo = QComboBox()
|
|
self.pattern_combo.addItems(['Fade Up and Down', 'Moving Sine Wave'])
|
|
self.pattern_combo.setCurrentText(self.controller.pattern)
|
|
|
|
# Buttons
|
|
self.start_button = QPushButton('Start')
|
|
self.stop_button = QPushButton('Stop')
|
|
self.stop_button.setEnabled(False)
|
|
|
|
# Layouts
|
|
layout = QVBoxLayout()
|
|
|
|
layout.addWidget(self.ip_label)
|
|
layout.addWidget(self.ip_input)
|
|
layout.addWidget(self.port_label)
|
|
layout.addWidget(self.port_input)
|
|
layout.addWidget(self.universe_label)
|
|
layout.addWidget(self.universe_input)
|
|
layout.addWidget(self.channels_label)
|
|
layout.addWidget(self.channels_input)
|
|
layout.addWidget(self.pattern_label)
|
|
layout.addWidget(self.pattern_combo)
|
|
layout.addWidget(self.fade_time_label)
|
|
layout.addWidget(self.fade_time_input)
|
|
layout.addWidget(self.speed_label)
|
|
layout.addWidget(self.speed_input)
|
|
|
|
button_layout = QHBoxLayout()
|
|
button_layout.addWidget(self.start_button)
|
|
button_layout.addWidget(self.stop_button)
|
|
|
|
layout.addLayout(button_layout)
|
|
|
|
self.setLayout(layout)
|
|
self.setWindowTitle('DMX Controller')
|
|
|
|
# Initially hide or show inputs based on the selected pattern
|
|
if self.controller.pattern == 'Moving Sine Wave':
|
|
self.fade_time_label.hide()
|
|
self.fade_time_input.hide()
|
|
else:
|
|
self.speed_label.hide()
|
|
self.speed_input.hide()
|
|
|
|
def bind_signals(self):
|
|
# Start and stop buttons
|
|
self.start_button.clicked.connect(self.start_dmx)
|
|
self.stop_button.clicked.connect(self.stop_dmx)
|
|
|
|
# Input fields
|
|
self.ip_input.editingFinished.connect(self.update_params)
|
|
self.port_input.editingFinished.connect(self.update_params)
|
|
self.universe_input.editingFinished.connect(self.update_params)
|
|
self.channels_input.editingFinished.connect(self.update_params)
|
|
self.fade_time_input.editingFinished.connect(self.update_params)
|
|
self.speed_input.editingFinished.connect(self.update_params)
|
|
self.pattern_combo.currentIndexChanged.connect(self.pattern_changed)
|
|
|
|
# Parameter change signal
|
|
self.controller.params_changed.connect(self.on_params_changed)
|
|
|
|
def start_dmx(self):
|
|
self.controller.start()
|
|
self.start_button.setEnabled(False)
|
|
self.stop_button.setEnabled(True)
|
|
|
|
def stop_dmx(self):
|
|
self.controller.stop()
|
|
self.start_button.setEnabled(True)
|
|
self.stop_button.setEnabled(False)
|
|
|
|
def update_params(self):
|
|
# Validate and update parameters
|
|
try:
|
|
ip_address = self.ip_input.text()
|
|
socket.inet_aton(ip_address) # Validate IP
|
|
|
|
port = int(self.port_input.text())
|
|
universe = int(self.universe_input.text())
|
|
num_channels = int(self.channels_input.text())
|
|
pattern = self.pattern_combo.currentText()
|
|
|
|
if pattern == 'Fade Up and Down':
|
|
fade_time = float(self.fade_time_input.text())
|
|
speed = self.controller.speed # Keep existing speed
|
|
elif pattern == 'Moving Sine Wave':
|
|
speed = float(self.speed_input.text())
|
|
fade_time = self.controller.fade_time # Keep existing fade_time
|
|
else:
|
|
fade_time = self.controller.fade_time
|
|
speed = self.controller.speed
|
|
|
|
self.controller.update_params(
|
|
ip_address, port, universe, num_channels, fade_time, pattern, speed
|
|
)
|
|
except Exception as e:
|
|
QMessageBox.warning(self, 'Invalid Input', str(e))
|
|
|
|
def pattern_changed(self):
|
|
pattern = self.pattern_combo.currentText()
|
|
if pattern == 'Moving Sine Wave':
|
|
self.fade_time_label.hide()
|
|
self.fade_time_input.hide()
|
|
self.speed_label.show()
|
|
self.speed_input.show()
|
|
else:
|
|
self.fade_time_label.show()
|
|
self.fade_time_input.show()
|
|
self.speed_label.hide()
|
|
self.speed_input.hide()
|
|
self.update_params()
|
|
|
|
def on_params_changed(self):
|
|
# Update the UI if needed
|
|
pass
|
|
|
|
if __name__ == '__main__':
|
|
app = QApplication(sys.argv)
|
|
window = DMXApp()
|
|
window.show()
|
|
sys.exit(app.exec_())
|