You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
573 lines
23 KiB
573 lines
23 KiB
#!/usr/bin/env python3
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import xml.etree.ElementTree as ET
|
|
from datetime import datetime, timedelta
|
|
from urllib.request import urlopen, Request
|
|
import urwid
|
|
from urllib.error import URLError
|
|
import threading
|
|
import time
|
|
|
|
# Configuration
|
|
M3U_URL = "http://10.0.0.17:8409/iptv/channels.m3u"
|
|
XMLTV_URL = "http://10.0.0.17:8409/iptv/xmltv.xml"
|
|
MPV_COMMAND = ["mpv", "--really-quiet", "--no-terminal", "--force-window=immediate"]
|
|
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
|
|
UPDATE_INTERVAL = 900 # 15 minutes in seconds
|
|
|
|
# Color Palette
|
|
PALETTE = [
|
|
('header', 'white', 'dark blue'),
|
|
('footer', 'white', 'dark blue'),
|
|
('channel', 'black', 'light gray'),
|
|
('channel_focus', 'white', 'dark blue'),
|
|
('program_now', 'white', 'dark green'),
|
|
('program_next', 'black', 'light gray'),
|
|
('error', 'white', 'dark red'),
|
|
('divider', 'light gray', ''),
|
|
('program_desc', 'light cyan', ''), # New color for descriptions
|
|
('time', 'yellow', ''), # New color for time displays
|
|
('title', 'bold', ''), # New color for titles
|
|
('update', 'light green', ''), # Color for update notifications
|
|
]
|
|
|
|
# ASCII Art
|
|
BANNER = r"""
|
|
__________._____. ___. .__ __ _______________ ____
|
|
\______ \__\_ |__\_ |__ |__|/ |_ \__ ___/\ \ / /
|
|
| | _/ || __ \| __ \| \ __\ | | \ Y /
|
|
| | \ || \_\ \ \_\ \ || | | | \ /
|
|
|______ /__||___ /___ /__||__| |____| \___/
|
|
\/ \/ \/
|
|
"""
|
|
|
|
class ChannelButton(urwid.Button):
|
|
def __init__(self, channel, *args, **kwargs):
|
|
self.channel = channel
|
|
if channel.get('current_show'):
|
|
label = f"{channel['name']}\nNow: {channel['current_show']['title']}"
|
|
if channel.get('next_show'):
|
|
label += f"\nNext: {channel['next_show']['title']}"
|
|
else:
|
|
label = channel['name']
|
|
super().__init__(label, *args, **kwargs)
|
|
|
|
class FocusAwareListBox(urwid.ListBox):
|
|
"""ListBox that tracks focus changes"""
|
|
def __init__(self, body, on_focus_change=None):
|
|
super().__init__(body)
|
|
self.on_focus_change = on_focus_change
|
|
self._last_focus = None
|
|
|
|
def change_focus(self, size, position, *args, **kwargs):
|
|
super().change_focus(size, position, *args, **kwargs)
|
|
current_focus = self.focus
|
|
if current_focus != self._last_focus and self.on_focus_change:
|
|
self.on_focus_change(current_focus)
|
|
self._last_focus = current_focus
|
|
|
|
class IPTVPlayer:
|
|
def __init__(self):
|
|
self.channels = []
|
|
self.programs = {}
|
|
self.current_channel = None
|
|
self.mpv_process = None
|
|
self.loop = None
|
|
self.last_update = datetime.now()
|
|
self.update_thread = None
|
|
self.update_running = True
|
|
self.load_data()
|
|
self.setup_ui()
|
|
|
|
def load_data(self):
|
|
print("\n[DEBUG] Loading IPTV data...")
|
|
|
|
# Load M3U playlist
|
|
try:
|
|
req = Request(M3U_URL, headers={'User-Agent': USER_AGENT})
|
|
with urlopen(req, timeout=10) as response:
|
|
m3u_data = response.read().decode('utf-8')
|
|
print(f"[DEBUG] Received M3U data (length: {len(m3u_data)} bytes)")
|
|
|
|
# Parse M3U
|
|
lines = m3u_data.split('\n')
|
|
current_channel = None
|
|
|
|
for line in lines:
|
|
line = line.strip()
|
|
if line.startswith('#EXTINF:'):
|
|
# Parse channel info
|
|
tvg_id = re.search(r'tvg-id="([^"]*)"', line)
|
|
tvg_name = re.search(r'tvg-name="([^"]*)"', line)
|
|
tvg_logo = re.search(r'tvg-logo="([^"]*)"', line)
|
|
group_title = re.search(r'group-title="([^"]*)"', line)
|
|
channel_title = line.split(',')[-1].strip()
|
|
|
|
current_channel = {
|
|
'id': tvg_id.group(1) if tvg_id else "",
|
|
'name': tvg_name.group(1) if tvg_name else channel_title,
|
|
'logo': tvg_logo.group(1) if tvg_logo else "",
|
|
'group': group_title.group(1) if group_title else "Other",
|
|
'title': channel_title,
|
|
'url': None,
|
|
'current_show': None,
|
|
'next_show': None
|
|
}
|
|
elif line.startswith('http://'):
|
|
if current_channel:
|
|
current_channel['url'] = line
|
|
self.channels.append(current_channel)
|
|
current_channel = None
|
|
|
|
if not self.channels:
|
|
print("[WARNING] No channels found in M3U file!")
|
|
self.add_error_channel("No channels found in playlist")
|
|
else:
|
|
print(f"[DEBUG] Successfully loaded {len(self.channels)} channels")
|
|
|
|
except Exception as e:
|
|
print(f"[ERROR] Failed to load M3U: {str(e)}")
|
|
self.add_error_channel(f"Error loading playlist: {str(e)}")
|
|
|
|
# Load XMLTV guide
|
|
if self.channels and not self.channels[0]['name'].startswith("Error:"):
|
|
try:
|
|
req = Request(XMLTV_URL, headers={'User-Agent': USER_AGENT})
|
|
with urlopen(req, timeout=10) as response:
|
|
xml_data = response.read().decode('utf-8')
|
|
print(f"[DEBUG] Received XMLTV data (length: {len(xml_data)} bytes)")
|
|
|
|
root = ET.fromstring(xml_data)
|
|
for programme in root.findall('programme'):
|
|
channel_id = programme.get('channel')
|
|
start = self.parse_xmltv_time(programme.get('start'))
|
|
stop = self.parse_xmltv_time(programme.get('stop'))
|
|
title_elem = programme.find('title')
|
|
desc_elem = programme.find('desc')
|
|
|
|
if channel_id not in self.programs:
|
|
self.programs[channel_id] = []
|
|
|
|
self.programs[channel_id].append({
|
|
'start': start,
|
|
'stop': stop,
|
|
'title': title_elem.text if title_elem is not None else "No Title",
|
|
'desc': desc_elem.text if desc_elem is not None else ""
|
|
})
|
|
print(f"[DEBUG] Loaded EPG data for {len(self.programs)} channels")
|
|
|
|
# Pre-load current and next shows for each channel
|
|
self.update_current_shows()
|
|
|
|
except Exception as e:
|
|
print(f"[WARNING] Failed to load EPG: {str(e)}")
|
|
|
|
self.last_update = datetime.now()
|
|
|
|
def update_current_shows(self):
|
|
"""Update current and next shows based on current time"""
|
|
now = datetime.now()
|
|
for channel in self.channels:
|
|
if channel['id'] in self.programs:
|
|
shows = self.programs[channel['id']]
|
|
channel['current_show'] = None
|
|
channel['next_show'] = None
|
|
for i, show in enumerate(shows):
|
|
if show['start'] <= now < show['stop']:
|
|
channel['current_show'] = show
|
|
if i+1 < len(shows):
|
|
channel['next_show'] = shows[i+1]
|
|
break
|
|
elif show['start'] > now:
|
|
channel['next_show'] = show
|
|
break
|
|
|
|
def add_error_channel(self, message):
|
|
self.channels.append({
|
|
'id': "error_channel",
|
|
'name': f"Error: {message}",
|
|
'logo': "",
|
|
'group': "Error",
|
|
'url': "",
|
|
'title': "Check your playlist URL",
|
|
'current_show': None,
|
|
'next_show': None
|
|
})
|
|
|
|
def parse_xmltv_time(self, time_str):
|
|
try:
|
|
return datetime.strptime(time_str[:14], "%Y%m%d%H%M%S")
|
|
except:
|
|
return datetime.now()
|
|
|
|
def setup_ui(self):
|
|
# Create channel list with current show info
|
|
self.channel_items = []
|
|
for channel in self.channels:
|
|
# Create custom button that knows its channel
|
|
btn = ChannelButton(channel)
|
|
|
|
# Connect the signal with the correct channel using a closure
|
|
def make_click_handler(c):
|
|
return lambda button: self.on_channel_select(c)
|
|
|
|
urwid.connect_signal(btn, 'click', make_click_handler(channel))
|
|
|
|
# Apply different colors based on channel status
|
|
if channel.get('url'):
|
|
channel_item = urwid.AttrMap(btn, 'channel', 'channel_focus')
|
|
else:
|
|
channel_item = urwid.AttrMap(btn, 'error', 'error')
|
|
|
|
self.channel_items.append(channel_item)
|
|
|
|
# Create list box that tracks focus changes
|
|
self.channel_list = FocusAwareListBox(
|
|
urwid.SimpleFocusListWalker(self.channel_items),
|
|
on_focus_change=self.on_focus_change
|
|
)
|
|
|
|
# Create program guide with scrollable content
|
|
self.program_walker = urwid.SimpleFocusListWalker([])
|
|
self.program_listbox = urwid.ListBox(self.program_walker)
|
|
|
|
# Add update time to footer
|
|
self.footer_text = urwid.Text("Q: Quit | ↑↓: Navigate | Enter: Play Channel | L: Reload | Last update: Loading...", align='center')
|
|
self.footer = urwid.AttrMap(self.footer_text, 'footer')
|
|
|
|
program_frame = urwid.Frame(
|
|
urwid.LineBox(self.program_listbox, title="Program Details"),
|
|
footer=self.footer
|
|
)
|
|
|
|
# Create header with ASCII art
|
|
header_text = BANNER + "\nBibbitTV Terminal Player"
|
|
header = urwid.AttrMap(urwid.Text(header_text, align='center'), 'header')
|
|
|
|
# Create columns
|
|
columns = urwid.Columns([
|
|
('weight', 1, urwid.LineBox(self.channel_list, title="Channels")),
|
|
('weight', 2, program_frame)
|
|
], dividechars=1)
|
|
|
|
# Main layout with header
|
|
layout = urwid.Pile([
|
|
('pack', header),
|
|
('pack', urwid.Divider()),
|
|
columns
|
|
])
|
|
|
|
# Overlay for centering
|
|
self.top = urwid.Overlay(
|
|
urwid.LineBox(layout, title=""),
|
|
urwid.SolidFill(' '),
|
|
align='center',
|
|
width=('relative', 85),
|
|
valign='middle',
|
|
height=('relative', 85)
|
|
)
|
|
|
|
# Update footer with initial timestamp
|
|
self.update_footer()
|
|
|
|
def update_footer(self):
|
|
"""Update footer with last update time"""
|
|
time_str = self.last_update.strftime("%H:%M:%S")
|
|
self.footer_text.set_text(f"Q: Quit | ↑↓: Navigate | Enter: Play Channel | L: Reload | Last update: {time_str}")
|
|
|
|
def start_update_thread(self):
|
|
"""Start background thread for periodic updates"""
|
|
self.update_running = True
|
|
self.update_thread = threading.Thread(target=self.update_worker, daemon=True)
|
|
self.update_thread.start()
|
|
|
|
def update_worker(self):
|
|
"""Background worker for periodic updates"""
|
|
while self.update_running:
|
|
time.sleep(UPDATE_INTERVAL)
|
|
if self.loop:
|
|
# Schedule update on the main thread
|
|
self.loop.set_alarm_in(0, self.refresh_schedule)
|
|
|
|
def refresh_schedule(self, loop=None, user_data=None):
|
|
"""Update schedule information"""
|
|
try:
|
|
print("[DEBUG] Refreshing schedule data...")
|
|
|
|
# Just update current shows without reloading everything
|
|
self.update_current_shows()
|
|
self.last_update = datetime.now()
|
|
|
|
# Update UI
|
|
self.refresh_ui()
|
|
self.update_footer()
|
|
|
|
# Add notification to program view
|
|
time_str = self.last_update.strftime("%H:%M:%S")
|
|
notification = [
|
|
urwid.Text([("update", f"Schedule updated at {time_str}")]),
|
|
urwid.Text("") # Empty line
|
|
]
|
|
# Prepend notification to existing content
|
|
self.program_walker[:] = notification + self.program_walker[:]
|
|
|
|
print("[DEBUG] Schedule refreshed successfully")
|
|
except Exception as e:
|
|
print(f"[ERROR] Failed to refresh schedule: {str(e)}")
|
|
|
|
def refresh_ui(self):
|
|
"""Refresh UI elements with updated data"""
|
|
# Update channel buttons
|
|
for i, channel in enumerate(self.channels):
|
|
button = self.channel_items[i].base_widget
|
|
if channel.get('current_show'):
|
|
label = f"{channel['name']}\nNow: {channel['current_show']['title']}"
|
|
if channel.get('next_show'):
|
|
label += f"\nNext: {channel['next_show']['title']}"
|
|
else:
|
|
label = channel['name']
|
|
button.set_label(label)
|
|
|
|
# Update program info for current channel if focused
|
|
if self.current_channel:
|
|
self.on_channel_hover(self.current_channel)
|
|
|
|
def on_focus_change(self, focused_widget):
|
|
"""Update program info when focusing a channel"""
|
|
if focused_widget:
|
|
# Get the button inside the AttrMap
|
|
button = focused_widget.base_widget
|
|
if hasattr(button, 'channel'):
|
|
self.on_channel_hover(button.channel)
|
|
|
|
def on_channel_hover(self, channel):
|
|
"""Update program info"""
|
|
self.current_channel = channel
|
|
program_info = self.get_program_info(channel)
|
|
# Convert each line to a Text widget
|
|
self.program_walker[:] = [urwid.Text(line) for line in program_info]
|
|
|
|
def on_channel_select(self, channel):
|
|
"""Play channel when selected"""
|
|
if channel.get('url'):
|
|
self.play_channel(channel['url'])
|
|
|
|
def get_program_info(self, channel):
|
|
info = []
|
|
max_line_length = 60 # Define this at the top so it's available everywhere
|
|
|
|
info.append([("header", f"📺 Channel: {channel['name']}")])
|
|
|
|
if channel.get('group'):
|
|
info.append([("title", f"Group: {channel['group']}")])
|
|
|
|
if channel.get('current_show'):
|
|
now = datetime.now()
|
|
remaining = (channel['current_show']['stop'] - now).seconds // 60
|
|
start_time = channel['current_show']['start'].strftime("%H:%M")
|
|
end_time = channel['current_show']['stop'].strftime("%H:%M")
|
|
|
|
# Current show section with colorful formatting
|
|
info.append([]) # Empty line
|
|
info.append([("program_now", "⏺ NOW PLAYING")])
|
|
info.append([("title", f"Title: {channel['current_show']['title']}")])
|
|
info.append([
|
|
("time", f"Time: {start_time} - {end_time} "),
|
|
("", f"({remaining} minutes remaining)")
|
|
])
|
|
|
|
if channel['current_show']['desc']:
|
|
info.append([]) # Empty line
|
|
info.append([("program_desc", "📝 Description:")])
|
|
# Split long description into multiple lines
|
|
desc = channel['current_show']['desc']
|
|
for i in range(0, len(desc), max_line_length):
|
|
info.append([('program_desc', desc[i:i+max_line_length])])
|
|
|
|
# Next show section
|
|
if channel.get('next_show'):
|
|
next_start = channel['next_show']['start'].strftime("%H:%M")
|
|
next_end = channel['next_show']['stop'].strftime("%H:%M")
|
|
|
|
info.append([]) # Empty line
|
|
info.append([("divider", "─" * 50)])
|
|
info.append([]) # Empty line
|
|
info.append([("program_next", "⏭ UP NEXT")])
|
|
info.append([("title", f"Title: {channel['next_show']['title']}")])
|
|
info.append([("time", f"Time: {next_start} - {next_end}")])
|
|
|
|
if channel['next_show']['desc']:
|
|
info.append([]) # Empty line
|
|
info.append([("program_desc", "📝 Description:")])
|
|
desc = channel['next_show']['desc']
|
|
# Limit description to 5 lines to prevent overflow
|
|
max_lines = 5
|
|
line_count = 0
|
|
for i in range(0, len(desc), max_line_length):
|
|
if line_count >= max_lines:
|
|
info.append([('program_desc', '... (description truncated)')])
|
|
break
|
|
info.append([('program_desc', desc[i:i+max_line_length])])
|
|
line_count += 1
|
|
else:
|
|
# Handle case where there's no current show but there might be a next show
|
|
if channel.get('next_show'):
|
|
next_start = channel['next_show']['start'].strftime("%H:%M")
|
|
next_end = channel['next_show']['stop'].strftime("%H:%M")
|
|
|
|
info.append([]) # Empty line
|
|
info.append([("program_next", "⏭ UP NEXT")])
|
|
info.append([("title", f"Title: {channel['next_show']['title']}")])
|
|
info.append([("time", f"Time: {next_start} - {next_end}")])
|
|
|
|
if channel['next_show']['desc']:
|
|
info.append([]) # Empty line
|
|
info.append([("program_desc", "📝 Description:")])
|
|
desc = channel['next_show']['desc']
|
|
# Limit description to 5 lines to prevent overflow
|
|
max_lines = 5
|
|
line_count = 0
|
|
for i in range(0, len(desc), max_line_length):
|
|
if line_count >= max_lines:
|
|
info.append([('program_desc', '... (description truncated)')])
|
|
break
|
|
info.append([('program_desc', desc[i:i+max_line_length])])
|
|
line_count += 1
|
|
else:
|
|
info.append([("", "No current program information available")])
|
|
|
|
return info
|
|
|
|
def play_channel(self, url):
|
|
if self.mpv_process:
|
|
self.mpv_process.terminate()
|
|
self.mpv_process.wait()
|
|
|
|
try:
|
|
self.mpv_process = subprocess.Popen(MPV_COMMAND + [url])
|
|
except Exception as e:
|
|
# Create proper widget for error message
|
|
self.program_walker[:] = [urwid.Text([("error", f"Failed to play stream: {str(e)}")])]
|
|
|
|
def reload_data(self):
|
|
"""Reload all data from sources without rebuilding UI"""
|
|
# Save current focus position
|
|
current_focus_pos = self.channel_list.focus_position if self.channel_list.body else None
|
|
|
|
# Reload data
|
|
self.channels = []
|
|
self.programs = {}
|
|
self.load_data()
|
|
|
|
# Update channel items in place
|
|
self.update_channel_items()
|
|
|
|
# Restore focus position
|
|
if current_focus_pos is not None and current_focus_pos < len(self.channel_items):
|
|
self.channel_list.set_focus(current_focus_pos)
|
|
self.on_channel_hover(self.channels[current_focus_pos])
|
|
|
|
# Update program details
|
|
if self.current_channel:
|
|
self.on_channel_hover(self.current_channel)
|
|
|
|
self.update_footer()
|
|
|
|
def update_channel_items(self):
|
|
"""Update existing channel items with new data"""
|
|
# Clear existing items
|
|
self.channel_list.body.clear()
|
|
|
|
# Recreate channel items with updated data
|
|
self.channel_items = []
|
|
for channel in self.channels:
|
|
# Create custom button that knows its channel
|
|
btn = ChannelButton(channel)
|
|
|
|
# Connect the signal with the correct channel using a closure
|
|
def make_click_handler(c):
|
|
return lambda button: self.on_channel_select(c)
|
|
|
|
urwid.connect_signal(btn, 'click', make_click_handler(channel))
|
|
|
|
# Apply different colors based on channel status
|
|
if channel.get('url'):
|
|
channel_item = urwid.AttrMap(btn, 'channel', 'channel_focus')
|
|
else:
|
|
channel_item = urwid.AttrMap(btn, 'error', 'error')
|
|
|
|
self.channel_items.append(channel_item)
|
|
self.channel_list.body.append(channel_item)
|
|
|
|
def run(self):
|
|
# Enable mouse support and cursor visibility
|
|
screen = urwid.raw_display.Screen()
|
|
screen.set_terminal_properties(colors=256)
|
|
screen.set_mouse_tracking()
|
|
|
|
self.loop = urwid.MainLoop(
|
|
self.top,
|
|
palette=PALETTE,
|
|
screen=screen,
|
|
unhandled_input=self.handle_input,
|
|
handle_mouse=True
|
|
)
|
|
|
|
# Start background update thread
|
|
self.start_update_thread()
|
|
|
|
try:
|
|
self.loop.run()
|
|
finally:
|
|
# Clean up when exiting
|
|
self.update_running = False
|
|
if self.update_thread:
|
|
self.update_thread.join(timeout=1.0)
|
|
|
|
def handle_input(self, key):
|
|
if key in ('q', 'Q'):
|
|
self.quit_player()
|
|
elif key in ('l', 'L'):
|
|
self.reload_data()
|
|
elif key == 'enter':
|
|
if self.current_channel and self.current_channel.get('url'):
|
|
self.play_channel(self.current_channel['url'])
|
|
|
|
def quit_player(self):
|
|
if self.mpv_process:
|
|
self.mpv_process.terminate()
|
|
self.mpv_process.wait()
|
|
self.update_running = False
|
|
raise urwid.ExitMainLoop()
|
|
|
|
def __del__(self):
|
|
self.update_running = False
|
|
if hasattr(self, 'mpv_process') and self.mpv_process:
|
|
self.mpv_process.terminate()
|
|
|
|
def check_mpv_installed():
|
|
try:
|
|
if os.name == 'nt':
|
|
subprocess.run(["where.exe", "mpv"], check=True,
|
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
else:
|
|
subprocess.run(["which", "mpv"], check=True,
|
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
return True
|
|
except subprocess.CalledProcessError:
|
|
return False
|
|
|
|
def main():
|
|
if not check_mpv_installed():
|
|
print("Error: mpv player is required but not found. Please install mpv first.")
|
|
print("You can download it from: https://mpv.io/installation/")
|
|
sys.exit(1)
|
|
|
|
player = IPTVPlayer()
|
|
player.run()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|