You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

352 lines
14 KiB

"""
Synth Router Module
Routes MIDI notes to appropriate synth channels based on arpeggio patterns.
Handles spatial routing patterns for lighting effects and musical distribution.
"""
import math
import random
from typing import Dict, List, Optional, Tuple
from PyQt5.QtCore import QObject, pyqtSignal
from .midi_channel_manager import MIDIChannelManager
class SynthRouter(QObject):
"""
Routes notes to synth channels based on musical and spatial patterns.
Integrates with MIDIChannelManager for voice allocation and channel management.
"""
# Signals
note_routed = pyqtSignal(int, int, int) # channel, note, velocity
pattern_changed = pyqtSignal(str)
# Routing pattern types
ROUTING_PATTERNS = [
# Musical patterns
"single_synth", "round_robin", "chord_spread", "random_musical",
# Spatial/lighting patterns
"bounce", "cycle", "wave", "ripple", "cascade",
"random_spatial", "spotlight", "alternating", "center_out",
"spiral", "zigzag"
]
def __init__(self, channel_manager: MIDIChannelManager):
super().__init__()
self.channel_manager = channel_manager
# Current routing settings
self.current_pattern = "single_synth"
self.primary_channel = 1 # For single synth mode
# Pattern state tracking
self.pattern_position = 0
self.pattern_direction = 1 # 1 for forward, -1 for reverse
self.last_channels = [] # Track recent channel usage
# Bounce pattern specific
self.bounce_position = 0
self.bounce_direction = 1
# Cycle pattern specific
self.cycle_position = 0
# Random state
self.random_weights = {} # Channel preference weights
def set_routing_pattern(self, pattern_name: str) -> bool:
"""Set the current routing pattern"""
if pattern_name in self.ROUTING_PATTERNS:
self.current_pattern = pattern_name
self.reset_pattern_state()
self.pattern_changed.emit(pattern_name)
return True
return False
def set_primary_channel(self, channel: int):
"""Set primary channel for single synth mode"""
if 1 <= channel <= 16:
self.primary_channel = channel
def reset_pattern_state(self):
"""Reset pattern-specific state variables"""
self.pattern_position = 0
self.pattern_direction = 1
self.bounce_position = 0
self.bounce_direction = 1
self.cycle_position = 0
self.last_channels.clear()
# Initialize random weights
active_channels = self.channel_manager.get_active_channels()
for channel in active_channels:
self.random_weights[channel] = 1.0
def route_note(self, note: int, velocity: int, chord_notes: List[int] = None) -> Optional[int]:
"""
Route a note to the appropriate synth channel based on current pattern.
Returns the selected channel number, or None if routing failed.
"""
active_channels = self.channel_manager.get_active_channels()
if not active_channels:
return None
# Handle chord spreading for specific patterns
if chord_notes and len(chord_notes) > 1 and self.current_pattern == "chord_spread":
return self._route_chord_spread(note, velocity, chord_notes)
# Route single note based on pattern
target_channel = self._select_target_channel(note, active_channels)
if target_channel:
# Check voice availability and allocate
if self.channel_manager.allocate_voice(target_channel, note):
self.note_routed.emit(target_channel, note, velocity)
self._update_pattern_state(target_channel, active_channels)
return target_channel
return None
def _select_target_channel(self, note: int, active_channels: List[int]) -> Optional[int]:
"""Select target channel based on current routing pattern"""
if self.current_pattern == "single_synth":
return self.primary_channel if self.primary_channel in active_channels else active_channels[0]
elif self.current_pattern == "round_robin":
return self._round_robin_select(active_channels)
elif self.current_pattern == "random_musical":
return random.choice(active_channels)
elif self.current_pattern == "bounce":
return self._bounce_select(active_channels)
elif self.current_pattern == "cycle":
return self._cycle_select(active_channels)
elif self.current_pattern == "wave":
return self._wave_select(active_channels)
elif self.current_pattern == "ripple":
return self._ripple_select(active_channels)
elif self.current_pattern == "cascade":
return self._cascade_select(active_channels)
elif self.current_pattern == "random_spatial":
return self._weighted_random_select(active_channels)
elif self.current_pattern == "spotlight":
return self._spotlight_select(active_channels)
elif self.current_pattern == "alternating":
return self._alternating_select(active_channels)
elif self.current_pattern == "center_out":
return self._center_out_select(active_channels)
elif self.current_pattern == "spiral":
return self._spiral_select(active_channels)
elif self.current_pattern == "zigzag":
return self._zigzag_select(active_channels)
# Default fallback
return active_channels[0]
def _round_robin_select(self, active_channels: List[int]) -> int:
"""Simple round-robin through channels"""
channel = active_channels[self.pattern_position % len(active_channels)]
return channel
def _bounce_select(self, active_channels: List[int]) -> int:
"""Bounce back and forth between first and last channels"""
if len(active_channels) == 1:
return active_channels[0]
# Calculate bounce position
if self.bounce_direction == 1:
if self.bounce_position >= len(active_channels) - 1:
self.bounce_direction = -1
else:
if self.bounce_position <= 0:
self.bounce_direction = 1
return active_channels[self.bounce_position]
def _cycle_select(self, active_channels: List[int]) -> int:
"""Cycle through channels in order"""
channel = active_channels[self.cycle_position % len(active_channels)]
return channel
def _wave_select(self, active_channels: List[int]) -> int:
"""Wave pattern across channels"""
wave_position = math.sin(self.pattern_position * 0.5) * 0.5 + 0.5
channel_index = int(wave_position * (len(active_channels) - 1))
return active_channels[channel_index]
def _ripple_select(self, active_channels: List[int]) -> int:
"""Ripple effect from center outward"""
center = len(active_channels) // 2
ripple_radius = (self.pattern_position // 2) % (len(active_channels) // 2 + 1)
# Select channels at current ripple radius
candidates = []
for i, channel in enumerate(active_channels):
distance = abs(i - center)
if distance == ripple_radius:
candidates.append(channel)
return random.choice(candidates) if candidates else active_channels[center]
def _cascade_select(self, active_channels: List[int]) -> int:
"""Cascade effect - sequential with overlap"""
cascade_width = 3 # Number of channels in cascade
cascade_position = self.pattern_position % (len(active_channels) + cascade_width)
# Find channels in current cascade window
candidates = []
for i in range(cascade_width):
idx = (cascade_position - i) % len(active_channels)
if idx >= 0:
candidates.append(active_channels[idx])
return random.choice(candidates) if candidates else active_channels[0]
def _weighted_random_select(self, active_channels: List[int]) -> int:
"""Random selection with dynamic weights"""
# Adjust weights to avoid recently used channels
for channel in self.last_channels[-3:]: # Last 3 channels get lower weight
if channel in self.random_weights:
self.random_weights[channel] *= 0.5
# Normalize weights
total_weight = sum(self.random_weights.get(ch, 1.0) for ch in active_channels)
if total_weight <= 0:
return random.choice(active_channels)
# Weighted random selection
rand_val = random.random() * total_weight
cumulative = 0
for channel in active_channels:
cumulative += self.random_weights.get(channel, 1.0)
if rand_val <= cumulative:
return channel
return active_channels[-1] # Fallback
def _spotlight_select(self, active_channels: List[int]) -> int:
"""Spotlight effect - focus on one channel at a time"""
spotlight_duration = 8 # Notes per spotlight
spotlight_channel_idx = (self.pattern_position // spotlight_duration) % len(active_channels)
return active_channels[spotlight_channel_idx]
def _alternating_select(self, active_channels: List[int]) -> int:
"""Alternate between even and odd channels"""
if len(active_channels) < 2:
return active_channels[0]
if self.pattern_position % 2 == 0:
# Even positions - select from first half
half = len(active_channels) // 2
return active_channels[self.pattern_position // 2 % (half if half > 0 else 1)]
else:
# Odd positions - select from second half
half = len(active_channels) // 2
second_half = active_channels[half:] if half > 0 else active_channels
return second_half[(self.pattern_position // 2) % len(second_half)]
def _center_out_select(self, active_channels: List[int]) -> int:
"""Select from center outward"""
center = len(active_channels) // 2
radius = (self.pattern_position // 2) % (len(active_channels) // 2 + 1)
# Alternate between left and right of center
if self.pattern_position % 2 == 0:
idx = center + radius
else:
idx = center - radius
idx = max(0, min(len(active_channels) - 1, idx))
return active_channels[idx]
def _spiral_select(self, active_channels: List[int]) -> int:
"""Spiral pattern through channels"""
# Create spiral by varying step size
spiral_step = 2 if len(active_channels) > 4 else 1
idx = (self.pattern_position * spiral_step) % len(active_channels)
return active_channels[idx]
def _zigzag_select(self, active_channels: List[int]) -> int:
"""Zigzag pattern through channels"""
period = len(active_channels) * 2 - 2
position = self.pattern_position % period
if position < len(active_channels):
idx = position
else:
idx = len(active_channels) - 2 - (position - len(active_channels))
idx = max(0, min(len(active_channels) - 1, idx))
return active_channels[idx]
def _route_chord_spread(self, note: int, velocity: int, chord_notes: List[int]) -> Optional[int]:
"""Spread chord notes across different channels"""
active_channels = self.channel_manager.get_active_channels()
# Find position of current note in chord
try:
note_index = chord_notes.index(note)
except ValueError:
note_index = 0
# Distribute chord notes across channels
if len(chord_notes) <= len(active_channels):
# Enough channels for each note
target_channel = active_channels[note_index % len(active_channels)]
else:
# More notes than channels - use round robin
target_channel = active_channels[note_index % len(active_channels)]
return target_channel
def _update_pattern_state(self, selected_channel: int, active_channels: List[int]):
"""Update pattern state after routing a note"""
self.pattern_position += 1
# Track recent channels
self.last_channels.append(selected_channel)
if len(self.last_channels) > 5:
self.last_channels.pop(0)
# Update bounce position
if self.current_pattern == "bounce":
self.bounce_position += self.bounce_direction
if self.bounce_position >= len(active_channels) - 1:
self.bounce_direction = -1
elif self.bounce_position <= 0:
self.bounce_direction = 1
# Update cycle position
elif self.current_pattern == "cycle":
self.cycle_position += 1
# Restore random weights gradually
for channel in active_channels:
if channel in self.random_weights:
self.random_weights[channel] = min(1.0, self.random_weights[channel] + 0.1)
def get_pattern_info(self) -> Dict:
"""Get current pattern state information"""
return {
'pattern': self.current_pattern,
'position': self.pattern_position,
'primary_channel': self.primary_channel,
'bounce_position': self.bounce_position,
'bounce_direction': self.bounce_direction,
'cycle_position': self.cycle_position,
'recent_channels': self.last_channels.copy()
}