Browse Source

added mpv player and updated code

master
melancholytron 2 months ago
parent
commit
4f66d3d71d
  1. BIN
      mpv.com
  2. BIN
      mpv.exe
  3. BIN
      mpv.pdb
  4. 215
      terminalTv.py
  5. BIN
      vulkan-1.dll

BIN
mpv.com

BIN
mpv.exe

BIN
mpv.pdb

215
terminalTv.py

@ -8,12 +8,15 @@ from datetime import datetime, timedelta
from urllib.request import urlopen, Request
import urwid
from urllib.error import URLError
import threading
import time
# Configuration
M3U_URL = "http://10.0.0.17:8409/iptv/channels.m3u"
XMLTV_URL = "http://10.0.0.17:8409/iptv/xmltv.xml"
MPV_COMMAND = ["mpv", "--really-quiet", "--no-terminal", "--force-window=immediate"]
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
UPDATE_INTERVAL = 900 # 15 minutes in seconds
# Color Palette
PALETTE = [
@ -28,6 +31,7 @@ PALETTE = [
('program_desc', 'light cyan', ''), # New color for descriptions
('time', 'yellow', ''), # New color for time displays
('title', 'bold', ''), # New color for titles
('update', 'light green', ''), # Color for update notifications
]
# ASCII Art
@ -71,6 +75,10 @@ class IPTVPlayer:
self.programs = {}
self.current_channel = None
self.mpv_process = None
self.loop = None
self.last_update = datetime.now()
self.update_thread = None
self.update_running = True
self.load_data()
self.setup_ui()
@ -152,22 +160,30 @@ class IPTVPlayer:
print(f"[DEBUG] Loaded EPG data for {len(self.programs)} channels")
# Pre-load current and next shows for each channel
now = datetime.now()
for channel in self.channels:
if channel['id'] in self.programs:
shows = self.programs[channel['id']]
for i, show in enumerate(shows):
if show['start'] <= now < show['stop']:
channel['current_show'] = show
if i+1 < len(shows):
channel['next_show'] = shows[i+1]
break
elif show['start'] > now:
channel['next_show'] = show
break
self.update_current_shows()
except Exception as e:
print(f"[WARNING] Failed to load EPG: {str(e)}")
self.last_update = datetime.now()
def update_current_shows(self):
"""Update current and next shows based on current time"""
now = datetime.now()
for channel in self.channels:
if channel['id'] in self.programs:
shows = self.programs[channel['id']]
channel['current_show'] = None
channel['next_show'] = None
for i, show in enumerate(shows):
if show['start'] <= now < show['stop']:
channel['current_show'] = show
if i+1 < len(shows):
channel['next_show'] = shows[i+1]
break
elif show['start'] > now:
channel['next_show'] = show
break
def add_error_channel(self, message):
self.channels.append({
@ -189,7 +205,7 @@ class IPTVPlayer:
def setup_ui(self):
# Create channel list with current show info
channel_items = []
self.channel_items = []
for channel in self.channels:
# Create custom button that knows its channel
btn = ChannelButton(channel)
@ -206,20 +222,25 @@ class IPTVPlayer:
else:
channel_item = urwid.AttrMap(btn, 'error', 'error')
channel_items.append(channel_item)
self.channel_items.append(channel_item)
# Create list box that tracks focus changes
self.channel_list = FocusAwareListBox(
urwid.SimpleFocusListWalker(channel_items),
urwid.SimpleFocusListWalker(self.channel_items),
on_focus_change=self.on_focus_change
)
# Create program guide with scrollable content
self.program_walker = urwid.SimpleFocusListWalker([])
self.program_listbox = urwid.ListBox(self.program_walker)
# Add update time to footer
self.footer_text = urwid.Text("Q: Quit | ↑↓: Navigate | Enter: Play Channel | L: Reload | Last update: Loading...", align='center')
self.footer = urwid.AttrMap(self.footer_text, 'footer')
program_frame = urwid.Frame(
urwid.LineBox(self.program_listbox, title="Program Details"),
footer=urwid.AttrMap(urwid.Text("Q: Quit | ↑↓: Navigate | Enter: Play Channel | L: Reload"), 'footer')
footer=self.footer
)
# Create header with ASCII art
@ -248,6 +269,71 @@ class IPTVPlayer:
valign='middle',
height=('relative', 85)
)
# Update footer with initial timestamp
self.update_footer()
def update_footer(self):
"""Update footer with last update time"""
time_str = self.last_update.strftime("%H:%M:%S")
self.footer_text.set_text(f"Q: Quit | ↑↓: Navigate | Enter: Play Channel | L: Reload | Last update: {time_str}")
def start_update_thread(self):
"""Start background thread for periodic updates"""
self.update_running = True
self.update_thread = threading.Thread(target=self.update_worker, daemon=True)
self.update_thread.start()
def update_worker(self):
"""Background worker for periodic updates"""
while self.update_running:
time.sleep(UPDATE_INTERVAL)
if self.loop:
# Schedule update on the main thread
self.loop.set_alarm_in(0, self.refresh_schedule)
def refresh_schedule(self, loop=None, user_data=None):
"""Update schedule information"""
try:
print("[DEBUG] Refreshing schedule data...")
# Just update current shows without reloading everything
self.update_current_shows()
self.last_update = datetime.now()
# Update UI
self.refresh_ui()
self.update_footer()
# Add notification to program view
time_str = self.last_update.strftime("%H:%M:%S")
notification = [
urwid.Text([("update", f"Schedule updated at {time_str}")]),
urwid.Text("") # Empty line
]
# Prepend notification to existing content
self.program_walker[:] = notification + self.program_walker[:]
print("[DEBUG] Schedule refreshed successfully")
except Exception as e:
print(f"[ERROR] Failed to refresh schedule: {str(e)}")
def refresh_ui(self):
"""Refresh UI elements with updated data"""
# Update channel buttons
for i, channel in enumerate(self.channels):
button = self.channel_items[i].base_widget
if channel.get('current_show'):
label = f"{channel['name']}\nNow: {channel['current_show']['title']}"
if channel.get('next_show'):
label += f"\nNext: {channel['next_show']['title']}"
else:
label = channel['name']
button.set_label(label)
# Update program info for current channel if focused
if self.current_channel:
self.on_channel_hover(self.current_channel)
def on_focus_change(self, focused_widget):
"""Update program info when focusing a channel"""
@ -261,6 +347,7 @@ class IPTVPlayer:
"""Update program info"""
self.current_channel = channel
program_info = self.get_program_info(channel)
# Convert each line to a Text widget
self.program_walker[:] = [urwid.Text(line) for line in program_info]
def on_channel_select(self, channel):
@ -270,6 +357,8 @@ class IPTVPlayer:
def get_program_info(self, channel):
info = []
max_line_length = 60 # Define this at the top so it's available everywhere
info.append([("header", f"📺 Channel: {channel['name']}")])
if channel.get('group'):
@ -295,7 +384,6 @@ class IPTVPlayer:
info.append([("program_desc", "📝 Description:")])
# Split long description into multiple lines
desc = channel['current_show']['desc']
max_line_length = 60
for i in range(0, len(desc), max_line_length):
info.append([('program_desc', desc[i:i+max_line_length])])
@ -325,7 +413,31 @@ class IPTVPlayer:
info.append([('program_desc', desc[i:i+max_line_length])])
line_count += 1
else:
info.append([("", "\nNo current program information available")])
# Handle case where there's no current show but there might be a next show
if channel.get('next_show'):
next_start = channel['next_show']['start'].strftime("%H:%M")
next_end = channel['next_show']['stop'].strftime("%H:%M")
info.append([]) # Empty line
info.append([("program_next", "⏭ UP NEXT")])
info.append([("title", f"Title: {channel['next_show']['title']}")])
info.append([("time", f"Time: {next_start} - {next_end}")])
if channel['next_show']['desc']:
info.append([]) # Empty line
info.append([("program_desc", "📝 Description:")])
desc = channel['next_show']['desc']
# Limit description to 5 lines to prevent overflow
max_lines = 5
line_count = 0
for i in range(0, len(desc), max_line_length):
if line_count >= max_lines:
info.append([('program_desc', '... (description truncated)')])
break
info.append([('program_desc', desc[i:i+max_line_length])])
line_count += 1
else:
info.append([("", "No current program information available")])
return info
@ -337,13 +449,58 @@ class IPTVPlayer:
try:
self.mpv_process = subprocess.Popen(MPV_COMMAND + [url])
except Exception as e:
self.program_walker[:] = [urwid.Text(("error", f"Failed to play stream:\n{str(e)}"))]
# Create proper widget for error message
self.program_walker[:] = [urwid.Text([("error", f"Failed to play stream: {str(e)}")])]
def reload_data(self):
"""Reload all data from sources without rebuilding UI"""
# Save current focus position
current_focus_pos = self.channel_list.focus_position if self.channel_list.body else None
# Reload data
self.channels = []
self.programs = {}
self.load_data()
self.setup_ui()
# Update channel items in place
self.update_channel_items()
# Restore focus position
if current_focus_pos is not None and current_focus_pos < len(self.channel_items):
self.channel_list.set_focus(current_focus_pos)
self.on_channel_hover(self.channels[current_focus_pos])
# Update program details
if self.current_channel:
self.on_channel_hover(self.current_channel)
self.update_footer()
def update_channel_items(self):
"""Update existing channel items with new data"""
# Clear existing items
self.channel_list.body.clear()
# Recreate channel items with updated data
self.channel_items = []
for channel in self.channels:
# Create custom button that knows its channel
btn = ChannelButton(channel)
# Connect the signal with the correct channel using a closure
def make_click_handler(c):
return lambda button: self.on_channel_select(c)
urwid.connect_signal(btn, 'click', make_click_handler(channel))
# Apply different colors based on channel status
if channel.get('url'):
channel_item = urwid.AttrMap(btn, 'channel', 'channel_focus')
else:
channel_item = urwid.AttrMap(btn, 'error', 'error')
self.channel_items.append(channel_item)
self.channel_list.body.append(channel_item)
def run(self):
# Enable mouse support and cursor visibility
@ -351,14 +508,24 @@ class IPTVPlayer:
screen.set_terminal_properties(colors=256)
screen.set_mouse_tracking()
loop = urwid.MainLoop(
self.loop = urwid.MainLoop(
self.top,
palette=PALETTE,
screen=screen,
unhandled_input=self.handle_input,
handle_mouse=True
)
loop.run()
# Start background update thread
self.start_update_thread()
try:
self.loop.run()
finally:
# Clean up when exiting
self.update_running = False
if self.update_thread:
self.update_thread.join(timeout=1.0)
def handle_input(self, key):
if key in ('q', 'Q'):
@ -373,9 +540,11 @@ class IPTVPlayer:
if self.mpv_process:
self.mpv_process.terminate()
self.mpv_process.wait()
self.update_running = False
raise urwid.ExitMainLoop()
def __del__(self):
self.update_running = False
if hasattr(self, 'mpv_process') and self.mpv_process:
self.mpv_process.terminate()

BIN
vulkan-1.dll

Loading…
Cancel
Save