You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

336 lines
13 KiB

"""
Output Manager Module
Manages MIDI output routing between simulator mode and hardware mode.
Handles program changes, note on/off messages, and volume control.
"""
import time
from typing import Dict, List, Optional, Tuple
from PyQt5.QtCore import QObject, pyqtSignal
# Try to import MIDI libraries with fallbacks
try:
import mido
MIDO_AVAILABLE = True
except ImportError:
print("Warning: mido not available, creating fallback")
MIDO_AVAILABLE = False
try:
import rtmidi
RTMIDI_AVAILABLE = True
except ImportError:
print("Warning: rtmidi not available")
RTMIDI_AVAILABLE = False
# Fallback MIDI message class if mido not available
if not MIDO_AVAILABLE:
class Message:
def __init__(self, msg_type, **kwargs):
self.type = msg_type
self.__dict__.update(kwargs)
def bytes(self):
if self.type == 'note_on':
return [0x90 | self.channel, self.note, self.velocity]
elif self.type == 'note_off':
return [0x80 | self.channel, self.note, self.velocity]
elif self.type == 'program_change':
return [0xC0 | self.channel, self.program]
elif self.type == 'control_change':
return [0xB0 | self.channel, self.control, self.value]
return []
# Create a mock mido module
class MockMido:
Message = Message
@staticmethod
def get_output_names():
return ["No MIDI - Simulator Only"]
@staticmethod
def open_output(name):
return None
mido = MockMido()
class OutputManager(QObject):
"""
Manages MIDI output to either hardware devices or internal simulator.
Provides seamless switching between modes and handles all MIDI communication.
"""
# Signals
mode_changed = pyqtSignal(str) # "simulator" or "hardware"
midi_device_changed = pyqtSignal(str) # device name
note_sent = pyqtSignal(int, int, int, bool) # channel, note, velocity, is_note_on
program_sent = pyqtSignal(int, int) # channel, program
volume_sent = pyqtSignal(int, int) # channel, volume
error_occurred = pyqtSignal(str) # error message
def __init__(self, simulator_engine=None):
super().__init__()
# Mode selection
self.current_mode = "simulator" # "simulator" or "hardware"
self.simulator_engine = simulator_engine
# Hardware MIDI
self.midi_output = None
self.available_outputs = []
self.selected_output = None
# Channel volumes (CC7)
self.channel_volumes: Dict[int, int] = {i: 100 for i in range(1, 17)}
# Program changes
self.channel_programs: Dict[int, int] = {i: 0 for i in range(1, 17)}
# Initialize MIDI
self.refresh_midi_devices()
def refresh_midi_devices(self):
"""Refresh list of available MIDI output devices"""
try:
self.available_outputs = mido.get_output_names()
except Exception as e:
self.error_occurred.emit(f"Error refreshing MIDI devices: {str(e)}")
self.available_outputs = []
def get_available_outputs(self) -> List[str]:
"""Get list of available MIDI output device names"""
return self.available_outputs.copy()
def set_mode(self, mode: str) -> bool:
"""Set output mode: 'simulator' or 'hardware'"""
if mode in ["simulator", "hardware"]:
old_mode = self.current_mode
self.current_mode = mode
if mode == "hardware" and old_mode == "simulator":
# Switching to hardware - sync current state
self._sync_to_hardware()
elif mode == "simulator" and old_mode == "hardware":
# Switching to simulator - stop all hardware notes
self._all_notes_off_hardware()
self.mode_changed.emit(mode)
return True
return False
def set_midi_output(self, device_name: str) -> bool:
"""Set hardware MIDI output device"""
if device_name not in self.available_outputs:
return False
# Close existing connection
if self.midi_output:
try:
self.midi_output.close()
except:
pass
self.midi_output = None
# Open new connection
try:
self.midi_output = mido.open_output(device_name)
self.selected_output = device_name
self.midi_device_changed.emit(device_name)
# Sync current state to new device
if self.current_mode == "hardware":
self._sync_to_hardware()
return True
except Exception as e:
self.error_occurred.emit(f"Error opening MIDI device {device_name}: {str(e)}")
return False
def send_note_on(self, channel: int, note: int, velocity: int):
"""Send note on message"""
if not (1 <= channel <= 16 and 0 <= note <= 127 and 0 <= velocity <= 127):
return
if self.current_mode == "simulator" and self.simulator_engine:
self.simulator_engine.play_note(channel, note, velocity)
elif self.current_mode == "hardware" and self.midi_output:
try:
msg = mido.Message('note_on', channel=channel-1, note=note, velocity=velocity)
self.midi_output.send(msg)
except Exception as e:
self.error_occurred.emit(f"Error sending note on: {str(e)}")
# Always update simulator for visual feedback, regardless of mode
if self.simulator_engine:
self.simulator_engine.update_lighting(channel, velocity, velocity / 127.0)
self.note_sent.emit(channel, note, velocity, True)
def send_note_off(self, channel: int, note: int):
"""Send note off message"""
if not (1 <= channel <= 16 and 0 <= note <= 127):
return
if self.current_mode == "simulator" and self.simulator_engine:
self.simulator_engine.stop_note(channel, note)
elif self.current_mode == "hardware" and self.midi_output:
try:
msg = mido.Message('note_off', channel=channel-1, note=note, velocity=0)
self.midi_output.send(msg)
except Exception as e:
self.error_occurred.emit(f"Error sending note off: {str(e)}")
# Always update simulator for visual feedback, regardless of mode
if self.simulator_engine:
self.simulator_engine.fade_lighting(channel)
self.note_sent.emit(channel, note, 0, False)
def send_program_change(self, channel: int, program: int):
"""Send program change message"""
if not (1 <= channel <= 16 and 0 <= program <= 127):
return
self.channel_programs[channel] = program
if self.current_mode == "simulator" and self.simulator_engine:
self.simulator_engine.change_program(channel, program)
elif self.current_mode == "hardware" and self.midi_output:
try:
msg = mido.Message('program_change', channel=channel-1, program=program)
self.midi_output.send(msg)
except Exception as e:
self.error_occurred.emit(f"Error sending program change: {str(e)}")
self.program_sent.emit(channel, program)
def send_volume_change(self, channel: int, volume: int):
"""Send channel volume change (CC7)"""
if not (1 <= channel <= 16 and 0 <= volume <= 127):
return
self.channel_volumes[channel] = volume
if self.current_mode == "simulator" and self.simulator_engine:
self.simulator_engine.set_channel_volume(channel, volume)
elif self.current_mode == "hardware" and self.midi_output:
try:
msg = mido.Message('control_change', channel=channel-1, control=7, value=volume)
self.midi_output.send(msg)
except Exception as e:
self.error_occurred.emit(f"Error sending volume change: {str(e)}")
self.volume_sent.emit(channel, volume)
def send_all_notes_off(self, channel: int = None):
"""Send all notes off message"""
channels = [channel] if channel else range(1, 17)
for ch in channels:
if not (1 <= ch <= 16):
continue
if self.current_mode == "simulator" and self.simulator_engine:
self.simulator_engine.all_notes_off(ch)
elif self.current_mode == "hardware" and self.midi_output:
try:
# Send CC 123 (All Notes Off)
msg = mido.Message('control_change', channel=ch-1, control=123, value=0)
self.midi_output.send(msg)
except Exception as e:
self.error_occurred.emit(f"Error sending all notes off: {str(e)}")
def send_panic(self):
"""Send panic message to all channels"""
if self.current_mode == "hardware" and self.midi_output:
try:
for channel in range(16):
# All Sound Off (CC 120)
msg1 = mido.Message('control_change', channel=channel, control=120, value=0)
self.midi_output.send(msg1)
# All Notes Off (CC 123)
msg2 = mido.Message('control_change', channel=channel, control=123, value=0)
self.midi_output.send(msg2)
# Reset All Controllers (CC 121)
msg3 = mido.Message('control_change', channel=channel, control=121, value=0)
self.midi_output.send(msg3)
except Exception as e:
self.error_occurred.emit(f"Error sending panic: {str(e)}")
elif self.simulator_engine:
self.simulator_engine.panic()
def _sync_to_hardware(self):
"""Sync current program and volume settings to hardware"""
if not (self.current_mode == "hardware" and self.midi_output):
return
try:
# Send current program changes
for channel, program in self.channel_programs.items():
if 1 <= channel <= 16:
msg = mido.Message('program_change', channel=channel-1, program=program)
self.midi_output.send(msg)
time.sleep(0.001) # Small delay between messages
# Send current volume settings
for channel, volume in self.channel_volumes.items():
if 1 <= channel <= 16:
msg = mido.Message('control_change', channel=channel-1, control=7, value=volume)
self.midi_output.send(msg)
time.sleep(0.001)
except Exception as e:
self.error_occurred.emit(f"Error syncing to hardware: {str(e)}")
def _all_notes_off_hardware(self):
"""Send all notes off to hardware when switching away"""
if self.midi_output:
try:
for channel in range(16):
msg = mido.Message('control_change', channel=channel, control=123, value=0)
self.midi_output.send(msg)
except Exception as e:
self.error_occurred.emit(f"Error turning off hardware notes: {str(e)}")
def get_channel_volume(self, channel: int) -> int:
"""Get current volume for a channel"""
return self.channel_volumes.get(channel, 100)
def get_channel_program(self, channel: int) -> int:
"""Get current program for a channel"""
return self.channel_programs.get(channel, 0)
def is_connected(self) -> bool:
"""Check if output is properly connected"""
if self.current_mode == "simulator":
return self.simulator_engine is not None
elif self.current_mode == "hardware":
return self.midi_output is not None
return False
def get_status_info(self) -> Dict:
"""Get comprehensive status information"""
return {
'mode': self.current_mode,
'connected': self.is_connected(),
'selected_output': self.selected_output,
'available_outputs': self.available_outputs.copy(),
'channel_volumes': self.channel_volumes.copy(),
'channel_programs': self.channel_programs.copy()
}
def close(self):
"""Close MIDI connection"""
if self.midi_output:
try:
self._all_notes_off_hardware()
self.midi_output.close()
except:
pass
self.midi_output = None
def __del__(self):
"""Cleanup on destruction"""
self.close()