""" Arpeggiator Engine Module Core arpeggiator logic with FL Studio-style functionality. Generates arpeggio patterns, handles timing, and integrates with routing and volume systems. """ import time import math import threading from typing import Dict, List, Optional, Tuple, Set from PyQt5.QtCore import QObject, pyqtSignal, QTimer from .midi_channel_manager import MIDIChannelManager from .synth_router import SynthRouter from .volume_pattern_engine import VolumePatternEngine from .output_manager import OutputManager class ArpeggiatorEngine(QObject): """ Main arpeggiator engine with FL Studio-style functionality. Handles pattern generation, timing, scale processing, and note scheduling. """ # Signals note_triggered = pyqtSignal(int, int, int, float) # channel, note, velocity, duration pattern_step = pyqtSignal(int) # current step tempo_changed = pyqtSignal(float) # BPM playing_state_changed = pyqtSignal(bool) # is_playing armed_state_changed = pyqtSignal() # armed state changed # Arpeggio pattern types (FL Studio style) PATTERN_TYPES = [ "up", "down", "up_down", "down_up", "random", "note_order", "chord", "random_chord", "custom" ] # Channel distribution patterns (how notes are spread across channels) CHANNEL_DISTRIBUTION_PATTERNS = [ "up", "down", "up_down", "bounce", "random", "cycle", "alternating", "single_channel" ] # Musical scales SCALES = { "chromatic": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], "major": [0, 2, 4, 5, 7, 9, 11], "minor": [0, 2, 3, 5, 7, 8, 10], "dorian": [0, 2, 3, 5, 7, 9, 10], "phrygian": [0, 1, 3, 5, 7, 8, 10], "lydian": [0, 2, 4, 6, 7, 9, 11], "mixolydian": [0, 2, 4, 5, 7, 9, 10], "locrian": [0, 1, 3, 5, 6, 8, 10], "harmonic_minor": [0, 2, 3, 5, 7, 8, 11], "melodic_minor": [0, 2, 3, 5, 7, 9, 11], "pentatonic_major": [0, 2, 4, 7, 9], "pentatonic_minor": [0, 3, 5, 7, 10], "blues": [0, 3, 5, 6, 7, 10] } # Note speeds (as fractions of a beat) NOTE_SPEEDS = { "1/32": 1/32, "1/32T": 1/48, "1/16": 1/16, "1/16T": 1/24, "1/8": 1/8, "1/8T": 1/12, "1/4": 1/4, "1/4T": 1/6, "1/2": 1/2, "1/2T": 1/3, "1/1": 1, "2/1": 2, "2/1T": 4/3, "4/1": 4, "4/1T": 8/3 } def __init__(self, channel_manager: MIDIChannelManager, synth_router: SynthRouter, volume_engine: VolumePatternEngine, output_manager: OutputManager): super().__init__() self.channel_manager = channel_manager self.synth_router = synth_router self.volume_engine = volume_engine self.output_manager = output_manager # Arpeggiator settings self.root_note = 60 # Middle C self.scale = "major" self.pattern_type = "up" self.octave_range = 1 # 1-4 octaves self.note_speed = "1/8" # Note duration self.gate = 1.0 # Note length as fraction of step (0.1-2.0) self.swing = 0.0 # Swing amount (-100% to +100%) self.velocity = 80 # Base velocity # Channel distribution settings self.channel_distribution = "up" # How notes are distributed across channels self.channel_position = 0 # Current position in channel distribution pattern # Armed state system for pattern-end changes self.armed_root_note = None self.armed_scale = None self.armed_pattern_type = None self.armed_channel_distribution = None self.armed_preset_data = None self.preset_apply_callback = None # Callback function for applying presets # Armed state timeout (30 seconds safety timeout) self.armed_timeout = QTimer() self.armed_timeout.setSingleShot(True) self.armed_timeout.timeout.connect(self.armed_timeout_expired) self.armed_timeout_duration = 30000 # 30 seconds in milliseconds self.last_armed_apply_time = 0 # Track when armed changes were last applied # Pattern loop tracking self.pattern_loops_completed = 0 self.pattern_start_position = 0 # Playback state self.is_playing = False self.tempo = 120.0 # BPM self.current_step = 0 self.pattern_length = 0 self.user_pattern_length = 8 # User-settable pattern length (1-16) # Delay/Echo settings self.delay_enabled = False self.delay_length = 3 # Number of repeats (0-8) self.delay_timing = "1/4" # Timing between delays self.delay_fade = 0.3 # Volume fade per repeat (0.0-1.0) self.delay_step_duration = 0.0 # Calculated delay timing self.scheduled_delays = [] # List of (time, channel, note, volume) tuples # Input notes (what's being held down) self.held_notes: Set[int] = set() self.input_chord: List[int] = [] # Generated pattern self.current_pattern: List[int] = [] self.pattern_position = 0 # Timing self.last_step_time = 0.0 self.step_duration = 0.0 # Seconds per step self.next_step_time = 0.0 # Active notes tracking for note-off self.active_notes: Dict[Tuple[int, int], float] = {} # (channel, note) -> end_time # Threading for timing precision self.timing_thread = None self.stop_timing = False # Setup timing calculation self.calculate_step_duration() self.calculate_delay_step_duration() # Initialize volume engine with pattern length self.volume_engine.set_bar_length(self.user_pattern_length) # Setup update timer self.update_timer = QTimer() self.update_timer.timeout.connect(self.update) self.update_timer.start(1) # 1ms updates for precise timing def set_root_note(self, note: int): """Set root note (0-127)""" if 0 <= note <= 127: self.root_note = note self.regenerate_pattern() def arm_root_note(self, note: int): """Arm a root note to change at pattern end""" if 0 <= note <= 127: self.armed_root_note = note self.armed_state_changed.emit() def clear_armed_root_note(self): """Clear armed root note""" self.armed_root_note = None self.armed_state_changed.emit() def set_scale(self, scale_name: str): """Set musical scale""" if scale_name in self.SCALES: self.scale = scale_name self.regenerate_pattern() def arm_scale(self, scale_name: str): """Arm a scale to change at pattern end""" if scale_name in self.SCALES: self.armed_scale = scale_name self.armed_state_changed.emit() def clear_armed_scale(self): """Clear armed scale""" self.armed_scale = None self.armed_state_changed.emit() def set_pattern_type(self, pattern_type: str): """Set arpeggio pattern type""" if pattern_type in self.PATTERN_TYPES: self.pattern_type = pattern_type self.regenerate_pattern() def arm_pattern_type(self, pattern_type: str): """Arm a pattern type to change at pattern end""" if pattern_type in self.PATTERN_TYPES: self.armed_pattern_type = pattern_type self.armed_state_changed.emit() def clear_armed_pattern_type(self): """Clear armed pattern type""" self.armed_pattern_type = None self.armed_state_changed.emit() def set_octave_range(self, octaves: int): """Set octave range (1-4)""" if 1 <= octaves <= 4: self.octave_range = octaves self.regenerate_pattern() def set_note_speed(self, speed: str): """Set note speed""" if speed in self.NOTE_SPEEDS: self.note_speed = speed self.calculate_step_duration() # Also update delay timing if it matches note speed if self.delay_timing == speed: self.calculate_delay_step_duration() def set_gate(self, gate: float): """Set gate (note length) 0.1-2.0""" self.gate = max(0.1, min(2.0, gate)) def set_swing(self, swing: float): """Set swing amount -1.0 to 1.0""" self.swing = max(-1.0, min(1.0, swing)) def set_velocity(self, velocity: int): """Set base velocity 0-127""" self.velocity = max(0, min(127, velocity)) def set_channel_distribution(self, distribution: str): """Set channel distribution pattern""" if distribution in self.CHANNEL_DISTRIBUTION_PATTERNS: self.channel_distribution = distribution self.channel_position = 0 # Reset position def arm_channel_distribution(self, distribution: str): """Arm a distribution pattern to change at pattern end""" if distribution in self.CHANNEL_DISTRIBUTION_PATTERNS: self.armed_channel_distribution = distribution self.armed_state_changed.emit() def clear_armed_channel_distribution(self): """Clear armed distribution pattern""" self.armed_channel_distribution = None self.armed_state_changed.emit() def arm_preset(self, preset_data: dict): """Arm a preset for switching at pattern end""" if preset_data: self.armed_preset_data = preset_data # Start timeout timer self.armed_timeout.start(self.armed_timeout_duration) self.armed_state_changed.emit() def clear_armed_preset(self): """Clear armed preset""" self.armed_preset_data = None self.armed_state_changed.emit() def set_preset_apply_callback(self, callback): """Set callback function for applying presets""" self.preset_apply_callback = callback def has_armed_changes(self) -> bool: """Check if any changes are armed""" return any([ self.armed_root_note is not None, self.armed_scale is not None, self.armed_pattern_type is not None, self.armed_channel_distribution is not None, self.armed_preset_data is not None ]) def get_armed_preset_data(self): """Get armed preset data""" return self.armed_preset_data def force_apply_armed_changes(self): """Force apply all armed changes immediately (emergency override)""" self.apply_armed_changes() self.last_armed_apply_time = time.time() # Update cooldown timer def clear_all_armed_changes(self): """Clear all armed changes without applying them""" self.armed_root_note = None self.armed_scale = None self.armed_pattern_type = None self.armed_channel_distribution = None self.armed_preset_data = None self.armed_timeout.stop() # Stop timeout timer self.armed_state_changed.emit() def armed_timeout_expired(self): """Handle armed state timeout - clear all armed changes""" print("Armed state timeout expired - clearing all armed changes") self.clear_all_armed_changes() def set_tempo(self, bpm: float): """Set tempo in BPM""" if 40 <= bpm <= 200: self.tempo = bpm self.calculate_step_duration() self.calculate_delay_step_duration() self.tempo_changed.emit(bpm) def set_pattern_length(self, length: int): """Set user-defined pattern length""" if 1 <= length <= 16: self.user_pattern_length = length # Update volume engine bar length to match pattern length self.volume_engine.set_bar_length(length) self.regenerate_pattern() def set_delay_enabled(self, enabled: bool): """Enable or disable delay/echo""" self.delay_enabled = enabled if not enabled: self.scheduled_delays.clear() def set_delay_length(self, length: int): """Set number of delay repeats""" if 0 <= length <= 8: self.delay_length = length def set_delay_timing(self, timing: str): """Set delay timing""" if timing in self.NOTE_SPEEDS: self.delay_timing = timing self.calculate_delay_step_duration() def set_delay_fade(self, fade: float): """Set delay fade amount (0.0 to 1.0)""" self.delay_fade = max(0.0, min(1.0, fade)) def calculate_step_duration(self): """Calculate time between steps based on tempo and note speed""" beats_per_second = self.tempo / 60.0 note_duration = self.NOTE_SPEEDS[self.note_speed] self.step_duration = note_duration / beats_per_second def calculate_delay_step_duration(self): """Calculate time between delay steps based on tempo and delay timing""" beats_per_second = self.tempo / 60.0 # Get delay timing duration in beats delay_timing_duration = self.NOTE_SPEEDS[self.delay_timing] # Calculate actual delay step duration in seconds self.delay_step_duration = delay_timing_duration / beats_per_second def schedule_delays(self, channel: int, note: int, original_volume: int): """Schedule delay/echo repeats for a note""" current_time = time.time() current_volume = original_volume for delay_step in range(1, self.delay_length + 1): # Calculate delay time delay_time = current_time + (delay_step * self.delay_step_duration) # Calculate faded volume for this delay step fade_factor = (1.0 - self.delay_fade) ** delay_step delayed_volume = int(current_volume * fade_factor) # Don't schedule if volume becomes too quiet if delayed_volume < 5: break # Schedule the delayed note self.scheduled_delays.append({ 'time': delay_time, 'channel': channel, 'note': note, 'volume': delayed_volume, 'velocity': self.velocity, # Use original velocity 'duration': self.step_duration * self.gate }) def process_delays(self): """Process scheduled delay/echo notes""" current_time = time.time() delays_to_remove = [] for i, delay in enumerate(self.scheduled_delays): if current_time >= delay['time']: # Time to play this delayed note channel = delay['channel'] note = delay['note'] volume = delay['volume'] velocity = delay['velocity'] duration = delay['duration'] # Set the faded volume self.output_manager.send_volume_change(channel, volume) # Send the delayed note self.output_manager.send_note_on(channel, note, velocity) # Schedule note off for delayed note note_end_time = current_time + duration self.active_notes[(channel, note)] = note_end_time # Mark for removal delays_to_remove.append(i) # Remove processed delays (in reverse order to maintain indices) for i in reversed(delays_to_remove): del self.scheduled_delays[i] def note_on(self, note: int): """Register a note being pressed""" self.held_notes.add(note) self.update_input_chord() self.regenerate_pattern() def note_off(self, note: int): """Register a note being released""" self.held_notes.discard(note) self.update_input_chord() if not self.held_notes: self.stop() else: self.regenerate_pattern() def update_input_chord(self): """Update the chord from held notes""" self.input_chord = sorted(list(self.held_notes)) def start(self): """Start arpeggiator playback""" if not self.held_notes: return False if not self.is_playing: self.is_playing = True self.current_step = 0 self.pattern_position = 0 self.last_step_time = time.time() self.next_step_time = self.last_step_time + self.step_duration self.playing_state_changed.emit(True) return True return False def stop(self): """Stop arpeggiator playback""" if self.is_playing: self.is_playing = False self.all_notes_off() self.playing_state_changed.emit(False) def all_notes_off(self): """Send note off for all active notes""" current_time = time.time() for (channel, note) in list(self.active_notes.keys()): self.output_manager.send_note_off(channel, note) self.channel_manager.release_voice(channel, note) self.active_notes.clear() def regenerate_pattern(self): """Regenerate arpeggio pattern based on current settings""" if not self.input_chord: self.current_pattern = [] return # Get scale notes for the key scale_intervals = self.SCALES[self.scale] # Generate pattern based on type if self.pattern_type == "up": self.current_pattern = self._generate_up_pattern() elif self.pattern_type == "down": self.current_pattern = self._generate_down_pattern() elif self.pattern_type == "up_down": self.current_pattern = self._generate_up_down_pattern() elif self.pattern_type == "down_up": self.current_pattern = self._generate_down_up_pattern() elif self.pattern_type == "random": self.current_pattern = self._generate_random_pattern() elif self.pattern_type == "note_order": self.current_pattern = self._generate_note_order_pattern() elif self.pattern_type == "chord": self.current_pattern = self._generate_chord_pattern() elif self.pattern_type == "random_chord": self.current_pattern = self._generate_random_chord_pattern() # Apply user pattern length intelligently based on pattern type if self.current_pattern: original_pattern = self.current_pattern.copy() # For directional patterns, adapt the pattern to fit the length if self.pattern_type in ["up_down", "down_up"] and self.user_pattern_length >= 4: # For up_down with 4 steps: take first half up, second half down scale_notes = self._generate_scale_notes() half_length = self.user_pattern_length // 2 if self.pattern_type == "up_down": # First half: up progression up_part = scale_notes[:half_length] if len(scale_notes) >= half_length else scale_notes * ((half_length // len(scale_notes)) + 1) up_part = up_part[:half_length] # Second half: down progression down_part = list(reversed(scale_notes))[:half_length] if len(scale_notes) >= half_length else list(reversed(scale_notes)) * ((half_length // len(scale_notes)) + 1) down_part = down_part[:half_length] self.current_pattern = up_part + down_part elif self.pattern_type == "down_up": # First half: down progression down_part = list(reversed(scale_notes))[:half_length] if len(scale_notes) >= half_length else list(reversed(scale_notes)) * ((half_length // len(scale_notes)) + 1) down_part = down_part[:half_length] # Second half: up progression up_part = scale_notes[:half_length] if len(scale_notes) >= half_length else scale_notes * ((half_length // len(scale_notes)) + 1) up_part = up_part[:half_length] self.current_pattern = down_part + up_part else: # For other patterns, use the original logic if len(self.current_pattern) > self.user_pattern_length: # Truncate pattern to user length self.current_pattern = self.current_pattern[:self.user_pattern_length] elif len(self.current_pattern) < self.user_pattern_length: # Repeat pattern to fill user length while len(self.current_pattern) < self.user_pattern_length: remaining_steps = self.user_pattern_length - len(self.current_pattern) self.current_pattern.extend(original_pattern[:remaining_steps]) self.pattern_length = len(self.current_pattern) self.pattern_position = 0 def _generate_scale_notes(self) -> List[int]: """Generate all scale notes within octave range""" scale_intervals = self.SCALES[self.scale] notes = [] # Start from root note base_octave = self.root_note // 12 root_in_octave = self.root_note % 12 # Find closest scale degree to root closest_degree = 0 min_distance = 12 for i, interval in enumerate(scale_intervals): distance = abs((root_in_octave - interval) % 12) if distance < min_distance: min_distance = distance closest_degree = i # Generate notes across octave range for octave in range(self.octave_range): for degree, interval in enumerate(scale_intervals): note = base_octave * 12 + root_in_octave + interval + (octave * 12) if 0 <= note <= 127: notes.append(note) return sorted(notes) def _generate_up_pattern(self) -> List[int]: """Generate ascending arpeggio pattern""" scale_notes = self._generate_scale_notes() return scale_notes def _generate_down_pattern(self) -> List[int]: """Generate descending arpeggio pattern""" scale_notes = self._generate_scale_notes() return list(reversed(scale_notes)) def _generate_up_down_pattern(self) -> List[int]: """Generate up then down pattern""" scale_notes = self._generate_scale_notes() # Up, then down (avoiding duplicate at top) return scale_notes + list(reversed(scale_notes[:-1])) def _generate_down_up_pattern(self) -> List[int]: """Generate down then up pattern""" scale_notes = self._generate_scale_notes() # Down, then up (avoiding duplicate at bottom) return list(reversed(scale_notes)) + scale_notes[1:] def _generate_random_pattern(self) -> List[int]: """Generate random pattern from scale notes""" import random scale_notes = self._generate_scale_notes() pattern_length = max(8, len(scale_notes)) return [random.choice(scale_notes) for _ in range(pattern_length)] def _generate_note_order_pattern(self) -> List[int]: """Generate pattern in the order notes were played""" return self.input_chord * self.octave_range def _generate_chord_pattern(self) -> List[int]: """Generate chord pattern (all notes together, represented as sequence)""" return self.input_chord def _generate_random_chord_pattern(self) -> List[int]: """Generate pattern with random chord combinations""" import random import itertools if len(self.input_chord) < 2: return self.input_chord # Generate various chord combinations patterns = [] # Individual notes patterns.extend(self.input_chord) # Pairs for pair in itertools.combinations(self.input_chord, 2): patterns.extend(pair) # Full chord patterns.extend(self.input_chord) return patterns def _get_next_channel(self) -> int: """Get next channel based on distribution pattern""" active_channels = self.channel_manager.get_active_channels() if not active_channels: return 1 if self.channel_distribution == "single_channel": return active_channels[0] elif self.channel_distribution == "up": # 1, 2, 3, 4, 5, 6... channel_idx = self.channel_position % len(active_channels) self.channel_position += 1 return active_channels[channel_idx] elif self.channel_distribution == "down": # 6, 5, 4, 3, 2, 1... channel_idx = (len(active_channels) - 1 - self.channel_position) % len(active_channels) self.channel_position += 1 return active_channels[channel_idx] elif self.channel_distribution == "up_down": # 1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1, 2, 3... cycle_length = (len(active_channels) - 1) * 2 pos = self.channel_position % cycle_length if pos < len(active_channels): channel_idx = pos else: channel_idx = len(active_channels) - 2 - (pos - len(active_channels)) self.channel_position += 1 return active_channels[max(0, channel_idx)] elif self.channel_distribution == "bounce": # 1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1, 2, 3... (same as up_down but clearer name) return self._get_bounce_channel(active_channels) elif self.channel_distribution == "random": import random return random.choice(active_channels) elif self.channel_distribution == "cycle": # Simple cycle through channels channel_idx = self.channel_position % len(active_channels) self.channel_position += 1 return active_channels[channel_idx] elif self.channel_distribution == "alternating": # Alternate between first and last, second and second-to-last, etc. half_point = len(active_channels) // 2 if self.channel_position % 2 == 0: # Even steps: use first half idx = (self.channel_position // 2) % half_point else: # Odd steps: use second half (from end) idx = len(active_channels) - 1 - ((self.channel_position // 2) % half_point) self.channel_position += 1 return active_channels[idx] # Default to up pattern return active_channels[self.channel_position % len(active_channels)] def _get_bounce_channel(self, active_channels: List[int]) -> int: """Get channel for bounce pattern with proper bounce logic""" if len(active_channels) == 1: return active_channels[0] # Create bounce sequence: 0,1,2,3,2,1,0,1,2,3... bounce_length = (len(active_channels) - 1) * 2 pos = self.channel_position % bounce_length if pos < len(active_channels): # Going up: 0,1,2,3 channel_idx = pos else: # Going down: 2,1,0 (skip the last one to avoid duplicate) channel_idx = len(active_channels) - 2 - (pos - len(active_channels)) self.channel_position += 1 return active_channels[max(0, min(len(active_channels) - 1, channel_idx))] def update(self): """Main update loop - called frequently for timing precision""" if not self.is_playing: self.check_note_offs() self.volume_engine.update_pattern(0.016) # ~60fps return current_time = time.time() # Check if it's time for the next step if current_time >= self.next_step_time and self.current_pattern: self.process_step() self.advance_step() # Check for notes to turn off self.check_note_offs() # Process scheduled delays self.process_delays() # Update volume patterns self.volume_engine.update_pattern(0.016) def process_step(self): """Process the current arpeggio step""" if not self.current_pattern: return # Get note from pattern note = self.current_pattern[self.pattern_position] # Apply swing swing_offset = 0 if self.swing != 0 and self.current_step % 2 == 1: swing_offset = self.step_duration * self.swing * 0.1 # Route note to appropriate channel using distribution pattern target_channel = self._get_next_channel() if target_channel: # Use static velocity (not modified by volume patterns) static_velocity = self.velocity # Calculate note duration note_duration = self.step_duration * self.gate note_end_time = time.time() + note_duration + swing_offset # Calculate and set volume for this channel (once per note) active_channel_count = len(set(self.channel_manager.active_voices.keys())) if active_channel_count == 0: active_channel_count = 1 # At least count the channel we're about to play on volume = self.volume_engine.get_channel_volume(target_channel, active_channel_count) midi_volume = int(volume * 127) self.output_manager.send_volume_change(target_channel, midi_volume) # Send note on self.output_manager.send_note_on(target_channel, note, static_velocity) # Schedule delay/echo if enabled if self.delay_enabled and self.delay_length > 0: self.schedule_delays(target_channel, note, midi_volume) # Schedule note off self.active_notes[(target_channel, note)] = note_end_time # Emit signal for GUI self.note_triggered.emit(target_channel, note, static_velocity, note_duration) def advance_step(self): """Advance to next step in pattern""" old_pattern_position = self.pattern_position self.pattern_position = (self.pattern_position + 1) % self.pattern_length self.current_step += 1 # Check if pattern completed a full loop pattern_completed = False if self.pattern_length == 1: # For single-note patterns, consider every step a completion pattern_completed = True self.pattern_loops_completed += 1 elif old_pattern_position != 0 and self.pattern_position == 0: # For multi-note patterns, completion is when we wrap back to 0 pattern_completed = True self.pattern_loops_completed += 1 if pattern_completed: # Apply armed changes with cooldown to prevent rapid firing current_time = time.time() min_cooldown_time = 1.0 # Minimum 1 second between armed changes if (self.has_armed_changes() and current_time - self.last_armed_apply_time >= min_cooldown_time): self.apply_armed_changes() self.last_armed_apply_time = current_time # Calculate next step time with swing base_time = self.next_step_time + self.step_duration # Apply swing to next step if it's an off-beat if self.swing != 0 and (self.current_step + 1) % 2 == 1: swing_offset = self.step_duration * self.swing * 0.1 self.next_step_time = base_time + swing_offset else: self.next_step_time = base_time self.pattern_step.emit(self.current_step) def apply_armed_changes(self): """Apply armed changes at pattern end""" changes_applied = False # Apply armed root note if self.armed_root_note is not None: self.root_note = self.armed_root_note self.armed_root_note = None changes_applied = True # Apply armed scale if self.armed_scale is not None: self.scale = self.armed_scale self.armed_scale = None changes_applied = True # Apply armed pattern type if self.armed_pattern_type is not None: self.pattern_type = self.armed_pattern_type self.armed_pattern_type = None changes_applied = True # Apply armed channel distribution if self.armed_channel_distribution is not None: self.channel_distribution = self.armed_channel_distribution self.channel_position = 0 # Reset position self.armed_channel_distribution = None changes_applied = True # Apply armed preset if self.armed_preset_data is not None and self.preset_apply_callback: try: self.preset_apply_callback(self.armed_preset_data) self.armed_preset_data = None changes_applied = True except Exception as e: print(f"Error applying armed preset: {e}") # Clear the armed preset to prevent lockup self.armed_preset_data = None changes_applied = True # If any changes were applied, regenerate pattern and emit signal if changes_applied: self.armed_timeout.stop() # Stop timeout timer since changes were applied self.regenerate_pattern() self.armed_state_changed.emit() def check_note_offs(self): """Check for notes that should be turned off""" current_time = time.time() notes_to_remove = [] channels_becoming_inactive = set() for (channel, note), end_time in self.active_notes.items(): if current_time >= end_time: self.output_manager.send_note_off(channel, note) self.channel_manager.release_voice(channel, note) notes_to_remove.append((channel, note)) # Check if this channel will have no more active notes remaining_notes_on_channel = [k for k in self.active_notes.keys() if k[0] == channel and k != (channel, note)] if not remaining_notes_on_channel: channels_becoming_inactive.add(channel) for key in notes_to_remove: del self.active_notes[key] # Turn off visual display for channels that just became inactive for channel in channels_becoming_inactive: # Send volume change signal with 0 volume for black display self.output_manager.volume_sent.emit(channel, 0) # Black display def get_current_state(self) -> Dict: """Get current arpeggiator state""" return { 'is_playing': self.is_playing, 'root_note': self.root_note, 'scale': self.scale, 'pattern_type': self.pattern_type, 'octave_range': self.octave_range, 'note_speed': self.note_speed, 'gate': self.gate, 'swing': self.swing, 'velocity': self.velocity, 'tempo': self.tempo, 'current_step': self.current_step, 'pattern_position': self.pattern_position, 'pattern_length': self.pattern_length, 'held_notes': list(self.held_notes), 'current_pattern': self.current_pattern.copy() }