You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

1077 lines
43 KiB

"""
Arpeggiator Engine Module
Core arpeggiator logic with FL Studio-style functionality.
Generates arpeggio patterns, handles timing, and integrates with routing and volume systems.
"""
import time
import math
import threading
from typing import Dict, List, Optional, Tuple, Set
from PyQt5.QtCore import QObject, pyqtSignal, QTimer
from .midi_channel_manager import MIDIChannelManager
from .synth_router import SynthRouter
from .volume_pattern_engine import VolumePatternEngine
from .output_manager import OutputManager
class ArpeggiatorEngine(QObject):
"""
Main arpeggiator engine with FL Studio-style functionality.
Handles pattern generation, timing, scale processing, and note scheduling.
"""
# Signals
note_triggered = pyqtSignal(int, int, int, float) # channel, note, velocity, duration
pattern_step = pyqtSignal(int) # current step
tempo_changed = pyqtSignal(float) # BPM
playing_state_changed = pyqtSignal(bool) # is_playing
armed_state_changed = pyqtSignal() # armed state changed
settings_changed = pyqtSignal() # Any setting changed (for GUI updates)
# Arpeggio pattern types (FL Studio style)
PATTERN_TYPES = [
"up", "down", "up_down", "down_up", "random",
"note_order", "chord", "random_chord", "custom"
]
# Channel distribution patterns (how notes are spread across channels)
CHANNEL_DISTRIBUTION_PATTERNS = [
"up", "down", "up_down", "bounce", "random", "cycle",
"alternating", "single_channel"
]
# Musical scales
SCALES = {
"chromatic": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11],
"major": [0, 2, 4, 5, 7, 9, 11],
"minor": [0, 2, 3, 5, 7, 8, 10],
"dorian": [0, 2, 3, 5, 7, 9, 10],
"phrygian": [0, 1, 3, 5, 7, 8, 10],
"lydian": [0, 2, 4, 6, 7, 9, 11],
"mixolydian": [0, 2, 4, 5, 7, 9, 10],
"locrian": [0, 1, 3, 5, 6, 8, 10],
"harmonic_minor": [0, 2, 3, 5, 7, 8, 11],
"melodic_minor": [0, 2, 3, 5, 7, 9, 11],
"pentatonic_major": [0, 2, 4, 7, 9],
"pentatonic_minor": [0, 3, 5, 7, 10],
"blues": [0, 3, 5, 6, 7, 10]
}
# Note speeds (as fractions of a beat)
NOTE_SPEEDS = {
"1/32": 1/32, "1/32T": 1/48, "1/16": 1/16, "1/16T": 1/24,
"1/8": 1/8, "1/8T": 1/12, "1/4": 1/4, "1/4T": 1/6,
"1/2": 1/2, "1/2T": 1/3, "1/1": 1, "2/1": 2, "2/1T": 4/3, "4/1": 4, "4/1T": 8/3
}
def __init__(self, channel_manager: MIDIChannelManager, synth_router: SynthRouter,
volume_engine: VolumePatternEngine, output_manager: OutputManager):
super().__init__()
self.channel_manager = channel_manager
self.synth_router = synth_router
self.volume_engine = volume_engine
self.output_manager = output_manager
# Arpeggiator settings
self.root_note = 60 # Middle C
self.scale = "major"
self.pattern_type = "up"
self.octave_range = 1 # 1-4 octaves
self.note_speed = "1/8" # Note duration
self.gate = 1.0 # Note length as fraction of step (0.1-2.0)
self.swing = 0.0 # Swing amount (-100% to +100%)
self.velocity = 80 # Base velocity
self.scale_note_start = 0 # Starting scale note index (0 = root)
# Channel distribution settings
self.channel_distribution = "up" # How notes are distributed across channels
self.channel_position = 0 # Current position in channel distribution pattern
# Armed state system for pattern-end changes
self.armed_root_note = None
self.armed_scale = None
self.armed_pattern_type = None
self.armed_channel_distribution = None
self.armed_scale_note_start = None
self.armed_pattern_length = None
self.armed_note_limit = None
self.armed_preset_data = None
self.preset_apply_callback = None # Callback function for applying presets
# Armed state timeout (30 seconds safety timeout)
self.armed_timeout = QTimer()
self.armed_timeout.setSingleShot(True)
self.armed_timeout.timeout.connect(self.armed_timeout_expired)
self.armed_timeout_duration = 30000 # 30 seconds in milliseconds
self.last_armed_apply_time = 0 # Track when armed changes were last applied
# Pattern loop tracking
self.pattern_loops_completed = 0
self.pattern_start_position = 0
# Playback state
self.is_playing = False
self.tempo = 120.0 # BPM
self.current_step = 0
self.pattern_length = 0
self.user_pattern_length = 8 # User-settable pattern length (1-16)
self.note_limit = 7 # Limit how many notes from the scale to use (1-7)
# Delay/Echo settings
self.delay_enabled = False
self.delay_length = 3 # Number of repeats (0-8)
self.delay_timing = "1/4" # Timing between delays
self.delay_fade = 0.3 # Volume fade per repeat (0.0-1.0)
self.delay_step_duration = 0.0 # Calculated delay timing
self.scheduled_delays = [] # List of delay dicts with time, channel, note, velocity, duration
# Input notes (what's being held down)
self.held_notes: Set[int] = set()
self.input_chord: List[int] = []
# Generated pattern
self.current_pattern: List[int] = []
self.pattern_position = 0
# Timing
self.last_step_time = 0.0
self.step_duration = 0.0 # Seconds per step
self.next_step_time = 0.0
# Active notes tracking for note-off
self.active_notes: Dict[Tuple[int, int], float] = {} # (channel, note) -> end_time
# Threading for timing precision
self.timing_thread = None
self.stop_timing = False
# Setup timing calculation
self.calculate_step_duration()
self.calculate_delay_step_duration()
# Initialize volume engine with pattern length
self.volume_engine.set_bar_length(self.user_pattern_length)
# Setup update timer
self.update_timer = QTimer()
self.update_timer.timeout.connect(self.update)
self.update_timer.start(1) # 1ms updates for precise timing
def set_root_note(self, note: int):
"""Set root note (0-127)"""
if 0 <= note <= 127:
self.root_note = note
self.regenerate_pattern()
def arm_root_note(self, note: int):
"""Arm a root note to change at pattern end"""
if 0 <= note <= 127:
self.armed_root_note = note
self.armed_state_changed.emit()
def clear_armed_root_note(self):
"""Clear armed root note"""
self.armed_root_note = None
self.armed_state_changed.emit()
def set_scale(self, scale_name: str):
"""Set musical scale"""
if scale_name in self.SCALES:
self.scale = scale_name
self.regenerate_pattern()
def arm_scale(self, scale_name: str):
"""Arm a scale to change at pattern end"""
if scale_name in self.SCALES:
self.armed_scale = scale_name
self.armed_state_changed.emit()
def clear_armed_scale(self):
"""Clear armed scale"""
self.armed_scale = None
self.armed_state_changed.emit()
def set_pattern_type(self, pattern_type: str):
"""Set arpeggio pattern type"""
if pattern_type in self.PATTERN_TYPES:
self.pattern_type = pattern_type
self.regenerate_pattern()
def arm_pattern_type(self, pattern_type: str):
"""Arm a pattern type to change at pattern end"""
if pattern_type in self.PATTERN_TYPES:
self.armed_pattern_type = pattern_type
self.armed_state_changed.emit()
def clear_armed_pattern_type(self):
"""Clear armed pattern type"""
self.armed_pattern_type = None
self.armed_state_changed.emit()
def set_octave_range(self, octaves: int):
"""Set octave range (1-4)"""
if 1 <= octaves <= 4:
self.octave_range = octaves
self.regenerate_pattern()
def set_note_speed(self, speed: str):
"""Set note speed"""
if speed in self.NOTE_SPEEDS:
self.note_speed = speed
self.calculate_step_duration()
# Always recalculate delay timing since it's relative to note speed
self.calculate_delay_step_duration()
def set_gate(self, gate: float):
"""Set gate (note length) 0.1-2.0"""
self.gate = max(0.1, min(2.0, gate))
def set_swing(self, swing: float):
"""Set swing amount -1.0 to 1.0"""
self.swing = max(-1.0, min(1.0, swing))
def set_velocity(self, velocity: int):
"""Set base velocity 0-127"""
self.velocity = max(0, min(127, velocity))
def set_scale_note_start(self, scale_note_index: int):
"""Set which scale note to start the arpeggio from"""
self.scale_note_start = max(0, scale_note_index)
self.regenerate_pattern()
def arm_scale_note_start(self, scale_note_index: int):
"""Arm a scale note start position to change at pattern end"""
self.armed_scale_note_start = max(0, scale_note_index)
self.armed_state_changed.emit()
def clear_armed_scale_note_start(self):
"""Clear armed scale note start position"""
self.armed_scale_note_start = None
self.armed_state_changed.emit()
def set_channel_distribution(self, distribution: str):
"""Set channel distribution pattern"""
if distribution in self.CHANNEL_DISTRIBUTION_PATTERNS:
self.channel_distribution = distribution
self.channel_position = 0 # Reset position
def arm_channel_distribution(self, distribution: str):
"""Arm a distribution pattern to change at pattern end"""
if distribution in self.CHANNEL_DISTRIBUTION_PATTERNS:
self.armed_channel_distribution = distribution
self.armed_state_changed.emit()
def clear_armed_channel_distribution(self):
"""Clear armed distribution pattern"""
self.armed_channel_distribution = None
self.armed_state_changed.emit()
def arm_preset(self, preset_data: dict):
"""Arm a preset for switching at pattern end"""
if preset_data:
self.armed_preset_data = preset_data
# Start timeout timer
self.armed_timeout.start(self.armed_timeout_duration)
self.armed_state_changed.emit()
def clear_armed_preset(self):
"""Clear armed preset"""
self.armed_preset_data = None
self.armed_state_changed.emit()
def set_preset_apply_callback(self, callback):
"""Set callback function for applying presets"""
self.preset_apply_callback = callback
def has_armed_changes(self) -> bool:
"""Check if any changes are armed"""
return any([
self.armed_root_note is not None,
self.armed_scale is not None,
self.armed_pattern_type is not None,
self.armed_channel_distribution is not None,
self.armed_scale_note_start is not None,
self.armed_pattern_length is not None,
self.armed_note_limit is not None,
self.armed_preset_data is not None
])
def get_armed_preset_data(self):
"""Get armed preset data"""
return self.armed_preset_data
def force_apply_armed_changes(self):
"""Force apply all armed changes immediately (emergency override)"""
self.apply_armed_changes()
self.last_armed_apply_time = time.time() # Update cooldown timer
def clear_all_armed_changes(self):
"""Clear all armed changes without applying them"""
self.armed_root_note = None
self.armed_scale = None
self.armed_pattern_type = None
self.armed_channel_distribution = None
self.armed_scale_note_start = None
self.armed_pattern_length = None
self.armed_note_limit = None
self.armed_preset_data = None
self.armed_timeout.stop() # Stop timeout timer
self.armed_state_changed.emit()
def armed_timeout_expired(self):
"""Handle armed state timeout - clear all armed changes"""
print("Armed state timeout expired - clearing all armed changes")
self.clear_all_armed_changes()
def set_tempo(self, bpm: float):
"""Set tempo in BPM"""
if 40 <= bpm <= 200:
self.tempo = bpm
self.calculate_step_duration()
self.calculate_delay_step_duration()
self.tempo_changed.emit(bpm)
def set_pattern_length(self, length: int):
"""Set user-defined pattern length (armed - applies at pattern end)"""
if 1 <= length <= 16:
self.armed_pattern_length = length
self.armed_state_changed.emit()
def set_note_limit(self, limit: int):
"""Set the note limit (1-7) - how many notes from the scale to use (armed - applies at pattern end)"""
if 1 <= limit <= 7:
self.armed_note_limit = limit
self.armed_state_changed.emit()
def set_delay_enabled(self, enabled: bool):
"""Enable or disable delay/echo"""
self.delay_enabled = enabled
if not enabled:
self.scheduled_delays.clear()
def set_delay_length(self, length: int):
"""Set number of delay repeats"""
if 0 <= length <= 8:
self.delay_length = length
def set_delay_timing(self, timing: str):
"""Set delay timing"""
if timing in self.NOTE_SPEEDS:
self.delay_timing = timing
self.calculate_delay_step_duration()
def set_delay_fade(self, fade: float):
"""Set delay fade amount (0.0 to 1.0)"""
self.delay_fade = max(0.0, min(1.0, fade))
def calculate_step_duration(self):
"""Calculate time between steps based on tempo and note speed"""
beats_per_second = self.tempo / 60.0
note_duration = self.NOTE_SPEEDS[self.note_speed]
self.step_duration = note_duration / beats_per_second
def calculate_delay_step_duration(self):
"""Calculate time between delay steps as a fraction of the current note speed"""
beats_per_second = self.tempo / 60.0
# Get current note speed duration in beats
note_speed_duration = self.NOTE_SPEEDS[self.note_speed]
# Get delay timing as a fraction
delay_timing_fraction = self.NOTE_SPEEDS[self.delay_timing]
# Delay interval = note_speed × delay_timing_fraction
# For example: note_speed=1/1, delay_timing=1/4 → delay every 1/4 beat
# Or: note_speed=1/2, delay_timing=1/4 → delay every 1/8 beat (1/2 × 1/4)
delay_interval_beats = note_speed_duration * delay_timing_fraction
# Convert to seconds
self.delay_step_duration = delay_interval_beats / beats_per_second
def schedule_delays(self, channel: int, note: int, original_velocity: int):
"""Schedule delay/echo repeats for a note"""
current_time = time.time()
current_velocity = original_velocity
for delay_step in range(1, self.delay_length + 1):
# Calculate delay time
delay_time = current_time + (delay_step * self.delay_step_duration)
# Calculate faded velocity for this delay step
fade_factor = (1.0 - self.delay_fade) ** delay_step
delayed_velocity = int(current_velocity * fade_factor)
# Don't schedule if velocity becomes too quiet (MIDI velocity range is 1-127)
if delayed_velocity < 5:
break
# Schedule the delayed note
self.scheduled_delays.append({
'time': delay_time,
'channel': channel,
'note': note,
'velocity': delayed_velocity, # Use faded velocity
'duration': self.step_duration * self.gate
})
def process_delays(self):
"""Process scheduled delay/echo notes"""
current_time = time.time()
delays_to_remove = []
for i, delay in enumerate(self.scheduled_delays):
if current_time >= delay['time']:
# Time to play this delayed note
channel = delay['channel']
note = delay['note']
velocity = delay['velocity']
duration = delay['duration']
# Send the delayed note with faded velocity (no volume change needed)
self.output_manager.send_note_on(channel, note, velocity)
# Schedule note off for delayed note
note_end_time = current_time + duration
self.active_notes[(channel, note)] = note_end_time
# Mark for removal
delays_to_remove.append(i)
# Remove processed delays (in reverse order to maintain indices)
for i in reversed(delays_to_remove):
del self.scheduled_delays[i]
def note_on(self, note: int):
"""Register a note being pressed"""
self.held_notes.add(note)
self.update_input_chord()
self.regenerate_pattern()
def note_off(self, note: int):
"""Register a note being released"""
self.held_notes.discard(note)
self.update_input_chord()
if not self.held_notes:
self.stop()
else:
self.regenerate_pattern()
def update_input_chord(self):
"""Update the chord from held notes"""
self.input_chord = sorted(list(self.held_notes))
def start(self):
"""Start arpeggiator playback"""
if not self.held_notes:
return False
if not self.is_playing:
self.is_playing = True
self.current_step = 0
self.pattern_position = 0
self.last_step_time = time.time()
self.next_step_time = self.last_step_time + self.step_duration
# Synchronize volume patterns with arpeggiator start
self.volume_engine.sync_with_arpeggiator_start()
self.playing_state_changed.emit(True)
return True
return False
def stop(self):
"""Stop arpeggiator playback"""
if self.is_playing:
self.is_playing = False
self.all_notes_off()
self.playing_state_changed.emit(False)
def all_notes_off(self):
"""Send note off for all active notes"""
current_time = time.time()
for (channel, note) in list(self.active_notes.keys()):
self.output_manager.send_note_off(channel, note)
self.channel_manager.release_voice(channel, note)
self.active_notes.clear()
def regenerate_pattern(self):
"""Regenerate arpeggio pattern based on current settings"""
if not self.input_chord:
self.current_pattern = []
return
# Get scale notes for the key
scale_intervals = self.SCALES[self.scale]
# Generate pattern based on type
if self.pattern_type == "up":
self.current_pattern = self._generate_up_pattern()
elif self.pattern_type == "down":
self.current_pattern = self._generate_down_pattern()
elif self.pattern_type == "up_down":
self.current_pattern = self._generate_up_down_pattern()
elif self.pattern_type == "down_up":
self.current_pattern = self._generate_down_up_pattern()
elif self.pattern_type == "random":
self.current_pattern = self._generate_random_pattern()
elif self.pattern_type == "note_order":
self.current_pattern = self._generate_note_order_pattern()
elif self.pattern_type == "chord":
self.current_pattern = self._generate_chord_pattern()
elif self.pattern_type == "random_chord":
self.current_pattern = self._generate_random_chord_pattern()
# Apply user pattern length intelligently based on pattern type
if self.current_pattern:
original_pattern = self.current_pattern.copy()
# For directional patterns, adapt the pattern to fit the length
if self.pattern_type in ["up_down", "down_up"] and self.user_pattern_length >= 4:
# For up_down with 4 steps: take first half up, second half down
scale_notes = self._get_all_scale_notes()
half_length = self.user_pattern_length // 2
if self.pattern_type == "up_down":
# First half: up progression
up_part = scale_notes[:half_length] if len(scale_notes) >= half_length else scale_notes * ((half_length // len(scale_notes)) + 1)
up_part = up_part[:half_length]
# Second half: down progression
down_part = list(reversed(scale_notes))[:half_length] if len(scale_notes) >= half_length else list(reversed(scale_notes)) * ((half_length // len(scale_notes)) + 1)
down_part = down_part[:half_length]
self.current_pattern = up_part + down_part
elif self.pattern_type == "down_up":
# First half: down progression
down_part = list(reversed(scale_notes))[:half_length] if len(scale_notes) >= half_length else list(reversed(scale_notes)) * ((half_length // len(scale_notes)) + 1)
down_part = down_part[:half_length]
# Second half: up progression
up_part = scale_notes[:half_length] if len(scale_notes) >= half_length else scale_notes * ((half_length // len(scale_notes)) + 1)
up_part = up_part[:half_length]
self.current_pattern = down_part + up_part
else:
# For other patterns, use the original logic
if len(self.current_pattern) > self.user_pattern_length:
# Truncate pattern to user length
self.current_pattern = self.current_pattern[:self.user_pattern_length]
elif len(self.current_pattern) < self.user_pattern_length:
# Repeat pattern to fill user length
while len(self.current_pattern) < self.user_pattern_length:
remaining_steps = self.user_pattern_length - len(self.current_pattern)
self.current_pattern.extend(original_pattern[:remaining_steps])
self.pattern_length = len(self.current_pattern)
self.pattern_position = 0
def _generate_scale_notes_up(self) -> List[int]:
"""Generate scale notes going upward from the selected starting note"""
scale_intervals = self.SCALES[self.scale]
notes = []
# Calculate the starting note based on root note and scale_note_start
base_octave = self.root_note // 12
root_in_octave = self.root_note % 12
# Get the interval for the selected scale degree
start_degree = self.scale_note_start % len(scale_intervals)
start_interval = scale_intervals[start_degree]
starting_note = base_octave * 12 + root_in_octave + start_interval
# Generate notes going upward for octave_range octaves
current_note = starting_note
notes_per_octave = len(scale_intervals)
total_notes_needed = notes_per_octave * self.octave_range
for i in range(total_notes_needed):
if 0 <= current_note <= 127:
notes.append(current_note)
# Move to next scale degree
current_degree = (start_degree + i + 1) % notes_per_octave
if current_degree == 0: # Wrapped to next octave
base_octave += 1
next_interval = scale_intervals[current_degree]
current_note = base_octave * 12 + root_in_octave + next_interval
return notes
def _generate_scale_notes_down(self) -> List[int]:
"""Generate scale notes going downward from the selected starting note"""
scale_intervals = self.SCALES[self.scale]
notes = []
# Calculate the starting note based on root note and scale_note_start
base_octave = self.root_note // 12
root_in_octave = self.root_note % 12
# Get the interval for the selected scale degree
start_degree = self.scale_note_start % len(scale_intervals)
start_interval = scale_intervals[start_degree]
starting_note = base_octave * 12 + root_in_octave + start_interval
# Generate notes going downward for octave_range octaves
current_note = starting_note
notes_per_octave = len(scale_intervals)
total_notes_needed = notes_per_octave * self.octave_range
for i in range(total_notes_needed):
if 0 <= current_note <= 127:
notes.append(current_note)
# Calculate next degree going down
next_degree = (start_degree - i - 1) % notes_per_octave
# Check if we need to move to previous octave
# This happens when we wrap around from a lower degree to a higher degree
if i == 0:
# First step down from starting note
if next_degree > start_degree: # Wrapped around (e.g., from 0 to 6)
base_octave -= 1
else:
# Subsequent steps - check if we wrapped around
prev_degree = (start_degree - i) % notes_per_octave
if next_degree > prev_degree: # Wrapped around
base_octave -= 1
current_degree = next_degree
next_interval = scale_intervals[current_degree]
current_note = base_octave * 12 + root_in_octave + next_interval
return notes
def _get_limited_scale_notes(self) -> List[int]:
"""Get scale notes limited by note_limit setting"""
scale_intervals = self.SCALES[self.scale]
notes = []
# Calculate the starting note based on root note and scale_note_start
base_octave = self.root_note // 12
root_in_octave = self.root_note % 12
# Get the interval for the selected scale degree
start_degree = self.scale_note_start % len(scale_intervals)
# Generate only the number of notes specified by note_limit
for i in range(self.note_limit):
degree = (start_degree + i) % len(scale_intervals)
# If we wrapped around, go to next octave
if i > 0 and degree < (start_degree + i - 1) % len(scale_intervals):
base_octave += 1
interval = scale_intervals[degree]
note = base_octave * 12 + root_in_octave + interval
if 0 <= note <= 127:
notes.append(note)
return notes
def _get_all_scale_notes(self) -> List[int]:
"""Get all available scale notes in both directions for patterns that need full range"""
# Use limited scale notes if note_limit is less than full scale
if self.note_limit < 7:
return self._get_limited_scale_notes()
# Original behavior for full scale (note_limit = 7)
up_notes = self._generate_scale_notes_up()
down_notes = self._generate_scale_notes_down()
# Combine and remove duplicates while preserving order
all_notes = []
seen = set()
# Add down notes (excluding starting note)
for note in reversed(down_notes[1:]):
if note not in seen:
all_notes.append(note)
seen.add(note)
# Add up notes (including starting note)
for note in up_notes:
if note not in seen:
all_notes.append(note)
seen.add(note)
return sorted(all_notes) # Return sorted for consistency
def _generate_up_pattern(self) -> List[int]:
"""Generate ascending arpeggio pattern"""
if self.note_limit < 7:
return self._get_limited_scale_notes()
return self._generate_scale_notes_up()
def _generate_down_pattern(self) -> List[int]:
"""Generate descending arpeggio pattern"""
if self.note_limit < 7:
limited_notes = self._get_limited_scale_notes()
return list(reversed(limited_notes))
return self._generate_scale_notes_down()
def _generate_up_down_pattern(self) -> List[int]:
"""Generate up then down pattern"""
if self.note_limit < 7:
limited_notes = self._get_limited_scale_notes()
# Up, then down (avoiding duplicate at starting note)
return limited_notes + list(reversed(limited_notes))[1:]
up_notes = self._generate_scale_notes_up()
down_notes = self._generate_scale_notes_down()
# Up, then down (avoiding duplicate at starting note)
return up_notes + down_notes[1:]
def _generate_down_up_pattern(self) -> List[int]:
"""Generate down then up pattern"""
if self.note_limit < 7:
limited_notes = self._get_limited_scale_notes()
# Down, then up (avoiding duplicate at starting note)
return list(reversed(limited_notes)) + limited_notes[1:]
down_notes = self._generate_scale_notes_down()
up_notes = self._generate_scale_notes_up()
# Down, then up (avoiding duplicate at starting note)
return down_notes + up_notes[1:]
def _generate_random_pattern(self) -> List[int]:
"""Generate random pattern from scale notes"""
import random
scale_notes = self._get_all_scale_notes()
pattern_length = max(8, len(scale_notes))
return [random.choice(scale_notes) for _ in range(pattern_length)]
def _generate_note_order_pattern(self) -> List[int]:
"""Generate pattern in the order notes were played"""
return self.input_chord * self.octave_range
def _generate_chord_pattern(self) -> List[int]:
"""Generate chord pattern (all notes together, represented as sequence)"""
return self.input_chord
def _generate_random_chord_pattern(self) -> List[int]:
"""Generate pattern with random chord combinations"""
import random
import itertools
if len(self.input_chord) < 2:
return self.input_chord
# Generate various chord combinations
patterns = []
# Individual notes
patterns.extend(self.input_chord)
# Pairs
for pair in itertools.combinations(self.input_chord, 2):
patterns.extend(pair)
# Full chord
patterns.extend(self.input_chord)
return patterns
def _get_next_channel(self) -> int:
"""Get next channel based on distribution pattern"""
active_channels = self.channel_manager.get_active_channels()
if not active_channels:
return 1
if self.channel_distribution == "single_channel":
return active_channels[0]
elif self.channel_distribution == "up":
# 1, 2, 3, 4, 5, 6...
channel_idx = self.channel_position % len(active_channels)
self.channel_position += 1
return active_channels[channel_idx]
elif self.channel_distribution == "down":
# 6, 5, 4, 3, 2, 1...
channel_idx = (len(active_channels) - 1 - self.channel_position) % len(active_channels)
self.channel_position += 1
return active_channels[channel_idx]
elif self.channel_distribution == "up_down":
# 1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1, 2, 3...
cycle_length = (len(active_channels) - 1) * 2
pos = self.channel_position % cycle_length
if pos < len(active_channels):
channel_idx = pos
else:
channel_idx = len(active_channels) - 2 - (pos - len(active_channels))
self.channel_position += 1
return active_channels[max(0, channel_idx)]
elif self.channel_distribution == "bounce":
# 1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1, 2, 3... (same as up_down but clearer name)
return self._get_bounce_channel(active_channels)
elif self.channel_distribution == "random":
import random
return random.choice(active_channels)
elif self.channel_distribution == "cycle":
# Simple cycle through channels
channel_idx = self.channel_position % len(active_channels)
self.channel_position += 1
return active_channels[channel_idx]
elif self.channel_distribution == "alternating":
# Alternate between first and last, second and second-to-last, etc.
half_point = len(active_channels) // 2
if self.channel_position % 2 == 0:
# Even steps: use first half
idx = (self.channel_position // 2) % half_point
else:
# Odd steps: use second half (from end)
idx = len(active_channels) - 1 - ((self.channel_position // 2) % half_point)
self.channel_position += 1
return active_channels[idx]
# Default to up pattern
return active_channels[self.channel_position % len(active_channels)]
def _get_bounce_channel(self, active_channels: List[int]) -> int:
"""Get channel for bounce pattern with proper bounce logic"""
if len(active_channels) == 1:
return active_channels[0]
# Create bounce sequence: 0,1,2,3,2,1,0,1,2,3...
bounce_length = (len(active_channels) - 1) * 2
pos = self.channel_position % bounce_length
if pos < len(active_channels):
# Going up: 0,1,2,3
channel_idx = pos
else:
# Going down: 2,1,0 (skip the last one to avoid duplicate)
channel_idx = len(active_channels) - 2 - (pos - len(active_channels))
self.channel_position += 1
return active_channels[max(0, min(len(active_channels) - 1, channel_idx))]
def update(self):
"""Main update loop - called frequently for timing precision"""
if not self.is_playing:
self.check_note_offs()
return
current_time = time.time()
# Check if it's time for the next step
if current_time >= self.next_step_time and self.current_pattern:
self.process_step()
self.advance_step()
# Check for notes to turn off
self.check_note_offs()
# Process scheduled delays
self.process_delays()
def process_step(self):
"""Process the current arpeggio step"""
if not self.current_pattern:
return
# Get note from pattern
note = self.current_pattern[self.pattern_position]
# Apply swing
swing_offset = 0
if self.swing != 0 and self.current_step % 2 == 1:
swing_offset = self.step_duration * self.swing * 0.1
# Route note to appropriate channel using distribution pattern
target_channel = self._get_next_channel()
if target_channel:
# Use static velocity (not modified by volume patterns)
static_velocity = self.velocity
# Calculate note duration
note_duration = self.step_duration * self.gate
note_end_time = time.time() + note_duration + swing_offset
# Update volume pattern position to sync with arpeggiator step
self.volume_engine.update_pattern_step(self.current_step, self.pattern_length)
# Calculate and set volume for this channel (once per note)
active_channel_count = len(set(self.channel_manager.active_voices.keys()))
if active_channel_count == 0:
active_channel_count = 1 # At least count the channel we're about to play on
volume = self.volume_engine.get_channel_volume(target_channel, active_channel_count)
midi_volume = int(volume * 127)
self.output_manager.send_volume_change(target_channel, midi_volume)
# Send note on
self.output_manager.send_note_on(target_channel, note, static_velocity)
# Schedule delay/echo if enabled
if self.delay_enabled and self.delay_length > 0:
self.schedule_delays(target_channel, note, static_velocity)
# Schedule note off
self.active_notes[(target_channel, note)] = note_end_time
# Emit signal for GUI
self.note_triggered.emit(target_channel, note, static_velocity, note_duration)
def advance_step(self):
"""Advance to next step in pattern"""
old_pattern_position = self.pattern_position
self.pattern_position = (self.pattern_position + 1) % self.pattern_length
self.current_step += 1
# Check if pattern completed a full loop
pattern_completed = False
if self.pattern_length == 1:
# For single-note patterns, consider every step a completion
pattern_completed = True
self.pattern_loops_completed += 1
elif old_pattern_position != 0 and self.pattern_position == 0:
# For multi-note patterns, completion is when we wrap back to 0
pattern_completed = True
self.pattern_loops_completed += 1
if pattern_completed:
# Apply armed changes with cooldown to prevent rapid firing
current_time = time.time()
min_cooldown_time = 1.0 # Minimum 1 second between armed changes
if (self.has_armed_changes() and
current_time - self.last_armed_apply_time >= min_cooldown_time):
self.apply_armed_changes()
self.last_armed_apply_time = current_time
# Calculate next step time with swing
base_time = self.next_step_time + self.step_duration
# Apply swing to next step if it's an off-beat
if self.swing != 0 and (self.current_step + 1) % 2 == 1:
swing_offset = self.step_duration * self.swing * 0.1
self.next_step_time = base_time + swing_offset
else:
self.next_step_time = base_time
self.pattern_step.emit(self.current_step)
def apply_armed_changes(self):
"""Apply armed changes at pattern end"""
changes_applied = False
# Apply armed root note
if self.armed_root_note is not None:
self.root_note = self.armed_root_note
self.armed_root_note = None
changes_applied = True
# Apply armed scale
if self.armed_scale is not None:
self.scale = self.armed_scale
self.armed_scale = None
changes_applied = True
# Apply armed pattern type
if self.armed_pattern_type is not None:
self.pattern_type = self.armed_pattern_type
self.armed_pattern_type = None
changes_applied = True
# Apply armed channel distribution
if self.armed_channel_distribution is not None:
self.channel_distribution = self.armed_channel_distribution
self.channel_position = 0 # Reset position
self.armed_channel_distribution = None
changes_applied = True
# Apply armed scale note start
if self.armed_scale_note_start is not None:
self.scale_note_start = self.armed_scale_note_start
self.armed_scale_note_start = None
changes_applied = True
# Apply armed pattern length
if self.armed_pattern_length is not None:
self.user_pattern_length = self.armed_pattern_length
self.volume_engine.set_bar_length(self.armed_pattern_length)
self.armed_pattern_length = None
changes_applied = True
# Apply armed note limit
if self.armed_note_limit is not None:
self.note_limit = self.armed_note_limit
self.armed_note_limit = None
changes_applied = True
# Apply armed preset
if self.armed_preset_data is not None and self.preset_apply_callback:
try:
self.preset_apply_callback(self.armed_preset_data)
self.armed_preset_data = None
changes_applied = True
except Exception as e:
print(f"Error applying armed preset: {e}")
# Clear the armed preset to prevent lockup
self.armed_preset_data = None
changes_applied = True
# If any changes were applied, regenerate pattern and emit signal
if changes_applied:
self.armed_timeout.stop() # Stop timeout timer since changes were applied
self.regenerate_pattern()
self.armed_state_changed.emit()
self.settings_changed.emit() # Update GUI to reflect new settings
def check_note_offs(self):
"""Check for notes that should be turned off"""
current_time = time.time()
notes_to_remove = []
channels_becoming_inactive = set()
for (channel, note), end_time in self.active_notes.items():
if current_time >= end_time:
self.output_manager.send_note_off(channel, note)
self.channel_manager.release_voice(channel, note)
notes_to_remove.append((channel, note))
# Check if this channel will have no more active notes
remaining_notes_on_channel = [k for k in self.active_notes.keys() if k[0] == channel and k != (channel, note)]
if not remaining_notes_on_channel:
channels_becoming_inactive.add(channel)
for key in notes_to_remove:
del self.active_notes[key]
# Turn off visual display for channels that just became inactive
for channel in channels_becoming_inactive:
# Send volume change signal with 0 volume for black display
self.output_manager.volume_sent.emit(channel, 0) # Black display
def get_current_state(self) -> Dict:
"""Get current arpeggiator state"""
return {
'is_playing': self.is_playing,
'root_note': self.root_note,
'scale': self.scale,
'pattern_type': self.pattern_type,
'octave_range': self.octave_range,
'note_speed': self.note_speed,
'gate': self.gate,
'swing': self.swing,
'velocity': self.velocity,
'tempo': self.tempo,
'current_step': self.current_step,
'pattern_position': self.pattern_position,
'pattern_length': self.pattern_length,
'held_notes': list(self.held_notes),
'current_pattern': self.current_pattern.copy()
}