1 /*
2  *  Tvheadend - MPEGTS input source
3  *  Copyright (C) 2013 Adam Sutton
4  *
5  *  This program is free software: you can redistribute it and/or modify
6  *  it under the terms of the GNU General Public License as published by
7  *  the Free Software Foundation, either version 3 of the License, or
8  *  (at your option) any later version.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU General Public License for more details.
14  *
15  *  You should have received a copy of the GNU General Public License
16  *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
17  */
18 
19 #include "input.h"
20 #include "tsdemux.h"
21 #include "packet.h"
22 #include "streaming.h"
23 #include "subscriptions.h"
24 #include "access.h"
25 #include "atomic.h"
26 #include "notify.h"
27 #include "idnode.h"
28 #include "dbus.h"
29 #include "memoryinfo.h"
30 
31 #include <pthread.h>
32 #include <assert.h>
33 #include <fcntl.h>
34 #include <sys/stat.h>
35 
36 memoryinfo_t mpegts_input_queue_memoryinfo = { .my_name = "MPEG-TS input queue" };
37 memoryinfo_t mpegts_input_table_memoryinfo = { .my_name = "MPEG-TS table queue" };
38 
39 static void
40 mpegts_input_del_network ( mpegts_network_link_t *mnl );
41 
42 /*
43  * DBUS
44  */
45 
46 static void
mpegts_input_dbus_notify(mpegts_input_t * mi,int64_t subs)47 mpegts_input_dbus_notify(mpegts_input_t *mi, int64_t subs)
48 {
49 #if ENABLE_DBUS_1
50   char buf[256], ubuf[UUID_HEX_SIZE];
51   htsmsg_t *msg;
52 
53   if (mi->mi_dbus_subs == subs)
54     return;
55   mi->mi_dbus_subs = subs;
56   msg = htsmsg_create_list();
57   mi->mi_display_name(mi, buf, sizeof(buf));
58   htsmsg_add_str(msg, NULL, buf);
59   htsmsg_add_s64(msg, NULL, subs);
60   snprintf(buf, sizeof(buf), "/input/mpegts/%s", idnode_uuid_as_str(&mi->ti_id, ubuf));
61   dbus_emit_signal(buf, "status", msg);
62 #endif
63 }
64 
65 /* **************************************************************************
66  * Class definition
67  * *************************************************************************/
68 
69 static const char *
mpegts_input_class_get_title(idnode_t * in,const char * lang)70 mpegts_input_class_get_title ( idnode_t *in, const char *lang )
71 {
72   static char buf[512];
73   mpegts_input_t *mi = (mpegts_input_t*)in;
74   mi->mi_display_name(mi, buf, sizeof(buf));
75   return buf;
76 }
77 
78 const void *
mpegts_input_class_active_get(void * obj)79 mpegts_input_class_active_get ( void *obj )
80 {
81   static int active;
82   mpegts_input_t *mi = obj;
83   active = mi->mi_is_enabled((mpegts_input_t*)mi, NULL, 0, -1) != MI_IS_ENABLED_NEVER;
84   return &active;
85 }
86 
87 const void *
mpegts_input_class_network_get(void * obj)88 mpegts_input_class_network_get ( void *obj )
89 {
90   mpegts_network_link_t *mnl;
91   mpegts_input_t *mi = obj;
92   htsmsg_t       *l  = htsmsg_create_list();
93   char ubuf[UUID_HEX_SIZE];
94 
95   LIST_FOREACH(mnl, &mi->mi_networks, mnl_mi_link)
96     htsmsg_add_str(l, NULL, idnode_uuid_as_str(&mnl->mnl_network->mn_id, ubuf));
97 
98   return l;
99 }
100 
101 int
mpegts_input_class_network_set(void * obj,const void * p)102 mpegts_input_class_network_set ( void *obj, const void *p )
103 {
104   return mpegts_input_set_networks(obj, (htsmsg_t*)p);
105 }
106 
107 htsmsg_t *
mpegts_input_class_network_enum(void * obj,const char * lang)108 mpegts_input_class_network_enum ( void *obj, const char *lang )
109 {
110   htsmsg_t *p, *m;
111   char ubuf[UUID_HEX_SIZE];
112 
113   if (!obj)
114     return NULL;
115 
116   p = htsmsg_create_map();
117   htsmsg_add_str (p, "uuid",    idnode_uuid_as_str((idnode_t*)obj, ubuf));
118   htsmsg_add_bool(p, "enum",    1);
119 
120   m = htsmsg_create_map();
121   htsmsg_add_str (m, "type",    "api");
122   htsmsg_add_str (m, "uri",     "mpegts/input/network_list");
123   htsmsg_add_str (m, "event",   "mpegts_network");
124   htsmsg_add_msg (m, "params",  p);
125   return m;
126 }
127 
128 char *
mpegts_input_class_network_rend(void * obj,const char * lang)129 mpegts_input_class_network_rend ( void *obj, const char *lang )
130 {
131   char *str;
132   mpegts_network_link_t *mnl;
133   mpegts_input_t *mi = obj;
134   htsmsg_t        *l = htsmsg_create_list();
135 
136   LIST_FOREACH(mnl, &mi->mi_networks, mnl_mi_link)
137     htsmsg_add_str(l, NULL, idnode_get_title(&mnl->mnl_network->mn_id, lang));
138 
139   str = htsmsg_list_2_csv(l, ',', 1);
140   htsmsg_destroy(l);
141 
142   return str;
143 }
144 
145 static void
mpegts_input_enabled_notify(void * p,const char * lang)146 mpegts_input_enabled_notify ( void *p, const char *lang )
147 {
148   mpegts_input_t *mi = p;
149   mpegts_mux_instance_t *mmi;
150 
151   /* Stop */
152   LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link)
153     mmi->mmi_mux->mm_stop(mmi->mmi_mux, 1, SM_CODE_ABORTED);
154 
155   /* Alert */
156   if (mi->mi_enabled_updated)
157     mi->mi_enabled_updated(mi);
158 }
159 
160 static int
mpegts_input_class_linked_set(void * self,const void * val)161 mpegts_input_class_linked_set ( void *self, const void *val )
162 {
163   mpegts_input_t *mi = self, *mi2;
164   char ubuf[UUID_HEX_SIZE];
165 
166   if (strcmp(val ?: "", mi->mi_linked ?: "")) {
167     mi2 = mpegts_input_find(mi->mi_linked);
168     free(mi->mi_linked);
169     mi->mi_linked = NULL;
170     if (mi2) {
171       free(mi2->mi_linked);
172       mi2->mi_linked = NULL;
173       mpegts_mux_unsubscribe_linked(mi2, NULL);
174     }
175     mpegts_mux_unsubscribe_linked(mi, NULL);
176     if (val && ((char *)val)[0]) {
177       mi->mi_linked = strdup((char *)val);
178       mi2 = mpegts_input_find((char *)val);
179       if (mi2) {
180         free(mi2->mi_linked);
181         mi2->mi_linked = strdup(idnode_uuid_as_str(&mi->ti_id, ubuf));
182       }
183     }
184     if (mi2)
185       idnode_changed(&mi2->ti_id);
186     return 1;
187   }
188   return 0;
189 }
190 
191 static const void *
mpegts_input_class_linked_get(void * self)192 mpegts_input_class_linked_get ( void *self )
193 {
194   mpegts_input_t *mi = self;
195   prop_sbuf[0] = '\0';
196   if (mi->mi_linked) {
197     mi = mpegts_input_find(mi->mi_linked);
198     if (mi)
199       idnode_uuid_as_str(&mi->ti_id, prop_sbuf);
200   }
201   return &prop_sbuf_ptr;
202 }
203 
204 static htsmsg_t *
mpegts_input_class_linked_enum(void * self,const char * lang)205 mpegts_input_class_linked_enum( void * self, const char *lang )
206 {
207   mpegts_input_t *mi = self, *mi2;
208   tvh_input_t *ti;
209   char ubuf[UUID_HEX_SIZE];
210   htsmsg_t *m = htsmsg_create_list();
211   htsmsg_t *e = htsmsg_create_key_val("", tvh_gettext_lang(lang, N_("Not linked")));
212   htsmsg_add_msg(m, NULL, e);
213   TVH_INPUT_FOREACH(ti)
214     if (idnode_is_instance(&ti->ti_id, &mpegts_input_class)) {
215       mi2 = (mpegts_input_t *)ti;
216       if (mi2 != mi) {
217         e = htsmsg_create_key_val(idnode_uuid_as_str(&ti->ti_id, ubuf),
218                                   idnode_get_title(&mi2->ti_id, lang));
219         htsmsg_add_msg(m, NULL, e);
220       }
221   }
222   return m;
223 }
224 
225 PROP_DOC(priority)
226 PROP_DOC(streaming_priority)
227 
228 const idclass_t mpegts_input_class =
229 {
230   .ic_super      = &tvh_input_class,
231   .ic_class      = "mpegts_input",
232   .ic_caption    = N_("MPEG-TS input"),
233   .ic_event      = "mpegts_input",
234   .ic_perm_def   = ACCESS_ADMIN,
235   .ic_get_title  = mpegts_input_class_get_title,
236   .ic_properties = (const property_t[]){
237     {
238       .type     = PT_BOOL,
239       .id       = "active",
240       .name     = N_("Active"),
241       .opts     = PO_RDONLY | PO_NOSAVE | PO_NOUI,
242       .get      = mpegts_input_class_active_get,
243     },
244     {
245       .type     = PT_BOOL,
246       .id       = "enabled",
247       .name     = N_("Enabled"),
248       .desc     = N_("Enable/disable tuner/adapter."),
249       .off      = offsetof(mpegts_input_t, mi_enabled),
250       .notify   = mpegts_input_enabled_notify,
251       .def.i    = 1,
252     },
253     {
254       .type     = PT_INT,
255       .id       = "priority",
256       .name     = N_("Priority"),
257       .desc     = N_("The tuner priority value (a higher value means to "
258                      "use this tuner out of preference). See Help for details."),
259       .doc      = prop_doc_priority,
260       .off      = offsetof(mpegts_input_t, mi_priority),
261       .def.i    = 1,
262       .opts     = PO_ADVANCED
263     },
264     {
265       .type     = PT_INT,
266       .id       = "spriority",
267       .name     = N_("Streaming priority"),
268       .desc     = N_("The tuner priority value for streamed channels "
269                      "through HTTP or HTSP (a higher value means to use "
270                      "this tuner out of preference). If not set (zero), "
271                      "the standard priority value is used. See Help for details."),
272       .doc      = prop_doc_streaming_priority,
273       .off      = offsetof(mpegts_input_t, mi_streaming_priority),
274       .def.i    = 1,
275       .opts     = PO_ADVANCED
276     },
277     {
278       .type     = PT_STR,
279       .id       = "displayname",
280       .name     = N_("Name"),
281       .desc     = N_("Name of the tuner/adapter."),
282       .off      = offsetof(mpegts_input_t, mi_name),
283       .notify   = idnode_notify_title_changed,
284     },
285     {
286       .type     = PT_BOOL,
287       .id       = "ota_epg",
288       .name     = N_("Over-the-air EPG"),
289       .desc     = N_("Enable over-the-air program guide (EPG) scanning "
290                      "on this input device."),
291       .off      = offsetof(mpegts_input_t, mi_ota_epg),
292       .def.i    = 1,
293     },
294     {
295       .type     = PT_BOOL,
296       .id       = "initscan",
297       .name     = N_("Initial scan"),
298       .desc     = N_("Allow the initial scan tuning on this device "
299                      "(scan when Tvheadend starts or when a new multiplex "
300                      "is added automatically). At least one tuner or input "
301                      "should have this settings turned on. "
302                      "See also 'Skip Startup Scan' in the network settings "
303                      "for further details."),
304       .off      = offsetof(mpegts_input_t, mi_initscan),
305       .def.i    = 1,
306       .opts     = PO_ADVANCED,
307     },
308     {
309       .type     = PT_BOOL,
310       .id       = "idlescan",
311       .name     = N_("Idle scan"),
312       .desc     = N_("Allow idle scan tuning on this device."),
313       .off      = offsetof(mpegts_input_t, mi_idlescan),
314       .def.i    = 1,
315       .opts     = PO_ADVANCED,
316     },
317     {
318       .type     = PT_U32,
319       .id       = "free_weight",
320       .name     = N_("Free subscription weight"),
321       .desc     = N_("If the subscription weight for the input is below "
322                      "the specified threshold, the tuner is handled as free "
323                      "(according the priority settings). Otherwise, the next "
324                      "tuner (without any subscriptions) is used. Set this value "
325                      "to 10, if you are willing to override scan and epggrab "
326                      "subscriptions."),
327       .off      = offsetof(mpegts_input_t, mi_free_weight),
328       .def.i    = 1,
329       .opts     = PO_ADVANCED,
330     },
331     {
332       .type     = PT_BOOL,
333       .id       = "remove_scrambled",
334       .name     = N_("Remove scrambled bits"),
335       .desc     = N_("The scrambled bits in MPEG-TS packets are always cleared. "
336                      "It is a workaround for the special streams which are "
337                      "descrambled, but these bits are not touched."),
338       .off      = offsetof(mpegts_input_t, mi_remove_scrambled_bits),
339       .def.i    = 1,
340       .opts     = PO_EXPERT,
341     },
342     {
343       .type     = PT_STR,
344       .id       = "networks",
345       .name     = N_("Networks"),
346       .desc     = N_("Associate this device with one or more networks."),
347       .islist   = 1,
348       .set      = mpegts_input_class_network_set,
349       .get      = mpegts_input_class_network_get,
350       .list     = mpegts_input_class_network_enum,
351       .rend     = mpegts_input_class_network_rend,
352     },
353     {
354       .type     = PT_STR,
355       .id       = "linked",
356       .name     = N_("Linked input"),
357       .desc     = N_("Wake up the linked input whenever this adapter "
358                      "is used. The subscriptions are named as \"keep\". "
359                      "Note that this isn't normally needed, and is here "
360                      "simply as a workaround to driver bugs in certain "
361                      "dual tuner cards that otherwise lock the second tuner."),
362       .set      = mpegts_input_class_linked_set,
363       .get      = mpegts_input_class_linked_get,
364       .list     = mpegts_input_class_linked_enum,
365       .opts     = PO_ADVANCED,
366     },
367     {}
368   }
369 };
370 
371 /* **************************************************************************
372  * Class methods
373  * *************************************************************************/
374 
375 int
mpegts_input_is_enabled(mpegts_input_t * mi,mpegts_mux_t * mm,int flags,int weight)376 mpegts_input_is_enabled
377   ( mpegts_input_t *mi, mpegts_mux_t *mm, int flags, int weight )
378 {
379   if ((flags & SUBSCRIPTION_EPG) != 0 && !mi->mi_ota_epg)
380     return MI_IS_ENABLED_NEVER;
381   if ((flags & SUBSCRIPTION_USERSCAN) == 0) {
382     if ((flags & SUBSCRIPTION_INITSCAN) != 0 && !mi->mi_initscan)
383       return MI_IS_ENABLED_NEVER;
384     if ((flags & SUBSCRIPTION_IDLESCAN) != 0 && !mi->mi_idlescan)
385       return MI_IS_ENABLED_NEVER;
386   }
387   return mi->mi_enabled ? MI_IS_ENABLED_OK : MI_IS_ENABLED_NEVER;
388 }
389 
390 void
mpegts_input_set_enabled(mpegts_input_t * mi,int enabled)391 mpegts_input_set_enabled ( mpegts_input_t *mi, int enabled )
392 {
393   enabled = !!enabled;
394   if (mi->mi_enabled != enabled) {
395     htsmsg_t *conf = htsmsg_create_map();
396     htsmsg_add_bool(conf, "enabled", enabled);
397     idnode_update(&mi->ti_id, conf);
398     htsmsg_destroy(conf);
399   }
400 }
401 
402 static void
mpegts_input_display_name(mpegts_input_t * mi,char * buf,size_t len)403 mpegts_input_display_name ( mpegts_input_t *mi, char *buf, size_t len )
404 {
405   if (mi->mi_name) {
406     strlcpy(buf, mi->mi_name, len);
407   } else {
408     *buf = 0;
409   }
410 }
411 
412 int
mpegts_input_get_weight(mpegts_input_t * mi,mpegts_mux_t * mm,int flags,int weight)413 mpegts_input_get_weight ( mpegts_input_t *mi, mpegts_mux_t *mm, int flags, int weight )
414 {
415   const mpegts_mux_instance_t *mmi;
416   const service_t *s;
417   const th_subscription_t *ths;
418   int w = 0, count = 0;
419 
420   /* Service subs */
421   pthread_mutex_lock(&mi->mi_output_lock);
422   LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link)
423     LIST_FOREACH(s, &mmi->mmi_mux->mm_transports, s_active_link)
424       LIST_FOREACH(ths, &s->s_subscriptions, ths_service_link) {
425         w = MAX(w, ths->ths_weight);
426         count++;
427       }
428   pthread_mutex_unlock(&mi->mi_output_lock);
429   return w > 0 ? w + count - 1 : 0;
430 }
431 
432 int
mpegts_input_get_priority(mpegts_input_t * mi,mpegts_mux_t * mm,int flags)433 mpegts_input_get_priority ( mpegts_input_t *mi, mpegts_mux_t *mm, int flags )
434 {
435   if (flags & SUBSCRIPTION_STREAMING) {
436     if (mi->mi_streaming_priority > 0)
437       return mi->mi_streaming_priority;
438   }
439   return mi->mi_priority;
440 }
441 
442 int
mpegts_input_warm_mux(mpegts_input_t * mi,mpegts_mux_instance_t * mmi)443 mpegts_input_warm_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
444 {
445   mpegts_mux_instance_t *cur;
446 
447   cur = LIST_FIRST(&mi->mi_mux_active);
448   if (cur != NULL) {
449     /* Already tuned */
450     if (mmi == cur)
451       return 0;
452 
453     /* Stop current */
454     cur->mmi_mux->mm_stop(cur->mmi_mux, 1, SM_CODE_SUBSCRIPTION_OVERRIDDEN);
455   }
456   if (LIST_FIRST(&mi->mi_mux_active))
457     return SM_CODE_TUNING_FAILED;
458   return 0;
459 }
460 
461 static int
mpegts_input_start_mux(mpegts_input_t * mi,mpegts_mux_instance_t * mmi,int weight)462 mpegts_input_start_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi, int weight )
463 {
464   return SM_CODE_TUNING_FAILED;
465 }
466 
467 static void
mpegts_input_stop_mux(mpegts_input_t * mi,mpegts_mux_instance_t * mmi)468 mpegts_input_stop_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
469 {
470 }
471 
472 int
mpegts_mps_cmp(mpegts_pid_sub_t * a,mpegts_pid_sub_t * b)473 mpegts_mps_cmp ( mpegts_pid_sub_t *a, mpegts_pid_sub_t *b )
474 {
475   if (a->mps_type != b->mps_type) {
476     if (a->mps_type & MPS_SERVICE)
477       return 1;
478     else
479       return -1;
480   }
481   if (a->mps_owner < b->mps_owner) return -1;
482   if (a->mps_owner > b->mps_owner) return 1;
483   return 0;
484 }
485 
486 void
mpegts_input_close_pids(mpegts_input_t * mi,mpegts_mux_t * mm,void * owner,int all)487 mpegts_input_close_pids
488   ( mpegts_input_t *mi, mpegts_mux_t *mm, void *owner, int all )
489 {
490   mpegts_pid_t *mp, *mp_next;
491   mpegts_pid_sub_t *mps, *mps_next;
492   int pid;
493 
494   if (all)
495     for (mps = LIST_FIRST(&mm->mm_all_subs); mps; mps = mps_next) {
496       mps_next = LIST_NEXT(mps, mps_svcraw_link);
497       if (mps->mps_owner != owner) continue;
498       pid = MPEGTS_FULLMUX_PID;
499       if (mps->mps_type & MPS_TABLES) pid = MPEGTS_TABLES_PID;
500       mpegts_input_close_pid(mi, mm, pid, mps->mps_type, mps->mps_weight, mps->mps_owner);
501     }
502   for (mp = RB_FIRST(&mm->mm_pids); mp; mp = mp_next) {
503     mp_next = RB_NEXT(mp, mp_link);
504     for (mps = RB_FIRST(&mp->mp_subs); mps; mps = mps_next) {
505       mps_next = RB_NEXT(mps, mps_link);
506       if (mps->mps_owner != owner) continue;
507       mpegts_input_close_pid(mi, mm, mp->mp_pid, mps->mps_type, mps->mps_weight, mps->mps_owner);
508     }
509   }
510 }
511 
512 mpegts_pid_t *
mpegts_input_open_pid(mpegts_input_t * mi,mpegts_mux_t * mm,int pid,int type,int weight,void * owner,int reopen)513 mpegts_input_open_pid
514   ( mpegts_input_t *mi, mpegts_mux_t *mm, int pid, int type, int weight,
515     void *owner, int reopen )
516 {
517   char buf[512];
518   mpegts_pid_t *mp;
519   mpegts_pid_sub_t *mps, *mps2;
520 
521   assert(owner != NULL);
522   assert((type & (MPS_STREAM|MPS_SERVICE|MPS_RAW)) == 0 ||
523          (((type & MPS_STREAM) ? 1 : 0) +
524           ((type & MPS_SERVICE) ? 1 : 0) +
525           ((type & MPS_RAW) ? 1 : 0)) == 1);
526   lock_assert(&mi->mi_output_lock);
527 
528   if (pid == MPEGTS_FULLMUX_PID)
529     mpegts_input_close_pids(mi, mm, owner, 1);
530 
531   if ((mp = mpegts_mux_find_pid(mm, pid, 1))) {
532     mps = calloc(1, sizeof(*mps));
533     mps->mps_type   = type;
534     mps->mps_weight = weight;
535     mps->mps_owner  = owner;
536     if (pid == MPEGTS_FULLMUX_PID || pid == MPEGTS_TABLES_PID) {
537       mp->mp_type |= type;
538       LIST_FOREACH(mps2, &mm->mm_all_subs, mps_svcraw_link)
539         if (mps2->mps_owner == owner) break;
540       if (mps2 == NULL) {
541         LIST_INSERT_HEAD(&mm->mm_all_subs, mps, mps_svcraw_link);
542         mpegts_mux_nice_name(mm, buf, sizeof(buf));
543         tvhdebug(LS_MPEGTS, "%s - open PID %s subscription [%04x/%p]",
544                  buf, (type & MPS_TABLES) ? "tables" : "fullmux", type, owner);
545         mm->mm_update_pids_flag = 1;
546       } else {
547         if (!reopen) {
548           mpegts_mux_nice_name(mm, buf, sizeof(buf));
549           tvherror(LS_MPEGTS,
550                    "%s - open PID %04x (%d) failed, dupe sub (owner %p)",
551                    buf, mp->mp_pid, mp->mp_pid, owner);
552         }
553         free(mps);
554         mp = NULL;
555       }
556     } else if (!RB_INSERT_SORTED(&mp->mp_subs, mps, mps_link, mpegts_mps_cmp)) {
557       mp->mp_type |= type;
558       if (type & MPS_RAW)
559         LIST_INSERT_HEAD(&mp->mp_raw_subs, mps, mps_raw_link);
560       if (type & MPS_SERVICE)
561         LIST_INSERT_HEAD(&mp->mp_svc_subs, mps, mps_svcraw_link);
562       mpegts_mux_nice_name(mm, buf, sizeof(buf));
563       tvhdebug(LS_MPEGTS, "%s - open PID %04X (%d) [%d/%p]",
564                buf, mp->mp_pid, mp->mp_pid, type, owner);
565       mm->mm_update_pids_flag = 1;
566     } else {
567       if (!reopen) {
568         mpegts_mux_nice_name(mm, buf, sizeof(buf));
569         tvherror(LS_MPEGTS, "%s - open PID %04x (%d) failed, dupe sub (owner %p)",
570                  buf, mp->mp_pid, mp->mp_pid, owner);
571       }
572       free(mps);
573       mp = NULL;
574     }
575   }
576   return mp;
577 }
578 
579 int
mpegts_input_close_pid(mpegts_input_t * mi,mpegts_mux_t * mm,int pid,int type,int weight,void * owner)580 mpegts_input_close_pid
581   ( mpegts_input_t *mi, mpegts_mux_t *mm, int pid, int type, int weight, void *owner )
582 {
583   char buf[512];
584   mpegts_pid_sub_t *mps, skel;
585   mpegts_pid_t *mp;
586   int mask;
587   assert(owner != NULL);
588   lock_assert(&mi->mi_output_lock);
589   if (!(mp = mpegts_mux_find_pid(mm, pid, 0)))
590     return -1;
591   if (pid == MPEGTS_FULLMUX_PID || pid == MPEGTS_TABLES_PID) {
592     mpegts_mux_nice_name(mm, buf, sizeof(buf));
593     LIST_FOREACH(mps, &mm->mm_all_subs, mps_svcraw_link)
594       if (mps->mps_owner == owner) break;
595     if (mps == NULL) return -1;
596     tvhdebug(LS_MPEGTS, "%s - close PID %s subscription [%04x/%p]",
597              buf, pid == MPEGTS_TABLES_PID ? "tables" : "fullmux",
598              type, owner);
599     if (pid == MPEGTS_FULLMUX_PID)
600       mpegts_input_close_pids(mi, mm, owner, 0);
601     LIST_REMOVE(mps, mps_svcraw_link);
602     free(mps);
603     mm->mm_update_pids_flag = 1;
604     mask = pid == MPEGTS_FULLMUX_PID ? MPS_ALL : MPS_TABLES;
605     LIST_FOREACH(mps, &mm->mm_all_subs, mps_svcraw_link)
606       if (mps->mps_type & mask) break;
607     if (mps) return 0;
608   } else {
609     skel.mps_type   = type;
610     skel.mps_weight = weight;
611     skel.mps_owner  = owner;
612     mps = RB_FIND(&mp->mp_subs, &skel, mps_link, mpegts_mps_cmp);
613     if (pid == mm->mm_last_pid) {
614       mm->mm_last_pid = -1;
615       mm->mm_last_mp = NULL;
616     }
617     if (mps) {
618       mpegts_mux_nice_name(mm, buf, sizeof(buf));
619       tvhdebug(LS_MPEGTS, "%s - close PID %04X (%d) [%d/%p]",
620                buf, mp->mp_pid, mp->mp_pid, type, owner);
621       if (type & MPS_RAW)
622         LIST_REMOVE(mps, mps_raw_link);
623       if (type & MPS_SERVICE)
624         LIST_REMOVE(mps, mps_svcraw_link);
625       RB_REMOVE(&mp->mp_subs, mps, mps_link);
626       free(mps);
627       mm->mm_update_pids_flag = 1;
628     }
629   }
630   if (!RB_FIRST(&mp->mp_subs)) {
631     if (mm->mm_last_pid == mp->mp_pid) {
632       mm->mm_last_pid = -1;
633       mm->mm_last_mp = NULL;
634     }
635     RB_REMOVE(&mm->mm_pids, mp, mp_link);
636     free(mp);
637     return 1;
638   } else {
639     type = 0;
640     RB_FOREACH(mps, &mp->mp_subs, mps_link)
641       type |= mps->mps_type;
642     mp->mp_type = type;
643   }
644   return 0;
645 }
646 
647 static void
mpegts_input_update_pids(mpegts_input_t * mi,mpegts_mux_t * mm)648 mpegts_input_update_pids
649   ( mpegts_input_t *mi, mpegts_mux_t *mm )
650 {
651   /* nothing - override */
652 }
653 
mpegts_mps_weight(elementary_stream_t * st)654 int mpegts_mps_weight(elementary_stream_t *st)
655 {
656    if (SCT_ISVIDEO(st->es_type))
657      return MPS_WEIGHT_VIDEO + MIN(st->es_index, 49);
658    else if (SCT_ISAUDIO(st->es_type))
659      return MPS_WEIGHT_AUDIO + MIN(st->es_index, 49);
660    else if (SCT_ISSUBTITLE(st->es_type))
661      return MPS_WEIGHT_SUBTITLE + MIN(st->es_index, 49);
662    else
663      return MPS_WEIGHT_ESOTHER + MIN(st->es_index, 49);
664 }
665 
666 static int
mpegts_input_cat_pass_callback(mpegts_table_t * mt,const uint8_t * ptr,int len,int tableid)667 mpegts_input_cat_pass_callback
668   (mpegts_table_t *mt, const uint8_t *ptr, int len, int tableid)
669 {
670   int r, sect, last, ver;
671   uint8_t dtag, dlen;
672   uint16_t pid;
673   uintptr_t caid;
674   mpegts_mux_t             *mm  = mt->mt_mux;
675   mpegts_psi_table_state_t *st  = NULL;
676   service_t                *s   = mt->mt_opaque;
677   elementary_stream_t      *es;
678   mpegts_input_t           *mi;
679 
680   /* Start */
681   r = dvb_table_begin((mpegts_psi_table_t *)mt, ptr, len,
682                       tableid, 0, 5, &st, &sect, &last, &ver);
683   if (r != 1) return r;
684   ptr += 5;
685   len -= 5;
686 
687   /* Send CAT data for descramblers */
688   descrambler_cat_data(mm, ptr, len);
689 
690   while(len > 2) {
691     dtag = *ptr++;
692     dlen = *ptr++;
693     len -= 2;
694 
695     switch(dtag) {
696       case DVB_DESC_CA:
697         if (len >= 4 && dlen >= 4 && mm->mm_active) {
698           caid = ( ptr[0]         << 8) | ptr[1];
699           pid  = ((ptr[2] & 0x1f) << 8) | ptr[3];
700           tvhdebug(LS_TBL_BASE, "cat:  pass: caid %04X (%d) pid %04X (%d)",
701                    (uint16_t)caid, (uint16_t)caid, pid, pid);
702           pthread_mutex_lock(&s->s_stream_mutex);
703           es = NULL;
704           if (service_stream_find((service_t *)s, pid) == NULL) {
705             es = service_stream_create(s, pid, SCT_CA);
706             es->es_pid_opened = 1;
707           }
708           pthread_mutex_unlock(&s->s_stream_mutex);
709           if (es && mm->mm_active && (mi = mm->mm_active->mmi_input) != NULL) {
710             pthread_mutex_lock(&mi->mi_output_lock);
711             if ((mi = mm->mm_active->mmi_input) != NULL)
712               mpegts_input_open_pid(mi, mm, pid,
713                                     MPS_SERVICE, MPS_WEIGHT_CAT, s, 0);
714             pthread_mutex_unlock(&mi->mi_output_lock);
715           }
716         }
717         break;
718       default:
719         break;
720     }
721 
722     ptr += dlen;
723     len -= dlen;
724   }
725 
726   /* Finish */
727   return dvb_table_end((mpegts_psi_table_t *)mt, st, sect);
728 }
729 
730 void
mpegts_input_open_service(mpegts_input_t * mi,mpegts_service_t * s,int flags,int init,int weight)731 mpegts_input_open_service
732   ( mpegts_input_t *mi, mpegts_service_t *s, int flags, int init, int weight )
733 {
734   mpegts_mux_t *mm = s->s_dvb_mux;
735   elementary_stream_t *st;
736   mpegts_apids_t *pids;
737   mpegts_apid_t *p;
738   int i, reopen = !init;
739 
740   /* Add to list */
741   pthread_mutex_lock(&mi->mi_output_lock);
742   if (!s->s_dvb_active_input) {
743     LIST_INSERT_HEAD(&mm->mm_transports, ((service_t*)s), s_active_link);
744     s->s_dvb_active_input = mi;
745   }
746   /* Register PIDs */
747   pthread_mutex_lock(&s->s_stream_mutex);
748   if (s->s_type == STYPE_STD) {
749 
750     if (s->s_pmt_pid == SERVICE_PMT_AUTO)
751       goto no_pids;
752 
753     mpegts_input_open_pid(mi, mm, s->s_pmt_pid, MPS_SERVICE, MPS_WEIGHT_PMT, s, reopen);
754     mpegts_input_open_pid(mi, mm, s->s_pcr_pid, MPS_SERVICE, MPS_WEIGHT_PCR, s, reopen);
755     if (s->s_scrambled_pass)
756       mpegts_input_open_pid(mi, mm, DVB_CAT_PID, MPS_SERVICE, MPS_WEIGHT_CAT, s, reopen);
757     /* Open only filtered components here */
758     TAILQ_FOREACH(st, &s->s_filt_components, es_filt_link)
759       if ((s->s_scrambled_pass || st->es_type != SCT_CA) &&
760           st->es_pid != s->s_pmt_pid && st->es_pid != s->s_pcr_pid) {
761         st->es_pid_opened = 1;
762         mpegts_input_open_pid(mi, mm, st->es_pid, MPS_SERVICE, mpegts_mps_weight(st), s, reopen);
763       }
764 
765     mpegts_service_update_slave_pids(s, 0);
766 
767   } else {
768     if ((pids = s->s_pids) != NULL) {
769       if (pids->all) {
770         mpegts_input_open_pid(mi, mm, MPEGTS_FULLMUX_PID, MPS_RAW | MPS_ALL, MPS_WEIGHT_RAW, s, reopen);
771       } else {
772         for (i = 0; i < pids->count; i++) {
773           p = &pids->pids[i];
774           mpegts_input_open_pid(mi, mm, p->pid, MPS_RAW, p->weight, s, reopen);
775         }
776       }
777     } else if (flags & SUBSCRIPTION_TABLES) {
778       mpegts_input_open_pid(mi, mm, MPEGTS_TABLES_PID, MPS_RAW | MPS_TABLES, MPS_WEIGHT_PAT, s, reopen);
779     } else if (flags & SUBSCRIPTION_MINIMAL) {
780       mpegts_input_open_pid(mi, mm, DVB_PAT_PID, MPS_RAW, MPS_WEIGHT_PAT, s, reopen);
781     }
782   }
783 
784 no_pids:
785   pthread_mutex_unlock(&s->s_stream_mutex);
786   pthread_mutex_unlock(&mi->mi_output_lock);
787 
788   /* Add PMT monitor */
789   if(s->s_type == STYPE_STD) {
790     s->s_pmt_mon =
791       mpegts_table_add(mm, DVB_PMT_BASE, DVB_PMT_MASK,
792                        dvb_pmt_callback, s, "pmt", LS_TBL_BASE,
793                        MT_CRC, s->s_pmt_pid, MPS_WEIGHT_PMT);
794     if (s->s_scrambled_pass && (flags & SUBSCRIPTION_EMM) != 0) {
795       s->s_cat_mon =
796         mpegts_table_add(mm, DVB_CAT_BASE, DVB_CAT_MASK,
797                          mpegts_input_cat_pass_callback, s, "cat",
798                          LS_TBL_BASE, MT_QUICKREQ | MT_CRC, DVB_CAT_PID,
799                          MPS_WEIGHT_CAT);
800     }
801   }
802 
803   mpegts_mux_update_pids(mm);
804 }
805 
806 void
mpegts_input_close_service(mpegts_input_t * mi,mpegts_service_t * s)807 mpegts_input_close_service ( mpegts_input_t *mi, mpegts_service_t *s )
808 {
809   mpegts_mux_t *mm = s->s_dvb_mux;
810   elementary_stream_t *st;
811 
812   /* Close PMT/CAT tables */
813   if (s->s_type == STYPE_STD) {
814     if (s->s_pmt_mon)
815       mpegts_table_destroy(s->s_pmt_mon);
816     if (s->s_cat_mon)
817       mpegts_table_destroy(s->s_cat_mon);
818   }
819   s->s_pmt_mon = NULL;
820 
821   /* Remove from list */
822   pthread_mutex_lock(&mi->mi_output_lock);
823   if (s->s_dvb_active_input != NULL) {
824     LIST_REMOVE(((service_t*)s), s_active_link);
825     s->s_dvb_active_input = NULL;
826   }
827   /* Close PID */
828   pthread_mutex_lock(&s->s_stream_mutex);
829   if (s->s_type == STYPE_STD) {
830 
831     if (s->s_pmt_pid == SERVICE_PMT_AUTO)
832       goto no_pids;
833 
834     mpegts_input_close_pid(mi, mm, s->s_pmt_pid, MPS_SERVICE, MPS_WEIGHT_PMT, s);
835     mpegts_input_close_pid(mi, mm, s->s_pcr_pid, MPS_SERVICE, MPS_WEIGHT_PCR, s);
836     if (s->s_scrambled_pass)
837       mpegts_input_close_pid(mi, mm, DVB_CAT_PID, MPS_SERVICE, MPS_WEIGHT_CAT, s);
838     /* Close all opened PIDs (the component filter may be changed at runtime) */
839     TAILQ_FOREACH(st, &s->s_components, es_link) {
840       if (st->es_pid_opened) {
841         st->es_pid_opened = 0;
842         mpegts_input_close_pid(mi, mm, st->es_pid, MPS_SERVICE, mpegts_mps_weight(st), s);
843       }
844     }
845 
846     mpegts_service_update_slave_pids(s, 1);
847 
848   } else {
849     mpegts_input_close_pids(mi, mm, s, 1);
850   }
851 
852 no_pids:
853   pthread_mutex_unlock(&s->s_stream_mutex);
854   pthread_mutex_unlock(&mi->mi_output_lock);
855 
856   mpegts_mux_update_pids(mm);
857 
858   /* Stop mux? */
859   s->s_dvb_mux->mm_stop(s->s_dvb_mux, 0, SM_CODE_OK);
860 }
861 
862 static void
mpegts_input_create_mux_instance(mpegts_input_t * mi,mpegts_mux_t * mm)863 mpegts_input_create_mux_instance
864   ( mpegts_input_t *mi, mpegts_mux_t *mm )
865 {
866   extern const idclass_t mpegts_mux_instance_class;
867   tvh_input_instance_t *tii;
868   LIST_FOREACH(tii, &mi->mi_mux_instances, tii_input_link)
869     if (((mpegts_mux_instance_t *)tii)->mmi_mux == mm) break;
870   if (!tii)
871     (void)mpegts_mux_instance_create(mpegts_mux_instance, NULL, mi, mm);
872 }
873 
874 static void
mpegts_input_started_mux(mpegts_input_t * mi,mpegts_mux_instance_t * mmi)875 mpegts_input_started_mux
876   ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
877 {
878 #if ENABLE_TSDEBUG
879   extern char *tvheadend_tsdebug;
880   static const char *tmpdir = "/tmp/tvheadend.tsdebug/";
881   char buf[128];
882   char path[PATH_MAX];
883   struct stat st;
884   if (!tvheadend_tsdebug && !stat(tmpdir, &st) && (st.st_mode & S_IFDIR) != 0)
885     tvheadend_tsdebug = (char *)tmpdir;
886   if (tvheadend_tsdebug && !strcmp(tvheadend_tsdebug, tmpdir) && stat(tmpdir, &st))
887     tvheadend_tsdebug = NULL;
888   if (tvheadend_tsdebug) {
889     mpegts_mux_nice_name(mmi->mmi_mux, buf, sizeof(buf));
890     snprintf(path, sizeof(path), "%s/%s-%li-%p-mux.ts", tvheadend_tsdebug,
891              buf, (long)mono2sec(mclk()), mi);
892     mmi->mmi_mux->mm_tsdebug_fd = tvh_open(path, O_WRONLY | O_CREAT, S_IRUSR | S_IWUSR);
893     if (mmi->mmi_mux->mm_tsdebug_fd < 0)
894       tvherror(LS_TSDEBUG, "unable to create file '%s' (%i)", path, errno);
895     snprintf(path, sizeof(path), "%s/%s-%li-%p-input.ts", tvheadend_tsdebug,
896              buf, (long)mono2sec(mclk()), mi);
897     mmi->mmi_mux->mm_tsdebug_fd2 = tvh_open(path, O_WRONLY | O_CREAT, S_IRUSR | S_IWUSR);
898     if (mmi->mmi_mux->mm_tsdebug_fd2 < 0)
899       tvherror(LS_TSDEBUG, "unable to create file '%s' (%i)", path, errno);
900   } else {
901     mmi->mmi_mux->mm_tsdebug_fd = -1;
902     mmi->mmi_mux->mm_tsdebug_fd2 = -1;
903   }
904 #endif
905 
906   /* Deliver first TS packets as fast as possible */
907   mi->mi_last_dispatch = 0;
908 
909   /* Arm timer */
910   if (LIST_FIRST(&mi->mi_mux_active) == NULL)
911     mtimer_arm_rel(&mi->mi_status_timer, mpegts_input_status_timer, mi, sec2mono(1));
912 
913   /* Update */
914   mmi->mmi_mux->mm_active = mmi;
915 
916   /* Accept packets */
917   LIST_INSERT_HEAD(&mi->mi_mux_active, mmi, mmi_active_link);
918   notify_reload("input_status");
919   mpegts_input_dbus_notify(mi, 1);
920 }
921 
922 static void
mpegts_input_stopping_mux(mpegts_input_t * mi,mpegts_mux_instance_t * mmi)923 mpegts_input_stopping_mux
924   ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
925 {
926   assert(mmi->mmi_mux->mm_active);
927 
928   pthread_mutex_lock(&mi->mi_input_lock);
929   mmi->mmi_mux->mm_active = NULL;
930   pthread_mutex_unlock(&mi->mi_input_lock);
931   pthread_mutex_lock(&mi->mi_output_lock);
932   mmi->mmi_mux->mm_active = NULL;
933   pthread_mutex_unlock(&mi->mi_output_lock);
934 }
935 
936 static void
mpegts_input_stopped_mux(mpegts_input_t * mi,mpegts_mux_instance_t * mmi)937 mpegts_input_stopped_mux
938   ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
939 {
940   char buf[256];
941   service_t *s, *s_next;
942   mpegts_mux_t *mm = mmi->mmi_mux;
943 
944   /* no longer active */
945   LIST_REMOVE(mmi, mmi_active_link);
946 
947   /* Disarm timer */
948   if (LIST_FIRST(&mi->mi_mux_active) == NULL)
949     mtimer_disarm(&mi->mi_status_timer);
950 
951   mi->mi_display_name(mi, buf, sizeof(buf));
952   tvhtrace(LS_MPEGTS, "%s - flush subscribers", buf);
953   for (s = LIST_FIRST(&mm->mm_transports); s; s = s_next) {
954     s_next = LIST_NEXT(s, s_active_link);
955     service_remove_subscriber(s, NULL, SM_CODE_SUBSCRIPTION_OVERRIDDEN);
956   }
957   notify_reload("input_status");
958   mpegts_input_dbus_notify(mi, 0);
959 
960 #if ENABLE_TSDEBUG
961   tsdebug_packet_t *tp;
962   if (mm->mm_tsdebug_fd >= 0)
963     close(mm->mm_tsdebug_fd);
964   if (mm->mm_tsdebug_fd2 >= 0)
965     close(mm->mm_tsdebug_fd2);
966   mm->mm_tsdebug_fd = -1;
967   mm->mm_tsdebug_fd2 = -1;
968   mm->mm_tsdebug_pos = 0;
969   while ((tp = TAILQ_FIRST(&mm->mm_tsdebug_packets)) != NULL) {
970     TAILQ_REMOVE(&mm->mm_tsdebug_packets, tp, link);
971     free(tp);
972   }
973 #endif
974 }
975 
976 static int
mpegts_input_has_subscription(mpegts_input_t * mi,mpegts_mux_t * mm)977 mpegts_input_has_subscription ( mpegts_input_t *mi, mpegts_mux_t *mm )
978 {
979   int ret = 0;
980   const service_t *t;
981   const th_subscription_t *ths;
982   pthread_mutex_lock(&mi->mi_output_lock);
983   LIST_FOREACH(t, &mm->mm_transports, s_active_link) {
984     if (t->s_type == STYPE_RAW) {
985       LIST_FOREACH(ths, &t->s_subscriptions, ths_service_link)
986         if (!strcmp(ths->ths_title, "keep")) break;
987       if (ths) continue;
988     }
989     ret = 1;
990     break;
991   }
992   pthread_mutex_unlock(&mi->mi_output_lock);
993   return ret;
994 }
995 
996 static void
mpegts_input_tuning_error(mpegts_input_t * mi,mpegts_mux_t * mm)997 mpegts_input_tuning_error ( mpegts_input_t *mi, mpegts_mux_t *mm )
998 {
999   service_t *t, *t_next;
1000   pthread_mutex_lock(&mi->mi_output_lock);
1001   for (t = LIST_FIRST(&mm->mm_transports); t; t = t_next) {
1002     t_next = LIST_NEXT(t, s_active_link);
1003     pthread_mutex_lock(&t->s_stream_mutex);
1004     service_set_streaming_status_flags(t, TSS_TUNING);
1005     pthread_mutex_unlock(&t->s_stream_mutex);
1006   }
1007   pthread_mutex_unlock(&mi->mi_output_lock);
1008 }
1009 
1010 /* **************************************************************************
1011  * Data processing
1012  * *************************************************************************/
1013 
1014 #if 0
1015 static int data_noise ( mpegts_packet_t *mp )
1016 {
1017   static uint64_t off = 0, win = 4096, limit = 2*1024*1024;
1018   uint8_t *data = mp->mp_data;
1019   uint32_t i, p, s, len = mp->mp_len;
1020   for (p = 0; p < len; p += 188) {
1021     off += 188;
1022     if (off >= limit && off < limit + win) {
1023       if ((off & 3) == 1) {
1024         memmove(data + p, data + p + 188, len - (p + 188));
1025         p -= 188;
1026         mp->mp_len -= 188;
1027         return 1;
1028       }
1029       s = ((data[2] + data[3] + data[4]) & 3) + 1;
1030       for (i = 0; i < 188; i += s)
1031         ((char *)data)[p+i] ^= data[10] + data[12] + data[i];
1032     } else if (off >= limit + win) {
1033       off = 0;
1034       limit = (uint64_t)data[15] * 4 * 1024;
1035       win   = (uint64_t)data[16] * 16;
1036     }
1037   }
1038   return 0;
1039 }
1040 #else
data_noise(mpegts_packet_t * mp)1041 static inline int data_noise( mpegts_packet_t *mp ) { return 0; }
1042 #endif
1043 
1044 static inline int
get_pcr(const uint8_t * tsb,int64_t * rpcr)1045 get_pcr ( const uint8_t *tsb, int64_t *rpcr )
1046 {
1047   int_fast64_t pcr;
1048 
1049   if (tsb[1] & 0x80) /* error bit */
1050     return 0;
1051 
1052   if ((tsb[3] & 0x20) == 0 ||
1053        tsb[4] <= 5 ||
1054       (tsb[5] & 0x10) == 0)
1055     return 0;
1056 
1057   pcr  =  (uint64_t)tsb[6] << 25;
1058   pcr |=  (uint64_t)tsb[7] << 17;
1059   pcr |=  (uint64_t)tsb[8] << 9;
1060   pcr |=  (uint64_t)tsb[9] << 1;
1061   pcr |= ((uint64_t)tsb[10] >> 7) & 0x01;
1062   *rpcr = pcr;
1063   return 1;
1064 }
1065 
1066 static inline int
ts_sync_count(const uint8_t * tsb,int len)1067 ts_sync_count ( const uint8_t *tsb, int len )
1068 {
1069   const uint8_t *start = tsb;
1070   while (len >= 188) {
1071     if (len >= 1880 &&
1072         tsb[0*188] == 0x47 && tsb[1*188] == 0x47 &&
1073         tsb[2*188] == 0x47 && tsb[3*188] == 0x47 &&
1074         tsb[4*188] == 0x47 && tsb[5*188] == 0x47 &&
1075         tsb[6*188] == 0x47 && tsb[7*188] == 0x47 &&
1076         tsb[8*188] == 0x47 && tsb[9*188] == 0x47) {
1077       len -= 1880;
1078       tsb += 1880;
1079     } else if (*tsb == 0x47) {
1080       len -= 188;
1081       tsb += 188;
1082     } else {
1083       break;
1084     }
1085   }
1086   return tsb - start;
1087 }
1088 
1089 void
mpegts_input_recv_packets(mpegts_input_t * mi,mpegts_mux_instance_t * mmi,sbuf_t * sb,int flags,mpegts_pcr_t * pcr)1090 mpegts_input_recv_packets
1091   ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi, sbuf_t *sb,
1092     int flags, mpegts_pcr_t *pcr )
1093 {
1094   int len, len2, off;
1095   mpegts_packet_t *mp;
1096   uint8_t *tsb;
1097 #define MIN_TS_PKT 100
1098 #define MIN_TS_SYN (5*188)
1099 
1100 retry:
1101   len2 = 0;
1102   off  = 0;
1103   tsb  = sb->sb_data;
1104   len  = sb->sb_ptr;
1105   if (len < (MIN_TS_PKT * 188) && (flags & MPEGTS_DATA_CC_RESTART) == 0) {
1106     /* For slow streams, check also against the clock */
1107     if (monocmpfastsec(mclk(), mi->mi_last_dispatch))
1108       return;
1109   }
1110   mi->mi_last_dispatch = mclk();
1111 
1112   /* Check for sync */
1113   while ( (len >= MIN_TS_SYN) &&
1114           ((len2 = ts_sync_count(tsb, len)) < MIN_TS_SYN) ) {
1115     atomic_add(&mmi->tii_stats.unc, 1);
1116     --len;
1117     ++tsb;
1118     ++off;
1119   }
1120 
1121   // Note: we check for sync here so that the buffer can always be
1122   //       processed in its entirety inside the processing thread
1123   //       without the potential need to buffer data (since that would
1124   //       require per mmi buffers, where this is generally not required)
1125 
1126   /* Extract PCR on demand */
1127   if (pcr) {
1128     uint8_t *tmp, *end;
1129     uint16_t pid;
1130     for (tmp = tsb, end = tsb + len2; tmp < end; tmp += 188) {
1131       pid = ((tmp[1] & 0x1f) << 8) | tmp[2];
1132       if (pcr->pcr_pid == MPEGTS_PID_NONE || pcr->pcr_pid == pid) {
1133         if (get_pcr(tmp, &pcr->pcr_first)) {
1134           pcr->pcr_pid = pid;
1135           break;
1136         }
1137       }
1138     }
1139     if (pcr->pcr_pid != MPEGTS_PID_NONE) {
1140       for (tmp = tsb + len2 - 188; tmp >= tsb; tmp -= 188) {
1141         pid = ((tmp[1] & 0x1f) << 8) | tmp[2];
1142         if (pcr->pcr_pid == pid) {
1143           if (get_pcr(tmp, &pcr->pcr_last)) {
1144             pcr->pcr_pid = pid;
1145             break;
1146           }
1147         }
1148       }
1149     }
1150   }
1151 
1152   /* Pass */
1153   if (len2 >= MIN_TS_SYN || (flags & MPEGTS_DATA_CC_RESTART)) {
1154     mp = malloc(sizeof(mpegts_packet_t) + len2);
1155     mp->mp_mux        = mmi->mmi_mux;
1156     mp->mp_len        = len2;
1157     mp->mp_cc_restart = (flags & MPEGTS_DATA_CC_RESTART) ? 1 : 0;
1158 
1159     memcpy(mp->mp_data, tsb, len2);
1160     if (mi->mi_remove_scrambled_bits || (flags & MPEGTS_DATA_REMOVE_SCRAMBLED) != 0) {
1161       uint8_t *tmp, *end;
1162       for (tmp = mp->mp_data, end = mp->mp_data + len2; tmp < end; tmp += 188)
1163         tmp[3] &= ~0xc0;
1164     }
1165 
1166     len -= len2;
1167     off += len2;
1168 
1169     if ((flags & MPEGTS_DATA_CC_RESTART) == 0 && data_noise(mp)) {
1170       free(mp);
1171       goto end;
1172     }
1173 
1174     pthread_mutex_lock(&mi->mi_input_lock);
1175     if (mmi->mmi_mux->mm_active == mmi) {
1176       if (mi->mi_input_queue_size < 50*1024*1024) {
1177         mi->mi_input_queue_size += len2;
1178         memoryinfo_alloc(&mpegts_input_queue_memoryinfo, sizeof(mpegts_packet_t) + len2);
1179         mpegts_mux_grab(mp->mp_mux);
1180         TAILQ_INSERT_TAIL(&mi->mi_input_queue, mp, mp_link);
1181         tvh_cond_signal(&mi->mi_input_cond, 0);
1182       } else {
1183         if (tvhlog_limit(&mi->mi_input_queue_loglimit, 10))
1184           tvhwarn(LS_MPEGTS, "too much queued input data (over 50MB), discarding new");
1185         free(mp);
1186       }
1187     } else {
1188       free(mp);
1189     }
1190     pthread_mutex_unlock(&mi->mi_input_lock);
1191   }
1192 
1193   /* Adjust buffer */
1194 end:
1195   if (len && (flags & MPEGTS_DATA_CC_RESTART) == 0) {
1196     sbuf_cut(sb, off); // cut off the bottom
1197     if (sb->sb_ptr >= MIN_TS_PKT * 188)
1198       goto retry;
1199   } else
1200     sb->sb_ptr = 0;    // clear
1201 }
1202 
1203 static void
mpegts_input_table_dispatch(mpegts_mux_t * mm,const char * logprefix,const uint8_t * tsb,int tsb_len)1204 mpegts_input_table_dispatch
1205   ( mpegts_mux_t *mm, const char *logprefix, const uint8_t *tsb, int tsb_len )
1206 {
1207   int i, len = 0, c = 0;
1208   const uint8_t *tsb2, *tsb2_end;
1209   uint16_t pid = ((tsb[1] & 0x1f) << 8) | tsb[2];
1210   mpegts_table_t *mt, **vec;
1211 
1212   /* Collate - tables may be removed during callbacks */
1213   pthread_mutex_lock(&mm->mm_tables_lock);
1214   i = mm->mm_num_tables;
1215   vec = alloca(i * sizeof(mpegts_table_t *));
1216   LIST_FOREACH(mt, &mm->mm_tables, mt_link) {
1217     c++;
1218     if (mt->mt_destroyed || !mt->mt_subscribed || mt->mt_pid != pid)
1219       continue;
1220     mpegts_table_grab(mt);
1221     if (len < i)
1222       vec[len++] = mt;
1223   }
1224   pthread_mutex_unlock(&mm->mm_tables_lock);
1225   if (i != c) {
1226     tvherror(LS_TBL, "tables count inconsistency (num %d, list %d)", i, c);
1227     assert(0);
1228   }
1229 
1230   /* Process */
1231   for (i = 0; i < len; i++) {
1232     mt = vec[i];
1233     if (!mt->mt_destroyed && mt->mt_pid == pid)
1234       for (tsb2 = tsb, tsb2_end = tsb + tsb_len; tsb2 < tsb2_end; tsb2 += 188)
1235         mpegts_psi_section_reassemble((mpegts_psi_table_t *)mt, logprefix,
1236                                       tsb2, mt->mt_flags & MT_CRC,
1237                                       mpegts_table_dispatch, mt);
1238     mpegts_table_release(mt);
1239   }
1240 }
1241 
1242 static void
mpegts_input_table_waiting(mpegts_input_t * mi,mpegts_mux_t * mm)1243 mpegts_input_table_waiting ( mpegts_input_t *mi, mpegts_mux_t *mm )
1244 {
1245   mpegts_table_t *mt;
1246 
1247   if (!mm || !mm->mm_active)
1248     return;
1249   pthread_mutex_lock(&mm->mm_tables_lock);
1250   while ((mt = TAILQ_FIRST(&mm->mm_defer_tables)) != NULL) {
1251     mpegts_table_consistency_check(mm);
1252     TAILQ_REMOVE(&mm->mm_defer_tables, mt, mt_defer_link);
1253     if (mt->mt_defer_cmd == MT_DEFER_OPEN_PID && !mt->mt_destroyed) {
1254       mt->mt_defer_cmd = 0;
1255       if (!mt->mt_subscribed) {
1256         mt->mt_subscribed = 1;
1257         pthread_mutex_unlock(&mm->mm_tables_lock);
1258         mpegts_input_open_pid(mi, mm, mt->mt_pid, mpegts_table_type(mt), mt->mt_weight, mt, 0);
1259       } else {
1260         pthread_mutex_unlock(&mm->mm_tables_lock);
1261       }
1262     } else if (mt->mt_defer_cmd == MT_DEFER_CLOSE_PID) {
1263       mt->mt_defer_cmd = 0;
1264       if (mt->mt_subscribed) {
1265         mt->mt_subscribed = 0;
1266         pthread_mutex_unlock(&mm->mm_tables_lock);
1267         mpegts_input_close_pid(mi, mm, mt->mt_pid, mpegts_table_type(mt), mt->mt_weight, mt);
1268       } else {
1269         pthread_mutex_unlock(&mm->mm_tables_lock);
1270       }
1271     } else {
1272       pthread_mutex_unlock(&mm->mm_tables_lock);
1273     }
1274     mpegts_table_release(mt);
1275     pthread_mutex_lock(&mm->mm_tables_lock);
1276   }
1277   mpegts_table_consistency_check(mm);
1278   pthread_mutex_unlock(&mm->mm_tables_lock);
1279 }
1280 
1281 #if ENABLE_TSDEBUG
1282 static void
tsdebug_check_tspkt(mpegts_mux_t * mm,uint8_t * pkt,int len)1283 tsdebug_check_tspkt( mpegts_mux_t *mm, uint8_t *pkt, int len )
1284 {
1285   void tsdebugcw_new_keys(service_t *t, int type, uint8_t *odd, uint8_t *even);
1286   uint32_t pos, type, keylen, sid, crc;
1287   mpegts_service_t *t;
1288 
1289   for ( ; len > 0; pkt += 188, len -= 188) {
1290     if (memcmp(pkt + 4, "TVHeadendDescramblerKeys", 24))
1291       continue;
1292     pos = 4 + 24;
1293     type = pkt[pos + 0];
1294     keylen = pkt[pos + 1];
1295     sid = (pkt[pos + 2] << 8) | pkt[pos + 3];
1296     pos += 4 + 2 * keylen;
1297     if (pos > 184)
1298       return;
1299     crc = (pkt[pos + 0] << 24) | (pkt[pos + 1] << 16) |
1300           (pkt[pos + 2] << 8) | pkt[pos + 3];
1301     if (crc != tvh_crc32(pkt, pos, 0x859aa5ba))
1302       return;
1303     LIST_FOREACH(t, &mm->mm_services, s_dvb_mux_link)
1304       if (t->s_dvb_service_id == sid) break;
1305     if (!t)
1306       return;
1307     pos =  4 + 24 + 4;
1308     tvhdebug(LS_DESCRAMBLER, "Keys from MPEG-TS source (PID 0x1FFF)!");
1309     tsdebugcw_new_keys((service_t *)t, type, pkt + pos, pkt + pos + keylen);
1310   }
1311 }
1312 #endif
1313 
1314 static int
mpegts_input_process(mpegts_input_t * mi,mpegts_packet_t * mpkt)1315 mpegts_input_process
1316   ( mpegts_input_t *mi, mpegts_packet_t *mpkt )
1317 {
1318   uint16_t pid;
1319   uint8_t cc, cc2;
1320   uint8_t *tsb = mpkt->mp_data, *tsb2, *tsb2_end;
1321   int len = mpkt->mp_len, llen;
1322   int type = 0, f;
1323   mpegts_pid_t *mp;
1324   mpegts_pid_sub_t *mps;
1325   service_t *s;
1326   elementary_stream_t *st;
1327   int table_wakeup = 0;
1328   mpegts_mux_t *mm = mpkt->mp_mux;
1329   mpegts_mux_instance_t *mmi;
1330 #if ENABLE_TSDEBUG
1331   off_t tsdebug_pos;
1332 #endif
1333   char muxname[256];
1334 
1335   if (mm == NULL || (mmi = mm->mm_active) == NULL)
1336     return 0;
1337 
1338   mpegts_mux_nice_name(mm, muxname, sizeof(muxname));
1339 
1340   assert(mm == mmi->mmi_mux);
1341 
1342 #if ENABLE_TSDEBUG
1343   tsdebug_pos = mm->mm_tsdebug_pos;
1344 #endif
1345 
1346   /* Process */
1347   assert((len % 188) == 0);
1348   while (len > 0) {
1349 
1350     /*
1351      * mask
1352      *  0 - 0xFF - sync word 0x47
1353      *  1 - 0x80 - transport error
1354      *  1 - 0x1F - pid high
1355      *  2 - 0xFF - pid low
1356      *  3 - 0xC0 - scrambled
1357      *  3 - 0x10 - CC check
1358      */
1359     llen = mpegts_word_count(tsb, len, 0xFF9FFFD0);
1360 
1361     pid = (tsb[1] << 8) | tsb[2];
1362 
1363     /* Transport error */
1364     if (pid & 0x8000) {
1365       if ((pid & 0x1FFF) != 0x1FFF)
1366         atomic_add(&mmi->tii_stats.te, 1);
1367     }
1368 
1369     pid &= 0x1FFF;
1370 
1371     /* Ignore NUL packets */
1372     if (pid == 0x1FFF) {
1373 #if ENABLE_TSDEBUG
1374       tsdebug_check_tspkt(mm, tsb, llen);
1375 #endif
1376       goto done;
1377     }
1378 
1379     /* Find PID */
1380     if ((mp = mpegts_mux_find_pid(mm, pid, 0))) {
1381 
1382       /* Low level CC check */
1383       if (tsb[3] & 0x10) {
1384         for (tsb2 = tsb, tsb2_end = tsb + llen, cc2 = mp->mp_cc;
1385              tsb2 < tsb2_end; tsb2 += 188) {
1386           cc = tsb2[3] & 0x0f;
1387           if (cc2 != 0xff && cc2 != cc) {
1388             tvhtrace(LS_MPEGTS, "%s: pid %04X cc err %2d != %2d", muxname, pid, cc, cc2);
1389             atomic_add(&mmi->tii_stats.cc, 1);
1390           }
1391           cc2 = (cc + 1) & 0xF;
1392         }
1393         mp->mp_cc = cc2;
1394       }
1395 
1396       type = mp->mp_type;
1397 
1398       /* Stream all PIDs */
1399       LIST_FOREACH(mps, &mm->mm_all_subs, mps_svcraw_link)
1400         if ((mps->mps_type & MPS_ALL) || (type & (MPS_TABLE|MPS_FTABLE)))
1401           ts_recv_raw((mpegts_service_t *)mps->mps_owner, tsb, llen);
1402 
1403       /* Stream raw PIDs */
1404       if (type & MPS_RAW) {
1405         LIST_FOREACH(mps, &mp->mp_raw_subs, mps_raw_link)
1406           ts_recv_raw((mpegts_service_t *)mps->mps_owner, tsb, llen);
1407       }
1408 
1409       /* Stream service data */
1410       if (type & MPS_SERVICE) {
1411         LIST_FOREACH(mps, &mp->mp_svc_subs, mps_svcraw_link) {
1412           s = mps->mps_owner;
1413           f = (type & (MPS_TABLE|MPS_FTABLE)) ||
1414               (pid == s->s_pmt_pid) || (pid == s->s_pcr_pid);
1415           ts_recv_packet1((mpegts_service_t*)s, tsb, llen, f);
1416         }
1417       } else
1418       /* Stream table data */
1419       if (type & MPS_STREAM) {
1420         LIST_FOREACH(s, &mm->mm_transports, s_active_link) {
1421           if (s->s_type != STYPE_STD) continue;
1422           f = (type & (MPS_TABLE|MPS_FTABLE)) ||
1423               (pid == s->s_pmt_pid) || (pid == s->s_pcr_pid);
1424           ts_recv_packet1((mpegts_service_t*)s, tsb, llen, f);
1425         }
1426       }
1427 
1428       /* Table data */
1429       if (type & (MPS_TABLE | MPS_FTABLE)) {
1430         if (!(tsb[1] & 0x80)) {
1431           if (type & MPS_FTABLE)
1432             mpegts_input_table_dispatch(mm, muxname, tsb, llen);
1433           if (type & MPS_TABLE) {
1434             if (mi->mi_table_queue_size >= 2*1024*1024) {
1435               if (tvhlog_limit(&mi->mi_input_queue_loglimit, 10))
1436                 tvhwarn(LS_MPEGTS, "too much queued table input data (over 2MB), discarding new");
1437             } else {
1438               mpegts_table_feed_t *mtf = malloc(sizeof(mpegts_table_feed_t)+llen);
1439               mtf->mtf_len = llen;
1440               memcpy(mtf->mtf_tsb, tsb, llen);
1441               mtf->mtf_mux = mm;
1442               mi->mi_table_queue_size += llen;
1443               memoryinfo_alloc(&mpegts_input_table_memoryinfo, sizeof(mpegts_table_feed_t) + llen);
1444               TAILQ_INSERT_TAIL(&mi->mi_table_queue, mtf, mtf_link);
1445               table_wakeup = 1;
1446             }
1447           }
1448         } else {
1449           //tvhdebug("tsdemux", "%s - SI packet had errors", name);
1450         }
1451       }
1452 
1453     } else {
1454 
1455       /* Stream to all fullmux subscribers */
1456       LIST_FOREACH(mps, &mm->mm_all_subs, mps_svcraw_link)
1457         ts_recv_raw((mpegts_service_t *)mps->mps_owner, tsb, llen);
1458 
1459     }
1460 
1461 done:
1462     tsb += llen;
1463     len -= llen;
1464 #if ENABLE_TSDEBUG
1465     mm->mm_tsdebug_pos += llen;
1466 #endif
1467   }
1468 
1469   /* Raw stream */
1470   if (tsb != mpkt->mp_data &&
1471       LIST_FIRST(&mmi->mmi_streaming_pad.sp_targets) != NULL) {
1472 
1473     streaming_message_t sm;
1474     pktbuf_t *pb = pktbuf_alloc(mpkt->mp_data, tsb - mpkt->mp_data);
1475     memset(&sm, 0, sizeof(sm));
1476     sm.sm_type = SMT_MPEGTS;
1477     sm.sm_data = pb;
1478     streaming_pad_deliver(&mmi->mmi_streaming_pad, streaming_msg_clone(&sm));
1479     pktbuf_ref_dec(pb);
1480   }
1481 #if ENABLE_TSDEBUG
1482   {
1483     tsdebug_packet_t *tp, *tp_next;
1484     off_t pos = 0;
1485     size_t used = tsb - mpkt->mp_data;
1486     for (tp = TAILQ_FIRST(&mm->mm_tsdebug_packets); tp; tp = tp_next) {
1487       tp_next = TAILQ_NEXT(tp, link);
1488       assert((tp->pos % 188) == 0);
1489       assert(tp->pos >= tsdebug_pos && tp->pos < tsdebug_pos + used);
1490       if (mm->mm_tsdebug_fd >= 0) {
1491         tvh_write(mm->mm_tsdebug_fd, mpkt->mp_data + pos, tp->pos - tsdebug_pos - pos);
1492         tvh_write(mm->mm_tsdebug_fd, tp->pkt, 188);
1493       }
1494       pos = tp->pos - tsdebug_pos;
1495       TAILQ_REMOVE(&mm->mm_tsdebug_packets, tp, link);
1496       free(tp);
1497     }
1498     if (pos < used && mm->mm_tsdebug_fd >= 0)
1499       tvh_write(mm->mm_tsdebug_fd, mpkt->mp_data + pos, used - pos);
1500   }
1501 #endif
1502 
1503   if (mpkt->mp_cc_restart) {
1504     LIST_FOREACH(s, &mm->mm_transports, s_active_link)
1505       TAILQ_FOREACH(st, &s->s_components, es_link)
1506         st->es_cc = -1;
1507   }
1508 
1509   /* Wake table */
1510   if (table_wakeup)
1511     tvh_cond_signal(&mi->mi_table_cond, 0);
1512 
1513   /* Bandwidth monitoring */
1514   llen = tsb - mpkt->mp_data;
1515   atomic_add(&mmi->tii_stats.bps, llen);
1516   return llen;
1517 }
1518 
1519 static void *
mpegts_input_thread(void * p)1520 mpegts_input_thread ( void * p )
1521 {
1522   mpegts_packet_t *mp;
1523   mpegts_input_t  *mi = p;
1524   size_t bytes = 0;
1525   int update_pids;
1526   char buf[256];
1527 
1528   mi->mi_display_name(mi, buf, sizeof(buf));
1529   pthread_mutex_lock(&mi->mi_input_lock);
1530   while (atomic_get(&mi->mi_running)) {
1531 
1532     /* Wait for a packet */
1533     if (!(mp = TAILQ_FIRST(&mi->mi_input_queue))) {
1534       if (bytes) {
1535         tvhtrace(LS_MPEGTS, "input %s got %zu bytes", buf, bytes);
1536         bytes = 0;
1537       }
1538       tvh_cond_wait(&mi->mi_input_cond, &mi->mi_input_lock);
1539       continue;
1540     }
1541     mi->mi_input_queue_size -= mp->mp_len;
1542     memoryinfo_free(&mpegts_input_queue_memoryinfo, sizeof(mpegts_packet_t) + mp->mp_len);
1543     TAILQ_REMOVE(&mi->mi_input_queue, mp, mp_link);
1544     pthread_mutex_unlock(&mi->mi_input_lock);
1545 
1546     /* Process */
1547     pthread_mutex_lock(&mi->mi_output_lock);
1548     mpegts_input_table_waiting(mi, mp->mp_mux);
1549     if (mp->mp_mux && mp->mp_mux->mm_update_pids_flag) {
1550       pthread_mutex_unlock(&mi->mi_output_lock);
1551       pthread_mutex_lock(&global_lock);
1552       mpegts_mux_update_pids(mp->mp_mux);
1553       pthread_mutex_unlock(&global_lock);
1554       pthread_mutex_lock(&mi->mi_output_lock);
1555     }
1556     bytes += mpegts_input_process(mi, mp);
1557     update_pids = mp->mp_mux && mp->mp_mux->mm_update_pids_flag;
1558     pthread_mutex_unlock(&mi->mi_output_lock);
1559     if (update_pids) {
1560       pthread_mutex_lock(&global_lock);
1561       mpegts_mux_update_pids(mp->mp_mux);
1562       pthread_mutex_unlock(&global_lock);
1563     }
1564 
1565     /* Cleanup */
1566     if (mp->mp_mux)
1567       mpegts_mux_release(mp->mp_mux);
1568     free(mp);
1569 
1570 #if ENABLE_TSDEBUG
1571     {
1572       extern void tsdebugcw_go(void);
1573       tsdebugcw_go();
1574     }
1575 #endif
1576 
1577     pthread_mutex_lock(&mi->mi_input_lock);
1578   }
1579 
1580   tvhtrace(LS_MPEGTS, "input %s got %zu bytes (finish)", buf, bytes);
1581 
1582   /* Flush */
1583   while ((mp = TAILQ_FIRST(&mi->mi_input_queue))) {
1584     memoryinfo_free(&mpegts_input_queue_memoryinfo, sizeof(mpegts_packet_t) + mp->mp_len);
1585     TAILQ_REMOVE(&mi->mi_input_queue, mp, mp_link);
1586     if (mp->mp_mux)
1587       mpegts_mux_release(mp->mp_mux);
1588     free(mp);
1589   }
1590   mi->mi_input_queue_size = 0;
1591   pthread_mutex_unlock(&mi->mi_input_lock);
1592 
1593   return NULL;
1594 }
1595 
1596 static void *
mpegts_input_table_thread(void * aux)1597 mpegts_input_table_thread ( void *aux )
1598 {
1599   mpegts_table_feed_t   *mtf;
1600   mpegts_input_t        *mi = aux;
1601   mpegts_mux_t          *mm = NULL;
1602   char                   muxname[256];
1603 
1604   pthread_mutex_lock(&mi->mi_output_lock);
1605   while (atomic_get(&mi->mi_running)) {
1606 
1607     /* Wait for data */
1608     if (!(mtf = TAILQ_FIRST(&mi->mi_table_queue))) {
1609       tvh_cond_wait(&mi->mi_table_cond, &mi->mi_output_lock);
1610       continue;
1611     }
1612     mi->mi_table_queue_size -= mtf->mtf_len;
1613     memoryinfo_free(&mpegts_input_table_memoryinfo, sizeof(mpegts_table_feed_t) + mtf->mtf_len);
1614     TAILQ_REMOVE(&mi->mi_table_queue, mtf, mtf_link);
1615     pthread_mutex_unlock(&mi->mi_output_lock);
1616 
1617     /* Process */
1618     pthread_mutex_lock(&global_lock);
1619     if (atomic_get(&mi->mi_running)) {
1620       if (mm != mtf->mtf_mux) {
1621         mm = mtf->mtf_mux;
1622         if (mm)
1623           mpegts_mux_nice_name(mm, muxname, sizeof(muxname));
1624       }
1625       if (mm && mm->mm_active)
1626         mpegts_input_table_dispatch(mm, muxname, mtf->mtf_tsb, mtf->mtf_len);
1627     }
1628     pthread_mutex_unlock(&global_lock);
1629 
1630     /* Cleanup */
1631     free(mtf);
1632     pthread_mutex_lock(&mi->mi_output_lock);
1633   }
1634 
1635   /* Flush */
1636   while ((mtf = TAILQ_FIRST(&mi->mi_table_queue)) != NULL) {
1637     memoryinfo_free(&mpegts_input_table_memoryinfo, sizeof(mpegts_table_feed_t) + mtf->mtf_len);
1638     TAILQ_REMOVE(&mi->mi_table_queue, mtf, mtf_link);
1639     free(mtf);
1640   }
1641   mi->mi_table_queue_size = 0;
1642   pthread_mutex_unlock(&mi->mi_output_lock);
1643 
1644   return NULL;
1645 }
1646 
1647 void
mpegts_input_flush_mux(mpegts_input_t * mi,mpegts_mux_t * mm)1648 mpegts_input_flush_mux
1649   ( mpegts_input_t *mi, mpegts_mux_t *mm )
1650 {
1651   mpegts_table_feed_t *mtf;
1652   mpegts_packet_t *mp;
1653 
1654   lock_assert(&global_lock);
1655 
1656   // Note: to avoid long delays in here, rather than actually
1657   //       remove things from the Q, we simply invalidate by clearing
1658   //       the mux pointer and allow the threads to deal with the deletion
1659 
1660   /* Flush input Q */
1661   pthread_mutex_lock(&mi->mi_input_lock);
1662   TAILQ_FOREACH(mp, &mi->mi_input_queue, mp_link) {
1663     if (mp->mp_mux == mm) {
1664       mpegts_mux_release(mm);
1665       mp->mp_mux = NULL;
1666     }
1667   }
1668   pthread_mutex_unlock(&mi->mi_input_lock);
1669 
1670   /* Flush table Q */
1671   pthread_mutex_lock(&mi->mi_output_lock);
1672   TAILQ_FOREACH(mtf, &mi->mi_table_queue, mtf_link) {
1673     if (mtf->mtf_mux == mm)
1674       mtf->mtf_mux = NULL;
1675   }
1676   pthread_mutex_unlock(&mi->mi_output_lock);
1677   /* mux active must be NULL here */
1678   /* otherwise the picked mtf might be processed after mux deactivation */
1679   assert(mm->mm_active == NULL);
1680 }
1681 
1682 static void
mpegts_input_stream_status(mpegts_mux_instance_t * mmi,tvh_input_stream_t * st)1683 mpegts_input_stream_status
1684   ( mpegts_mux_instance_t *mmi, tvh_input_stream_t *st )
1685 {
1686   int s = 0, w = 0;
1687   char buf[512], ubuf[UUID_HEX_SIZE];
1688   th_subscription_t *ths;
1689   const service_t *t;
1690   mpegts_mux_t *mm = mmi->mmi_mux;
1691   mpegts_input_t *mi = mmi->mmi_input;
1692 
1693   LIST_FOREACH(t, &mm->mm_transports, s_active_link)
1694     if (((mpegts_service_t *)t)->s_dvb_mux == mm)
1695       LIST_FOREACH(ths, &t->s_subscriptions, ths_service_link) {
1696         s++;
1697         w = MAX(w, ths->ths_weight);
1698       }
1699 
1700   st->uuid        = strdup(idnode_uuid_as_str(&mmi->tii_id, ubuf));
1701   mi->mi_display_name(mi, buf, sizeof(buf));
1702   st->input_name  = strdup(buf);
1703   mpegts_mux_nice_name(mm, buf, sizeof(buf));
1704   st->stream_name = strdup(buf);
1705   st->subs_count  = s;
1706   st->max_weight  = w;
1707   pthread_mutex_lock(&mmi->tii_stats_mutex);
1708   st->stats.signal = mmi->tii_stats.signal;
1709   st->stats.snr    = mmi->tii_stats.snr;
1710   st->stats.ber    = mmi->tii_stats.ber;
1711   st->stats.signal_scale = mmi->tii_stats.signal_scale;
1712   st->stats.snr_scale    = mmi->tii_stats.snr_scale;
1713   st->stats.ec_bit   = mmi->tii_stats.ec_bit;
1714   st->stats.tc_bit   = mmi->tii_stats.tc_bit;
1715   st->stats.ec_block = mmi->tii_stats.ec_block;
1716   st->stats.tc_block = mmi->tii_stats.tc_block;
1717   pthread_mutex_unlock(&mmi->tii_stats_mutex);
1718   st->stats.unc   = atomic_get(&mmi->tii_stats.unc);
1719   st->stats.cc    = atomic_get(&mmi->tii_stats.cc);
1720   st->stats.te    = atomic_get(&mmi->tii_stats.te);
1721   st->stats.bps   = atomic_exchange(&mmi->tii_stats.bps, 0) * 8;
1722 }
1723 
1724 void
mpegts_input_empty_status(mpegts_input_t * mi,tvh_input_stream_t * st)1725 mpegts_input_empty_status
1726   ( mpegts_input_t *mi, tvh_input_stream_t *st )
1727 {
1728   char buf[512], ubuf[UUID_HEX_SIZE];
1729   tvh_input_instance_t *mmi_;
1730   mpegts_mux_instance_t *mmi;
1731 
1732   st->uuid        = strdup(idnode_uuid_as_str(&mi->ti_id, ubuf));
1733   mi->mi_display_name(mi, buf, sizeof(buf));
1734   st->input_name  = strdup(buf);
1735   LIST_FOREACH(mmi_, &mi->mi_mux_instances, tii_input_link) {
1736     mmi = (mpegts_mux_instance_t *)mmi_;
1737     st->stats.unc += atomic_get(&mmi->tii_stats.unc);
1738     st->stats.cc += atomic_get(&mmi->tii_stats.cc);
1739     pthread_mutex_lock(&mmi->tii_stats_mutex);
1740     st->stats.te += mmi->tii_stats.te;
1741     st->stats.ec_block += mmi->tii_stats.ec_block;
1742     st->stats.tc_block += mmi->tii_stats.tc_block;
1743     pthread_mutex_unlock(&mmi->tii_stats_mutex);
1744   }
1745 }
1746 
1747 static void
mpegts_input_get_streams(tvh_input_t * i,tvh_input_stream_list_t * isl)1748 mpegts_input_get_streams
1749   ( tvh_input_t *i, tvh_input_stream_list_t *isl )
1750 {
1751   tvh_input_stream_t *st = NULL;
1752   mpegts_input_t *mi = (mpegts_input_t*)i;
1753   mpegts_mux_instance_t *mmi;
1754 
1755   pthread_mutex_lock(&mi->mi_output_lock);
1756   LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link) {
1757     st = calloc(1, sizeof(tvh_input_stream_t));
1758     mpegts_input_stream_status(mmi, st);
1759     LIST_INSERT_HEAD(isl, st, link);
1760   }
1761   if (st == NULL && mi->mi_empty_status && mi->mi_enabled) {
1762     st = calloc(1, sizeof(tvh_input_stream_t));
1763     mi->mi_empty_status(mi, st);
1764     LIST_INSERT_HEAD(isl, st, link);
1765   }
1766   pthread_mutex_unlock(&mi->mi_output_lock);
1767 }
1768 
1769 static void
mpegts_input_clear_stats(tvh_input_t * i)1770 mpegts_input_clear_stats ( tvh_input_t *i )
1771 {
1772   mpegts_input_t *mi = (mpegts_input_t*)i;
1773   tvh_input_instance_t *mmi_;
1774   mpegts_mux_instance_t *mmi;
1775 
1776   pthread_mutex_lock(&mi->mi_output_lock);
1777   LIST_FOREACH(mmi_, &mi->mi_mux_instances, tii_input_link) {
1778     mmi = (mpegts_mux_instance_t *)mmi_;
1779     atomic_set(&mmi->tii_stats.unc, 0);
1780     atomic_set(&mmi->tii_stats.cc, 0);
1781     pthread_mutex_lock(&mmi->tii_stats_mutex);
1782     mmi->tii_stats.te = 0;
1783     mmi->tii_stats.ec_block = 0;
1784     mmi->tii_stats.tc_block = 0;
1785     pthread_mutex_unlock(&mmi->tii_stats_mutex);
1786   }
1787   pthread_mutex_unlock(&mi->mi_output_lock);
1788   notify_reload("input_status");
1789 }
1790 
1791 static void
mpegts_input_thread_start(void * aux)1792 mpegts_input_thread_start ( void *aux )
1793 {
1794   mpegts_input_t *mi = aux;
1795   atomic_set(&mi->mi_running, 1);
1796 
1797   tvhthread_create(&mi->mi_table_tid, NULL,
1798                    mpegts_input_table_thread, mi, "mi-table");
1799   tvhthread_create(&mi->mi_input_tid, NULL,
1800                    mpegts_input_thread, mi, "mi-main");
1801 }
1802 
1803 static void
mpegts_input_thread_stop(mpegts_input_t * mi)1804 mpegts_input_thread_stop ( mpegts_input_t *mi )
1805 {
1806   atomic_set(&mi->mi_running, 0);
1807   mtimer_disarm(&mi->mi_input_thread_start);
1808 
1809   /* Stop input thread */
1810   pthread_mutex_lock(&mi->mi_input_lock);
1811   tvh_cond_signal(&mi->mi_input_cond, 0);
1812   pthread_mutex_unlock(&mi->mi_input_lock);
1813 
1814   /* Stop table thread */
1815   pthread_mutex_lock(&mi->mi_output_lock);
1816   tvh_cond_signal(&mi->mi_table_cond, 0);
1817   pthread_mutex_unlock(&mi->mi_output_lock);
1818 
1819   /* Join threads (relinquish lock due to potential deadlock) */
1820   pthread_mutex_unlock(&global_lock);
1821   if (mi->mi_input_tid)
1822     pthread_join(mi->mi_input_tid, NULL);
1823   if (mi->mi_table_tid)
1824     pthread_join(mi->mi_table_tid, NULL);
1825   pthread_mutex_lock(&global_lock);
1826 }
1827 
1828 /* **************************************************************************
1829  * Status monitoring
1830  * *************************************************************************/
1831 
1832 void
mpegts_input_status_timer(void * p)1833 mpegts_input_status_timer ( void *p )
1834 {
1835   tvh_input_stream_t st;
1836   mpegts_input_t *mi = p;
1837   mpegts_mux_instance_t *mmi;
1838   htsmsg_t *e;
1839   int64_t subs = 0;
1840 
1841   pthread_mutex_lock(&mi->mi_output_lock);
1842   LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link) {
1843     memset(&st, 0, sizeof(st));
1844     mpegts_input_stream_status(mmi, &st);
1845     e = tvh_input_stream_create_msg(&st);
1846     htsmsg_add_u32(e, "update", 1);
1847     notify_by_msg("input_status", e, 0);
1848     subs += st.subs_count;
1849     tvh_input_stream_destroy(&st);
1850   }
1851   pthread_mutex_unlock(&mi->mi_output_lock);
1852   mtimer_arm_rel(&mi->mi_status_timer, mpegts_input_status_timer, mi, sec2mono(1));
1853   mpegts_input_dbus_notify(mi, subs);
1854 }
1855 
1856 /* **************************************************************************
1857  * Creation/Config
1858  * *************************************************************************/
1859 
1860 static int mpegts_input_idx = 0;
1861 mpegts_input_list_t mpegts_input_all;
1862 
1863 mpegts_input_t*
mpegts_input_create0(mpegts_input_t * mi,const idclass_t * class,const char * uuid,htsmsg_t * c)1864 mpegts_input_create0
1865   ( mpegts_input_t *mi, const idclass_t *class, const char *uuid,
1866     htsmsg_t *c )
1867 {
1868   if (idnode_insert(&mi->ti_id, uuid, class, 0)) {
1869     if (uuid)
1870       tvherror(LS_MPEGTS, "invalid input uuid '%s'", uuid);
1871     free(mi);
1872     return NULL;
1873   }
1874   LIST_INSERT_HEAD(&tvh_inputs, (tvh_input_t*)mi, ti_link);
1875 
1876   /* Defaults */
1877   mi->mi_is_enabled           = mpegts_input_is_enabled;
1878   mi->mi_display_name         = mpegts_input_display_name;
1879   mi->mi_get_weight           = mpegts_input_get_weight;
1880   mi->mi_get_priority         = mpegts_input_get_priority;
1881   mi->mi_warm_mux             = mpegts_input_warm_mux;
1882   mi->mi_start_mux            = mpegts_input_start_mux;
1883   mi->mi_stop_mux             = mpegts_input_stop_mux;
1884   mi->mi_open_service         = mpegts_input_open_service;
1885   mi->mi_close_service        = mpegts_input_close_service;
1886   mi->mi_update_pids          = mpegts_input_update_pids;
1887   mi->mi_create_mux_instance  = mpegts_input_create_mux_instance;
1888   mi->mi_started_mux          = mpegts_input_started_mux;
1889   mi->mi_stopping_mux         = mpegts_input_stopping_mux;
1890   mi->mi_stopped_mux          = mpegts_input_stopped_mux;
1891   mi->mi_has_subscription     = mpegts_input_has_subscription;
1892   mi->mi_tuning_error         = mpegts_input_tuning_error;
1893   mi->ti_get_streams          = mpegts_input_get_streams;
1894   mi->ti_clear_stats          = mpegts_input_clear_stats;
1895 
1896   /* Index */
1897   mi->mi_instance             = ++mpegts_input_idx;
1898 
1899   /* Init input/output structures */
1900   pthread_mutex_init(&mi->mi_input_lock, NULL);
1901   tvh_cond_init(&mi->mi_input_cond);
1902   TAILQ_INIT(&mi->mi_input_queue);
1903 
1904   pthread_mutex_init(&mi->mi_output_lock, NULL);
1905   tvh_cond_init(&mi->mi_table_cond);
1906   TAILQ_INIT(&mi->mi_table_queue);
1907 
1908   /* Defaults */
1909   mi->mi_ota_epg = 1;
1910   mi->mi_initscan = 1;
1911   mi->mi_idlescan = 1;
1912 
1913   /* Add to global list */
1914   LIST_INSERT_HEAD(&mpegts_input_all, mi, mi_global_link);
1915 
1916   /* Load config */
1917   if (c)
1918     idnode_load(&mi->ti_id, c);
1919 
1920   /* Start threads */
1921   mtimer_arm_rel(&mi->mi_input_thread_start, mpegts_input_thread_start, mi, 0);
1922 
1923   return mi;
1924 }
1925 
1926 void
mpegts_input_stop_all(mpegts_input_t * mi)1927 mpegts_input_stop_all ( mpegts_input_t *mi )
1928 {
1929   mpegts_mux_instance_t *mmi;
1930   while ((mmi = LIST_FIRST(&mi->mi_mux_active)))
1931     mmi->mmi_mux->mm_stop(mmi->mmi_mux, 1, SM_CODE_OK);
1932 }
1933 
1934 void
mpegts_input_delete(mpegts_input_t * mi,int delconf)1935 mpegts_input_delete ( mpegts_input_t *mi, int delconf )
1936 {
1937   mpegts_network_link_t *mnl;
1938   tvh_input_instance_t *tii, *tii_next;
1939 
1940   /* Early shutdown flag */
1941   atomic_set(&mi->mi_running, 0);
1942 
1943   idnode_save_check(&mi->ti_id, delconf);
1944 
1945   /* Remove networks */
1946   while ((mnl = LIST_FIRST(&mi->mi_networks)))
1947     mpegts_input_del_network(mnl);
1948 
1949   /* Remove mux instances assigned to this input */
1950   tii = LIST_FIRST(&mi->mi_mux_instances);
1951   while (tii) {
1952     tii_next = LIST_NEXT(tii, tii_input_link);
1953     if (((mpegts_mux_instance_t *)tii)->mmi_input == mi)
1954       tii->tii_delete(tii);
1955     tii = tii_next;
1956   }
1957 
1958   /* Remove global refs */
1959   idnode_unlink(&mi->ti_id);
1960   LIST_REMOVE(mi, ti_link);
1961   LIST_REMOVE(mi, mi_global_link);
1962 
1963   /* Stop threads (will unlock global_lock to join) */
1964   mpegts_input_thread_stop(mi);
1965 
1966   pthread_mutex_destroy(&mi->mi_output_lock);
1967   tvh_cond_destroy(&mi->mi_table_cond);
1968   free(mi->mi_name);
1969   free(mi->mi_linked);
1970   free(mi);
1971 }
1972 
1973 void
mpegts_input_save(mpegts_input_t * mi,htsmsg_t * m)1974 mpegts_input_save ( mpegts_input_t *mi, htsmsg_t *m )
1975 {
1976   idnode_save(&mi->ti_id, m);
1977 }
1978 
1979 int
mpegts_input_add_network(mpegts_input_t * mi,mpegts_network_t * mn)1980 mpegts_input_add_network ( mpegts_input_t *mi, mpegts_network_t *mn )
1981 {
1982   mpegts_network_link_t *mnl;
1983 
1984   /* Find existing */
1985   LIST_FOREACH(mnl, &mi->mi_networks, mnl_mi_link)
1986     if (mnl->mnl_network == mn)
1987       break;
1988 
1989   /* Clear mark */
1990   if (mnl) {
1991     mnl->mnl_mark = 0;
1992     return 0;
1993   }
1994 
1995   /* Create new */
1996   mnl = calloc(1, sizeof(mpegts_network_link_t));
1997   mnl->mnl_input    = mi;
1998   mnl->mnl_network  = mn;
1999   LIST_INSERT_HEAD(&mi->mi_networks, mnl, mnl_mi_link);
2000   LIST_INSERT_HEAD(&mn->mn_inputs,   mnl, mnl_mn_link);
2001   idnode_notify_changed(&mnl->mnl_network->mn_id);
2002   return 1;
2003 }
2004 
2005 static void
mpegts_input_del_network(mpegts_network_link_t * mnl)2006 mpegts_input_del_network ( mpegts_network_link_t *mnl )
2007 {
2008   idnode_notify_changed(&mnl->mnl_network->mn_id);
2009   LIST_REMOVE(mnl, mnl_mn_link);
2010   LIST_REMOVE(mnl, mnl_mi_link);
2011   free(mnl);
2012 }
2013 
2014 int
mpegts_input_set_networks(mpegts_input_t * mi,htsmsg_t * msg)2015 mpegts_input_set_networks ( mpegts_input_t *mi, htsmsg_t *msg )
2016 {
2017   int save = 0;
2018   const char *str;
2019   htsmsg_field_t *f;
2020   mpegts_network_t *mn;
2021   mpegts_network_link_t *mnl, *nxt;
2022 
2023   /* Mark for deletion */
2024   LIST_FOREACH(mnl, &mi->mi_networks, mnl_mi_link)
2025     mnl->mnl_mark = 1;
2026 
2027   /* Link */
2028   HTSMSG_FOREACH(f, msg) {
2029     if (!(str = htsmsg_field_get_str(f))) continue;
2030     if (!(mn = mpegts_network_find(str))) continue;
2031     save |= mpegts_input_add_network(mi, mn);
2032   }
2033 
2034   /* Unlink */
2035   for (mnl = LIST_FIRST(&mi->mi_networks); mnl != NULL; mnl = nxt) {
2036     nxt = LIST_NEXT(mnl, mnl_mi_link);
2037     if (mnl->mnl_mark) {
2038       mpegts_input_del_network(mnl);
2039       save = 1;
2040     }
2041   }
2042 
2043   return save;
2044 }
2045 
2046 int
mpegts_input_grace(mpegts_input_t * mi,mpegts_mux_t * mm)2047 mpegts_input_grace( mpegts_input_t *mi, mpegts_mux_t *mm )
2048 {
2049   /* Get timeout */
2050   int t = 0;
2051   if (mi && mi->mi_get_grace)
2052     t = mi->mi_get_grace(mi, mm);
2053   if (t < 5) t = 5; // lower bound
2054   return t;
2055 }
2056 
2057 /******************************************************************************
2058  * Editor Configuration
2059  *
2060  * vim:sts=2:ts=2:sw=2:et
2061  *****************************************************************************/
2062