1#!/usr/local/bin/python3.8 2import configparser 3import gi 4import os 5import requests 6import shutil 7import string 8import threading 9import re 10from gi.repository import GObject 11from random import choice 12 13# M3U parsing regex 14PARAMS = re.compile(r'(\S+)="(.*?)"') 15EXTINF = re.compile(r'^#EXTINF:(?P<duration>-?\d+?) ?(?P<params>.*),(?P<title>.*?)$') 16SERIES = re.compile(r"(?P<series>.*?) S(?P<season>.\d{1,2}).*E(?P<episode>.\d{1,2})$", re.IGNORECASE) 17 18PROVIDERS_PATH = os.path.expanduser("~/.hypnotix/providers") 19 20TV_GROUP, MOVIES_GROUP, SERIES_GROUP = range(3) 21 22BADGES = {} 23BADGES['musik'] = "music" 24BADGES['zeland'] = "newzealand" 25 26# Used as a decorator to run things in the background 27def async_function(func): 28 def wrapper(*args, **kwargs): 29 thread = threading.Thread(target=func, args=args, kwargs=kwargs) 30 thread.daemon = True 31 thread.start() 32 return thread 33 return wrapper 34 35# Used as a decorator to run things in the main loop, from another thread 36def idle_function(func): 37 def wrapper(*args): 38 GObject.idle_add(func, *args) 39 return wrapper 40 41def slugify(string): 42 """ 43 Normalizes string, converts to lowercase, removes non-alpha characters, 44 and converts spaces to hyphens. 45 """ 46 return "".join(x.lower() for x in string if x.isalnum()) 47 48class Provider(): 49 def __init__(self, name, provider_info): 50 if provider_info != None: 51 self.name, self.type_id, self.url, self.username, self.password, self.epg = provider_info.split(":::") 52 else: 53 self.name = name 54 self.path = os.path.join(PROVIDERS_PATH, slugify(self.name)) 55 self.groups = [] 56 self.channels = [] 57 self.movies = [] 58 self.series = [] 59 60 def get_info(self): 61 return "%s:::%s:::%s:::%s:::%s:::%s" % (self.name, self.type_id, self.url, self.username, self.password, self.epg) 62 63class Group(): 64 def __init__(self, name): 65 if "VOD" in name.split(): 66 self.group_type = MOVIES_GROUP 67 elif "SERIES" in name.split(): 68 self.group_type = SERIES_GROUP 69 else: 70 self.group_type = TV_GROUP 71 self.name = name 72 self.channels = [] 73 self.series = [] 74 75class Serie(): 76 def __init__(self, name): 77 self.name = name 78 self.logo = None 79 self.logo_path = None 80 self.seasons = {} 81 self.episodes = [] 82 83class Season(): 84 def __init__(self, name): 85 self.name = name 86 self.episodes = {} 87 88class Channel(): 89 def __init__(self, provider, info): 90 self.info = info 91 self.id = None 92 self.name = None 93 self.logo = None 94 self.logo_path = None 95 self.group_title = None 96 self.title = None 97 self.url = None 98 match = EXTINF.fullmatch(info) 99 if match != None: 100 res = match.groupdict() 101 if 'params' in res: 102 params = dict(PARAMS.findall(res['params'])) 103 if "tvg-name" in params and params['tvg-name'].strip() != "": 104 self.name = params['tvg-name'].strip() 105 if "tvg-logo" in params and params['tvg-logo'].strip() != "": 106 self.logo = params['tvg-logo'].strip() 107 if "group-title" in params and params['group-title'].strip() != "": 108 self.group_title = params['group-title'].strip().replace(";", " ").replace(" ", " ") 109 if 'title' in res: 110 self.title = res['title'] 111 if self.name == None and "," in info: 112 self.name = info.split(",")[-1].strip() 113 if self.logo != None: 114 ext = None 115 for known_ext in [".png", ".jpg", ".gif", ".jpeg"]: 116 if self.logo.lower().endswith(known_ext): 117 ext = known_ext 118 break 119 if ext == ".jpeg": 120 ext = ".jpg" 121 self.logo_path = os.path.join(PROVIDERS_PATH, "%s-%s%s" % (slugify(provider.name), slugify(self.name), ext)) 122 123class Manager(): 124 125 def __init__(self, settings): 126 os.system("mkdir -p '%s'" % PROVIDERS_PATH) 127 self.verbose = False 128 self.settings = settings 129 130 def debug(self, *args): 131 if self.verbose: 132 print(args) 133 134 def get_playlist(self, provider, refresh=False): 135 try: 136 if "file://" in provider.url: 137 # local file 138 provider.path = provider.url.replace("file://", "") 139 elif "://" in provider.url: 140 # Other protocol, assume it's http 141 if refresh or not os.path.exists(provider.path): 142 headers = { 143 'User-Agent': self.settings.get_string("user-agent"), 144 'Referer': self.settings.get_string("http-referer") 145 } 146 response = requests.get(provider.url, headers=headers, timeout=10) 147 if response.status_code == 200: 148 try: 149 source = response.content.decode("UTF-8") 150 except UnicodeDecodeError as e: 151 source = response.content.decode("latin1") 152 with open(provider.path, "w") as file: 153 file.write(source) 154 else: 155 print("HTTP error %d while retrieving from %s!" % (response.status_code, provider.url)) 156 else: 157 # No protocol, assume it's local 158 provider.path = provider.url 159 except Exception as e: 160 print(e) 161 162 def check_playlist(self, provider): 163 legit = False 164 if os.path.exists(provider.path): 165 with open(provider.path, "r") as file: 166 content = file.read() 167 if ("#EXTM3U" in content and "#EXTINF" in content): 168 legit = True 169 self.debug("Content looks legit: %s" % provider.name) 170 else: 171 self.debug("Nope: %s" % provider.path) 172 return legit 173 174 def load_channels(self, provider): 175 with open(provider.path, "r") as file: 176 channel = None 177 group = None 178 groups = {} 179 series = {} 180 for line in file: 181 line = line.strip() 182 if line.startswith("#EXTM3U"): 183 continue 184 if line.startswith("#EXTINF"): 185 channel = Channel(provider, line) 186 self.debug("New channel: ", line) 187 continue 188 if "://" in line and not (line.startswith("#")): 189 self.debug(" ", line) 190 if channel == None: 191 self.debug(" --> channel is None") 192 continue 193 if channel.url != None: 194 # We already found the URL, skip the line 195 self.debug(" --> channel URL was already found") 196 continue 197 if channel.name == None or "***" in channel.name: 198 self.debug(" --> channel name is None") 199 continue 200 channel.url = line 201 self.debug(" --> URL found: ", line) 202 203 serie = None 204 f = SERIES.fullmatch(channel.name) 205 if f != None: 206 res = f.groupdict() 207 series_name = res['series'] 208 if series_name in series.keys(): 209 serie = series[series_name] 210 else: 211 serie = Serie(series_name) 212 #todo put in group 213 provider.series.append(serie) 214 series[series_name] = serie 215 serie.logo = channel.logo 216 serie.logo_path = channel.logo_path 217 season_name = res['season'] 218 if season_name in serie.seasons.keys(): 219 season = serie.seasons[season_name] 220 else: 221 season = Season(season_name) 222 serie.seasons[season_name] = season 223 224 episode_name = res['episode'] 225 season.episodes[episode_name] = channel 226 serie.episodes.append(channel) 227 228 if channel.group_title != None and channel.group_title.strip() != "": 229 if group == None or group.name != channel.group_title: 230 if channel.group_title in groups.keys(): 231 group = groups[channel.group_title] 232 else: 233 group = Group(channel.group_title) 234 provider.groups.append(group) 235 groups[channel.group_title] = group 236 if serie != None and serie not in group.series: 237 group.series.append(serie) 238 group.channels.append(channel) 239 if group.group_type == TV_GROUP: 240 provider.channels.append(channel) 241 elif group.group_type == MOVIES_GROUP: 242 provider.movies.append(channel) 243 else: 244 provider.channels.append(channel) 245