iptv player
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

602 lines
24 KiB

#!/usr/bin/env python3
import os
import re
import subprocess
import sys
import json
import xml.etree.ElementTree as ET
from datetime import datetime, timedelta
from urllib.request import urlopen, Request
import urwid
from urllib.error import URLError
import threading
import time
# Load configuration from JSON file
try:
with open('config.json') as config_file:
config = json.load(config_file)
M3U_URL = config.get('m3u_url', "http://10.0.0.17:8409/iptv/channels.m3u")
XMLTV_URL = config.get('xmltv_url', "http://10.0.0.17:8409/iptv/xmltv.xml")
USER_AGENT = config.get('user_agent', "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36")
UPDATE_INTERVAL = config.get('update_interval', 900)
MONITOR = config.get('monitor', None)
# Base MPV command with monitor options if specified
MPV_BASE = ["mpv"]
if MONITOR is not None:
MPV_BASE.extend(["--fs", f"--fs-screen={MONITOR}"])
MPV_BASE.extend(config.get('mpv_options', [
"--really-quiet",
"--no-terminal",
"--force-window=immediate"
]))
except FileNotFoundError:
print("Configuration file not found, using default settings")
M3U_URL = "http://10.0.0.17:8409/iptv/channels.m3u"
XMLTV_URL = "http://10.0.0.17:8409/iptv/xmltv.xml"
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
UPDATE_INTERVAL = 900
MONITOR = None
MPV_BASE = [
"mpv",
"--really-quiet",
"--no-terminal",
"--force-window=immediate"
]
# Color Palette remains the same
PALETTE = [
('header', 'white', 'dark blue'),
('footer', 'white', 'dark blue'),
('channel', 'black', 'light cyan'),
('channel_focus', 'white', 'dark blue'),
('program_now', 'white', 'dark green'),
('program_next', 'black', 'light gray'),
('error', 'white', 'dark red'),
('divider', 'dark green', ''),
('program_desc', 'light cyan', ''),
('time', 'yellow', ''),
('title', 'bold', ''),
('update', 'light blue', ''),
]
# ASCII Art
BANNER = r"""
__________._____. ___. .__ __ _______________ ____
\______ \__\_ |__\_ |__ |__|/ |_ \__ ___/\ \ / /
| | _/ || __ \| __ \| \ __\ | | \ Y /
| | \ || \_\ \ \_\ \ || | | | \ /
|______ /__||___ /___ /__||__| |____| \___/
\/ \/ \/
"""
class ChannelButton(urwid.Button):
def __init__(self, channel, *args, **kwargs):
self.channel = channel
if channel.get('current_show'):
label = f"{channel['name']}\nNow: {channel['current_show']['title']}"
if channel.get('next_show'):
label += f"\nNext: {channel['next_show']['title']}"
else:
label = channel['name']
super().__init__(label, *args, **kwargs)
class FocusAwareListBox(urwid.ListBox):
"""ListBox that tracks focus changes"""
def __init__(self, body, on_focus_change=None):
super().__init__(body)
self.on_focus_change = on_focus_change
self._last_focus = None
def change_focus(self, size, position, *args, **kwargs):
super().change_focus(size, position, *args, **kwargs)
current_focus = self.focus
if current_focus != self._last_focus and self.on_focus_change:
self.on_focus_change(current_focus)
self._last_focus = current_focus
class IPTVPlayer:
def __init__(self):
self.channels = []
self.programs = {}
self.current_channel = None
self.mpv_process = None
self.loop = None
self.last_update = datetime.now()
self.update_thread = None
self.update_running = True
self.load_data()
self.setup_ui()
def load_data(self):
print("\n[DEBUG] Loading IPTV data...")
# Load M3U playlist
try:
req = Request(M3U_URL, headers={'User-Agent': USER_AGENT})
with urlopen(req, timeout=10) as response:
m3u_data = response.read().decode('utf-8')
print(f"[DEBUG] Received M3U data (length: {len(m3u_data)} bytes)")
# Parse M3U
lines = m3u_data.split('\n')
current_channel = None
for line in lines:
line = line.strip()
if line.startswith('#EXTINF:'):
# Parse channel info
tvg_id = re.search(r'tvg-id="([^"]*)"', line)
tvg_name = re.search(r'tvg-name="([^"]*)"', line)
tvg_logo = re.search(r'tvg-logo="([^"]*)"', line)
group_title = re.search(r'group-title="([^"]*)"', line)
channel_title = line.split(',')[-1].strip()
current_channel = {
'id': tvg_id.group(1) if tvg_id else "",
'name': tvg_name.group(1) if tvg_name else channel_title,
'logo': tvg_logo.group(1) if tvg_logo else "",
'group': group_title.group(1) if group_title else "Other",
'title': channel_title,
'url': None,
'current_show': None,
'next_show': None
}
elif line.startswith('http://'):
if current_channel:
current_channel['url'] = line
self.channels.append(current_channel)
current_channel = None
if not self.channels:
print("[WARNING] No channels found in M3U file!")
self.add_error_channel("No channels found in playlist")
else:
print(f"[DEBUG] Successfully loaded {len(self.channels)} channels")
except Exception as e:
print(f"[ERROR] Failed to load M3U: {str(e)}")
self.add_error_channel(f"Error loading playlist: {str(e)}")
# Load XMLTV guide
if self.channels and not self.channels[0]['name'].startswith("Error:"):
try:
req = Request(XMLTV_URL, headers={'User-Agent': USER_AGENT})
with urlopen(req, timeout=10) as response:
xml_data = response.read().decode('utf-8')
print(f"[DEBUG] Received XMLTV data (length: {len(xml_data)} bytes)")
root = ET.fromstring(xml_data)
for programme in root.findall('programme'):
channel_id = programme.get('channel')
start = self.parse_xmltv_time(programme.get('start'))
stop = self.parse_xmltv_time(programme.get('stop'))
title_elem = programme.find('title')
desc_elem = programme.find('desc')
if channel_id not in self.programs:
self.programs[channel_id] = []
self.programs[channel_id].append({
'start': start,
'stop': stop,
'title': title_elem.text if title_elem is not None else "No Title",
'desc': desc_elem.text if desc_elem is not None else ""
})
print(f"[DEBUG] Loaded EPG data for {len(self.programs)} channels")
# Pre-load current and next shows for each channel
self.update_current_shows()
except Exception as e:
print(f"[WARNING] Failed to load EPG: {str(e)}")
self.last_update = datetime.now()
def update_current_shows(self):
"""Update current and next shows based on current time"""
now = datetime.now()
for channel in self.channels:
if channel['id'] in self.programs:
shows = self.programs[channel['id']]
channel['current_show'] = None
channel['next_show'] = None
for i, show in enumerate(shows):
if show['start'] <= now < show['stop']:
channel['current_show'] = show
if i+1 < len(shows):
channel['next_show'] = shows[i+1]
break
elif show['start'] > now:
channel['next_show'] = show
break
def add_error_channel(self, message):
self.channels.append({
'id': "error_channel",
'name': f"Error: {message}",
'logo': "",
'group': "Error",
'url': "",
'title': "Check your playlist URL",
'current_show': None,
'next_show': None
})
def parse_xmltv_time(self, time_str):
try:
return datetime.strptime(time_str[:14], "%Y%m%d%H%M%S")
except:
return datetime.now()
def setup_ui(self):
# Create channel list with current show info
self.channel_items = []
for channel in self.channels:
# Create custom button that knows its channel
btn = ChannelButton(channel)
# Connect the signal with the correct channel using a closure
def make_click_handler(c):
return lambda button: self.on_channel_select(c)
urwid.connect_signal(btn, 'click', make_click_handler(channel))
# Apply different colors based on channel status
if channel.get('url'):
channel_item = urwid.AttrMap(btn, 'channel', 'channel_focus')
else:
channel_item = urwid.AttrMap(btn, 'error', 'error')
self.channel_items.append(channel_item)
# Create list box that tracks focus changes
self.channel_list = FocusAwareListBox(
urwid.SimpleFocusListWalker(self.channel_items),
on_focus_change=self.on_focus_change
)
# Create program guide with scrollable content
self.program_walker = urwid.SimpleFocusListWalker([])
self.program_listbox = urwid.ListBox(self.program_walker)
# Add update time to footer
monitor_info = f" | Monitor: {MONITOR}" if MONITOR is not None else ""
self.footer_text = urwid.Text(f"Q: Quit | ↑↓: Navigate | Enter: Play Channel | L: Reload | Last update: Loading...{monitor_info}", align='center')
self.footer = urwid.AttrMap(self.footer_text, 'footer')
program_frame = urwid.Frame(
urwid.LineBox(self.program_listbox, title="Program Details"),
footer=self.footer
)
# Create header with ASCII art
header_text = BANNER + "\nBibbitTV Terminal Player"
header = urwid.AttrMap(urwid.Text(header_text, align='center'), 'header')
# Create columns
columns = urwid.Columns([
('weight', 1, urwid.LineBox(self.channel_list, title="Channels")),
('weight', 2, program_frame)
], dividechars=1)
# Main layout with header
layout = urwid.Pile([
('pack', header),
('pack', urwid.Divider()),
columns
])
# Overlay for centering
self.top = urwid.Overlay(
urwid.LineBox(layout, title=""),
urwid.SolidFill(' '),
align='center',
width=('relative', 85),
valign='middle',
height=('relative', 85)
)
# Update footer with initial timestamp
self.update_footer()
def update_footer(self):
"""Update footer with last update time"""
time_str = self.last_update.strftime("%H:%M:%S")
monitor_info = f" | Monitor: {MONITOR}" if MONITOR is not None else ""
self.footer_text.set_text(f"Q: Quit | ↑↓: Navigate | Enter: Play Channel | L: Reload | Last update: {time_str}{monitor_info}")
def start_update_thread(self):
"""Start background thread for periodic updates"""
self.update_running = True
self.update_thread = threading.Thread(target=self.update_worker, daemon=True)
self.update_thread.start()
def update_worker(self):
"""Background worker for periodic updates"""
while self.update_running:
time.sleep(UPDATE_INTERVAL)
if self.loop:
# Schedule update on the main thread
self.loop.set_alarm_in(0, self.refresh_schedule)
def refresh_schedule(self, loop=None, user_data=None):
"""Update schedule information"""
try:
print("[DEBUG] Refreshing schedule data...")
# Just update current shows without reloading everything
self.update_current_shows()
self.last_update = datetime.now()
# Update UI
self.refresh_ui()
self.update_footer()
# Add notification to program view
time_str = self.last_update.strftime("%H:%M:%S")
notification = [
urwid.Text([("update", f"Schedule updated at {time_str}")]),
urwid.Text("") # Empty line
]
# Prepend notification to existing content
self.program_walker[:] = notification + self.program_walker[:]
print("[DEBUG] Schedule refreshed successfully")
except Exception as e:
print(f"[ERROR] Failed to refresh schedule: {str(e)}")
def refresh_ui(self):
"""Refresh UI elements with updated data"""
# Update channel buttons
for i, channel in enumerate(self.channels):
button = self.channel_items[i].base_widget
if channel.get('current_show'):
label = f"{channel['name']}\nNow: {channel['current_show']['title']}"
if channel.get('next_show'):
label += f"\nNext: {channel['next_show']['title']}"
else:
label = channel['name']
button.set_label(label)
# Update program info for current channel if focused
if self.current_channel:
self.on_channel_hover(self.current_channel)
def on_focus_change(self, focused_widget):
"""Update program info when focusing a channel"""
if focused_widget:
# Get the button inside the AttrMap
button = focused_widget.base_widget
if hasattr(button, 'channel'):
self.on_channel_hover(button.channel)
def on_channel_hover(self, channel):
"""Update program info"""
self.current_channel = channel
program_info = self.get_program_info(channel)
# Convert each line to a Text widget
self.program_walker[:] = [urwid.Text(line) for line in program_info]
def on_channel_select(self, channel):
"""Play channel when selected"""
if channel.get('url'):
self.play_channel(channel['url'])
def get_program_info(self, channel):
info = []
max_line_length = 60 # Define this at the top so it's available everywhere
info.append([("header", f"--Channel: {channel['name']}")])
if channel.get('group'):
info.append([("title", f"Group: {channel['group']}")])
if channel.get('current_show'):
now = datetime.now()
remaining = (channel['current_show']['stop'] - now).seconds // 60
start_time = channel['current_show']['start'].strftime("%H:%M")
end_time = channel['current_show']['stop'].strftime("%H:%M")
# Current show section with colorful formatting
info.append([]) # Empty line
info.append([("program_now", "--NOW PLAYING")])
info.append([("title", f"Title: {channel['current_show']['title']}")])
info.append([
("time", f"Time: {start_time} - {end_time} "),
("", f"({remaining} minutes remaining)")
])
if channel['current_show']['desc']:
info.append([]) # Empty line
info.append([("program_desc", "--Description:")])
# Split long description into multiple lines
desc = channel['current_show']['desc']
for i in range(0, len(desc), max_line_length):
info.append([('program_desc', desc[i:i+max_line_length])])
# Next show section
if channel.get('next_show'):
next_start = channel['next_show']['start'].strftime("%H:%M")
next_end = channel['next_show']['stop'].strftime("%H:%M")
info.append([]) # Empty line
info.append([("divider", "" * 50)])
info.append([]) # Empty line
info.append([("program_next", "--UP NEXT")])
info.append([("title", f"Title: {channel['next_show']['title']}")])
info.append([("time", f"Time: {next_start} - {next_end}")])
if channel['next_show']['desc']:
info.append([]) # Empty line
info.append([("program_desc", "--Description:")])
desc = channel['next_show']['desc']
# Limit description to 5 lines to prevent overflow
max_lines = 5
line_count = 0
for i in range(0, len(desc), max_line_length):
if line_count >= max_lines:
info.append([('program_desc', '... (description truncated)')])
break
info.append([('program_desc', desc[i:i+max_line_length])])
line_count += 1
else:
# Handle case where there's no current show but there might be a next show
if channel.get('next_show'):
next_start = channel['next_show']['start'].strftime("%H:%M")
next_end = channel['next_show']['stop'].strftime("%H:%M")
info.append([]) # Empty line
info.append([("program_next", "--UP NEXT")])
info.append([("title", f"Title: {channel['next_show']['title']}")])
info.append([("time", f"Time: {next_start} - {next_end}")])
if channel['next_show']['desc']:
info.append([]) # Empty line
info.append([("program_desc", "--Description:")])
desc = channel['next_show']['desc']
# Limit description to 5 lines to prevent overflow
max_lines = 5
line_count = 0
for i in range(0, len(desc), max_line_length):
if line_count >= max_lines:
info.append([('program_desc', '... (description truncated)')])
break
info.append([('program_desc', desc[i:i+max_line_length])])
line_count += 1
else:
info.append([("", "No current program information available")])
return info
def play_channel(self, url):
if self.mpv_process:
self.mpv_process.terminate()
self.mpv_process.wait()
try:
self.mpv_process = subprocess.Popen(MPV_BASE + [url])
except Exception as e:
# Create proper widget for error message
self.program_walker[:] = [urwid.Text([("error", f"Failed to play stream: {str(e)}")])]
def reload_data(self):
"""Reload all data from sources without rebuilding UI"""
# Save current focus position
current_focus_pos = self.channel_list.focus_position if self.channel_list.body else None
# Reload data
self.channels = []
self.programs = {}
self.load_data()
# Update channel items in place
self.update_channel_items()
# Restore focus position
if current_focus_pos is not None and current_focus_pos < len(self.channel_items):
self.channel_list.set_focus(current_focus_pos)
self.on_channel_hover(self.channels[current_focus_pos])
# Update program details
if self.current_channel:
self.on_channel_hover(self.current_channel)
self.update_footer()
def update_channel_items(self):
"""Update existing channel items with new data"""
# Clear existing items
self.channel_list.body.clear()
# Recreate channel items with updated data
self.channel_items = []
for channel in self.channels:
# Create custom button that knows its channel
btn = ChannelButton(channel)
# Connect the signal with the correct channel using a closure
def make_click_handler(c):
return lambda button: self.on_channel_select(c)
urwid.connect_signal(btn, 'click', make_click_handler(channel))
# Apply different colors based on channel status
if channel.get('url'):
channel_item = urwid.AttrMap(btn, 'channel', 'channel_focus')
else:
channel_item = urwid.AttrMap(btn, 'error', 'error')
self.channel_items.append(channel_item)
self.channel_list.body.append(channel_item)
def run(self):
# Enable mouse support and cursor visibility
screen = urwid.raw_display.Screen()
screen.set_terminal_properties(colors=256)
screen.set_mouse_tracking()
self.loop = urwid.MainLoop(
self.top,
palette=PALETTE,
screen=screen,
unhandled_input=self.handle_input,
handle_mouse=True
)
# Start background update thread
self.start_update_thread()
try:
self.loop.run()
finally:
# Clean up when exiting
self.update_running = False
if self.update_thread:
self.update_thread.join(timeout=1.0)
def handle_input(self, key):
if key in ('q', 'Q'):
self.quit_player()
elif key in ('l', 'L'):
self.reload_data()
elif key == 'enter':
if self.current_channel and self.current_channel.get('url'):
self.play_channel(self.current_channel['url'])
def quit_player(self):
if self.mpv_process:
self.mpv_process.terminate()
self.mpv_process.wait()
self.update_running = False
raise urwid.ExitMainLoop()
def __del__(self):
self.update_running = False
if hasattr(self, 'mpv_process') and self.mpv_process:
self.mpv_process.terminate()
def check_mpv_installed():
try:
if os.name == 'nt':
subprocess.run(["where.exe", "mpv"], check=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
else:
subprocess.run(["which", "mpv"], check=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return True
except subprocess.CalledProcessError:
return False
def main():
if not check_mpv_installed():
print("Error: mpv player is required but not found. Please install mpv first.")
print("You can download it from: https://mpv.io/installation/")
sys.exit(1)
player = IPTVPlayer()
player.run()
if __name__ == "__main__":
main()