#!/usr/bin/env python3 import os import re import subprocess import sys import json import xml.etree.ElementTree as ET from datetime import datetime, timedelta from urllib.request import urlopen, Request import urwid from urllib.error import URLError import threading import time import urllib.parse # Load configuration from JSON file try: with open('config.json') as config_file: config = json.load(config_file) M3U_URL = config.get('m3u_url', "http://10.0.0.17:8409/iptv/channels.m3u") XMLTV_URL = config.get('xmltv_url', "http://10.0.0.17:8409/iptv/xmltv.xml") USER_AGENT = config.get('user_agent', "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36") UPDATE_INTERVAL = config.get('update_interval', 900) MONITOR = config.get('monitor', None) DEBUG_MODE = config.get('debug_mode', False) # Base MPV command with monitor options if specified MPV_BASE = ["mpv"] if MONITOR is not None: MPV_BASE.extend(["--fs", f"--fs-screen={MONITOR}"]) MPV_BASE.extend(config.get('mpv_options', [ "--really-quiet", "--no-terminal", "--force-window=immediate" ])) except FileNotFoundError: print("Configuration file not found, using default settings") M3U_URL = "http://10.0.0.17:8409/iptv/channels.m3u" XMLTV_URL = "http://10.0.0.17:8409/iptv/xmltv.xml" USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" UPDATE_INTERVAL = 900 MONITOR = None DEBUG_MODE = False MPV_BASE = [ "mpv", "--really-quiet", "--no-terminal", "--force-window=immediate" ] # Enhanced color palette with animation colors PALETTE = [ # Original colors ('header', 'white', 'dark blue'), ('footer', 'white', 'dark blue'), ('channel', 'black', 'light cyan'), ('channel_focus', 'white', 'dark blue'), ('program_now', 'white', 'dark green'), ('program_next', 'black', 'light gray'), ('error', 'white', 'dark red'), ('divider', 'dark green', ''), ('program_desc', 'light cyan', ''), ('time', 'yellow', ''), ('title', 'bold', ''), ('update', 'light blue', ''), # Animation colors ('splash_1', 'light red', ''), ('splash_2', 'yellow', ''), ('splash_3', 'light green', ''), ('splash_4', 'light cyan', ''), ('splash_5', 'light blue', ''), ('splash_6', 'light magenta', ''), ('highlight', 'white,bold', 'dark blue'), ('loading', 'yellow', ''), ('pulse_1', 'dark cyan', ''), ('pulse_2', 'light cyan', ''), ('pulse_3', 'white', ''), ('flash', 'white,bold', 'dark green'), ('fade', 'dark gray', ''), ] # Function for debug prints that only outputs when debug mode is enabled def debug_print(*args, **kwargs): if DEBUG_MODE: print(*args, **kwargs) # ASCII Art and Animation Frames BANNER = r""" __________._____. ___. .__ __ _______________ ____ \______ \__\_ |__\_ |__ |__|/ |_ \__ ___/\ \ / / | | _/ || __ \| __ \| \ __\ | | \ Y / | | \ || \_\ \ \_\ \ || | | | \ / |______ /__||___ /___ /__||__| |____| \___/ \/ \/ \/ """ # Animation frames for the splash screen SPLASH_FRAMES = [ # Frame 1 - Just B r""" _________ |\\ | | \\ | | \\_____| | |_____| | | | |___|_____| """, # Frame 2 - BI r""" _________ ___ |\\ | | | | \\ | | | | \\_____| | | | |_____| | | | | | | |___|_____| |___| """, # Frame 3 - BIB r""" _________ ___ _________ |\\ | | | |\\ | | \\ | | | | \\ | | \\_____| | | | \\_____| | |_____| | | | |_____| | | | | | | | |___|_____| |___| |___|_____| """, # Frame 4 - BIBB r""" _________ ___ _________ _________ |\\ | | | |\\ | |\\ | | \\ | | | | \\ | | \\ | | \\_____| | | | \\_____| | \\_____| | |_____| | | | |_____| | |_____| | | | | | | | | | | |___|_____| |___| |___|_____| |___|_____| """, # Frame 5 - BIBBI r""" _________ ___ _________ _________ ___ |\\ | | | |\\ | |\\ | | | | \\ | | | | \\ | | \\ | | | | \\_____| | | | \\_____| | \\_____| | | | |_____| | | | |_____| | |_____| | | | | | | | | | | | | | |___|_____| |___| |___|_____| |___|_____| |___| """, # Frame 6 - BIBBIT r""" _________ ___ _________ _________ ___ _______ |\\ | | | |\\ | |\\ | | | | | | \\ | | | | \\ | | \\ | | | |_ _| | \\_____| | | | \\_____| | \\_____| | | | | | |_____| | | | |_____| | |_____| | | | | | | | | | | | | | | | | | |___|_____| |___| |___|_____| |___|_____| |___| |_______| """, ] # Loading animation characters LOADING_CHARS = ['⣾', '⣽', '⣻', '⢿', '⡿', '⣟', '⣯', '⣷'] class ChannelButton(urwid.Button): def __init__(self, channel, *args, **kwargs): self.channel = channel if channel.get('current_show'): label = f"{channel['name']}\nNow: {channel['current_show']['title']}" if channel.get('next_show'): label += f"\nNext: {channel['next_show']['title']}" else: label = channel['name'] super().__init__(label, *args, **kwargs) class FocusAwareListBox(urwid.ListBox): """ListBox that tracks focus changes""" def __init__(self, body, on_focus_change=None): super().__init__(body) self.on_focus_change = on_focus_change self._last_focus = None def change_focus(self, size, position, *args, **kwargs): super().change_focus(size, position, *args, **kwargs) current_focus = self.focus if current_focus != self._last_focus and self.on_focus_change: self.on_focus_change(current_focus) self._last_focus = current_focus class SettingsMenu: """Settings configuration menu""" def __init__(self, config, on_save_callback, on_cancel_callback): self.config = config.copy() self.on_save_callback = on_save_callback self.on_cancel_callback = on_cancel_callback self.setup_ui() def setup_ui(self): # Create styled form fields with consistent colors self.m3u_edit = urwid.Edit(("title", "M3U URL: "), self.config.get('m3u_url', '')) self.xmltv_edit = urwid.Edit(("title", "XMLTV URL: "), self.config.get('xmltv_url', '')) self.user_agent_edit = urwid.Edit(("title", "User Agent: "), self.config.get('user_agent', '')) self.update_interval_edit = urwid.IntEdit(("title", "Update Interval (seconds): "), self.config.get('update_interval', 900)) # Monitor selection - simplified as a numeric selection monitor_value = self.config.get('monitor', None) # Create a radio button group for monitor selection self.monitor_group = [] rb_none = urwid.RadioButton(self.monitor_group, "Default (No specific monitor)", state=(monitor_value is None)) rb_1 = urwid.RadioButton(self.monitor_group, "Monitor 1", state=(monitor_value == 1)) rb_2 = urwid.RadioButton(self.monitor_group, "Monitor 2", state=(monitor_value == 2)) rb_3 = urwid.RadioButton(self.monitor_group, "Monitor 3", state=(monitor_value == 3)) # Debug mode checkbox self.debug_checkbox = urwid.CheckBox("Enable Debug Mode", state=self.config.get('debug_mode', False)) # Create styled form sections form_items = [ # Header section urwid.AttrMap(urwid.Text("Settings Configuration", align='center'), 'header'), urwid.AttrMap(urwid.Divider("─"), 'divider'), urwid.Divider(), # Connection settings section urwid.AttrMap(urwid.Text("CONNECTION SETTINGS", align='left'), 'program_now'), urwid.Divider(), urwid.AttrMap(self.m3u_edit, None, 'channel_focus'), urwid.Divider(), urwid.AttrMap(self.xmltv_edit, None, 'channel_focus'), urwid.Divider(), urwid.AttrMap(self.user_agent_edit, None, 'channel_focus'), urwid.Divider(), # Application settings section urwid.AttrMap(urwid.Text("APPLICATION SETTINGS", align='left'), 'program_now'), urwid.Divider(), urwid.AttrMap(self.update_interval_edit, None, 'channel_focus'), urwid.Divider(), # Display settings section urwid.AttrMap(urwid.Text("DISPLAY SETTINGS", align='left'), 'program_now'), urwid.Divider(), urwid.AttrMap(urwid.Text("Fullscreen Monitor Selection:", align='left'), 'program_desc'), urwid.AttrMap(rb_none, None, 'channel_focus'), urwid.AttrMap(rb_1, None, 'channel_focus'), urwid.AttrMap(rb_2, None, 'channel_focus'), urwid.AttrMap(rb_3, None, 'channel_focus'), urwid.Divider(), # Debug settings section urwid.AttrMap(urwid.Text("DEBUG SETTINGS", align='left'), 'program_now'), urwid.Divider(), urwid.AttrMap(self.debug_checkbox, None, 'channel_focus'), urwid.Divider(), # Action buttons urwid.AttrMap(urwid.Divider("─"), 'divider'), urwid.Divider(), urwid.Columns([ ('weight', 1, urwid.AttrMap(urwid.Button("Save Settings", on_press=self.save_settings), 'program_now', 'program_desc')), ('weight', 1, urwid.AttrMap(urwid.Button("Cancel", on_press=self.cancel_settings), 'error', 'program_desc')) ], dividechars=2) ] listwalker = urwid.SimpleFocusListWalker(form_items) self.listbox = urwid.ListBox(listwalker) # Create the main widget with styled frame self.widget = urwid.Frame( urwid.AttrMap(urwid.LineBox(self.listbox, title=" Settings ", title_align='center'), 'channel'), footer=urwid.AttrMap( urwid.Text("Tab/Shift+Tab: Navigate | Enter: Select | Esc: Cancel", align='center'), 'footer' ) ) def save_settings(self, button=None): """Validate and save settings""" try: # Validate and collect settings new_config = { 'm3u_url': self.m3u_edit.edit_text.strip(), 'xmltv_url': self.xmltv_edit.edit_text.strip(), 'user_agent': self.user_agent_edit.edit_text.strip(), 'update_interval': self.update_interval_edit.value(), 'debug_mode': self.debug_checkbox.state } # Handle monitor setting from radio buttons # Find which radio button is selected monitor_value = None for i, rb in enumerate(self.monitor_group): if rb.state: if i == 0: # "Default" option monitor_value = None else: monitor_value = i # Monitor number (1, 2, 3) break new_config['monitor'] = monitor_value # Set default MPV options without exposing them in the UI standard_mpv_options = [ "--really-quiet", "--no-terminal", "--force-window=immediate" ] # Add fullscreen options if monitor is selected if monitor_value is not None: fs_options = [ "--fs", f"--fs-screen={monitor_value}" ] new_config['mpv_options'] = fs_options + standard_mpv_options else: new_config['mpv_options'] = standard_mpv_options # Validate required fields if not new_config['m3u_url']: self.show_error("M3U URL is required") return if not new_config['xmltv_url']: self.show_error("XMLTV URL is required") return if not new_config['user_agent']: self.show_error("User Agent is required") return if new_config['update_interval'] <= 0: self.show_error("Update interval must be positive") return self.on_save_callback(new_config) except Exception as e: self.show_error(f"Error saving settings: {str(e)}") def cancel_settings(self, button=None): """Cancel settings changes""" self.on_cancel_callback() def show_error(self, message): """Show error message in footer""" error_text = urwid.AttrMap(urwid.Text(f"Error: {message}", align='center'), 'error') self.widget.footer = error_text class IPTVPlayer: def __init__(self): self.channels = [] self.programs = {} self.current_channel = None self.mpv_process = None self.loop = None self.last_update = datetime.now() self.update_thread = None self.update_running = True self.settings_menu = None self.main_widget = None self.animate_startup = False # Disable animations for better performance self.splash_screen = None self.load_data() self.setup_ui() def display_splash_screen(self): """Disabled splash screen for better performance""" # Skip splash screen completely for better performance pass def load_data(self): debug_print("\n[DEBUG] Loading IPTV data...") # Load M3U playlist try: req = Request(M3U_URL, headers={'User-Agent': USER_AGENT}) with urlopen(req, timeout=10) as response: m3u_data = response.read().decode('utf-8') debug_print(f"[DEBUG] Received M3U data (length: {len(m3u_data)} bytes)") # Parse M3U lines = m3u_data.split('\n') current_channel = None for line in lines: line = line.strip() if line.startswith('#EXTINF:'): # Parse channel info tvg_id = re.search(r'tvg-id="([^"]*)"', line) tvg_name = re.search(r'tvg-name="([^"]*)"', line) tvg_logo = re.search(r'tvg-logo="([^"]*)"', line) group_title = re.search(r'group-title="([^"]*)"', line) channel_title = line.split(',')[-1].strip() current_channel = { 'id': tvg_id.group(1) if tvg_id else "", 'name': tvg_name.group(1) if tvg_name else channel_title, 'logo': tvg_logo.group(1) if tvg_logo else "", 'group': group_title.group(1) if group_title else "Other", 'title': channel_title, 'url': None, 'current_show': None, 'next_show': None } elif (line.startswith('http://') or line.startswith('https://') or line.startswith('rtmp://') or line.startswith('udp://') or line.startswith('rtp://') or line.startswith('mms://') or line.startswith('rtsp://') or line.startswith('file://')): if current_channel: current_channel['url'] = line self.channels.append(current_channel) current_channel = None if not self.channels: print("[WARNING] No channels found in M3U file!") self.add_error_channel("No channels found in playlist") else: debug_print(f"[DEBUG] Successfully loaded {len(self.channels)} channels") except Exception as e: print(f"[ERROR] Failed to load M3U: {str(e)}") self.add_error_channel(f"Error loading playlist: {str(e)}") # Load XMLTV guide if self.channels and not self.channels[0]['name'].startswith("Error:"): try: req = Request(XMLTV_URL, headers={'User-Agent': USER_AGENT}) with urlopen(req, timeout=10) as response: xml_data = response.read().decode('utf-8') debug_print(f"[DEBUG] Received XMLTV data (length: {len(xml_data)} bytes)") root = ET.fromstring(xml_data) for programme in root.findall('programme'): channel_id = programme.get('channel') start = self.parse_xmltv_time(programme.get('start')) stop = self.parse_xmltv_time(programme.get('stop')) title_elem = programme.find('title') desc_elem = programme.find('desc') # Extract additional episode information episode_num_elem = programme.find('episode-num') episode_num = "" season = None episode = None # Parse episode numbering systems if episode_num_elem is not None and episode_num_elem.text: episode_num = episode_num_elem.text debug_print(f"[DEBUG] Found episode number: {episode_num}, system: {episode_num_elem.get('system')}") # Extract season and episode from xmltv_ns format (common format: S.E.P/TOTAL where S=season, E=episode, P=part) if episode_num_elem.get('system') == 'xmltv_ns': # Format example: "2.9." means Season 3, Episode 10 (zero-based) parts = episode_num.split('.') debug_print(f"[DEBUG] Split episode_num into parts: {parts}") if len(parts) >= 2: try: # XMLTV uses zero-based numbering, so add 1 if parts[0].strip(): season = int(parts[0]) + 1 debug_print(f"[DEBUG] Parsed season: {season}") if parts[1].strip(): episode = int(parts[1]) + 1 debug_print(f"[DEBUG] Parsed episode: {episode}") except ValueError as e: debug_print(f"[DEBUG] Error parsing season/episode: {e}") # Also try to find other common formats elif episode_num.lower().startswith('s') and 'e' in episode_num.lower(): # Format like "S01E05" try: s_part = episode_num.lower().split('e')[0].strip('s') e_part = episode_num.lower().split('e')[1].strip() if s_part.isdigit(): season = int(s_part) debug_print(f"[DEBUG] Parsed season from SxxExx format: {season}") if e_part.isdigit(): episode = int(e_part) debug_print(f"[DEBUG] Parsed episode from SxxExx format: {episode}") except Exception as e: debug_print(f"[DEBUG] Error parsing SxxExx format: {e}") # Try digit format (like "105" for S01E05) elif episode_num.isdigit() and len(episode_num) >= 3: try: if len(episode_num) == 3: season = int(episode_num[0]) episode = int(episode_num[1:]) elif len(episode_num) == 4: season = int(episode_num[:2]) episode = int(episode_num[2:]) debug_print(f"[DEBUG] Parsed from numeric format: S{season}E{episode}") except Exception as e: debug_print(f"[DEBUG] Error parsing numeric format: {e}") # For debugging: also check direct episode and season tags season_elem = programme.find('season-num') if season_elem is not None and season_elem.text: try: season = int(season_elem.text) debug_print(f"[DEBUG] Found direct season tag: {season}") except ValueError: pass episode_elem = programme.find('episode-num') if episode_elem is not None and episode_elem.text and episode_elem.get('system') == 'onscreen': debug_print(f"[DEBUG] Found onscreen episode format: {episode_elem.text}") # Try to extract numbers from onscreen format (like "Episode 5") try: num_match = re.search(r'(\d+)', episode_elem.text) if num_match and episode is None: episode = int(num_match.group(1)) debug_print(f"[DEBUG] Extracted episode number from onscreen format: {episode}") except Exception as e: debug_print(f"[DEBUG] Error parsing onscreen format: {e}") # Also check if there's a direct 'episode' tag direct_episode_elem = programme.find('episode') if direct_episode_elem is not None and direct_episode_elem.text: try: episode = int(direct_episode_elem.text) debug_print(f"[DEBUG] Found direct episode tag: {episode}") except ValueError: pass # Detect if we have the season/episode directly in the title title_text = title_elem.text if title_elem is not None else "" if title_text and (season is None or episode is None): # Look for patterns like "S01E05" in the title se_match = re.search(r'S(\d+)E(\d+)', title_text, re.IGNORECASE) if se_match: season = int(se_match.group(1)) episode = int(se_match.group(2)) debug_print(f"[DEBUG] Extracted from title: S{season}E{episode} from '{title_text}'") # Check format like "1x05" elif 'x' in title_text.lower(): x_match = re.search(r'(\d+)x(\d+)', title_text, re.IGNORECASE) if x_match: season = int(x_match.group(1)) episode = int(x_match.group(2)) debug_print(f"[DEBUG] Extracted from title 'x' format: S{season}E{episode} from '{title_text}'") # Check for "Season X Episode Y" text elif 'season' in title_text.lower() and 'episode' in title_text.lower(): debug_print(f"[DEBUG] Title contains 'season' and 'episode': '{title_text}'") # Try to extract season and episode numbers season_match = re.search(r'season\s+(\d+)', title_text, re.IGNORECASE) episode_match = re.search(r'episode\s+(\d+)', title_text, re.IGNORECASE) if season_match: season = int(season_match.group(1)) debug_print(f"[DEBUG] Extracted season from text: {season}") if episode_match: episode = int(episode_match.group(1)) debug_print(f"[DEBUG] Extracted episode from text: {episode}") # Check for Episode number in brackets like "Show Name (5)" elif re.search(r'\(\s*\d+\s*\)', title_text): num_match = re.search(r'\(\s*(\d+)\s*\)', title_text) if num_match: # Assume this is an episode number if we don't have one yet if episode is None: episode = int(num_match.group(1)) debug_print(f"[DEBUG] Extracted episode from brackets: {episode}") # Check for common patterns like "E05" without season elif re.search(r'E\d+', title_text, re.IGNORECASE) and episode is None: e_match = re.search(r'E(\d+)', title_text, re.IGNORECASE) if e_match: episode = int(e_match.group(1)) debug_print(f"[DEBUG] Extracted episode from E-format: {episode}") if season is not None or episode is not None: debug_print(f"[DEBUG] Final season/episode for '{title_text}': S{season}E{episode}") # Get original air date if available date_elem = programme.find('date') air_date = date_elem.text if date_elem is not None else None if channel_id not in self.programs: self.programs[channel_id] = [] # Create formatted show title with season/episode if available formatted_title = title_elem.text if title_elem is not None else "No Title" self.programs[channel_id].append({ 'start': start, 'stop': stop, 'title': formatted_title, 'desc': desc_elem.text if desc_elem is not None else "", 'season': season, 'episode': episode, 'episode_num': episode_num, 'air_date': air_date }) debug_print(f"[DEBUG] Loaded EPG data for {len(self.programs)} channels") # Pre-load current and next shows for each channel self.update_current_shows() # Show a sample of program data with season/episode info for debugging for channel_id, programs in self.programs.items(): if programs and len(programs) > 0: sample = programs[0] if sample.get('season') is not None or sample.get('episode') is not None: debug_print(f"[DEBUG] Sample program: Title={sample.get('title')}, Season={sample.get('season')}, Episode={sample.get('episode')}, Episode_num={sample.get('episode_num')}") break except Exception as e: print(f"[WARNING] Failed to load EPG: {str(e)}") self.last_update = datetime.now() def update_current_shows(self): """Update current and next shows based on current time""" now = datetime.now() for channel in self.channels: if channel['id'] in self.programs: shows = self.programs[channel['id']] channel['current_show'] = None channel['next_show'] = None for i, show in enumerate(shows): if show['start'] <= now < show['stop']: channel['current_show'] = show if i+1 < len(shows): channel['next_show'] = shows[i+1] break elif show['start'] > now: channel['next_show'] = show break def add_error_channel(self, message): # Clear existing channels before adding error channel self.channels = [] self.channels.append({ 'id': "error_channel", 'name': f"Error: {message}", 'logo': "", 'group': "Error", 'url': "", 'title': "Check your playlist URL", 'current_show': None, 'next_show': None }) def parse_xmltv_time(self, time_str): try: return datetime.strptime(time_str[:14], "%Y%m%d%H%M%S") except: return datetime.now() def generate_imdb_url(self, show): """Generate IMDb search URL for a show or specific episode""" # Base title for search title = show.get('title', '') if not title: return None # Check if we have season and episode information season = show.get('season') episode = show.get('episode') # Advanced search for TV episodes is more accurate for specific episodes if season is not None and episode is not None: # For episodes, use IMDb's advanced title search which gives better results # Format properly for the advanced search with title, season and episode show_title = urllib.parse.quote_plus(title) # Use IMDb's advanced search for TV episodes # This directly targets the episode search with better filtering return f"https://www.imdb.com/search/title/?title={show_title}&title_type=tv_episode&season={season}&episode={episode}" else: # For shows without episode info, use regular search encoded_query = urllib.parse.quote_plus(title) return f"https://www.imdb.com/find?q={encoded_query}&s=tt&ttype=tv" # Removed clickable link function since it doesn't work in Windows CMD def setup_ui(self): # Create channel list with current show info self.channel_items = [] for channel in self.channels: # Create custom button that knows its channel btn = ChannelButton(channel) # Connect the signal with the correct channel using a closure def make_click_handler(c): return lambda button: self.on_channel_select(c) urwid.connect_signal(btn, 'click', make_click_handler(channel)) # Apply different colors based on channel status if channel.get('url'): channel_item = urwid.AttrMap(btn, 'channel', 'channel_focus') else: channel_item = urwid.AttrMap(btn, 'error', 'error') self.channel_items.append(channel_item) # Create list box that tracks focus changes self.channel_list = FocusAwareListBox( urwid.SimpleFocusListWalker(self.channel_items), on_focus_change=self.on_focus_change ) # Create program guide with scrollable content self.program_walker = urwid.SimpleFocusListWalker([]) self.program_listbox = urwid.ListBox(self.program_walker) # Add update time to footer monitor_info = f" | Monitor: {MONITOR}" if MONITOR is not None else "" self.footer_text = urwid.Text(f"Q: Quit | ↑↓: Navigate | Enter: Play | L: Reload | S: Settings | Last update: Loading...{monitor_info}", align='center') self.footer = urwid.AttrMap(self.footer_text, 'footer') program_frame = urwid.Frame( urwid.LineBox(self.program_listbox, title="Program Details"), footer=self.footer ) # Create header with ASCII art header_text = BANNER + "\nBibbitTV Terminal Player" header = urwid.AttrMap(urwid.Text(header_text, align='center'), 'header') # Create columns columns = urwid.Columns([ ('weight', 1, urwid.LineBox(self.channel_list, title="Channels")), ('weight', 2, program_frame) ], dividechars=1) # Main layout with header layout = urwid.Pile([ ('pack', header), ('pack', urwid.Divider()), columns ]) # Store the main layout self.main_widget = urwid.LineBox(layout, title="") # Overlay for centering self.top = urwid.Overlay( self.main_widget, urwid.SolidFill(' '), align='center', width=('relative', 85), valign='middle', height=('relative', 85) ) # Update footer with initial timestamp self.update_footer() def update_footer(self): """Update footer with last update time""" time_str = self.last_update.strftime("%H:%M:%S") monitor_info = f" | Monitor: {MONITOR}" if MONITOR is not None else "" self.footer_text.set_text(f"Q: Quit | ↑↓: Navigate | Enter: Play | L: Reload | S: Settings | Last update: {time_str}{monitor_info}") def show_settings_menu(self): """Show settings configuration menu""" # Get current config current_config = { 'm3u_url': M3U_URL, 'xmltv_url': XMLTV_URL, 'user_agent': USER_AGENT, 'update_interval': UPDATE_INTERVAL, 'monitor': MONITOR, 'mpv_options': MPV_BASE[1:] if MPV_BASE[0] == 'mpv' else MPV_BASE # Remove 'mpv' command } # Create settings menu self.settings_menu = SettingsMenu( current_config, self.on_settings_save, self.on_settings_cancel ) # Create settings overlay properly self.top = urwid.Overlay( self.settings_menu.widget, self.top, align='center', width=('relative', 80), valign='middle', height=('relative', 90) ) # Update the MainLoop's widget if self.loop: self.loop.widget = self.top def on_settings_save(self, new_config): """Handle settings save""" try: # Save to config.json with open('config.json', 'w') as config_file: json.dump(new_config, config_file, indent=4) # Update global variables global M3U_URL, XMLTV_URL, USER_AGENT, UPDATE_INTERVAL, MONITOR, MPV_BASE, DEBUG_MODE M3U_URL = new_config['m3u_url'] XMLTV_URL = new_config['xmltv_url'] USER_AGENT = new_config['user_agent'] UPDATE_INTERVAL = new_config['update_interval'] MONITOR = new_config['monitor'] DEBUG_MODE = new_config['debug_mode'] # Update MPV base command directly from new_config MPV_BASE = ["mpv"] MPV_BASE.extend(new_config['mpv_options']) # Close settings menu self.on_settings_cancel() # Reload data with new settings self.reload_data() except Exception as e: # Show error in settings menu if self.settings_menu: self.settings_menu.show_error(f"Failed to save config: {str(e)}") def on_settings_cancel(self): """Close settings menu""" self.settings_menu = None # Restore the original top widget structure self.top = urwid.Overlay( self.main_widget, urwid.SolidFill(' '), align='center', width=('relative', 85), valign='middle', height=('relative', 85) ) # Update the MainLoop's widget if self.loop: self.loop.widget = self.top def start_update_thread(self): """Start background thread for periodic updates""" self.update_running = True self.update_thread = threading.Thread(target=self.update_worker, daemon=True) self.update_thread.start() def update_worker(self): """Background worker for periodic updates""" while self.update_running: time.sleep(UPDATE_INTERVAL) if self.loop: # Schedule update on the main thread self.loop.set_alarm_in(0, self.refresh_schedule) def refresh_schedule(self, loop=None, user_data=None): """Update schedule information without animation for better performance""" try: debug_print("[DEBUG] Refreshing schedule data...") # Update current shows without animations self.update_current_shows() self.last_update = datetime.now() # Update UI self.refresh_ui() self.update_footer() # Add simple update notification time_str = self.last_update.strftime("%H:%M:%S") update_text = urwid.Text([("update", f"Schedule updated at {time_str}")]) empty_line = urwid.Text("") self.program_walker[:] = [update_text, empty_line] + self.program_walker[:] debug_print("[DEBUG] Schedule refreshed successfully") except Exception as e: print(f"[ERROR] Failed to refresh schedule: {str(e)}") def refresh_ui(self): """Refresh UI elements with updated data""" # Update channel buttons for i, channel in enumerate(self.channels): button = self.channel_items[i].base_widget if channel.get('current_show'): label = f"{channel['name']}\nNow: {channel['current_show']['title']}" if channel.get('next_show'): label += f"\nNext: {channel['next_show']['title']}" else: label = channel['name'] button.set_label(label) # Update program info for current channel if focused if self.current_channel: self.on_channel_hover(self.current_channel) def on_focus_change(self, focused_widget): """Update program info when focusing a channel""" if focused_widget: # Get the button inside the AttrMap button = focused_widget.base_widget if hasattr(button, 'channel'): self.on_channel_hover(button.channel) def on_channel_hover(self, channel): """Update program info without animation for better performance""" self.current_channel = channel program_info = self.get_program_info(channel) # Direct update without animation for better performance self.program_walker[:] = [urwid.Text(line) for line in program_info] def on_channel_select(self, channel): """Play channel when selected (no animation for better performance)""" if channel.get('url'): # Play the channel directly without animation self.play_channel(channel['url']) def get_program_info(self, channel): info = [] max_line_length = 60 # Define this at the top so it's available everywhere info.append([("header", f"--Channel: {channel['name']}")]) if channel.get('group'): info.append([("title", f"Group: {channel['group']}")]) # Debug program information if channel.get('current_show'): current_show = channel['current_show'] debug_print(f"[DEBUG] Program info - Title: {current_show.get('title')}") debug_print(f"[DEBUG] Program info - Season: {current_show.get('season')}") debug_print(f"[DEBUG] Program info - Episode: {current_show.get('episode')}") debug_print(f"[DEBUG] Program info - Air date: {current_show.get('air_date')}") if channel.get('current_show'): now = datetime.now() remaining = (channel['current_show']['stop'] - now).seconds // 60 start_time = channel['current_show']['start'].strftime("%H:%M") end_time = channel['current_show']['stop'].strftime("%H:%M") # Current show section with colorful formatting info.append([]) # Empty line info.append([("program_now", "--NOW PLAYING")]) # Display title with season and episode prominently if available current_show = channel['current_show'] # Extract and display season/episode information season = current_show.get('season') episode = current_show.get('episode') # If season/episode not available in metadata, try to extract from title if season is None or episode is None: # Try SxxExx format (S01E05) se_match = re.search(r'S(\d+)E(\d+)', current_show['title'], re.IGNORECASE) if se_match: season = int(se_match.group(1)) episode = int(se_match.group(2)) debug_print(f"[DEBUG] Extracted S{season}E{episode} from title format") # Try season x episode format (1x05) elif 'x' in current_show['title'].lower(): x_match = re.search(r'(\d+)x(\d+)', current_show['title'], re.IGNORECASE) if x_match: season = int(x_match.group(1)) episode = int(x_match.group(2)) debug_print(f"[DEBUG] Extracted S{season}E{episode} from Nx format") # Display title with or without season/episode info if season is not None and episode is not None: # Format title with season/episode info formatted_title = f"Title: {current_show['title']} (S{season:02d}E{episode:02d})" info.append([("title", formatted_title)]) # Also add a detailed version on a separate line info.append([("program_desc", f"Season {season}, Episode {episode}")]) # Update the current_show object with the extracted info if it wasn't there already current_show['season'] = season current_show['episode'] = episode else: info.append([("title", f"Title: {current_show['title']}")]) # Add air date if available if current_show.get('air_date'): info.append([("time", f"Original Air Date: {current_show['air_date']}")]) info.append([ ("time", f"Time: {start_time} - {end_time} "), ("", f"({remaining} minutes remaining)") ]) # Add IMDb link if we can generate one imdb_url = self.generate_imdb_url(current_show) # Skip IMDb URL display as it doesn't work well in Windows CMD # if imdb_url: # info.append([("update", f"IMDb: {imdb_url}")]) if channel['current_show']['desc']: info.append([]) # Empty line info.append([("program_desc", "--Description:")]) # Split long description into multiple lines desc = channel['current_show']['desc'] for i in range(0, len(desc), max_line_length): info.append([('program_desc', desc[i:i+max_line_length])]) # Next show section if channel.get('next_show'): next_start = channel['next_show']['start'].strftime("%H:%M") next_end = channel['next_show']['stop'].strftime("%H:%M") info.append([]) # Empty line info.append([("divider", "─" * 50)]) info.append([]) # Empty line info.append([("program_next", "--UP NEXT")]) # Display title with season and episode information next_show = channel['next_show'] # Extract season/episode information for next show season = next_show.get('season') episode = next_show.get('episode') # If season/episode not available in metadata, try to extract from title if season is None or episode is None: # Try SxxExx format (S01E05) se_match = re.search(r'S(\d+)E(\d+)', next_show['title'], re.IGNORECASE) if se_match: season = int(se_match.group(1)) episode = int(se_match.group(2)) debug_print(f"[DEBUG] Next show: Extracted S{season}E{episode} from title format") # Try season x episode format (1x05) elif 'x' in next_show['title'].lower(): x_match = re.search(r'(\d+)x(\d+)', next_show['title'], re.IGNORECASE) if x_match: season = int(x_match.group(1)) episode = int(x_match.group(2)) debug_print(f"[DEBUG] Next show: Extracted S{season}E{episode} from Nx format") # Display title with or without season/episode info if season is not None and episode is not None: # Format title with season/episode info formatted_title = f"Title: {next_show['title']} (S{season:02d}E{episode:02d})" info.append([("title", formatted_title)]) # Also add a detailed version on a separate line info.append([("program_desc", f"Season {season}, Episode {episode}")]) # Update the next_show object with the extracted info if it wasn't there already next_show['season'] = season next_show['episode'] = episode else: info.append([("title", f"Title: {next_show['title']}")]) # Add air date if available for next show if next_show.get('air_date'): info.append([("time", f"Original Air Date: {next_show['air_date']}")]) info.append([("time", f"Time: {next_start} - {next_end}")]) # Add IMDb link for next show if we can generate one imdb_url = self.generate_imdb_url(next_show) # Skip IMDb URL display as it doesn't work well in Windows CMD if channel['next_show']['desc']: info.append([]) # Empty line info.append([("program_desc", "--Description:")]) desc = channel['next_show']['desc'] # Limit description to 5 lines to prevent overflow max_lines = 5 line_count = 0 for i in range(0, len(desc), max_line_length): if line_count >= max_lines: info.append([('program_desc', '... (description truncated)')]) break info.append([('program_desc', desc[i:i+max_line_length])]) line_count += 1 else: # Handle case where there's no current show but there might be a next show if channel.get('next_show'): next_start = channel['next_show']['start'].strftime("%H:%M") next_end = channel['next_show']['stop'].strftime("%H:%M") info.append([]) # Empty line info.append([("program_next", "--UP NEXT")]) # Display title with season and episode information next_show = channel['next_show'] # Extract season/episode information for next show season = next_show.get('season') episode = next_show.get('episode') # If season/episode not available in metadata, try to extract from title if season is None or episode is None: # Try SxxExx format (S01E05) se_match = re.search(r'S(\d+)E(\d+)', next_show['title'], re.IGNORECASE) if se_match: season = int(se_match.group(1)) episode = int(se_match.group(2)) debug_print(f"[DEBUG] Next show: Extracted S{season}E{episode} from title format") # Try season x episode format (1x05) elif 'x' in next_show['title'].lower(): x_match = re.search(r'(\d+)x(\d+)', next_show['title'], re.IGNORECASE) if x_match: season = int(x_match.group(1)) episode = int(x_match.group(2)) debug_print(f"[DEBUG] Next show: Extracted S{season}E{episode} from Nx format") # Display title with or without season/episode info if season is not None and episode is not None: # Format title with season/episode info formatted_title = f"Title: {next_show['title']} (S{season:02d}E{episode:02d})" info.append([("title", formatted_title)]) # Also add a detailed version on a separate line info.append([("program_desc", f"Season {season}, Episode {episode}")]) # Update the next_show object with the extracted info if it wasn't there already next_show['season'] = season next_show['episode'] = episode else: info.append([("title", f"Title: {next_show['title']}")]) # Add air date if available for next show if next_show.get('air_date'): info.append([("time", f"Original Air Date: {next_show['air_date']}")]) info.append([("time", f"Time: {next_start} - {next_end}")]) # Add IMDb link for next show if we can generate one imdb_url = self.generate_imdb_url(next_show) # Skip IMDb URL display as it doesn't work well in Windows CMD if channel['next_show']['desc']: info.append([]) # Empty line info.append([("program_desc", "--Description:")]) desc = channel['next_show']['desc'] # Limit description to 5 lines to prevent overflow max_lines = 5 line_count = 0 for i in range(0, len(desc), max_line_length): if line_count >= max_lines: info.append([('program_desc', '... (description truncated)')]) break info.append([('program_desc', desc[i:i+max_line_length])]) line_count += 1 else: info.append([("", "No current program information available")]) return info def play_channel(self, url): if self.mpv_process: self.mpv_process.terminate() self.mpv_process.wait() try: self.mpv_process = subprocess.Popen(MPV_BASE + [url]) except Exception as e: # Create proper widget for error message self.program_walker[:] = [urwid.Text([("error", f"Failed to play stream: {str(e)}")])] def reload_data(self): """Reload all data from sources without animation for better performance""" # Save current focus position current_focus_pos = self.channel_list.focus_position if self.channel_list.body else None # Reload data self.channels = [] self.programs = {} self.load_data() # Update channel items in place self.update_channel_items() # Restore focus position if current_focus_pos is not None and current_focus_pos < len(self.channel_items): self.channel_list.set_focus(current_focus_pos) self.on_channel_hover(self.channels[current_focus_pos]) # Update program details if self.current_channel: self.on_channel_hover(self.current_channel) self.update_footer() def update_channel_items(self): """Update existing channel items with new data""" # Clear existing items self.channel_list.body.clear() # Recreate channel items with updated data self.channel_items = [] for channel in self.channels: # Create custom button that knows its channel btn = ChannelButton(channel) # Connect the signal with the correct channel using a closure def make_click_handler(c): return lambda button: self.on_channel_select(c) urwid.connect_signal(btn, 'click', make_click_handler(channel)) # Apply different colors based on channel status if channel.get('url'): channel_item = urwid.AttrMap(btn, 'channel', 'channel_focus') else: channel_item = urwid.AttrMap(btn, 'error', 'error') self.channel_items.append(channel_item) self.channel_list.body.append(channel_item) def run(self): # Enable mouse support and cursor visibility screen = urwid.raw_display.Screen() screen.set_terminal_properties(colors=256) screen.set_mouse_tracking() self.loop = urwid.MainLoop( self.top, palette=PALETTE, screen=screen, unhandled_input=self.handle_input, handle_mouse=True ) # Start background update thread self.start_update_thread() try: # Show splash screen animation before running the main loop self.display_splash_screen() # Start the main event loop self.loop.run() finally: # Clean up when exiting self.update_running = False if self.update_thread: self.update_thread.join(timeout=1.0) def handle_input(self, key): # Only handle main UI input if settings menu is not open if self.settings_menu: if key == 'esc': self.on_settings_cancel() return if key in ('q', 'Q'): self.quit_player() elif key in ('l', 'L'): self.reload_data() elif key in ('s', 'S'): self.show_settings_menu() elif key == 'enter': if self.current_channel and self.current_channel.get('url'): self.play_channel(self.current_channel['url']) def quit_player(self): if self.mpv_process: self.mpv_process.terminate() self.mpv_process.wait() self.update_running = False raise urwid.ExitMainLoop() def __del__(self): self.update_running = False if hasattr(self, 'mpv_process') and self.mpv_process: self.mpv_process.terminate() def check_mpv_installed(): try: if os.name == 'nt': subprocess.run(["where.exe", "mpv"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) else: subprocess.run(["which", "mpv"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return True except subprocess.CalledProcessError: return False def main(): if not check_mpv_installed(): print("Error: mpv player is required but not found. Please install mpv first.") print("You can download it from: https://mpv.io/installation/") sys.exit(1) player = IPTVPlayer() player.run() if __name__ == "__main__": main()