""" Arpeggiator Engine Module Core arpeggiator logic with FL Studio-style functionality. Generates arpeggio patterns, handles timing, and integrates with routing and volume systems. """ import time import math import threading from typing import Dict, List, Optional, Tuple, Set from PyQt5.QtCore import QObject, pyqtSignal, QTimer from .midi_channel_manager import MIDIChannelManager from .synth_router import SynthRouter from .volume_pattern_engine import VolumePatternEngine from .output_manager import OutputManager class ArpeggiatorEngine(QObject): """ Main arpeggiator engine with FL Studio-style functionality. Handles pattern generation, timing, scale processing, and note scheduling. """ # Signals note_triggered = pyqtSignal(int, int, int, float) # channel, note, velocity, duration pattern_step = pyqtSignal(int) # current step tempo_changed = pyqtSignal(float) # BPM playing_state_changed = pyqtSignal(bool) # is_playing armed_state_changed = pyqtSignal() # armed state changed # Arpeggio pattern types (FL Studio style) PATTERN_TYPES = [ "up", "down", "up_down", "down_up", "random", "note_order", "chord", "random_chord", "custom" ] # Channel distribution patterns (how notes are spread across channels) CHANNEL_DISTRIBUTION_PATTERNS = [ "up", "down", "up_down", "bounce", "random", "cycle", "alternating", "single_channel" ] # Musical scales SCALES = { "chromatic": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], "major": [0, 2, 4, 5, 7, 9, 11], "minor": [0, 2, 3, 5, 7, 8, 10], "dorian": [0, 2, 3, 5, 7, 9, 10], "phrygian": [0, 1, 3, 5, 7, 8, 10], "lydian": [0, 2, 4, 6, 7, 9, 11], "mixolydian": [0, 2, 4, 5, 7, 9, 10], "locrian": [0, 1, 3, 5, 6, 8, 10], "harmonic_minor": [0, 2, 3, 5, 7, 8, 11], "melodic_minor": [0, 2, 3, 5, 7, 9, 11], "pentatonic_major": [0, 2, 4, 7, 9], "pentatonic_minor": [0, 3, 5, 7, 10], "blues": [0, 3, 5, 6, 7, 10] } # Note speeds (as fractions of a beat) NOTE_SPEEDS = { "1/32": 1/32, "1/16": 1/16, "1/8": 1/8, "1/4": 1/4, "1/2": 1/2, "1/1": 1, "2/1": 2 } def __init__(self, channel_manager: MIDIChannelManager, synth_router: SynthRouter, volume_engine: VolumePatternEngine, output_manager: OutputManager): super().__init__() self.channel_manager = channel_manager self.synth_router = synth_router self.volume_engine = volume_engine self.output_manager = output_manager # Arpeggiator settings self.root_note = 60 # Middle C self.scale = "major" self.pattern_type = "up" self.octave_range = 1 # 1-4 octaves self.note_speed = "1/8" # Note duration self.gate = 1.0 # Note length as fraction of step (0.1-2.0) self.swing = 0.0 # Swing amount (-100% to +100%) self.velocity = 80 # Base velocity # Channel distribution settings self.channel_distribution = "up" # How notes are distributed across channels self.channel_position = 0 # Current position in channel distribution pattern # Armed state system for pattern-end changes self.armed_root_note = None self.armed_scale = None self.armed_pattern_type = None self.armed_channel_distribution = None # Pattern loop tracking self.pattern_loops_completed = 0 self.pattern_start_position = 0 # Playback state self.is_playing = False self.tempo = 120.0 # BPM self.current_step = 0 self.pattern_length = 0 # Input notes (what's being held down) self.held_notes: Set[int] = set() self.input_chord: List[int] = [] # Generated pattern self.current_pattern: List[int] = [] self.pattern_position = 0 # Timing self.last_step_time = 0.0 self.step_duration = 0.0 # Seconds per step self.next_step_time = 0.0 # Active notes tracking for note-off self.active_notes: Dict[Tuple[int, int], float] = {} # (channel, note) -> end_time # Threading for timing precision self.timing_thread = None self.stop_timing = False # Setup timing calculation self.calculate_step_duration() # Setup update timer self.update_timer = QTimer() self.update_timer.timeout.connect(self.update) self.update_timer.start(1) # 1ms updates for precise timing def set_root_note(self, note: int): """Set root note (0-127)""" if 0 <= note <= 127: self.root_note = note self.regenerate_pattern() def arm_root_note(self, note: int): """Arm a root note to change at pattern end""" if 0 <= note <= 127: self.armed_root_note = note self.armed_state_changed.emit() def clear_armed_root_note(self): """Clear armed root note""" self.armed_root_note = None self.armed_state_changed.emit() def set_scale(self, scale_name: str): """Set musical scale""" if scale_name in self.SCALES: self.scale = scale_name self.regenerate_pattern() def arm_scale(self, scale_name: str): """Arm a scale to change at pattern end""" if scale_name in self.SCALES: self.armed_scale = scale_name self.armed_state_changed.emit() def clear_armed_scale(self): """Clear armed scale""" self.armed_scale = None self.armed_state_changed.emit() def set_pattern_type(self, pattern_type: str): """Set arpeggio pattern type""" if pattern_type in self.PATTERN_TYPES: self.pattern_type = pattern_type self.regenerate_pattern() def arm_pattern_type(self, pattern_type: str): """Arm a pattern type to change at pattern end""" if pattern_type in self.PATTERN_TYPES: self.armed_pattern_type = pattern_type self.armed_state_changed.emit() def clear_armed_pattern_type(self): """Clear armed pattern type""" self.armed_pattern_type = None self.armed_state_changed.emit() def set_octave_range(self, octaves: int): """Set octave range (1-4)""" if 1 <= octaves <= 4: self.octave_range = octaves self.regenerate_pattern() def set_note_speed(self, speed: str): """Set note speed""" if speed in self.NOTE_SPEEDS: self.note_speed = speed self.calculate_step_duration() def set_gate(self, gate: float): """Set gate (note length) 0.1-2.0""" self.gate = max(0.1, min(2.0, gate)) def set_swing(self, swing: float): """Set swing amount -1.0 to 1.0""" self.swing = max(-1.0, min(1.0, swing)) def set_velocity(self, velocity: int): """Set base velocity 0-127""" self.velocity = max(0, min(127, velocity)) def set_channel_distribution(self, distribution: str): """Set channel distribution pattern""" if distribution in self.CHANNEL_DISTRIBUTION_PATTERNS: self.channel_distribution = distribution self.channel_position = 0 # Reset position def arm_channel_distribution(self, distribution: str): """Arm a distribution pattern to change at pattern end""" if distribution in self.CHANNEL_DISTRIBUTION_PATTERNS: self.armed_channel_distribution = distribution self.armed_state_changed.emit() def clear_armed_channel_distribution(self): """Clear armed distribution pattern""" self.armed_channel_distribution = None self.armed_state_changed.emit() def set_tempo(self, bpm: float): """Set tempo in BPM""" if 40 <= bpm <= 200: self.tempo = bpm self.calculate_step_duration() self.tempo_changed.emit(bpm) def calculate_step_duration(self): """Calculate time between steps based on tempo and note speed""" beats_per_second = self.tempo / 60.0 note_duration = self.NOTE_SPEEDS[self.note_speed] self.step_duration = note_duration / beats_per_second def note_on(self, note: int): """Register a note being pressed""" self.held_notes.add(note) self.update_input_chord() self.regenerate_pattern() def note_off(self, note: int): """Register a note being released""" self.held_notes.discard(note) self.update_input_chord() if not self.held_notes: self.stop() else: self.regenerate_pattern() def update_input_chord(self): """Update the chord from held notes""" self.input_chord = sorted(list(self.held_notes)) def start(self): """Start arpeggiator playback""" if not self.held_notes: return False if not self.is_playing: self.is_playing = True self.current_step = 0 self.pattern_position = 0 self.last_step_time = time.time() self.next_step_time = self.last_step_time + self.step_duration self.playing_state_changed.emit(True) return True return False def stop(self): """Stop arpeggiator playback""" if self.is_playing: self.is_playing = False self.all_notes_off() self.playing_state_changed.emit(False) def all_notes_off(self): """Send note off for all active notes""" current_time = time.time() for (channel, note) in list(self.active_notes.keys()): self.output_manager.send_note_off(channel, note) self.channel_manager.release_voice(channel, note) self.active_notes.clear() def regenerate_pattern(self): """Regenerate arpeggio pattern based on current settings""" if not self.input_chord: self.current_pattern = [] return # Get scale notes for the key scale_intervals = self.SCALES[self.scale] # Generate pattern based on type if self.pattern_type == "up": self.current_pattern = self._generate_up_pattern() elif self.pattern_type == "down": self.current_pattern = self._generate_down_pattern() elif self.pattern_type == "up_down": self.current_pattern = self._generate_up_down_pattern() elif self.pattern_type == "down_up": self.current_pattern = self._generate_down_up_pattern() elif self.pattern_type == "random": self.current_pattern = self._generate_random_pattern() elif self.pattern_type == "note_order": self.current_pattern = self._generate_note_order_pattern() elif self.pattern_type == "chord": self.current_pattern = self._generate_chord_pattern() elif self.pattern_type == "random_chord": self.current_pattern = self._generate_random_chord_pattern() self.pattern_length = len(self.current_pattern) self.pattern_position = 0 def _generate_scale_notes(self) -> List[int]: """Generate all scale notes within octave range""" scale_intervals = self.SCALES[self.scale] notes = [] # Start from root note base_octave = self.root_note // 12 root_in_octave = self.root_note % 12 # Find closest scale degree to root closest_degree = 0 min_distance = 12 for i, interval in enumerate(scale_intervals): distance = abs((root_in_octave - interval) % 12) if distance < min_distance: min_distance = distance closest_degree = i # Generate notes across octave range for octave in range(self.octave_range): for degree, interval in enumerate(scale_intervals): note = base_octave * 12 + root_in_octave + interval + (octave * 12) if 0 <= note <= 127: notes.append(note) return sorted(notes) def _generate_up_pattern(self) -> List[int]: """Generate ascending arpeggio pattern""" scale_notes = self._generate_scale_notes() return scale_notes def _generate_down_pattern(self) -> List[int]: """Generate descending arpeggio pattern""" scale_notes = self._generate_scale_notes() return list(reversed(scale_notes)) def _generate_up_down_pattern(self) -> List[int]: """Generate up then down pattern""" scale_notes = self._generate_scale_notes() # Up, then down (avoiding duplicate at top) return scale_notes + list(reversed(scale_notes[:-1])) def _generate_down_up_pattern(self) -> List[int]: """Generate down then up pattern""" scale_notes = self._generate_scale_notes() # Down, then up (avoiding duplicate at bottom) return list(reversed(scale_notes)) + scale_notes[1:] def _generate_random_pattern(self) -> List[int]: """Generate random pattern from scale notes""" import random scale_notes = self._generate_scale_notes() pattern_length = max(8, len(scale_notes)) return [random.choice(scale_notes) for _ in range(pattern_length)] def _generate_note_order_pattern(self) -> List[int]: """Generate pattern in the order notes were played""" return self.input_chord * self.octave_range def _generate_chord_pattern(self) -> List[int]: """Generate chord pattern (all notes together, represented as sequence)""" return self.input_chord def _generate_random_chord_pattern(self) -> List[int]: """Generate pattern with random chord combinations""" import random import itertools if len(self.input_chord) < 2: return self.input_chord # Generate various chord combinations patterns = [] # Individual notes patterns.extend(self.input_chord) # Pairs for pair in itertools.combinations(self.input_chord, 2): patterns.extend(pair) # Full chord patterns.extend(self.input_chord) return patterns def _get_next_channel(self) -> int: """Get next channel based on distribution pattern""" active_channels = self.channel_manager.get_active_channels() if not active_channels: return 1 if self.channel_distribution == "single_channel": return active_channels[0] elif self.channel_distribution == "up": # 1, 2, 3, 4, 5, 6... channel_idx = self.channel_position % len(active_channels) self.channel_position += 1 return active_channels[channel_idx] elif self.channel_distribution == "down": # 6, 5, 4, 3, 2, 1... channel_idx = (len(active_channels) - 1 - self.channel_position) % len(active_channels) self.channel_position += 1 return active_channels[channel_idx] elif self.channel_distribution == "up_down": # 1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1, 2, 3... cycle_length = (len(active_channels) - 1) * 2 pos = self.channel_position % cycle_length if pos < len(active_channels): channel_idx = pos else: channel_idx = len(active_channels) - 2 - (pos - len(active_channels)) self.channel_position += 1 return active_channels[max(0, channel_idx)] elif self.channel_distribution == "bounce": # 1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1, 2, 3... (same as up_down but clearer name) return self._get_bounce_channel(active_channels) elif self.channel_distribution == "random": import random return random.choice(active_channels) elif self.channel_distribution == "cycle": # Simple cycle through channels channel_idx = self.channel_position % len(active_channels) self.channel_position += 1 return active_channels[channel_idx] elif self.channel_distribution == "alternating": # Alternate between first and last, second and second-to-last, etc. half_point = len(active_channels) // 2 if self.channel_position % 2 == 0: # Even steps: use first half idx = (self.channel_position // 2) % half_point else: # Odd steps: use second half (from end) idx = len(active_channels) - 1 - ((self.channel_position // 2) % half_point) self.channel_position += 1 return active_channels[idx] # Default to up pattern return active_channels[self.channel_position % len(active_channels)] def _get_bounce_channel(self, active_channels: List[int]) -> int: """Get channel for bounce pattern with proper bounce logic""" if len(active_channels) == 1: return active_channels[0] # Create bounce sequence: 0,1,2,3,2,1,0,1,2,3... bounce_length = (len(active_channels) - 1) * 2 pos = self.channel_position % bounce_length if pos < len(active_channels): # Going up: 0,1,2,3 channel_idx = pos else: # Going down: 2,1,0 (skip the last one to avoid duplicate) channel_idx = len(active_channels) - 2 - (pos - len(active_channels)) self.channel_position += 1 return active_channels[max(0, min(len(active_channels) - 1, channel_idx))] def update(self): """Main update loop - called frequently for timing precision""" if not self.is_playing: self.check_note_offs() self.volume_engine.update_pattern(0.016) # ~60fps return current_time = time.time() # Check if it's time for the next step if current_time >= self.next_step_time and self.current_pattern: self.process_step() self.advance_step() # Check for notes to turn off self.check_note_offs() # Update volume patterns self.volume_engine.update_pattern(0.016) def process_step(self): """Process the current arpeggio step""" if not self.current_pattern: return # Get note from pattern note = self.current_pattern[self.pattern_position] # Apply swing swing_offset = 0 if self.swing != 0 and self.current_step % 2 == 1: swing_offset = self.step_duration * self.swing * 0.1 # Route note to appropriate channel using distribution pattern target_channel = self._get_next_channel() if target_channel: # Use static velocity (not modified by volume patterns) static_velocity = self.velocity # Calculate note duration note_duration = self.step_duration * self.gate note_end_time = time.time() + note_duration + swing_offset # Calculate and set volume for this channel (once per note) active_channel_count = len(set(self.channel_manager.active_voices.keys())) if active_channel_count == 0: active_channel_count = 1 # At least count the channel we're about to play on volume = self.volume_engine.get_channel_volume(target_channel, active_channel_count) midi_volume = int(volume * 127) self.output_manager.send_volume_change(target_channel, midi_volume) # Send note on self.output_manager.send_note_on(target_channel, note, static_velocity) # Schedule note off self.active_notes[(target_channel, note)] = note_end_time # Emit signal for GUI self.note_triggered.emit(target_channel, note, static_velocity, note_duration) def advance_step(self): """Advance to next step in pattern""" old_pattern_position = self.pattern_position self.pattern_position = (self.pattern_position + 1) % self.pattern_length self.current_step += 1 # Check if pattern completed a full loop if old_pattern_position != 0 and self.pattern_position == 0: self.pattern_loops_completed += 1 self.apply_armed_changes() # Calculate next step time with swing base_time = self.next_step_time + self.step_duration # Apply swing to next step if it's an off-beat if self.swing != 0 and (self.current_step + 1) % 2 == 1: swing_offset = self.step_duration * self.swing * 0.1 self.next_step_time = base_time + swing_offset else: self.next_step_time = base_time self.pattern_step.emit(self.current_step) def apply_armed_changes(self): """Apply armed changes at pattern end""" changes_applied = False # Apply armed root note if self.armed_root_note is not None: self.root_note = self.armed_root_note self.armed_root_note = None changes_applied = True # Apply armed scale if self.armed_scale is not None: self.scale = self.armed_scale self.armed_scale = None changes_applied = True # Apply armed pattern type if self.armed_pattern_type is not None: self.pattern_type = self.armed_pattern_type self.armed_pattern_type = None changes_applied = True # Apply armed channel distribution if self.armed_channel_distribution is not None: self.channel_distribution = self.armed_channel_distribution self.channel_position = 0 # Reset position self.armed_channel_distribution = None changes_applied = True # If any changes were applied, regenerate pattern and emit signal if changes_applied: self.regenerate_pattern() self.armed_state_changed.emit() def check_note_offs(self): """Check for notes that should be turned off""" current_time = time.time() notes_to_remove = [] channels_becoming_inactive = set() for (channel, note), end_time in self.active_notes.items(): if current_time >= end_time: self.output_manager.send_note_off(channel, note) self.channel_manager.release_voice(channel, note) notes_to_remove.append((channel, note)) # Check if this channel will have no more active notes remaining_notes_on_channel = [k for k in self.active_notes.keys() if k[0] == channel and k != (channel, note)] if not remaining_notes_on_channel: channels_becoming_inactive.add(channel) for key in notes_to_remove: del self.active_notes[key] # Dim visual display for channels that just became inactive for channel in channels_becoming_inactive: # Send volume change signal with low volume for visual feedback self.output_manager.volume_sent.emit(channel, 20) # Dim display def get_current_state(self) -> Dict: """Get current arpeggiator state""" return { 'is_playing': self.is_playing, 'root_note': self.root_note, 'scale': self.scale, 'pattern_type': self.pattern_type, 'octave_range': self.octave_range, 'note_speed': self.note_speed, 'gate': self.gate, 'swing': self.swing, 'velocity': self.velocity, 'tempo': self.tempo, 'current_step': self.current_step, 'pattern_position': self.pattern_position, 'pattern_length': self.pattern_length, 'held_notes': list(self.held_notes), 'current_pattern': self.current_pattern.copy() }