You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1284 lines
57 KiB
1284 lines
57 KiB
#!/usr/bin/env python3
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import json
|
|
import xml.etree.ElementTree as ET
|
|
from datetime import datetime, timedelta
|
|
from urllib.request import urlopen, Request
|
|
import urwid
|
|
from urllib.error import URLError
|
|
import threading
|
|
import time
|
|
import urllib.parse
|
|
|
|
# Load configuration from JSON file
|
|
try:
|
|
with open('config.json') as config_file:
|
|
config = json.load(config_file)
|
|
M3U_URL = config.get('m3u_url', "http://10.0.0.17:8409/iptv/channels.m3u")
|
|
XMLTV_URL = config.get('xmltv_url', "http://10.0.0.17:8409/iptv/xmltv.xml")
|
|
USER_AGENT = config.get('user_agent', "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36")
|
|
UPDATE_INTERVAL = config.get('update_interval', 900)
|
|
MONITOR = config.get('monitor', None)
|
|
DEBUG_MODE = config.get('debug_mode', False)
|
|
|
|
# Base MPV command with monitor options if specified
|
|
MPV_BASE = ["mpv"]
|
|
if MONITOR is not None:
|
|
MPV_BASE.extend(["--fs", f"--fs-screen={MONITOR}"])
|
|
MPV_BASE.extend(config.get('mpv_options', [
|
|
"--really-quiet",
|
|
"--no-terminal",
|
|
"--force-window=immediate"
|
|
]))
|
|
except FileNotFoundError:
|
|
print("Configuration file not found, using default settings")
|
|
M3U_URL = "http://10.0.0.17:8409/iptv/channels.m3u"
|
|
XMLTV_URL = "http://10.0.0.17:8409/iptv/xmltv.xml"
|
|
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
|
|
UPDATE_INTERVAL = 900
|
|
MONITOR = None
|
|
DEBUG_MODE = False
|
|
MPV_BASE = [
|
|
"mpv",
|
|
"--really-quiet",
|
|
"--no-terminal",
|
|
"--force-window=immediate"
|
|
]
|
|
|
|
# Enhanced color palette with animation colors
|
|
PALETTE = [
|
|
# Original colors
|
|
('header', 'white', 'dark blue'),
|
|
('footer', 'white', 'dark blue'),
|
|
('channel', 'black', 'light cyan'),
|
|
('channel_focus', 'white', 'dark blue'),
|
|
('program_now', 'white', 'dark green'),
|
|
('program_next', 'black', 'light gray'),
|
|
('error', 'white', 'dark red'),
|
|
('divider', 'dark green', ''),
|
|
('program_desc', 'light cyan', ''),
|
|
('time', 'yellow', ''),
|
|
('title', 'bold', ''),
|
|
('update', 'light blue', ''),
|
|
|
|
# Animation colors
|
|
('splash_1', 'light red', ''),
|
|
('splash_2', 'yellow', ''),
|
|
('splash_3', 'light green', ''),
|
|
('splash_4', 'light cyan', ''),
|
|
('splash_5', 'light blue', ''),
|
|
('splash_6', 'light magenta', ''),
|
|
('highlight', 'white,bold', 'dark blue'),
|
|
('loading', 'yellow', ''),
|
|
('pulse_1', 'dark cyan', ''),
|
|
('pulse_2', 'light cyan', ''),
|
|
('pulse_3', 'white', ''),
|
|
('flash', 'white,bold', 'dark green'),
|
|
('fade', 'dark gray', ''),
|
|
]
|
|
|
|
# Function for debug prints that only outputs when debug mode is enabled
|
|
def debug_print(*args, **kwargs):
|
|
if DEBUG_MODE:
|
|
print(*args, **kwargs)
|
|
|
|
# ASCII Art and Animation Frames
|
|
BANNER = r"""
|
|
__________._____. ___. .__ __ _______________ ____
|
|
\______ \__\_ |__\_ |__ |__|/ |_ \__ ___/\ \ / /
|
|
| | _/ || __ \| __ \| \ __\ | | \ Y /
|
|
| | \ || \_\ \ \_\ \ || | | | \ /
|
|
|______ /__||___ /___ /__||__| |____| \___/
|
|
\/ \/ \/
|
|
"""
|
|
|
|
# Animation frames for the splash screen
|
|
SPLASH_FRAMES = [
|
|
# Frame 1 - Just B
|
|
r"""
|
|
_________
|
|
|\\ |
|
|
| \\ |
|
|
| \\_____|
|
|
| |_____|
|
|
| | |
|
|
|___|_____|
|
|
|
|
""",
|
|
# Frame 2 - BI
|
|
r"""
|
|
_________ ___
|
|
|\\ | | |
|
|
| \\ | | |
|
|
| \\_____| | |
|
|
| |_____| | |
|
|
| | | |
|
|
|___|_____| |___|
|
|
|
|
""",
|
|
# Frame 3 - BIB
|
|
r"""
|
|
_________ ___ _________
|
|
|\\ | | | |\\ |
|
|
| \\ | | | | \\ |
|
|
| \\_____| | | | \\_____|
|
|
| |_____| | | | |_____|
|
|
| | | | | | |
|
|
|___|_____| |___| |___|_____|
|
|
|
|
""",
|
|
# Frame 4 - BIBB
|
|
r"""
|
|
_________ ___ _________ _________
|
|
|\\ | | | |\\ | |\\ |
|
|
| \\ | | | | \\ | | \\ |
|
|
| \\_____| | | | \\_____| | \\_____|
|
|
| |_____| | | | |_____| | |_____|
|
|
| | | | | | | | | |
|
|
|___|_____| |___| |___|_____| |___|_____|
|
|
|
|
""",
|
|
# Frame 5 - BIBBI
|
|
r"""
|
|
_________ ___ _________ _________ ___
|
|
|\\ | | | |\\ | |\\ | | |
|
|
| \\ | | | | \\ | | \\ | | |
|
|
| \\_____| | | | \\_____| | \\_____| | |
|
|
| |_____| | | | |_____| | |_____| | |
|
|
| | | | | | | | | | |
|
|
|___|_____| |___| |___|_____| |___|_____| |___|
|
|
|
|
""",
|
|
# Frame 6 - BIBBIT
|
|
r"""
|
|
_________ ___ _________ _________ ___ _______
|
|
|\\ | | | |\\ | |\\ | | | | |
|
|
| \\ | | | | \\ | | \\ | | | |_ _|
|
|
| \\_____| | | | \\_____| | \\_____| | | | |
|
|
| |_____| | | | |_____| | |_____| | | | |
|
|
| | | | | | | | | | | | |
|
|
|___|_____| |___| |___|_____| |___|_____| |___| |_______|
|
|
|
|
""",
|
|
]
|
|
|
|
# Loading animation characters
|
|
LOADING_CHARS = ['⣾', '⣽', '⣻', '⢿', '⡿', '⣟', '⣯', '⣷']
|
|
|
|
class ChannelButton(urwid.Button):
|
|
def __init__(self, channel, *args, **kwargs):
|
|
self.channel = channel
|
|
if channel.get('current_show'):
|
|
label = f"{channel['name']}\nNow: {channel['current_show']['title']}"
|
|
if channel.get('next_show'):
|
|
label += f"\nNext: {channel['next_show']['title']}"
|
|
else:
|
|
label = channel['name']
|
|
super().__init__(label, *args, **kwargs)
|
|
|
|
class FocusAwareListBox(urwid.ListBox):
|
|
"""ListBox that tracks focus changes"""
|
|
def __init__(self, body, on_focus_change=None):
|
|
super().__init__(body)
|
|
self.on_focus_change = on_focus_change
|
|
self._last_focus = None
|
|
|
|
def change_focus(self, size, position, *args, **kwargs):
|
|
super().change_focus(size, position, *args, **kwargs)
|
|
current_focus = self.focus
|
|
if current_focus != self._last_focus and self.on_focus_change:
|
|
self.on_focus_change(current_focus)
|
|
self._last_focus = current_focus
|
|
|
|
class SettingsMenu:
|
|
"""Settings configuration menu"""
|
|
def __init__(self, config, on_save_callback, on_cancel_callback):
|
|
self.config = config.copy()
|
|
self.on_save_callback = on_save_callback
|
|
self.on_cancel_callback = on_cancel_callback
|
|
self.setup_ui()
|
|
|
|
def setup_ui(self):
|
|
# Create styled form fields with consistent colors
|
|
self.m3u_edit = urwid.Edit(("title", "M3U URL: "), self.config.get('m3u_url', ''))
|
|
self.xmltv_edit = urwid.Edit(("title", "XMLTV URL: "), self.config.get('xmltv_url', ''))
|
|
self.user_agent_edit = urwid.Edit(("title", "User Agent: "), self.config.get('user_agent', ''))
|
|
self.update_interval_edit = urwid.IntEdit(("title", "Update Interval (seconds): "), self.config.get('update_interval', 900))
|
|
|
|
# Monitor selection - simplified as a numeric selection
|
|
monitor_value = self.config.get('monitor', None)
|
|
|
|
# Create a radio button group for monitor selection
|
|
self.monitor_group = []
|
|
rb_none = urwid.RadioButton(self.monitor_group, "Default (No specific monitor)", state=(monitor_value is None))
|
|
rb_1 = urwid.RadioButton(self.monitor_group, "Monitor 1", state=(monitor_value == 1))
|
|
rb_2 = urwid.RadioButton(self.monitor_group, "Monitor 2", state=(monitor_value == 2))
|
|
rb_3 = urwid.RadioButton(self.monitor_group, "Monitor 3", state=(monitor_value == 3))
|
|
|
|
# Debug mode checkbox
|
|
self.debug_checkbox = urwid.CheckBox("Enable Debug Mode", state=self.config.get('debug_mode', False))
|
|
|
|
# Create styled form sections
|
|
form_items = [
|
|
# Header section
|
|
urwid.AttrMap(urwid.Text("Settings Configuration", align='center'), 'header'),
|
|
urwid.AttrMap(urwid.Divider("─"), 'divider'),
|
|
urwid.Divider(),
|
|
|
|
# Connection settings section
|
|
urwid.AttrMap(urwid.Text("CONNECTION SETTINGS", align='left'), 'program_now'),
|
|
urwid.Divider(),
|
|
urwid.AttrMap(self.m3u_edit, None, 'channel_focus'),
|
|
urwid.Divider(),
|
|
urwid.AttrMap(self.xmltv_edit, None, 'channel_focus'),
|
|
urwid.Divider(),
|
|
urwid.AttrMap(self.user_agent_edit, None, 'channel_focus'),
|
|
urwid.Divider(),
|
|
|
|
# Application settings section
|
|
urwid.AttrMap(urwid.Text("APPLICATION SETTINGS", align='left'), 'program_now'),
|
|
urwid.Divider(),
|
|
urwid.AttrMap(self.update_interval_edit, None, 'channel_focus'),
|
|
urwid.Divider(),
|
|
|
|
# Display settings section
|
|
urwid.AttrMap(urwid.Text("DISPLAY SETTINGS", align='left'), 'program_now'),
|
|
urwid.Divider(),
|
|
urwid.AttrMap(urwid.Text("Fullscreen Monitor Selection:", align='left'), 'program_desc'),
|
|
urwid.AttrMap(rb_none, None, 'channel_focus'),
|
|
urwid.AttrMap(rb_1, None, 'channel_focus'),
|
|
urwid.AttrMap(rb_2, None, 'channel_focus'),
|
|
urwid.AttrMap(rb_3, None, 'channel_focus'),
|
|
urwid.Divider(),
|
|
|
|
# Debug settings section
|
|
urwid.AttrMap(urwid.Text("DEBUG SETTINGS", align='left'), 'program_now'),
|
|
urwid.Divider(),
|
|
urwid.AttrMap(self.debug_checkbox, None, 'channel_focus'),
|
|
urwid.Divider(),
|
|
|
|
# Action buttons
|
|
urwid.AttrMap(urwid.Divider("─"), 'divider'),
|
|
urwid.Divider(),
|
|
urwid.Columns([
|
|
('weight', 1, urwid.AttrMap(urwid.Button("Save Settings", on_press=self.save_settings), 'program_now', 'program_desc')),
|
|
('weight', 1, urwid.AttrMap(urwid.Button("Cancel", on_press=self.cancel_settings), 'error', 'program_desc'))
|
|
], dividechars=2)
|
|
]
|
|
|
|
listwalker = urwid.SimpleFocusListWalker(form_items)
|
|
self.listbox = urwid.ListBox(listwalker)
|
|
|
|
# Create the main widget with styled frame
|
|
self.widget = urwid.Frame(
|
|
urwid.AttrMap(urwid.LineBox(self.listbox, title=" Settings ", title_align='center'), 'channel'),
|
|
footer=urwid.AttrMap(
|
|
urwid.Text("Tab/Shift+Tab: Navigate | Enter: Select | Esc: Cancel", align='center'),
|
|
'footer'
|
|
)
|
|
)
|
|
|
|
def save_settings(self, button=None):
|
|
"""Validate and save settings"""
|
|
try:
|
|
# Validate and collect settings
|
|
new_config = {
|
|
'm3u_url': self.m3u_edit.edit_text.strip(),
|
|
'xmltv_url': self.xmltv_edit.edit_text.strip(),
|
|
'user_agent': self.user_agent_edit.edit_text.strip(),
|
|
'update_interval': self.update_interval_edit.value(),
|
|
'debug_mode': self.debug_checkbox.state
|
|
}
|
|
|
|
# Handle monitor setting from radio buttons
|
|
# Find which radio button is selected
|
|
monitor_value = None
|
|
for i, rb in enumerate(self.monitor_group):
|
|
if rb.state:
|
|
if i == 0: # "Default" option
|
|
monitor_value = None
|
|
else:
|
|
monitor_value = i # Monitor number (1, 2, 3)
|
|
break
|
|
|
|
new_config['monitor'] = monitor_value
|
|
|
|
# Set default MPV options without exposing them in the UI
|
|
standard_mpv_options = [
|
|
"--really-quiet",
|
|
"--no-terminal",
|
|
"--force-window=immediate"
|
|
]
|
|
|
|
# Add fullscreen options if monitor is selected
|
|
if monitor_value is not None:
|
|
fs_options = [
|
|
"--fs",
|
|
f"--fs-screen={monitor_value}"
|
|
]
|
|
new_config['mpv_options'] = fs_options + standard_mpv_options
|
|
else:
|
|
new_config['mpv_options'] = standard_mpv_options
|
|
|
|
# Validate required fields
|
|
if not new_config['m3u_url']:
|
|
self.show_error("M3U URL is required")
|
|
return
|
|
if not new_config['xmltv_url']:
|
|
self.show_error("XMLTV URL is required")
|
|
return
|
|
if not new_config['user_agent']:
|
|
self.show_error("User Agent is required")
|
|
return
|
|
if new_config['update_interval'] <= 0:
|
|
self.show_error("Update interval must be positive")
|
|
return
|
|
|
|
self.on_save_callback(new_config)
|
|
except Exception as e:
|
|
self.show_error(f"Error saving settings: {str(e)}")
|
|
|
|
def cancel_settings(self, button=None):
|
|
"""Cancel settings changes"""
|
|
self.on_cancel_callback()
|
|
|
|
def show_error(self, message):
|
|
"""Show error message in footer"""
|
|
error_text = urwid.AttrMap(urwid.Text(f"Error: {message}", align='center'), 'error')
|
|
self.widget.footer = error_text
|
|
|
|
|
|
class IPTVPlayer:
|
|
def __init__(self):
|
|
self.channels = []
|
|
self.programs = {}
|
|
self.current_channel = None
|
|
self.mpv_process = None
|
|
self.loop = None
|
|
self.last_update = datetime.now()
|
|
self.update_thread = None
|
|
self.update_running = True
|
|
self.settings_menu = None
|
|
self.main_widget = None
|
|
self.animate_startup = False # Disable animations for better performance
|
|
self.splash_screen = None
|
|
self.load_data()
|
|
self.setup_ui()
|
|
|
|
def display_splash_screen(self):
|
|
"""Disabled splash screen for better performance"""
|
|
# Skip splash screen completely for better performance
|
|
pass
|
|
|
|
def load_data(self):
|
|
debug_print("\n[DEBUG] Loading IPTV data...")
|
|
|
|
# Load M3U playlist
|
|
try:
|
|
req = Request(M3U_URL, headers={'User-Agent': USER_AGENT})
|
|
with urlopen(req, timeout=10) as response:
|
|
m3u_data = response.read().decode('utf-8')
|
|
debug_print(f"[DEBUG] Received M3U data (length: {len(m3u_data)} bytes)")
|
|
|
|
# Parse M3U
|
|
lines = m3u_data.split('\n')
|
|
current_channel = None
|
|
|
|
for line in lines:
|
|
line = line.strip()
|
|
if line.startswith('#EXTINF:'):
|
|
# Parse channel info
|
|
tvg_id = re.search(r'tvg-id="([^"]*)"', line)
|
|
tvg_name = re.search(r'tvg-name="([^"]*)"', line)
|
|
tvg_logo = re.search(r'tvg-logo="([^"]*)"', line)
|
|
group_title = re.search(r'group-title="([^"]*)"', line)
|
|
channel_title = line.split(',')[-1].strip()
|
|
|
|
current_channel = {
|
|
'id': tvg_id.group(1) if tvg_id else "",
|
|
'name': tvg_name.group(1) if tvg_name else channel_title,
|
|
'logo': tvg_logo.group(1) if tvg_logo else "",
|
|
'group': group_title.group(1) if group_title else "Other",
|
|
'title': channel_title,
|
|
'url': None,
|
|
'current_show': None,
|
|
'next_show': None
|
|
}
|
|
elif (line.startswith('http://') or line.startswith('https://') or
|
|
line.startswith('rtmp://') or line.startswith('udp://') or
|
|
line.startswith('rtp://') or line.startswith('mms://') or
|
|
line.startswith('rtsp://') or line.startswith('file://')):
|
|
if current_channel:
|
|
current_channel['url'] = line
|
|
self.channels.append(current_channel)
|
|
current_channel = None
|
|
|
|
if not self.channels:
|
|
print("[WARNING] No channels found in M3U file!")
|
|
self.add_error_channel("No channels found in playlist")
|
|
else:
|
|
debug_print(f"[DEBUG] Successfully loaded {len(self.channels)} channels")
|
|
|
|
except Exception as e:
|
|
print(f"[ERROR] Failed to load M3U: {str(e)}")
|
|
self.add_error_channel(f"Error loading playlist: {str(e)}")
|
|
|
|
# Load XMLTV guide
|
|
if self.channels and not self.channels[0]['name'].startswith("Error:"):
|
|
try:
|
|
req = Request(XMLTV_URL, headers={'User-Agent': USER_AGENT})
|
|
with urlopen(req, timeout=10) as response:
|
|
xml_data = response.read().decode('utf-8')
|
|
debug_print(f"[DEBUG] Received XMLTV data (length: {len(xml_data)} bytes)")
|
|
|
|
root = ET.fromstring(xml_data)
|
|
for programme in root.findall('programme'):
|
|
channel_id = programme.get('channel')
|
|
start = self.parse_xmltv_time(programme.get('start'))
|
|
stop = self.parse_xmltv_time(programme.get('stop'))
|
|
title_elem = programme.find('title')
|
|
desc_elem = programme.find('desc')
|
|
|
|
# Extract additional episode information
|
|
episode_num_elem = programme.find('episode-num')
|
|
episode_num = ""
|
|
season = None
|
|
episode = None
|
|
|
|
# Parse episode numbering systems
|
|
if episode_num_elem is not None and episode_num_elem.text:
|
|
episode_num = episode_num_elem.text
|
|
debug_print(f"[DEBUG] Found episode number: {episode_num}, system: {episode_num_elem.get('system')}")
|
|
|
|
# Extract season and episode from xmltv_ns format (common format: S.E.P/TOTAL where S=season, E=episode, P=part)
|
|
if episode_num_elem.get('system') == 'xmltv_ns':
|
|
# Format example: "2.9." means Season 3, Episode 10 (zero-based)
|
|
parts = episode_num.split('.')
|
|
debug_print(f"[DEBUG] Split episode_num into parts: {parts}")
|
|
if len(parts) >= 2:
|
|
try:
|
|
# XMLTV uses zero-based numbering, so add 1
|
|
if parts[0].strip():
|
|
season = int(parts[0]) + 1
|
|
debug_print(f"[DEBUG] Parsed season: {season}")
|
|
if parts[1].strip():
|
|
episode = int(parts[1]) + 1
|
|
debug_print(f"[DEBUG] Parsed episode: {episode}")
|
|
except ValueError as e:
|
|
debug_print(f"[DEBUG] Error parsing season/episode: {e}")
|
|
# Also try to find other common formats
|
|
elif episode_num.lower().startswith('s') and 'e' in episode_num.lower():
|
|
# Format like "S01E05"
|
|
try:
|
|
s_part = episode_num.lower().split('e')[0].strip('s')
|
|
e_part = episode_num.lower().split('e')[1].strip()
|
|
if s_part.isdigit():
|
|
season = int(s_part)
|
|
debug_print(f"[DEBUG] Parsed season from SxxExx format: {season}")
|
|
if e_part.isdigit():
|
|
episode = int(e_part)
|
|
debug_print(f"[DEBUG] Parsed episode from SxxExx format: {episode}")
|
|
except Exception as e:
|
|
debug_print(f"[DEBUG] Error parsing SxxExx format: {e}")
|
|
# Try digit format (like "105" for S01E05)
|
|
elif episode_num.isdigit() and len(episode_num) >= 3:
|
|
try:
|
|
if len(episode_num) == 3:
|
|
season = int(episode_num[0])
|
|
episode = int(episode_num[1:])
|
|
elif len(episode_num) == 4:
|
|
season = int(episode_num[:2])
|
|
episode = int(episode_num[2:])
|
|
debug_print(f"[DEBUG] Parsed from numeric format: S{season}E{episode}")
|
|
except Exception as e:
|
|
debug_print(f"[DEBUG] Error parsing numeric format: {e}")
|
|
|
|
# For debugging: also check direct episode and season tags
|
|
season_elem = programme.find('season-num')
|
|
if season_elem is not None and season_elem.text:
|
|
try:
|
|
season = int(season_elem.text)
|
|
debug_print(f"[DEBUG] Found direct season tag: {season}")
|
|
except ValueError:
|
|
pass
|
|
|
|
episode_elem = programme.find('episode-num')
|
|
if episode_elem is not None and episode_elem.text and episode_elem.get('system') == 'onscreen':
|
|
debug_print(f"[DEBUG] Found onscreen episode format: {episode_elem.text}")
|
|
# Try to extract numbers from onscreen format (like "Episode 5")
|
|
try:
|
|
num_match = re.search(r'(\d+)', episode_elem.text)
|
|
if num_match and episode is None:
|
|
episode = int(num_match.group(1))
|
|
debug_print(f"[DEBUG] Extracted episode number from onscreen format: {episode}")
|
|
except Exception as e:
|
|
debug_print(f"[DEBUG] Error parsing onscreen format: {e}")
|
|
|
|
# Also check if there's a direct 'episode' tag
|
|
direct_episode_elem = programme.find('episode')
|
|
if direct_episode_elem is not None and direct_episode_elem.text:
|
|
try:
|
|
episode = int(direct_episode_elem.text)
|
|
debug_print(f"[DEBUG] Found direct episode tag: {episode}")
|
|
except ValueError:
|
|
pass
|
|
|
|
# Detect if we have the season/episode directly in the title
|
|
title_text = title_elem.text if title_elem is not None else ""
|
|
if title_text and (season is None or episode is None):
|
|
# Look for patterns like "S01E05" in the title
|
|
se_match = re.search(r'S(\d+)E(\d+)', title_text, re.IGNORECASE)
|
|
if se_match:
|
|
season = int(se_match.group(1))
|
|
episode = int(se_match.group(2))
|
|
debug_print(f"[DEBUG] Extracted from title: S{season}E{episode} from '{title_text}'")
|
|
# Check format like "1x05"
|
|
elif 'x' in title_text.lower():
|
|
x_match = re.search(r'(\d+)x(\d+)', title_text, re.IGNORECASE)
|
|
if x_match:
|
|
season = int(x_match.group(1))
|
|
episode = int(x_match.group(2))
|
|
debug_print(f"[DEBUG] Extracted from title 'x' format: S{season}E{episode} from '{title_text}'")
|
|
# Check for "Season X Episode Y" text
|
|
elif 'season' in title_text.lower() and 'episode' in title_text.lower():
|
|
debug_print(f"[DEBUG] Title contains 'season' and 'episode': '{title_text}'")
|
|
# Try to extract season and episode numbers
|
|
season_match = re.search(r'season\s+(\d+)', title_text, re.IGNORECASE)
|
|
episode_match = re.search(r'episode\s+(\d+)', title_text, re.IGNORECASE)
|
|
|
|
if season_match:
|
|
season = int(season_match.group(1))
|
|
debug_print(f"[DEBUG] Extracted season from text: {season}")
|
|
if episode_match:
|
|
episode = int(episode_match.group(1))
|
|
debug_print(f"[DEBUG] Extracted episode from text: {episode}")
|
|
# Check for Episode number in brackets like "Show Name (5)"
|
|
elif re.search(r'\(\s*\d+\s*\)', title_text):
|
|
num_match = re.search(r'\(\s*(\d+)\s*\)', title_text)
|
|
if num_match:
|
|
# Assume this is an episode number if we don't have one yet
|
|
if episode is None:
|
|
episode = int(num_match.group(1))
|
|
debug_print(f"[DEBUG] Extracted episode from brackets: {episode}")
|
|
# Check for common patterns like "E05" without season
|
|
elif re.search(r'E\d+', title_text, re.IGNORECASE) and episode is None:
|
|
e_match = re.search(r'E(\d+)', title_text, re.IGNORECASE)
|
|
if e_match:
|
|
episode = int(e_match.group(1))
|
|
debug_print(f"[DEBUG] Extracted episode from E-format: {episode}")
|
|
|
|
if season is not None or episode is not None:
|
|
debug_print(f"[DEBUG] Final season/episode for '{title_text}': S{season}E{episode}")
|
|
|
|
# Get original air date if available
|
|
date_elem = programme.find('date')
|
|
air_date = date_elem.text if date_elem is not None else None
|
|
|
|
if channel_id not in self.programs:
|
|
self.programs[channel_id] = []
|
|
|
|
# Create formatted show title with season/episode if available
|
|
formatted_title = title_elem.text if title_elem is not None else "No Title"
|
|
|
|
self.programs[channel_id].append({
|
|
'start': start,
|
|
'stop': stop,
|
|
'title': formatted_title,
|
|
'desc': desc_elem.text if desc_elem is not None else "",
|
|
'season': season,
|
|
'episode': episode,
|
|
'episode_num': episode_num,
|
|
'air_date': air_date
|
|
})
|
|
debug_print(f"[DEBUG] Loaded EPG data for {len(self.programs)} channels")
|
|
|
|
# Pre-load current and next shows for each channel
|
|
self.update_current_shows()
|
|
|
|
# Show a sample of program data with season/episode info for debugging
|
|
for channel_id, programs in self.programs.items():
|
|
if programs and len(programs) > 0:
|
|
sample = programs[0]
|
|
if sample.get('season') is not None or sample.get('episode') is not None:
|
|
debug_print(f"[DEBUG] Sample program: Title={sample.get('title')}, Season={sample.get('season')}, Episode={sample.get('episode')}, Episode_num={sample.get('episode_num')}")
|
|
break
|
|
|
|
except Exception as e:
|
|
print(f"[WARNING] Failed to load EPG: {str(e)}")
|
|
|
|
self.last_update = datetime.now()
|
|
|
|
def update_current_shows(self):
|
|
"""Update current and next shows based on current time"""
|
|
now = datetime.now()
|
|
for channel in self.channels:
|
|
if channel['id'] in self.programs:
|
|
shows = self.programs[channel['id']]
|
|
channel['current_show'] = None
|
|
channel['next_show'] = None
|
|
for i, show in enumerate(shows):
|
|
if show['start'] <= now < show['stop']:
|
|
channel['current_show'] = show
|
|
if i+1 < len(shows):
|
|
channel['next_show'] = shows[i+1]
|
|
break
|
|
elif show['start'] > now:
|
|
channel['next_show'] = show
|
|
break
|
|
|
|
def add_error_channel(self, message):
|
|
# Clear existing channels before adding error channel
|
|
self.channels = []
|
|
self.channels.append({
|
|
'id': "error_channel",
|
|
'name': f"Error: {message}",
|
|
'logo': "",
|
|
'group': "Error",
|
|
'url': "",
|
|
'title': "Check your playlist URL",
|
|
'current_show': None,
|
|
'next_show': None
|
|
})
|
|
|
|
def parse_xmltv_time(self, time_str):
|
|
try:
|
|
return datetime.strptime(time_str[:14], "%Y%m%d%H%M%S")
|
|
except:
|
|
return datetime.now()
|
|
|
|
def generate_imdb_url(self, show):
|
|
"""Generate IMDb search URL for a show or specific episode"""
|
|
# Base title for search
|
|
title = show.get('title', '')
|
|
if not title:
|
|
return None
|
|
|
|
# Check if we have season and episode information
|
|
season = show.get('season')
|
|
episode = show.get('episode')
|
|
|
|
# Advanced search for TV episodes is more accurate for specific episodes
|
|
if season is not None and episode is not None:
|
|
# For episodes, use IMDb's advanced title search which gives better results
|
|
# Format properly for the advanced search with title, season and episode
|
|
show_title = urllib.parse.quote_plus(title)
|
|
|
|
# Use IMDb's advanced search for TV episodes
|
|
# This directly targets the episode search with better filtering
|
|
return f"https://www.imdb.com/search/title/?title={show_title}&title_type=tv_episode&season={season}&episode={episode}"
|
|
else:
|
|
# For shows without episode info, use regular search
|
|
encoded_query = urllib.parse.quote_plus(title)
|
|
return f"https://www.imdb.com/find?q={encoded_query}&s=tt&ttype=tv"
|
|
|
|
# Removed clickable link function since it doesn't work in Windows CMD
|
|
|
|
def setup_ui(self):
|
|
# Create channel list with current show info
|
|
self.channel_items = []
|
|
for channel in self.channels:
|
|
# Create custom button that knows its channel
|
|
btn = ChannelButton(channel)
|
|
|
|
# Connect the signal with the correct channel using a closure
|
|
def make_click_handler(c):
|
|
return lambda button: self.on_channel_select(c)
|
|
|
|
urwid.connect_signal(btn, 'click', make_click_handler(channel))
|
|
|
|
# Apply different colors based on channel status
|
|
if channel.get('url'):
|
|
channel_item = urwid.AttrMap(btn, 'channel', 'channel_focus')
|
|
else:
|
|
channel_item = urwid.AttrMap(btn, 'error', 'error')
|
|
|
|
self.channel_items.append(channel_item)
|
|
|
|
# Create list box that tracks focus changes
|
|
self.channel_list = FocusAwareListBox(
|
|
urwid.SimpleFocusListWalker(self.channel_items),
|
|
on_focus_change=self.on_focus_change
|
|
)
|
|
|
|
# Create program guide with scrollable content
|
|
self.program_walker = urwid.SimpleFocusListWalker([])
|
|
self.program_listbox = urwid.ListBox(self.program_walker)
|
|
|
|
# Add update time to footer
|
|
monitor_info = f" | Monitor: {MONITOR}" if MONITOR is not None else ""
|
|
self.footer_text = urwid.Text(f"Q: Quit | ↑↓: Navigate | Enter: Play | L: Reload | S: Settings | Last update: Loading...{monitor_info}", align='center')
|
|
self.footer = urwid.AttrMap(self.footer_text, 'footer')
|
|
|
|
program_frame = urwid.Frame(
|
|
urwid.LineBox(self.program_listbox, title="Program Details"),
|
|
footer=self.footer
|
|
)
|
|
|
|
# Create header with ASCII art
|
|
header_text = BANNER + "\nBibbitTV Terminal Player"
|
|
header = urwid.AttrMap(urwid.Text(header_text, align='center'), 'header')
|
|
|
|
# Create columns
|
|
columns = urwid.Columns([
|
|
('weight', 1, urwid.LineBox(self.channel_list, title="Channels")),
|
|
('weight', 2, program_frame)
|
|
], dividechars=1)
|
|
|
|
# Main layout with header
|
|
layout = urwid.Pile([
|
|
('pack', header),
|
|
('pack', urwid.Divider()),
|
|
columns
|
|
])
|
|
|
|
# Store the main layout
|
|
self.main_widget = urwid.LineBox(layout, title="")
|
|
|
|
# Overlay for centering
|
|
self.top = urwid.Overlay(
|
|
self.main_widget,
|
|
urwid.SolidFill(' '),
|
|
align='center',
|
|
width=('relative', 85),
|
|
valign='middle',
|
|
height=('relative', 85)
|
|
)
|
|
|
|
# Update footer with initial timestamp
|
|
self.update_footer()
|
|
|
|
def update_footer(self):
|
|
"""Update footer with last update time"""
|
|
time_str = self.last_update.strftime("%H:%M:%S")
|
|
monitor_info = f" | Monitor: {MONITOR}" if MONITOR is not None else ""
|
|
self.footer_text.set_text(f"Q: Quit | ↑↓: Navigate | Enter: Play | L: Reload | S: Settings | Last update: {time_str}{monitor_info}")
|
|
|
|
def show_settings_menu(self):
|
|
"""Show settings configuration menu"""
|
|
# Get current config
|
|
current_config = {
|
|
'm3u_url': M3U_URL,
|
|
'xmltv_url': XMLTV_URL,
|
|
'user_agent': USER_AGENT,
|
|
'update_interval': UPDATE_INTERVAL,
|
|
'monitor': MONITOR,
|
|
'mpv_options': MPV_BASE[1:] if MPV_BASE[0] == 'mpv' else MPV_BASE # Remove 'mpv' command
|
|
}
|
|
|
|
# Create settings menu
|
|
self.settings_menu = SettingsMenu(
|
|
current_config,
|
|
self.on_settings_save,
|
|
self.on_settings_cancel
|
|
)
|
|
|
|
# Create settings overlay properly
|
|
self.top = urwid.Overlay(
|
|
self.settings_menu.widget,
|
|
self.top,
|
|
align='center',
|
|
width=('relative', 80),
|
|
valign='middle',
|
|
height=('relative', 90)
|
|
)
|
|
|
|
# Update the MainLoop's widget
|
|
if self.loop:
|
|
self.loop.widget = self.top
|
|
|
|
def on_settings_save(self, new_config):
|
|
"""Handle settings save"""
|
|
try:
|
|
# Save to config.json
|
|
with open('config.json', 'w') as config_file:
|
|
json.dump(new_config, config_file, indent=4)
|
|
|
|
# Update global variables
|
|
global M3U_URL, XMLTV_URL, USER_AGENT, UPDATE_INTERVAL, MONITOR, MPV_BASE, DEBUG_MODE
|
|
M3U_URL = new_config['m3u_url']
|
|
XMLTV_URL = new_config['xmltv_url']
|
|
USER_AGENT = new_config['user_agent']
|
|
UPDATE_INTERVAL = new_config['update_interval']
|
|
MONITOR = new_config['monitor']
|
|
DEBUG_MODE = new_config['debug_mode']
|
|
|
|
# Update MPV base command directly from new_config
|
|
MPV_BASE = ["mpv"]
|
|
MPV_BASE.extend(new_config['mpv_options'])
|
|
|
|
# Close settings menu
|
|
self.on_settings_cancel()
|
|
|
|
# Reload data with new settings
|
|
self.reload_data()
|
|
|
|
except Exception as e:
|
|
# Show error in settings menu
|
|
if self.settings_menu:
|
|
self.settings_menu.show_error(f"Failed to save config: {str(e)}")
|
|
|
|
def on_settings_cancel(self):
|
|
"""Close settings menu"""
|
|
self.settings_menu = None
|
|
# Restore the original top widget structure
|
|
self.top = urwid.Overlay(
|
|
self.main_widget,
|
|
urwid.SolidFill(' '),
|
|
align='center',
|
|
width=('relative', 85),
|
|
valign='middle',
|
|
height=('relative', 85)
|
|
)
|
|
|
|
# Update the MainLoop's widget
|
|
if self.loop:
|
|
self.loop.widget = self.top
|
|
|
|
def start_update_thread(self):
|
|
"""Start background thread for periodic updates"""
|
|
self.update_running = True
|
|
self.update_thread = threading.Thread(target=self.update_worker, daemon=True)
|
|
self.update_thread.start()
|
|
|
|
def update_worker(self):
|
|
"""Background worker for periodic updates"""
|
|
while self.update_running:
|
|
time.sleep(UPDATE_INTERVAL)
|
|
if self.loop:
|
|
# Schedule update on the main thread
|
|
self.loop.set_alarm_in(0, self.refresh_schedule)
|
|
|
|
def refresh_schedule(self, loop=None, user_data=None):
|
|
"""Update schedule information without animation for better performance"""
|
|
try:
|
|
debug_print("[DEBUG] Refreshing schedule data...")
|
|
|
|
# Update current shows without animations
|
|
self.update_current_shows()
|
|
self.last_update = datetime.now()
|
|
|
|
# Update UI
|
|
self.refresh_ui()
|
|
self.update_footer()
|
|
|
|
# Add simple update notification
|
|
time_str = self.last_update.strftime("%H:%M:%S")
|
|
update_text = urwid.Text([("update", f"Schedule updated at {time_str}")])
|
|
empty_line = urwid.Text("")
|
|
self.program_walker[:] = [update_text, empty_line] + self.program_walker[:]
|
|
|
|
debug_print("[DEBUG] Schedule refreshed successfully")
|
|
except Exception as e:
|
|
print(f"[ERROR] Failed to refresh schedule: {str(e)}")
|
|
|
|
def refresh_ui(self):
|
|
"""Refresh UI elements with updated data"""
|
|
# Update channel buttons
|
|
for i, channel in enumerate(self.channels):
|
|
button = self.channel_items[i].base_widget
|
|
if channel.get('current_show'):
|
|
label = f"{channel['name']}\nNow: {channel['current_show']['title']}"
|
|
if channel.get('next_show'):
|
|
label += f"\nNext: {channel['next_show']['title']}"
|
|
else:
|
|
label = channel['name']
|
|
button.set_label(label)
|
|
|
|
# Update program info for current channel if focused
|
|
if self.current_channel:
|
|
self.on_channel_hover(self.current_channel)
|
|
|
|
def on_focus_change(self, focused_widget):
|
|
"""Update program info when focusing a channel"""
|
|
if focused_widget:
|
|
# Get the button inside the AttrMap
|
|
button = focused_widget.base_widget
|
|
if hasattr(button, 'channel'):
|
|
self.on_channel_hover(button.channel)
|
|
|
|
def on_channel_hover(self, channel):
|
|
"""Update program info without animation for better performance"""
|
|
self.current_channel = channel
|
|
program_info = self.get_program_info(channel)
|
|
|
|
# Direct update without animation for better performance
|
|
self.program_walker[:] = [urwid.Text(line) for line in program_info]
|
|
|
|
def on_channel_select(self, channel):
|
|
"""Play channel when selected (no animation for better performance)"""
|
|
if channel.get('url'):
|
|
# Play the channel directly without animation
|
|
self.play_channel(channel['url'])
|
|
|
|
def get_program_info(self, channel):
|
|
info = []
|
|
max_line_length = 60 # Define this at the top so it's available everywhere
|
|
|
|
info.append([("header", f"--Channel: {channel['name']}")])
|
|
|
|
if channel.get('group'):
|
|
info.append([("title", f"Group: {channel['group']}")])
|
|
|
|
# Debug program information
|
|
if channel.get('current_show'):
|
|
current_show = channel['current_show']
|
|
debug_print(f"[DEBUG] Program info - Title: {current_show.get('title')}")
|
|
debug_print(f"[DEBUG] Program info - Season: {current_show.get('season')}")
|
|
debug_print(f"[DEBUG] Program info - Episode: {current_show.get('episode')}")
|
|
debug_print(f"[DEBUG] Program info - Air date: {current_show.get('air_date')}")
|
|
|
|
if channel.get('current_show'):
|
|
now = datetime.now()
|
|
remaining = (channel['current_show']['stop'] - now).seconds // 60
|
|
start_time = channel['current_show']['start'].strftime("%H:%M")
|
|
end_time = channel['current_show']['stop'].strftime("%H:%M")
|
|
|
|
# Current show section with colorful formatting
|
|
info.append([]) # Empty line
|
|
info.append([("program_now", "--NOW PLAYING")])
|
|
# Display title with season and episode prominently if available
|
|
current_show = channel['current_show']
|
|
|
|
# Extract and display season/episode information
|
|
season = current_show.get('season')
|
|
episode = current_show.get('episode')
|
|
|
|
# If season/episode not available in metadata, try to extract from title
|
|
if season is None or episode is None:
|
|
# Try SxxExx format (S01E05)
|
|
se_match = re.search(r'S(\d+)E(\d+)', current_show['title'], re.IGNORECASE)
|
|
if se_match:
|
|
season = int(se_match.group(1))
|
|
episode = int(se_match.group(2))
|
|
debug_print(f"[DEBUG] Extracted S{season}E{episode} from title format")
|
|
# Try season x episode format (1x05)
|
|
elif 'x' in current_show['title'].lower():
|
|
x_match = re.search(r'(\d+)x(\d+)', current_show['title'], re.IGNORECASE)
|
|
if x_match:
|
|
season = int(x_match.group(1))
|
|
episode = int(x_match.group(2))
|
|
debug_print(f"[DEBUG] Extracted S{season}E{episode} from Nx format")
|
|
|
|
# Display title with or without season/episode info
|
|
if season is not None and episode is not None:
|
|
# Format title with season/episode info
|
|
formatted_title = f"Title: {current_show['title']} (S{season:02d}E{episode:02d})"
|
|
info.append([("title", formatted_title)])
|
|
# Also add a detailed version on a separate line
|
|
info.append([("program_desc", f"Season {season}, Episode {episode}")])
|
|
# Update the current_show object with the extracted info if it wasn't there already
|
|
current_show['season'] = season
|
|
current_show['episode'] = episode
|
|
else:
|
|
info.append([("title", f"Title: {current_show['title']}")])
|
|
|
|
# Add air date if available
|
|
if current_show.get('air_date'):
|
|
info.append([("time", f"Original Air Date: {current_show['air_date']}")])
|
|
|
|
info.append([
|
|
("time", f"Time: {start_time} - {end_time} "),
|
|
("", f"({remaining} minutes remaining)")
|
|
])
|
|
|
|
# Add IMDb link if we can generate one
|
|
imdb_url = self.generate_imdb_url(current_show)
|
|
# Skip IMDb URL display as it doesn't work well in Windows CMD
|
|
# if imdb_url:
|
|
# info.append([("update", f"IMDb: {imdb_url}")])
|
|
|
|
if channel['current_show']['desc']:
|
|
info.append([]) # Empty line
|
|
info.append([("program_desc", "--Description:")])
|
|
# Split long description into multiple lines
|
|
desc = channel['current_show']['desc']
|
|
for i in range(0, len(desc), max_line_length):
|
|
info.append([('program_desc', desc[i:i+max_line_length])])
|
|
|
|
# Next show section
|
|
if channel.get('next_show'):
|
|
next_start = channel['next_show']['start'].strftime("%H:%M")
|
|
next_end = channel['next_show']['stop'].strftime("%H:%M")
|
|
|
|
info.append([]) # Empty line
|
|
info.append([("divider", "─" * 50)])
|
|
info.append([]) # Empty line
|
|
info.append([("program_next", "--UP NEXT")])
|
|
|
|
# Display title with season and episode information
|
|
next_show = channel['next_show']
|
|
|
|
# Extract season/episode information for next show
|
|
season = next_show.get('season')
|
|
episode = next_show.get('episode')
|
|
|
|
# If season/episode not available in metadata, try to extract from title
|
|
if season is None or episode is None:
|
|
# Try SxxExx format (S01E05)
|
|
se_match = re.search(r'S(\d+)E(\d+)', next_show['title'], re.IGNORECASE)
|
|
if se_match:
|
|
season = int(se_match.group(1))
|
|
episode = int(se_match.group(2))
|
|
debug_print(f"[DEBUG] Next show: Extracted S{season}E{episode} from title format")
|
|
# Try season x episode format (1x05)
|
|
elif 'x' in next_show['title'].lower():
|
|
x_match = re.search(r'(\d+)x(\d+)', next_show['title'], re.IGNORECASE)
|
|
if x_match:
|
|
season = int(x_match.group(1))
|
|
episode = int(x_match.group(2))
|
|
debug_print(f"[DEBUG] Next show: Extracted S{season}E{episode} from Nx format")
|
|
|
|
# Display title with or without season/episode info
|
|
if season is not None and episode is not None:
|
|
# Format title with season/episode info
|
|
formatted_title = f"Title: {next_show['title']} (S{season:02d}E{episode:02d})"
|
|
info.append([("title", formatted_title)])
|
|
# Also add a detailed version on a separate line
|
|
info.append([("program_desc", f"Season {season}, Episode {episode}")])
|
|
# Update the next_show object with the extracted info if it wasn't there already
|
|
next_show['season'] = season
|
|
next_show['episode'] = episode
|
|
else:
|
|
info.append([("title", f"Title: {next_show['title']}")])
|
|
|
|
# Add air date if available for next show
|
|
if next_show.get('air_date'):
|
|
info.append([("time", f"Original Air Date: {next_show['air_date']}")])
|
|
|
|
info.append([("time", f"Time: {next_start} - {next_end}")])
|
|
|
|
# Add IMDb link for next show if we can generate one
|
|
imdb_url = self.generate_imdb_url(next_show)
|
|
# Skip IMDb URL display as it doesn't work well in Windows CMD
|
|
|
|
if channel['next_show']['desc']:
|
|
info.append([]) # Empty line
|
|
info.append([("program_desc", "--Description:")])
|
|
desc = channel['next_show']['desc']
|
|
# Limit description to 5 lines to prevent overflow
|
|
max_lines = 5
|
|
line_count = 0
|
|
for i in range(0, len(desc), max_line_length):
|
|
if line_count >= max_lines:
|
|
info.append([('program_desc', '... (description truncated)')])
|
|
break
|
|
info.append([('program_desc', desc[i:i+max_line_length])])
|
|
line_count += 1
|
|
else:
|
|
# Handle case where there's no current show but there might be a next show
|
|
if channel.get('next_show'):
|
|
next_start = channel['next_show']['start'].strftime("%H:%M")
|
|
next_end = channel['next_show']['stop'].strftime("%H:%M")
|
|
|
|
info.append([]) # Empty line
|
|
info.append([("program_next", "--UP NEXT")])
|
|
|
|
# Display title with season and episode information
|
|
next_show = channel['next_show']
|
|
|
|
# Extract season/episode information for next show
|
|
season = next_show.get('season')
|
|
episode = next_show.get('episode')
|
|
|
|
# If season/episode not available in metadata, try to extract from title
|
|
if season is None or episode is None:
|
|
# Try SxxExx format (S01E05)
|
|
se_match = re.search(r'S(\d+)E(\d+)', next_show['title'], re.IGNORECASE)
|
|
if se_match:
|
|
season = int(se_match.group(1))
|
|
episode = int(se_match.group(2))
|
|
debug_print(f"[DEBUG] Next show: Extracted S{season}E{episode} from title format")
|
|
# Try season x episode format (1x05)
|
|
elif 'x' in next_show['title'].lower():
|
|
x_match = re.search(r'(\d+)x(\d+)', next_show['title'], re.IGNORECASE)
|
|
if x_match:
|
|
season = int(x_match.group(1))
|
|
episode = int(x_match.group(2))
|
|
debug_print(f"[DEBUG] Next show: Extracted S{season}E{episode} from Nx format")
|
|
|
|
# Display title with or without season/episode info
|
|
if season is not None and episode is not None:
|
|
# Format title with season/episode info
|
|
formatted_title = f"Title: {next_show['title']} (S{season:02d}E{episode:02d})"
|
|
info.append([("title", formatted_title)])
|
|
# Also add a detailed version on a separate line
|
|
info.append([("program_desc", f"Season {season}, Episode {episode}")])
|
|
# Update the next_show object with the extracted info if it wasn't there already
|
|
next_show['season'] = season
|
|
next_show['episode'] = episode
|
|
else:
|
|
info.append([("title", f"Title: {next_show['title']}")])
|
|
|
|
# Add air date if available for next show
|
|
if next_show.get('air_date'):
|
|
info.append([("time", f"Original Air Date: {next_show['air_date']}")])
|
|
|
|
info.append([("time", f"Time: {next_start} - {next_end}")])
|
|
|
|
# Add IMDb link for next show if we can generate one
|
|
imdb_url = self.generate_imdb_url(next_show)
|
|
# Skip IMDb URL display as it doesn't work well in Windows CMD
|
|
|
|
if channel['next_show']['desc']:
|
|
info.append([]) # Empty line
|
|
info.append([("program_desc", "--Description:")])
|
|
desc = channel['next_show']['desc']
|
|
# Limit description to 5 lines to prevent overflow
|
|
max_lines = 5
|
|
line_count = 0
|
|
for i in range(0, len(desc), max_line_length):
|
|
if line_count >= max_lines:
|
|
info.append([('program_desc', '... (description truncated)')])
|
|
break
|
|
info.append([('program_desc', desc[i:i+max_line_length])])
|
|
line_count += 1
|
|
else:
|
|
info.append([("", "No current program information available")])
|
|
|
|
return info
|
|
|
|
def play_channel(self, url):
|
|
if self.mpv_process:
|
|
self.mpv_process.terminate()
|
|
self.mpv_process.wait()
|
|
|
|
try:
|
|
self.mpv_process = subprocess.Popen(MPV_BASE + [url])
|
|
except Exception as e:
|
|
# Create proper widget for error message
|
|
self.program_walker[:] = [urwid.Text([("error", f"Failed to play stream: {str(e)}")])]
|
|
|
|
def reload_data(self):
|
|
"""Reload all data from sources without animation for better performance"""
|
|
# Save current focus position
|
|
current_focus_pos = self.channel_list.focus_position if self.channel_list.body else None
|
|
|
|
# Reload data
|
|
self.channels = []
|
|
self.programs = {}
|
|
self.load_data()
|
|
|
|
# Update channel items in place
|
|
self.update_channel_items()
|
|
|
|
# Restore focus position
|
|
if current_focus_pos is not None and current_focus_pos < len(self.channel_items):
|
|
self.channel_list.set_focus(current_focus_pos)
|
|
self.on_channel_hover(self.channels[current_focus_pos])
|
|
|
|
# Update program details
|
|
if self.current_channel:
|
|
self.on_channel_hover(self.current_channel)
|
|
|
|
self.update_footer()
|
|
|
|
def update_channel_items(self):
|
|
"""Update existing channel items with new data"""
|
|
# Clear existing items
|
|
self.channel_list.body.clear()
|
|
|
|
# Recreate channel items with updated data
|
|
self.channel_items = []
|
|
for channel in self.channels:
|
|
# Create custom button that knows its channel
|
|
btn = ChannelButton(channel)
|
|
|
|
# Connect the signal with the correct channel using a closure
|
|
def make_click_handler(c):
|
|
return lambda button: self.on_channel_select(c)
|
|
|
|
urwid.connect_signal(btn, 'click', make_click_handler(channel))
|
|
|
|
# Apply different colors based on channel status
|
|
if channel.get('url'):
|
|
channel_item = urwid.AttrMap(btn, 'channel', 'channel_focus')
|
|
else:
|
|
channel_item = urwid.AttrMap(btn, 'error', 'error')
|
|
|
|
self.channel_items.append(channel_item)
|
|
self.channel_list.body.append(channel_item)
|
|
|
|
def run(self):
|
|
# Enable mouse support and cursor visibility
|
|
screen = urwid.raw_display.Screen()
|
|
screen.set_terminal_properties(colors=256)
|
|
screen.set_mouse_tracking()
|
|
|
|
self.loop = urwid.MainLoop(
|
|
self.top,
|
|
palette=PALETTE,
|
|
screen=screen,
|
|
unhandled_input=self.handle_input,
|
|
handle_mouse=True
|
|
)
|
|
|
|
# Start background update thread
|
|
self.start_update_thread()
|
|
|
|
try:
|
|
# Show splash screen animation before running the main loop
|
|
self.display_splash_screen()
|
|
|
|
# Start the main event loop
|
|
self.loop.run()
|
|
finally:
|
|
# Clean up when exiting
|
|
self.update_running = False
|
|
if self.update_thread:
|
|
self.update_thread.join(timeout=1.0)
|
|
|
|
def handle_input(self, key):
|
|
# Only handle main UI input if settings menu is not open
|
|
if self.settings_menu:
|
|
if key == 'esc':
|
|
self.on_settings_cancel()
|
|
return
|
|
|
|
if key in ('q', 'Q'):
|
|
self.quit_player()
|
|
elif key in ('l', 'L'):
|
|
self.reload_data()
|
|
elif key in ('s', 'S'):
|
|
self.show_settings_menu()
|
|
elif key == 'enter':
|
|
if self.current_channel and self.current_channel.get('url'):
|
|
self.play_channel(self.current_channel['url'])
|
|
|
|
def quit_player(self):
|
|
if self.mpv_process:
|
|
self.mpv_process.terminate()
|
|
self.mpv_process.wait()
|
|
self.update_running = False
|
|
raise urwid.ExitMainLoop()
|
|
|
|
def __del__(self):
|
|
self.update_running = False
|
|
if hasattr(self, 'mpv_process') and self.mpv_process:
|
|
self.mpv_process.terminate()
|
|
|
|
def check_mpv_installed():
|
|
try:
|
|
if os.name == 'nt':
|
|
subprocess.run(["where.exe", "mpv"], check=True,
|
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
else:
|
|
subprocess.run(["which", "mpv"], check=True,
|
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
return True
|
|
except subprocess.CalledProcessError:
|
|
return False
|
|
|
|
def main():
|
|
if not check_mpv_installed():
|
|
print("Error: mpv player is required but not found. Please install mpv first.")
|
|
print("You can download it from: https://mpv.io/installation/")
|
|
sys.exit(1)
|
|
|
|
player = IPTVPlayer()
|
|
player.run()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|