iptv player
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

1284 lines
57 KiB

#!/usr/bin/env python3
import os
import re
import subprocess
import sys
import json
import xml.etree.ElementTree as ET
from datetime import datetime, timedelta
from urllib.request import urlopen, Request
import urwid
from urllib.error import URLError
import threading
import time
import urllib.parse
# Load configuration from JSON file
try:
with open('config.json') as config_file:
config = json.load(config_file)
M3U_URL = config.get('m3u_url', "http://10.0.0.17:8409/iptv/channels.m3u")
XMLTV_URL = config.get('xmltv_url', "http://10.0.0.17:8409/iptv/xmltv.xml")
USER_AGENT = config.get('user_agent', "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36")
UPDATE_INTERVAL = config.get('update_interval', 900)
MONITOR = config.get('monitor', None)
DEBUG_MODE = config.get('debug_mode', False)
# Base MPV command with monitor options if specified
MPV_BASE = ["mpv"]
if MONITOR is not None:
MPV_BASE.extend(["--fs", f"--fs-screen={MONITOR}"])
MPV_BASE.extend(config.get('mpv_options', [
"--really-quiet",
"--no-terminal",
"--force-window=immediate"
]))
except FileNotFoundError:
print("Configuration file not found, using default settings")
M3U_URL = "http://10.0.0.17:8409/iptv/channels.m3u"
XMLTV_URL = "http://10.0.0.17:8409/iptv/xmltv.xml"
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
UPDATE_INTERVAL = 900
MONITOR = None
DEBUG_MODE = False
MPV_BASE = [
"mpv",
"--really-quiet",
"--no-terminal",
"--force-window=immediate"
]
# Enhanced color palette with animation colors
PALETTE = [
# Original colors
('header', 'white', 'dark blue'),
('footer', 'white', 'dark blue'),
('channel', 'black', 'light cyan'),
('channel_focus', 'white', 'dark blue'),
('program_now', 'white', 'dark green'),
('program_next', 'black', 'light gray'),
('error', 'white', 'dark red'),
('divider', 'dark green', ''),
('program_desc', 'light cyan', ''),
('time', 'yellow', ''),
('title', 'bold', ''),
('update', 'light blue', ''),
# Animation colors
('splash_1', 'light red', ''),
('splash_2', 'yellow', ''),
('splash_3', 'light green', ''),
('splash_4', 'light cyan', ''),
('splash_5', 'light blue', ''),
('splash_6', 'light magenta', ''),
('highlight', 'white,bold', 'dark blue'),
('loading', 'yellow', ''),
('pulse_1', 'dark cyan', ''),
('pulse_2', 'light cyan', ''),
('pulse_3', 'white', ''),
('flash', 'white,bold', 'dark green'),
('fade', 'dark gray', ''),
]
# Function for debug prints that only outputs when debug mode is enabled
def debug_print(*args, **kwargs):
if DEBUG_MODE:
print(*args, **kwargs)
# ASCII Art and Animation Frames
BANNER = r"""
__________._____. ___. .__ __ _______________ ____
\______ \__\_ |__\_ |__ |__|/ |_ \__ ___/\ \ / /
| | _/ || __ \| __ \| \ __\ | | \ Y /
| | \ || \_\ \ \_\ \ || | | | \ /
|______ /__||___ /___ /__||__| |____| \___/
\/ \/ \/
"""
# Animation frames for the splash screen
SPLASH_FRAMES = [
# Frame 1 - Just B
r"""
_________
|\\ |
| \\ |
| \\_____|
| |_____|
| | |
|___|_____|
""",
# Frame 2 - BI
r"""
_________ ___
|\\ | | |
| \\ | | |
| \\_____| | |
| |_____| | |
| | | |
|___|_____| |___|
""",
# Frame 3 - BIB
r"""
_________ ___ _________
|\\ | | | |\\ |
| \\ | | | | \\ |
| \\_____| | | | \\_____|
| |_____| | | | |_____|
| | | | | | |
|___|_____| |___| |___|_____|
""",
# Frame 4 - BIBB
r"""
_________ ___ _________ _________
|\\ | | | |\\ | |\\ |
| \\ | | | | \\ | | \\ |
| \\_____| | | | \\_____| | \\_____|
| |_____| | | | |_____| | |_____|
| | | | | | | | | |
|___|_____| |___| |___|_____| |___|_____|
""",
# Frame 5 - BIBBI
r"""
_________ ___ _________ _________ ___
|\\ | | | |\\ | |\\ | | |
| \\ | | | | \\ | | \\ | | |
| \\_____| | | | \\_____| | \\_____| | |
| |_____| | | | |_____| | |_____| | |
| | | | | | | | | | |
|___|_____| |___| |___|_____| |___|_____| |___|
""",
# Frame 6 - BIBBIT
r"""
_________ ___ _________ _________ ___ _______
|\\ | | | |\\ | |\\ | | | | |
| \\ | | | | \\ | | \\ | | | |_ _|
| \\_____| | | | \\_____| | \\_____| | | | |
| |_____| | | | |_____| | |_____| | | | |
| | | | | | | | | | | | |
|___|_____| |___| |___|_____| |___|_____| |___| |_______|
""",
]
# Loading animation characters
LOADING_CHARS = ['', '', '', '', '', '', '', '']
class ChannelButton(urwid.Button):
def __init__(self, channel, *args, **kwargs):
self.channel = channel
if channel.get('current_show'):
label = f"{channel['name']}\nNow: {channel['current_show']['title']}"
if channel.get('next_show'):
label += f"\nNext: {channel['next_show']['title']}"
else:
label = channel['name']
super().__init__(label, *args, **kwargs)
class FocusAwareListBox(urwid.ListBox):
"""ListBox that tracks focus changes"""
def __init__(self, body, on_focus_change=None):
super().__init__(body)
self.on_focus_change = on_focus_change
self._last_focus = None
def change_focus(self, size, position, *args, **kwargs):
super().change_focus(size, position, *args, **kwargs)
current_focus = self.focus
if current_focus != self._last_focus and self.on_focus_change:
self.on_focus_change(current_focus)
self._last_focus = current_focus
class SettingsMenu:
"""Settings configuration menu"""
def __init__(self, config, on_save_callback, on_cancel_callback):
self.config = config.copy()
self.on_save_callback = on_save_callback
self.on_cancel_callback = on_cancel_callback
self.setup_ui()
def setup_ui(self):
# Create styled form fields with consistent colors
self.m3u_edit = urwid.Edit(("title", "M3U URL: "), self.config.get('m3u_url', ''))
self.xmltv_edit = urwid.Edit(("title", "XMLTV URL: "), self.config.get('xmltv_url', ''))
self.user_agent_edit = urwid.Edit(("title", "User Agent: "), self.config.get('user_agent', ''))
self.update_interval_edit = urwid.IntEdit(("title", "Update Interval (seconds): "), self.config.get('update_interval', 900))
# Monitor selection - simplified as a numeric selection
monitor_value = self.config.get('monitor', None)
# Create a radio button group for monitor selection
self.monitor_group = []
rb_none = urwid.RadioButton(self.monitor_group, "Default (No specific monitor)", state=(monitor_value is None))
rb_1 = urwid.RadioButton(self.monitor_group, "Monitor 1", state=(monitor_value == 1))
rb_2 = urwid.RadioButton(self.monitor_group, "Monitor 2", state=(monitor_value == 2))
rb_3 = urwid.RadioButton(self.monitor_group, "Monitor 3", state=(monitor_value == 3))
# Debug mode checkbox
self.debug_checkbox = urwid.CheckBox("Enable Debug Mode", state=self.config.get('debug_mode', False))
# Create styled form sections
form_items = [
# Header section
urwid.AttrMap(urwid.Text("Settings Configuration", align='center'), 'header'),
urwid.AttrMap(urwid.Divider(""), 'divider'),
urwid.Divider(),
# Connection settings section
urwid.AttrMap(urwid.Text("CONNECTION SETTINGS", align='left'), 'program_now'),
urwid.Divider(),
urwid.AttrMap(self.m3u_edit, None, 'channel_focus'),
urwid.Divider(),
urwid.AttrMap(self.xmltv_edit, None, 'channel_focus'),
urwid.Divider(),
urwid.AttrMap(self.user_agent_edit, None, 'channel_focus'),
urwid.Divider(),
# Application settings section
urwid.AttrMap(urwid.Text("APPLICATION SETTINGS", align='left'), 'program_now'),
urwid.Divider(),
urwid.AttrMap(self.update_interval_edit, None, 'channel_focus'),
urwid.Divider(),
# Display settings section
urwid.AttrMap(urwid.Text("DISPLAY SETTINGS", align='left'), 'program_now'),
urwid.Divider(),
urwid.AttrMap(urwid.Text("Fullscreen Monitor Selection:", align='left'), 'program_desc'),
urwid.AttrMap(rb_none, None, 'channel_focus'),
urwid.AttrMap(rb_1, None, 'channel_focus'),
urwid.AttrMap(rb_2, None, 'channel_focus'),
urwid.AttrMap(rb_3, None, 'channel_focus'),
urwid.Divider(),
# Debug settings section
urwid.AttrMap(urwid.Text("DEBUG SETTINGS", align='left'), 'program_now'),
urwid.Divider(),
urwid.AttrMap(self.debug_checkbox, None, 'channel_focus'),
urwid.Divider(),
# Action buttons
urwid.AttrMap(urwid.Divider(""), 'divider'),
urwid.Divider(),
urwid.Columns([
('weight', 1, urwid.AttrMap(urwid.Button("Save Settings", on_press=self.save_settings), 'program_now', 'program_desc')),
('weight', 1, urwid.AttrMap(urwid.Button("Cancel", on_press=self.cancel_settings), 'error', 'program_desc'))
], dividechars=2)
]
listwalker = urwid.SimpleFocusListWalker(form_items)
self.listbox = urwid.ListBox(listwalker)
# Create the main widget with styled frame
self.widget = urwid.Frame(
urwid.AttrMap(urwid.LineBox(self.listbox, title=" Settings ", title_align='center'), 'channel'),
footer=urwid.AttrMap(
urwid.Text("Tab/Shift+Tab: Navigate | Enter: Select | Esc: Cancel", align='center'),
'footer'
)
)
def save_settings(self, button=None):
"""Validate and save settings"""
try:
# Validate and collect settings
new_config = {
'm3u_url': self.m3u_edit.edit_text.strip(),
'xmltv_url': self.xmltv_edit.edit_text.strip(),
'user_agent': self.user_agent_edit.edit_text.strip(),
'update_interval': self.update_interval_edit.value(),
'debug_mode': self.debug_checkbox.state
}
# Handle monitor setting from radio buttons
# Find which radio button is selected
monitor_value = None
for i, rb in enumerate(self.monitor_group):
if rb.state:
if i == 0: # "Default" option
monitor_value = None
else:
monitor_value = i # Monitor number (1, 2, 3)
break
new_config['monitor'] = monitor_value
# Set default MPV options without exposing them in the UI
standard_mpv_options = [
"--really-quiet",
"--no-terminal",
"--force-window=immediate"
]
# Add fullscreen options if monitor is selected
if monitor_value is not None:
fs_options = [
"--fs",
f"--fs-screen={monitor_value}"
]
new_config['mpv_options'] = fs_options + standard_mpv_options
else:
new_config['mpv_options'] = standard_mpv_options
# Validate required fields
if not new_config['m3u_url']:
self.show_error("M3U URL is required")
return
if not new_config['xmltv_url']:
self.show_error("XMLTV URL is required")
return
if not new_config['user_agent']:
self.show_error("User Agent is required")
return
if new_config['update_interval'] <= 0:
self.show_error("Update interval must be positive")
return
self.on_save_callback(new_config)
except Exception as e:
self.show_error(f"Error saving settings: {str(e)}")
def cancel_settings(self, button=None):
"""Cancel settings changes"""
self.on_cancel_callback()
def show_error(self, message):
"""Show error message in footer"""
error_text = urwid.AttrMap(urwid.Text(f"Error: {message}", align='center'), 'error')
self.widget.footer = error_text
class IPTVPlayer:
def __init__(self):
self.channels = []
self.programs = {}
self.current_channel = None
self.mpv_process = None
self.loop = None
self.last_update = datetime.now()
self.update_thread = None
self.update_running = True
self.settings_menu = None
self.main_widget = None
self.animate_startup = False # Disable animations for better performance
self.splash_screen = None
self.load_data()
self.setup_ui()
def display_splash_screen(self):
"""Disabled splash screen for better performance"""
# Skip splash screen completely for better performance
pass
def load_data(self):
debug_print("\n[DEBUG] Loading IPTV data...")
# Load M3U playlist
try:
req = Request(M3U_URL, headers={'User-Agent': USER_AGENT})
with urlopen(req, timeout=10) as response:
m3u_data = response.read().decode('utf-8')
debug_print(f"[DEBUG] Received M3U data (length: {len(m3u_data)} bytes)")
# Parse M3U
lines = m3u_data.split('\n')
current_channel = None
for line in lines:
line = line.strip()
if line.startswith('#EXTINF:'):
# Parse channel info
tvg_id = re.search(r'tvg-id="([^"]*)"', line)
tvg_name = re.search(r'tvg-name="([^"]*)"', line)
tvg_logo = re.search(r'tvg-logo="([^"]*)"', line)
group_title = re.search(r'group-title="([^"]*)"', line)
channel_title = line.split(',')[-1].strip()
current_channel = {
'id': tvg_id.group(1) if tvg_id else "",
'name': tvg_name.group(1) if tvg_name else channel_title,
'logo': tvg_logo.group(1) if tvg_logo else "",
'group': group_title.group(1) if group_title else "Other",
'title': channel_title,
'url': None,
'current_show': None,
'next_show': None
}
elif (line.startswith('http://') or line.startswith('https://') or
line.startswith('rtmp://') or line.startswith('udp://') or
line.startswith('rtp://') or line.startswith('mms://') or
line.startswith('rtsp://') or line.startswith('file://')):
if current_channel:
current_channel['url'] = line
self.channels.append(current_channel)
current_channel = None
if not self.channels:
print("[WARNING] No channels found in M3U file!")
self.add_error_channel("No channels found in playlist")
else:
debug_print(f"[DEBUG] Successfully loaded {len(self.channels)} channels")
except Exception as e:
print(f"[ERROR] Failed to load M3U: {str(e)}")
self.add_error_channel(f"Error loading playlist: {str(e)}")
# Load XMLTV guide
if self.channels and not self.channels[0]['name'].startswith("Error:"):
try:
req = Request(XMLTV_URL, headers={'User-Agent': USER_AGENT})
with urlopen(req, timeout=10) as response:
xml_data = response.read().decode('utf-8')
debug_print(f"[DEBUG] Received XMLTV data (length: {len(xml_data)} bytes)")
root = ET.fromstring(xml_data)
for programme in root.findall('programme'):
channel_id = programme.get('channel')
start = self.parse_xmltv_time(programme.get('start'))
stop = self.parse_xmltv_time(programme.get('stop'))
title_elem = programme.find('title')
desc_elem = programme.find('desc')
# Extract additional episode information
episode_num_elem = programme.find('episode-num')
episode_num = ""
season = None
episode = None
# Parse episode numbering systems
if episode_num_elem is not None and episode_num_elem.text:
episode_num = episode_num_elem.text
debug_print(f"[DEBUG] Found episode number: {episode_num}, system: {episode_num_elem.get('system')}")
# Extract season and episode from xmltv_ns format (common format: S.E.P/TOTAL where S=season, E=episode, P=part)
if episode_num_elem.get('system') == 'xmltv_ns':
# Format example: "2.9." means Season 3, Episode 10 (zero-based)
parts = episode_num.split('.')
debug_print(f"[DEBUG] Split episode_num into parts: {parts}")
if len(parts) >= 2:
try:
# XMLTV uses zero-based numbering, so add 1
if parts[0].strip():
season = int(parts[0]) + 1
debug_print(f"[DEBUG] Parsed season: {season}")
if parts[1].strip():
episode = int(parts[1]) + 1
debug_print(f"[DEBUG] Parsed episode: {episode}")
except ValueError as e:
debug_print(f"[DEBUG] Error parsing season/episode: {e}")
# Also try to find other common formats
elif episode_num.lower().startswith('s') and 'e' in episode_num.lower():
# Format like "S01E05"
try:
s_part = episode_num.lower().split('e')[0].strip('s')
e_part = episode_num.lower().split('e')[1].strip()
if s_part.isdigit():
season = int(s_part)
debug_print(f"[DEBUG] Parsed season from SxxExx format: {season}")
if e_part.isdigit():
episode = int(e_part)
debug_print(f"[DEBUG] Parsed episode from SxxExx format: {episode}")
except Exception as e:
debug_print(f"[DEBUG] Error parsing SxxExx format: {e}")
# Try digit format (like "105" for S01E05)
elif episode_num.isdigit() and len(episode_num) >= 3:
try:
if len(episode_num) == 3:
season = int(episode_num[0])
episode = int(episode_num[1:])
elif len(episode_num) == 4:
season = int(episode_num[:2])
episode = int(episode_num[2:])
debug_print(f"[DEBUG] Parsed from numeric format: S{season}E{episode}")
except Exception as e:
debug_print(f"[DEBUG] Error parsing numeric format: {e}")
# For debugging: also check direct episode and season tags
season_elem = programme.find('season-num')
if season_elem is not None and season_elem.text:
try:
season = int(season_elem.text)
debug_print(f"[DEBUG] Found direct season tag: {season}")
except ValueError:
pass
episode_elem = programme.find('episode-num')
if episode_elem is not None and episode_elem.text and episode_elem.get('system') == 'onscreen':
debug_print(f"[DEBUG] Found onscreen episode format: {episode_elem.text}")
# Try to extract numbers from onscreen format (like "Episode 5")
try:
num_match = re.search(r'(\d+)', episode_elem.text)
if num_match and episode is None:
episode = int(num_match.group(1))
debug_print(f"[DEBUG] Extracted episode number from onscreen format: {episode}")
except Exception as e:
debug_print(f"[DEBUG] Error parsing onscreen format: {e}")
# Also check if there's a direct 'episode' tag
direct_episode_elem = programme.find('episode')
if direct_episode_elem is not None and direct_episode_elem.text:
try:
episode = int(direct_episode_elem.text)
debug_print(f"[DEBUG] Found direct episode tag: {episode}")
except ValueError:
pass
# Detect if we have the season/episode directly in the title
title_text = title_elem.text if title_elem is not None else ""
if title_text and (season is None or episode is None):
# Look for patterns like "S01E05" in the title
se_match = re.search(r'S(\d+)E(\d+)', title_text, re.IGNORECASE)
if se_match:
season = int(se_match.group(1))
episode = int(se_match.group(2))
debug_print(f"[DEBUG] Extracted from title: S{season}E{episode} from '{title_text}'")
# Check format like "1x05"
elif 'x' in title_text.lower():
x_match = re.search(r'(\d+)x(\d+)', title_text, re.IGNORECASE)
if x_match:
season = int(x_match.group(1))
episode = int(x_match.group(2))
debug_print(f"[DEBUG] Extracted from title 'x' format: S{season}E{episode} from '{title_text}'")
# Check for "Season X Episode Y" text
elif 'season' in title_text.lower() and 'episode' in title_text.lower():
debug_print(f"[DEBUG] Title contains 'season' and 'episode': '{title_text}'")
# Try to extract season and episode numbers
season_match = re.search(r'season\s+(\d+)', title_text, re.IGNORECASE)
episode_match = re.search(r'episode\s+(\d+)', title_text, re.IGNORECASE)
if season_match:
season = int(season_match.group(1))
debug_print(f"[DEBUG] Extracted season from text: {season}")
if episode_match:
episode = int(episode_match.group(1))
debug_print(f"[DEBUG] Extracted episode from text: {episode}")
# Check for Episode number in brackets like "Show Name (5)"
elif re.search(r'\(\s*\d+\s*\)', title_text):
num_match = re.search(r'\(\s*(\d+)\s*\)', title_text)
if num_match:
# Assume this is an episode number if we don't have one yet
if episode is None:
episode = int(num_match.group(1))
debug_print(f"[DEBUG] Extracted episode from brackets: {episode}")
# Check for common patterns like "E05" without season
elif re.search(r'E\d+', title_text, re.IGNORECASE) and episode is None:
e_match = re.search(r'E(\d+)', title_text, re.IGNORECASE)
if e_match:
episode = int(e_match.group(1))
debug_print(f"[DEBUG] Extracted episode from E-format: {episode}")
if season is not None or episode is not None:
debug_print(f"[DEBUG] Final season/episode for '{title_text}': S{season}E{episode}")
# Get original air date if available
date_elem = programme.find('date')
air_date = date_elem.text if date_elem is not None else None
if channel_id not in self.programs:
self.programs[channel_id] = []
# Create formatted show title with season/episode if available
formatted_title = title_elem.text if title_elem is not None else "No Title"
self.programs[channel_id].append({
'start': start,
'stop': stop,
'title': formatted_title,
'desc': desc_elem.text if desc_elem is not None else "",
'season': season,
'episode': episode,
'episode_num': episode_num,
'air_date': air_date
})
debug_print(f"[DEBUG] Loaded EPG data for {len(self.programs)} channels")
# Pre-load current and next shows for each channel
self.update_current_shows()
# Show a sample of program data with season/episode info for debugging
for channel_id, programs in self.programs.items():
if programs and len(programs) > 0:
sample = programs[0]
if sample.get('season') is not None or sample.get('episode') is not None:
debug_print(f"[DEBUG] Sample program: Title={sample.get('title')}, Season={sample.get('season')}, Episode={sample.get('episode')}, Episode_num={sample.get('episode_num')}")
break
except Exception as e:
print(f"[WARNING] Failed to load EPG: {str(e)}")
self.last_update = datetime.now()
def update_current_shows(self):
"""Update current and next shows based on current time"""
now = datetime.now()
for channel in self.channels:
if channel['id'] in self.programs:
shows = self.programs[channel['id']]
channel['current_show'] = None
channel['next_show'] = None
for i, show in enumerate(shows):
if show['start'] <= now < show['stop']:
channel['current_show'] = show
if i+1 < len(shows):
channel['next_show'] = shows[i+1]
break
elif show['start'] > now:
channel['next_show'] = show
break
def add_error_channel(self, message):
# Clear existing channels before adding error channel
self.channels = []
self.channels.append({
'id': "error_channel",
'name': f"Error: {message}",
'logo': "",
'group': "Error",
'url': "",
'title': "Check your playlist URL",
'current_show': None,
'next_show': None
})
def parse_xmltv_time(self, time_str):
try:
return datetime.strptime(time_str[:14], "%Y%m%d%H%M%S")
except:
return datetime.now()
def generate_imdb_url(self, show):
"""Generate IMDb search URL for a show or specific episode"""
# Base title for search
title = show.get('title', '')
if not title:
return None
# Check if we have season and episode information
season = show.get('season')
episode = show.get('episode')
# Advanced search for TV episodes is more accurate for specific episodes
if season is not None and episode is not None:
# For episodes, use IMDb's advanced title search which gives better results
# Format properly for the advanced search with title, season and episode
show_title = urllib.parse.quote_plus(title)
# Use IMDb's advanced search for TV episodes
# This directly targets the episode search with better filtering
return f"https://www.imdb.com/search/title/?title={show_title}&title_type=tv_episode&season={season}&episode={episode}"
else:
# For shows without episode info, use regular search
encoded_query = urllib.parse.quote_plus(title)
return f"https://www.imdb.com/find?q={encoded_query}&s=tt&ttype=tv"
# Removed clickable link function since it doesn't work in Windows CMD
def setup_ui(self):
# Create channel list with current show info
self.channel_items = []
for channel in self.channels:
# Create custom button that knows its channel
btn = ChannelButton(channel)
# Connect the signal with the correct channel using a closure
def make_click_handler(c):
return lambda button: self.on_channel_select(c)
urwid.connect_signal(btn, 'click', make_click_handler(channel))
# Apply different colors based on channel status
if channel.get('url'):
channel_item = urwid.AttrMap(btn, 'channel', 'channel_focus')
else:
channel_item = urwid.AttrMap(btn, 'error', 'error')
self.channel_items.append(channel_item)
# Create list box that tracks focus changes
self.channel_list = FocusAwareListBox(
urwid.SimpleFocusListWalker(self.channel_items),
on_focus_change=self.on_focus_change
)
# Create program guide with scrollable content
self.program_walker = urwid.SimpleFocusListWalker([])
self.program_listbox = urwid.ListBox(self.program_walker)
# Add update time to footer
monitor_info = f" | Monitor: {MONITOR}" if MONITOR is not None else ""
self.footer_text = urwid.Text(f"Q: Quit | ↑↓: Navigate | Enter: Play | L: Reload | S: Settings | Last update: Loading...{monitor_info}", align='center')
self.footer = urwid.AttrMap(self.footer_text, 'footer')
program_frame = urwid.Frame(
urwid.LineBox(self.program_listbox, title="Program Details"),
footer=self.footer
)
# Create header with ASCII art
header_text = BANNER + "\nBibbitTV Terminal Player"
header = urwid.AttrMap(urwid.Text(header_text, align='center'), 'header')
# Create columns
columns = urwid.Columns([
('weight', 1, urwid.LineBox(self.channel_list, title="Channels")),
('weight', 2, program_frame)
], dividechars=1)
# Main layout with header
layout = urwid.Pile([
('pack', header),
('pack', urwid.Divider()),
columns
])
# Store the main layout
self.main_widget = urwid.LineBox(layout, title="")
# Overlay for centering
self.top = urwid.Overlay(
self.main_widget,
urwid.SolidFill(' '),
align='center',
width=('relative', 85),
valign='middle',
height=('relative', 85)
)
# Update footer with initial timestamp
self.update_footer()
def update_footer(self):
"""Update footer with last update time"""
time_str = self.last_update.strftime("%H:%M:%S")
monitor_info = f" | Monitor: {MONITOR}" if MONITOR is not None else ""
self.footer_text.set_text(f"Q: Quit | ↑↓: Navigate | Enter: Play | L: Reload | S: Settings | Last update: {time_str}{monitor_info}")
def show_settings_menu(self):
"""Show settings configuration menu"""
# Get current config
current_config = {
'm3u_url': M3U_URL,
'xmltv_url': XMLTV_URL,
'user_agent': USER_AGENT,
'update_interval': UPDATE_INTERVAL,
'monitor': MONITOR,
'mpv_options': MPV_BASE[1:] if MPV_BASE[0] == 'mpv' else MPV_BASE # Remove 'mpv' command
}
# Create settings menu
self.settings_menu = SettingsMenu(
current_config,
self.on_settings_save,
self.on_settings_cancel
)
# Create settings overlay properly
self.top = urwid.Overlay(
self.settings_menu.widget,
self.top,
align='center',
width=('relative', 80),
valign='middle',
height=('relative', 90)
)
# Update the MainLoop's widget
if self.loop:
self.loop.widget = self.top
def on_settings_save(self, new_config):
"""Handle settings save"""
try:
# Save to config.json
with open('config.json', 'w') as config_file:
json.dump(new_config, config_file, indent=4)
# Update global variables
global M3U_URL, XMLTV_URL, USER_AGENT, UPDATE_INTERVAL, MONITOR, MPV_BASE, DEBUG_MODE
M3U_URL = new_config['m3u_url']
XMLTV_URL = new_config['xmltv_url']
USER_AGENT = new_config['user_agent']
UPDATE_INTERVAL = new_config['update_interval']
MONITOR = new_config['monitor']
DEBUG_MODE = new_config['debug_mode']
# Update MPV base command directly from new_config
MPV_BASE = ["mpv"]
MPV_BASE.extend(new_config['mpv_options'])
# Close settings menu
self.on_settings_cancel()
# Reload data with new settings
self.reload_data()
except Exception as e:
# Show error in settings menu
if self.settings_menu:
self.settings_menu.show_error(f"Failed to save config: {str(e)}")
def on_settings_cancel(self):
"""Close settings menu"""
self.settings_menu = None
# Restore the original top widget structure
self.top = urwid.Overlay(
self.main_widget,
urwid.SolidFill(' '),
align='center',
width=('relative', 85),
valign='middle',
height=('relative', 85)
)
# Update the MainLoop's widget
if self.loop:
self.loop.widget = self.top
def start_update_thread(self):
"""Start background thread for periodic updates"""
self.update_running = True
self.update_thread = threading.Thread(target=self.update_worker, daemon=True)
self.update_thread.start()
def update_worker(self):
"""Background worker for periodic updates"""
while self.update_running:
time.sleep(UPDATE_INTERVAL)
if self.loop:
# Schedule update on the main thread
self.loop.set_alarm_in(0, self.refresh_schedule)
def refresh_schedule(self, loop=None, user_data=None):
"""Update schedule information without animation for better performance"""
try:
debug_print("[DEBUG] Refreshing schedule data...")
# Update current shows without animations
self.update_current_shows()
self.last_update = datetime.now()
# Update UI
self.refresh_ui()
self.update_footer()
# Add simple update notification
time_str = self.last_update.strftime("%H:%M:%S")
update_text = urwid.Text([("update", f"Schedule updated at {time_str}")])
empty_line = urwid.Text("")
self.program_walker[:] = [update_text, empty_line] + self.program_walker[:]
debug_print("[DEBUG] Schedule refreshed successfully")
except Exception as e:
print(f"[ERROR] Failed to refresh schedule: {str(e)}")
def refresh_ui(self):
"""Refresh UI elements with updated data"""
# Update channel buttons
for i, channel in enumerate(self.channels):
button = self.channel_items[i].base_widget
if channel.get('current_show'):
label = f"{channel['name']}\nNow: {channel['current_show']['title']}"
if channel.get('next_show'):
label += f"\nNext: {channel['next_show']['title']}"
else:
label = channel['name']
button.set_label(label)
# Update program info for current channel if focused
if self.current_channel:
self.on_channel_hover(self.current_channel)
def on_focus_change(self, focused_widget):
"""Update program info when focusing a channel"""
if focused_widget:
# Get the button inside the AttrMap
button = focused_widget.base_widget
if hasattr(button, 'channel'):
self.on_channel_hover(button.channel)
def on_channel_hover(self, channel):
"""Update program info without animation for better performance"""
self.current_channel = channel
program_info = self.get_program_info(channel)
# Direct update without animation for better performance
self.program_walker[:] = [urwid.Text(line) for line in program_info]
def on_channel_select(self, channel):
"""Play channel when selected (no animation for better performance)"""
if channel.get('url'):
# Play the channel directly without animation
self.play_channel(channel['url'])
def get_program_info(self, channel):
info = []
max_line_length = 60 # Define this at the top so it's available everywhere
info.append([("header", f"--Channel: {channel['name']}")])
if channel.get('group'):
info.append([("title", f"Group: {channel['group']}")])
# Debug program information
if channel.get('current_show'):
current_show = channel['current_show']
debug_print(f"[DEBUG] Program info - Title: {current_show.get('title')}")
debug_print(f"[DEBUG] Program info - Season: {current_show.get('season')}")
debug_print(f"[DEBUG] Program info - Episode: {current_show.get('episode')}")
debug_print(f"[DEBUG] Program info - Air date: {current_show.get('air_date')}")
if channel.get('current_show'):
now = datetime.now()
remaining = (channel['current_show']['stop'] - now).seconds // 60
start_time = channel['current_show']['start'].strftime("%H:%M")
end_time = channel['current_show']['stop'].strftime("%H:%M")
# Current show section with colorful formatting
info.append([]) # Empty line
info.append([("program_now", "--NOW PLAYING")])
# Display title with season and episode prominently if available
current_show = channel['current_show']
# Extract and display season/episode information
season = current_show.get('season')
episode = current_show.get('episode')
# If season/episode not available in metadata, try to extract from title
if season is None or episode is None:
# Try SxxExx format (S01E05)
se_match = re.search(r'S(\d+)E(\d+)', current_show['title'], re.IGNORECASE)
if se_match:
season = int(se_match.group(1))
episode = int(se_match.group(2))
debug_print(f"[DEBUG] Extracted S{season}E{episode} from title format")
# Try season x episode format (1x05)
elif 'x' in current_show['title'].lower():
x_match = re.search(r'(\d+)x(\d+)', current_show['title'], re.IGNORECASE)
if x_match:
season = int(x_match.group(1))
episode = int(x_match.group(2))
debug_print(f"[DEBUG] Extracted S{season}E{episode} from Nx format")
# Display title with or without season/episode info
if season is not None and episode is not None:
# Format title with season/episode info
formatted_title = f"Title: {current_show['title']} (S{season:02d}E{episode:02d})"
info.append([("title", formatted_title)])
# Also add a detailed version on a separate line
info.append([("program_desc", f"Season {season}, Episode {episode}")])
# Update the current_show object with the extracted info if it wasn't there already
current_show['season'] = season
current_show['episode'] = episode
else:
info.append([("title", f"Title: {current_show['title']}")])
# Add air date if available
if current_show.get('air_date'):
info.append([("time", f"Original Air Date: {current_show['air_date']}")])
info.append([
("time", f"Time: {start_time} - {end_time} "),
("", f"({remaining} minutes remaining)")
])
# Add IMDb link if we can generate one
imdb_url = self.generate_imdb_url(current_show)
# Skip IMDb URL display as it doesn't work well in Windows CMD
# if imdb_url:
# info.append([("update", f"IMDb: {imdb_url}")])
if channel['current_show']['desc']:
info.append([]) # Empty line
info.append([("program_desc", "--Description:")])
# Split long description into multiple lines
desc = channel['current_show']['desc']
for i in range(0, len(desc), max_line_length):
info.append([('program_desc', desc[i:i+max_line_length])])
# Next show section
if channel.get('next_show'):
next_start = channel['next_show']['start'].strftime("%H:%M")
next_end = channel['next_show']['stop'].strftime("%H:%M")
info.append([]) # Empty line
info.append([("divider", "" * 50)])
info.append([]) # Empty line
info.append([("program_next", "--UP NEXT")])
# Display title with season and episode information
next_show = channel['next_show']
# Extract season/episode information for next show
season = next_show.get('season')
episode = next_show.get('episode')
# If season/episode not available in metadata, try to extract from title
if season is None or episode is None:
# Try SxxExx format (S01E05)
se_match = re.search(r'S(\d+)E(\d+)', next_show['title'], re.IGNORECASE)
if se_match:
season = int(se_match.group(1))
episode = int(se_match.group(2))
debug_print(f"[DEBUG] Next show: Extracted S{season}E{episode} from title format")
# Try season x episode format (1x05)
elif 'x' in next_show['title'].lower():
x_match = re.search(r'(\d+)x(\d+)', next_show['title'], re.IGNORECASE)
if x_match:
season = int(x_match.group(1))
episode = int(x_match.group(2))
debug_print(f"[DEBUG] Next show: Extracted S{season}E{episode} from Nx format")
# Display title with or without season/episode info
if season is not None and episode is not None:
# Format title with season/episode info
formatted_title = f"Title: {next_show['title']} (S{season:02d}E{episode:02d})"
info.append([("title", formatted_title)])
# Also add a detailed version on a separate line
info.append([("program_desc", f"Season {season}, Episode {episode}")])
# Update the next_show object with the extracted info if it wasn't there already
next_show['season'] = season
next_show['episode'] = episode
else:
info.append([("title", f"Title: {next_show['title']}")])
# Add air date if available for next show
if next_show.get('air_date'):
info.append([("time", f"Original Air Date: {next_show['air_date']}")])
info.append([("time", f"Time: {next_start} - {next_end}")])
# Add IMDb link for next show if we can generate one
imdb_url = self.generate_imdb_url(next_show)
# Skip IMDb URL display as it doesn't work well in Windows CMD
if channel['next_show']['desc']:
info.append([]) # Empty line
info.append([("program_desc", "--Description:")])
desc = channel['next_show']['desc']
# Limit description to 5 lines to prevent overflow
max_lines = 5
line_count = 0
for i in range(0, len(desc), max_line_length):
if line_count >= max_lines:
info.append([('program_desc', '... (description truncated)')])
break
info.append([('program_desc', desc[i:i+max_line_length])])
line_count += 1
else:
# Handle case where there's no current show but there might be a next show
if channel.get('next_show'):
next_start = channel['next_show']['start'].strftime("%H:%M")
next_end = channel['next_show']['stop'].strftime("%H:%M")
info.append([]) # Empty line
info.append([("program_next", "--UP NEXT")])
# Display title with season and episode information
next_show = channel['next_show']
# Extract season/episode information for next show
season = next_show.get('season')
episode = next_show.get('episode')
# If season/episode not available in metadata, try to extract from title
if season is None or episode is None:
# Try SxxExx format (S01E05)
se_match = re.search(r'S(\d+)E(\d+)', next_show['title'], re.IGNORECASE)
if se_match:
season = int(se_match.group(1))
episode = int(se_match.group(2))
debug_print(f"[DEBUG] Next show: Extracted S{season}E{episode} from title format")
# Try season x episode format (1x05)
elif 'x' in next_show['title'].lower():
x_match = re.search(r'(\d+)x(\d+)', next_show['title'], re.IGNORECASE)
if x_match:
season = int(x_match.group(1))
episode = int(x_match.group(2))
debug_print(f"[DEBUG] Next show: Extracted S{season}E{episode} from Nx format")
# Display title with or without season/episode info
if season is not None and episode is not None:
# Format title with season/episode info
formatted_title = f"Title: {next_show['title']} (S{season:02d}E{episode:02d})"
info.append([("title", formatted_title)])
# Also add a detailed version on a separate line
info.append([("program_desc", f"Season {season}, Episode {episode}")])
# Update the next_show object with the extracted info if it wasn't there already
next_show['season'] = season
next_show['episode'] = episode
else:
info.append([("title", f"Title: {next_show['title']}")])
# Add air date if available for next show
if next_show.get('air_date'):
info.append([("time", f"Original Air Date: {next_show['air_date']}")])
info.append([("time", f"Time: {next_start} - {next_end}")])
# Add IMDb link for next show if we can generate one
imdb_url = self.generate_imdb_url(next_show)
# Skip IMDb URL display as it doesn't work well in Windows CMD
if channel['next_show']['desc']:
info.append([]) # Empty line
info.append([("program_desc", "--Description:")])
desc = channel['next_show']['desc']
# Limit description to 5 lines to prevent overflow
max_lines = 5
line_count = 0
for i in range(0, len(desc), max_line_length):
if line_count >= max_lines:
info.append([('program_desc', '... (description truncated)')])
break
info.append([('program_desc', desc[i:i+max_line_length])])
line_count += 1
else:
info.append([("", "No current program information available")])
return info
def play_channel(self, url):
if self.mpv_process:
self.mpv_process.terminate()
self.mpv_process.wait()
try:
self.mpv_process = subprocess.Popen(MPV_BASE + [url])
except Exception as e:
# Create proper widget for error message
self.program_walker[:] = [urwid.Text([("error", f"Failed to play stream: {str(e)}")])]
def reload_data(self):
"""Reload all data from sources without animation for better performance"""
# Save current focus position
current_focus_pos = self.channel_list.focus_position if self.channel_list.body else None
# Reload data
self.channels = []
self.programs = {}
self.load_data()
# Update channel items in place
self.update_channel_items()
# Restore focus position
if current_focus_pos is not None and current_focus_pos < len(self.channel_items):
self.channel_list.set_focus(current_focus_pos)
self.on_channel_hover(self.channels[current_focus_pos])
# Update program details
if self.current_channel:
self.on_channel_hover(self.current_channel)
self.update_footer()
def update_channel_items(self):
"""Update existing channel items with new data"""
# Clear existing items
self.channel_list.body.clear()
# Recreate channel items with updated data
self.channel_items = []
for channel in self.channels:
# Create custom button that knows its channel
btn = ChannelButton(channel)
# Connect the signal with the correct channel using a closure
def make_click_handler(c):
return lambda button: self.on_channel_select(c)
urwid.connect_signal(btn, 'click', make_click_handler(channel))
# Apply different colors based on channel status
if channel.get('url'):
channel_item = urwid.AttrMap(btn, 'channel', 'channel_focus')
else:
channel_item = urwid.AttrMap(btn, 'error', 'error')
self.channel_items.append(channel_item)
self.channel_list.body.append(channel_item)
def run(self):
# Enable mouse support and cursor visibility
screen = urwid.raw_display.Screen()
screen.set_terminal_properties(colors=256)
screen.set_mouse_tracking()
self.loop = urwid.MainLoop(
self.top,
palette=PALETTE,
screen=screen,
unhandled_input=self.handle_input,
handle_mouse=True
)
# Start background update thread
self.start_update_thread()
try:
# Show splash screen animation before running the main loop
self.display_splash_screen()
# Start the main event loop
self.loop.run()
finally:
# Clean up when exiting
self.update_running = False
if self.update_thread:
self.update_thread.join(timeout=1.0)
def handle_input(self, key):
# Only handle main UI input if settings menu is not open
if self.settings_menu:
if key == 'esc':
self.on_settings_cancel()
return
if key in ('q', 'Q'):
self.quit_player()
elif key in ('l', 'L'):
self.reload_data()
elif key in ('s', 'S'):
self.show_settings_menu()
elif key == 'enter':
if self.current_channel and self.current_channel.get('url'):
self.play_channel(self.current_channel['url'])
def quit_player(self):
if self.mpv_process:
self.mpv_process.terminate()
self.mpv_process.wait()
self.update_running = False
raise urwid.ExitMainLoop()
def __del__(self):
self.update_running = False
if hasattr(self, 'mpv_process') and self.mpv_process:
self.mpv_process.terminate()
def check_mpv_installed():
try:
if os.name == 'nt':
subprocess.run(["where.exe", "mpv"], check=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
else:
subprocess.run(["which", "mpv"], check=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return True
except subprocess.CalledProcessError:
return False
def main():
if not check_mpv_installed():
print("Error: mpv player is required but not found. Please install mpv first.")
print("You can download it from: https://mpv.io/installation/")
sys.exit(1)
player = IPTVPlayer()
player.run()
if __name__ == "__main__":
main()