1#!/usr/local/bin/python3.8
2import configparser
3import gi
4import os
5import requests
6import shutil
7import string
8import threading
9import re
10from gi.repository import GObject
11from random import choice
12
13# M3U parsing regex
14PARAMS = re.compile(r'(\S+)="(.*?)"')
15EXTINF = re.compile(r'^#EXTINF:(?P<duration>-?\d+?) ?(?P<params>.*),(?P<title>.*?)$')
16SERIES = re.compile(r"(?P<series>.*?) S(?P<season>.\d{1,2}).*E(?P<episode>.\d{1,2})$", re.IGNORECASE)
17
18PROVIDERS_PATH = os.path.expanduser("~/.hypnotix/providers")
19
20TV_GROUP, MOVIES_GROUP, SERIES_GROUP = range(3)
21
22BADGES = {}
23BADGES['musik'] = "music"
24BADGES['zeland'] = "newzealand"
25
26# Used as a decorator to run things in the background
27def async_function(func):
28    def wrapper(*args, **kwargs):
29        thread = threading.Thread(target=func, args=args, kwargs=kwargs)
30        thread.daemon = True
31        thread.start()
32        return thread
33    return wrapper
34
35# Used as a decorator to run things in the main loop, from another thread
36def idle_function(func):
37    def wrapper(*args):
38        GObject.idle_add(func, *args)
39    return wrapper
40
41def slugify(string):
42    """
43    Normalizes string, converts to lowercase, removes non-alpha characters,
44    and converts spaces to hyphens.
45    """
46    return "".join(x.lower() for x in string if x.isalnum())
47
48class Provider():
49    def __init__(self, name, provider_info):
50        if provider_info != None:
51            self.name, self.type_id, self.url, self.username, self.password, self.epg = provider_info.split(":::")
52        else:
53            self.name = name
54        self.path = os.path.join(PROVIDERS_PATH, slugify(self.name))
55        self.groups = []
56        self.channels = []
57        self.movies = []
58        self.series = []
59
60    def get_info(self):
61        return "%s:::%s:::%s:::%s:::%s:::%s" % (self.name, self.type_id, self.url, self.username, self.password, self.epg)
62
63class Group():
64    def __init__(self, name):
65        if "VOD" in name.split():
66            self.group_type = MOVIES_GROUP
67        elif "SERIES" in name.split():
68            self.group_type = SERIES_GROUP
69        else:
70            self.group_type = TV_GROUP
71        self.name = name
72        self.channels = []
73        self.series = []
74
75class Serie():
76    def __init__(self, name):
77        self.name = name
78        self.logo = None
79        self.logo_path = None
80        self.seasons = {}
81        self.episodes = []
82
83class Season():
84    def __init__(self, name):
85        self.name = name
86        self.episodes = {}
87
88class Channel():
89    def __init__(self, provider, info):
90        self.info = info
91        self.id = None
92        self.name = None
93        self.logo = None
94        self.logo_path = None
95        self.group_title = None
96        self.title = None
97        self.url = None
98        match = EXTINF.fullmatch(info)
99        if match != None:
100            res = match.groupdict()
101            if 'params' in res:
102                params = dict(PARAMS.findall(res['params']))
103                if "tvg-name" in params and params['tvg-name'].strip() != "":
104                    self.name = params['tvg-name'].strip()
105                if "tvg-logo" in params and params['tvg-logo'].strip() != "":
106                    self.logo = params['tvg-logo'].strip()
107                if "group-title" in params and params['group-title'].strip() != "":
108                    self.group_title = params['group-title'].strip().replace(";", " ").replace("  ", " ")
109            if 'title' in res:
110                self.title = res['title']
111        if self.name == None and "," in info:
112            self.name = info.split(",")[-1].strip()
113        if self.logo != None:
114            ext = None
115            for known_ext in [".png", ".jpg", ".gif", ".jpeg"]:
116                if self.logo.lower().endswith(known_ext):
117                    ext = known_ext
118                    break
119            if ext == ".jpeg":
120                ext = ".jpg"
121            self.logo_path = os.path.join(PROVIDERS_PATH, "%s-%s%s" % (slugify(provider.name), slugify(self.name), ext))
122
123class Manager():
124
125    def __init__(self, settings):
126        os.system("mkdir -p '%s'" % PROVIDERS_PATH)
127        self.verbose = False
128        self.settings = settings
129
130    def debug(self, *args):
131        if self.verbose:
132            print(args)
133
134    def get_playlist(self, provider, refresh=False):
135        try:
136            if "file://" in provider.url:
137                # local file
138                provider.path = provider.url.replace("file://", "")
139            elif "://" in provider.url:
140                # Other protocol, assume it's http
141                if refresh or not os.path.exists(provider.path):
142                    headers = {
143                        'User-Agent': self.settings.get_string("user-agent"),
144                        'Referer': self.settings.get_string("http-referer")
145                    }
146                    response = requests.get(provider.url, headers=headers, timeout=10)
147                    if response.status_code == 200:
148                        try:
149                            source = response.content.decode("UTF-8")
150                        except UnicodeDecodeError as e:
151                            source = response.content.decode("latin1")
152                        with open(provider.path, "w") as file:
153                            file.write(source)
154                    else:
155                        print("HTTP error %d while retrieving from %s!" % (response.status_code, provider.url))
156            else:
157                # No protocol, assume it's local
158                provider.path = provider.url
159        except Exception as e:
160            print(e)
161
162    def check_playlist(self, provider):
163        legit = False
164        if os.path.exists(provider.path):
165            with open(provider.path, "r") as file:
166                content = file.read()
167                if ("#EXTM3U" in content and "#EXTINF" in content):
168                    legit = True
169                    self.debug("Content looks legit: %s" % provider.name)
170                else:
171                    self.debug("Nope: %s" % provider.path)
172        return legit
173
174    def load_channels(self, provider):
175        with open(provider.path, "r") as file:
176            channel = None
177            group = None
178            groups = {}
179            series = {}
180            for line in file:
181                line = line.strip()
182                if line.startswith("#EXTM3U"):
183                    continue
184                if line.startswith("#EXTINF"):
185                    channel = Channel(provider, line)
186                    self.debug("New channel: ", line)
187                    continue
188                if "://" in line and not (line.startswith("#")):
189                    self.debug("    ", line)
190                    if channel == None:
191                        self.debug("    --> channel is None")
192                        continue
193                    if channel.url != None:
194                        # We already found the URL, skip the line
195                        self.debug("    --> channel URL was already found")
196                        continue
197                    if channel.name == None or "***" in channel.name:
198                        self.debug("    --> channel name is None")
199                        continue
200                    channel.url = line
201                    self.debug("    --> URL found: ", line)
202
203                    serie = None
204                    f = SERIES.fullmatch(channel.name)
205                    if f != None:
206                        res = f.groupdict()
207                        series_name = res['series']
208                        if series_name in series.keys():
209                            serie = series[series_name]
210                        else:
211                            serie = Serie(series_name)
212                            #todo put in group
213                            provider.series.append(serie)
214                            series[series_name] = serie
215                            serie.logo = channel.logo
216                            serie.logo_path = channel.logo_path
217                        season_name = res['season']
218                        if season_name in serie.seasons.keys():
219                            season = serie.seasons[season_name]
220                        else:
221                            season = Season(season_name)
222                            serie.seasons[season_name] = season
223
224                        episode_name = res['episode']
225                        season.episodes[episode_name] = channel
226                        serie.episodes.append(channel)
227
228                    if channel.group_title != None and channel.group_title.strip() != "":
229                        if group == None or group.name != channel.group_title:
230                            if channel.group_title in groups.keys():
231                                group = groups[channel.group_title]
232                            else:
233                                group = Group(channel.group_title)
234                                provider.groups.append(group)
235                                groups[channel.group_title] = group
236                        if serie != None and serie not in group.series:
237                            group.series.append(serie)
238                        group.channels.append(channel)
239                        if group.group_type == TV_GROUP:
240                            provider.channels.append(channel)
241                        elif group.group_type == MOVIES_GROUP:
242                            provider.movies.append(channel)
243                    else:
244                        provider.channels.append(channel)
245