1 /*
2 * Tvheadend - MPEGTS input source
3 * Copyright (C) 2013 Adam Sutton
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 #include "input.h"
20 #include "tsdemux.h"
21 #include "packet.h"
22 #include "streaming.h"
23 #include "subscriptions.h"
24 #include "access.h"
25 #include "atomic.h"
26 #include "notify.h"
27 #include "idnode.h"
28 #include "dbus.h"
29 #include "memoryinfo.h"
30
31 #include <pthread.h>
32 #include <assert.h>
33 #include <fcntl.h>
34 #include <sys/stat.h>
35
36 memoryinfo_t mpegts_input_queue_memoryinfo = { .my_name = "MPEG-TS input queue" };
37 memoryinfo_t mpegts_input_table_memoryinfo = { .my_name = "MPEG-TS table queue" };
38
39 static void
40 mpegts_input_del_network ( mpegts_network_link_t *mnl );
41
42 /*
43 * DBUS
44 */
45
46 static void
mpegts_input_dbus_notify(mpegts_input_t * mi,int64_t subs)47 mpegts_input_dbus_notify(mpegts_input_t *mi, int64_t subs)
48 {
49 #if ENABLE_DBUS_1
50 char buf[256], ubuf[UUID_HEX_SIZE];
51 htsmsg_t *msg;
52
53 if (mi->mi_dbus_subs == subs)
54 return;
55 mi->mi_dbus_subs = subs;
56 msg = htsmsg_create_list();
57 mi->mi_display_name(mi, buf, sizeof(buf));
58 htsmsg_add_str(msg, NULL, buf);
59 htsmsg_add_s64(msg, NULL, subs);
60 snprintf(buf, sizeof(buf), "/input/mpegts/%s", idnode_uuid_as_str(&mi->ti_id, ubuf));
61 dbus_emit_signal(buf, "status", msg);
62 #endif
63 }
64
65 /* **************************************************************************
66 * Class definition
67 * *************************************************************************/
68
69 static const char *
mpegts_input_class_get_title(idnode_t * in,const char * lang)70 mpegts_input_class_get_title ( idnode_t *in, const char *lang )
71 {
72 static char buf[512];
73 mpegts_input_t *mi = (mpegts_input_t*)in;
74 mi->mi_display_name(mi, buf, sizeof(buf));
75 return buf;
76 }
77
78 const void *
mpegts_input_class_active_get(void * obj)79 mpegts_input_class_active_get ( void *obj )
80 {
81 static int active;
82 mpegts_input_t *mi = obj;
83 active = mi->mi_is_enabled((mpegts_input_t*)mi, NULL, 0, -1) != MI_IS_ENABLED_NEVER;
84 return &active;
85 }
86
87 const void *
mpegts_input_class_network_get(void * obj)88 mpegts_input_class_network_get ( void *obj )
89 {
90 mpegts_network_link_t *mnl;
91 mpegts_input_t *mi = obj;
92 htsmsg_t *l = htsmsg_create_list();
93 char ubuf[UUID_HEX_SIZE];
94
95 LIST_FOREACH(mnl, &mi->mi_networks, mnl_mi_link)
96 htsmsg_add_str(l, NULL, idnode_uuid_as_str(&mnl->mnl_network->mn_id, ubuf));
97
98 return l;
99 }
100
101 int
mpegts_input_class_network_set(void * obj,const void * p)102 mpegts_input_class_network_set ( void *obj, const void *p )
103 {
104 return mpegts_input_set_networks(obj, (htsmsg_t*)p);
105 }
106
107 htsmsg_t *
mpegts_input_class_network_enum(void * obj,const char * lang)108 mpegts_input_class_network_enum ( void *obj, const char *lang )
109 {
110 htsmsg_t *p, *m;
111 char ubuf[UUID_HEX_SIZE];
112
113 if (!obj)
114 return NULL;
115
116 p = htsmsg_create_map();
117 htsmsg_add_str (p, "uuid", idnode_uuid_as_str((idnode_t*)obj, ubuf));
118 htsmsg_add_bool(p, "enum", 1);
119
120 m = htsmsg_create_map();
121 htsmsg_add_str (m, "type", "api");
122 htsmsg_add_str (m, "uri", "mpegts/input/network_list");
123 htsmsg_add_str (m, "event", "mpegts_network");
124 htsmsg_add_msg (m, "params", p);
125 return m;
126 }
127
128 char *
mpegts_input_class_network_rend(void * obj,const char * lang)129 mpegts_input_class_network_rend ( void *obj, const char *lang )
130 {
131 char *str;
132 mpegts_network_link_t *mnl;
133 mpegts_input_t *mi = obj;
134 htsmsg_t *l = htsmsg_create_list();
135
136 LIST_FOREACH(mnl, &mi->mi_networks, mnl_mi_link)
137 htsmsg_add_str(l, NULL, idnode_get_title(&mnl->mnl_network->mn_id, lang));
138
139 str = htsmsg_list_2_csv(l, ',', 1);
140 htsmsg_destroy(l);
141
142 return str;
143 }
144
145 static void
mpegts_input_enabled_notify(void * p,const char * lang)146 mpegts_input_enabled_notify ( void *p, const char *lang )
147 {
148 mpegts_input_t *mi = p;
149 mpegts_mux_instance_t *mmi;
150
151 /* Stop */
152 LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link)
153 mmi->mmi_mux->mm_stop(mmi->mmi_mux, 1, SM_CODE_ABORTED);
154
155 /* Alert */
156 if (mi->mi_enabled_updated)
157 mi->mi_enabled_updated(mi);
158 }
159
160 static int
mpegts_input_class_linked_set(void * self,const void * val)161 mpegts_input_class_linked_set ( void *self, const void *val )
162 {
163 mpegts_input_t *mi = self, *mi2;
164 char ubuf[UUID_HEX_SIZE];
165
166 if (strcmp(val ?: "", mi->mi_linked ?: "")) {
167 mi2 = mpegts_input_find(mi->mi_linked);
168 free(mi->mi_linked);
169 mi->mi_linked = NULL;
170 if (mi2) {
171 free(mi2->mi_linked);
172 mi2->mi_linked = NULL;
173 mpegts_mux_unsubscribe_linked(mi2, NULL);
174 }
175 mpegts_mux_unsubscribe_linked(mi, NULL);
176 if (val && ((char *)val)[0]) {
177 mi->mi_linked = strdup((char *)val);
178 mi2 = mpegts_input_find((char *)val);
179 if (mi2) {
180 free(mi2->mi_linked);
181 mi2->mi_linked = strdup(idnode_uuid_as_str(&mi->ti_id, ubuf));
182 }
183 }
184 if (mi2)
185 idnode_changed(&mi2->ti_id);
186 return 1;
187 }
188 return 0;
189 }
190
191 static const void *
mpegts_input_class_linked_get(void * self)192 mpegts_input_class_linked_get ( void *self )
193 {
194 mpegts_input_t *mi = self;
195 prop_sbuf[0] = '\0';
196 if (mi->mi_linked) {
197 mi = mpegts_input_find(mi->mi_linked);
198 if (mi)
199 idnode_uuid_as_str(&mi->ti_id, prop_sbuf);
200 }
201 return &prop_sbuf_ptr;
202 }
203
204 static htsmsg_t *
mpegts_input_class_linked_enum(void * self,const char * lang)205 mpegts_input_class_linked_enum( void * self, const char *lang )
206 {
207 mpegts_input_t *mi = self, *mi2;
208 tvh_input_t *ti;
209 char ubuf[UUID_HEX_SIZE];
210 htsmsg_t *m = htsmsg_create_list();
211 htsmsg_t *e = htsmsg_create_key_val("", tvh_gettext_lang(lang, N_("Not linked")));
212 htsmsg_add_msg(m, NULL, e);
213 TVH_INPUT_FOREACH(ti)
214 if (idnode_is_instance(&ti->ti_id, &mpegts_input_class)) {
215 mi2 = (mpegts_input_t *)ti;
216 if (mi2 != mi) {
217 e = htsmsg_create_key_val(idnode_uuid_as_str(&ti->ti_id, ubuf),
218 idnode_get_title(&mi2->ti_id, lang));
219 htsmsg_add_msg(m, NULL, e);
220 }
221 }
222 return m;
223 }
224
225 PROP_DOC(priority)
226 PROP_DOC(streaming_priority)
227
228 const idclass_t mpegts_input_class =
229 {
230 .ic_super = &tvh_input_class,
231 .ic_class = "mpegts_input",
232 .ic_caption = N_("MPEG-TS input"),
233 .ic_event = "mpegts_input",
234 .ic_perm_def = ACCESS_ADMIN,
235 .ic_get_title = mpegts_input_class_get_title,
236 .ic_properties = (const property_t[]){
237 {
238 .type = PT_BOOL,
239 .id = "active",
240 .name = N_("Active"),
241 .opts = PO_RDONLY | PO_NOSAVE | PO_NOUI,
242 .get = mpegts_input_class_active_get,
243 },
244 {
245 .type = PT_BOOL,
246 .id = "enabled",
247 .name = N_("Enabled"),
248 .desc = N_("Enable/disable tuner/adapter."),
249 .off = offsetof(mpegts_input_t, mi_enabled),
250 .notify = mpegts_input_enabled_notify,
251 .def.i = 1,
252 },
253 {
254 .type = PT_INT,
255 .id = "priority",
256 .name = N_("Priority"),
257 .desc = N_("The tuner priority value (a higher value means to "
258 "use this tuner out of preference). See Help for details."),
259 .doc = prop_doc_priority,
260 .off = offsetof(mpegts_input_t, mi_priority),
261 .def.i = 1,
262 .opts = PO_ADVANCED
263 },
264 {
265 .type = PT_INT,
266 .id = "spriority",
267 .name = N_("Streaming priority"),
268 .desc = N_("The tuner priority value for streamed channels "
269 "through HTTP or HTSP (a higher value means to use "
270 "this tuner out of preference). If not set (zero), "
271 "the standard priority value is used. See Help for details."),
272 .doc = prop_doc_streaming_priority,
273 .off = offsetof(mpegts_input_t, mi_streaming_priority),
274 .def.i = 1,
275 .opts = PO_ADVANCED
276 },
277 {
278 .type = PT_STR,
279 .id = "displayname",
280 .name = N_("Name"),
281 .desc = N_("Name of the tuner/adapter."),
282 .off = offsetof(mpegts_input_t, mi_name),
283 .notify = idnode_notify_title_changed,
284 },
285 {
286 .type = PT_BOOL,
287 .id = "ota_epg",
288 .name = N_("Over-the-air EPG"),
289 .desc = N_("Enable over-the-air program guide (EPG) scanning "
290 "on this input device."),
291 .off = offsetof(mpegts_input_t, mi_ota_epg),
292 .def.i = 1,
293 },
294 {
295 .type = PT_BOOL,
296 .id = "initscan",
297 .name = N_("Initial scan"),
298 .desc = N_("Allow the initial scan tuning on this device "
299 "(scan when Tvheadend starts or when a new multiplex "
300 "is added automatically). At least one tuner or input "
301 "should have this settings turned on. "
302 "See also 'Skip Startup Scan' in the network settings "
303 "for further details."),
304 .off = offsetof(mpegts_input_t, mi_initscan),
305 .def.i = 1,
306 .opts = PO_ADVANCED,
307 },
308 {
309 .type = PT_BOOL,
310 .id = "idlescan",
311 .name = N_("Idle scan"),
312 .desc = N_("Allow idle scan tuning on this device."),
313 .off = offsetof(mpegts_input_t, mi_idlescan),
314 .def.i = 1,
315 .opts = PO_ADVANCED,
316 },
317 {
318 .type = PT_U32,
319 .id = "free_weight",
320 .name = N_("Free subscription weight"),
321 .desc = N_("If the subscription weight for the input is below "
322 "the specified threshold, the tuner is handled as free "
323 "(according the priority settings). Otherwise, the next "
324 "tuner (without any subscriptions) is used. Set this value "
325 "to 10, if you are willing to override scan and epggrab "
326 "subscriptions."),
327 .off = offsetof(mpegts_input_t, mi_free_weight),
328 .def.i = 1,
329 .opts = PO_ADVANCED,
330 },
331 {
332 .type = PT_BOOL,
333 .id = "remove_scrambled",
334 .name = N_("Remove scrambled bits"),
335 .desc = N_("The scrambled bits in MPEG-TS packets are always cleared. "
336 "It is a workaround for the special streams which are "
337 "descrambled, but these bits are not touched."),
338 .off = offsetof(mpegts_input_t, mi_remove_scrambled_bits),
339 .def.i = 1,
340 .opts = PO_EXPERT,
341 },
342 {
343 .type = PT_STR,
344 .id = "networks",
345 .name = N_("Networks"),
346 .desc = N_("Associate this device with one or more networks."),
347 .islist = 1,
348 .set = mpegts_input_class_network_set,
349 .get = mpegts_input_class_network_get,
350 .list = mpegts_input_class_network_enum,
351 .rend = mpegts_input_class_network_rend,
352 },
353 {
354 .type = PT_STR,
355 .id = "linked",
356 .name = N_("Linked input"),
357 .desc = N_("Wake up the linked input whenever this adapter "
358 "is used. The subscriptions are named as \"keep\". "
359 "Note that this isn't normally needed, and is here "
360 "simply as a workaround to driver bugs in certain "
361 "dual tuner cards that otherwise lock the second tuner."),
362 .set = mpegts_input_class_linked_set,
363 .get = mpegts_input_class_linked_get,
364 .list = mpegts_input_class_linked_enum,
365 .opts = PO_ADVANCED,
366 },
367 {}
368 }
369 };
370
371 /* **************************************************************************
372 * Class methods
373 * *************************************************************************/
374
375 int
mpegts_input_is_enabled(mpegts_input_t * mi,mpegts_mux_t * mm,int flags,int weight)376 mpegts_input_is_enabled
377 ( mpegts_input_t *mi, mpegts_mux_t *mm, int flags, int weight )
378 {
379 if ((flags & SUBSCRIPTION_EPG) != 0 && !mi->mi_ota_epg)
380 return MI_IS_ENABLED_NEVER;
381 if ((flags & SUBSCRIPTION_USERSCAN) == 0) {
382 if ((flags & SUBSCRIPTION_INITSCAN) != 0 && !mi->mi_initscan)
383 return MI_IS_ENABLED_NEVER;
384 if ((flags & SUBSCRIPTION_IDLESCAN) != 0 && !mi->mi_idlescan)
385 return MI_IS_ENABLED_NEVER;
386 }
387 return mi->mi_enabled ? MI_IS_ENABLED_OK : MI_IS_ENABLED_NEVER;
388 }
389
390 void
mpegts_input_set_enabled(mpegts_input_t * mi,int enabled)391 mpegts_input_set_enabled ( mpegts_input_t *mi, int enabled )
392 {
393 enabled = !!enabled;
394 if (mi->mi_enabled != enabled) {
395 htsmsg_t *conf = htsmsg_create_map();
396 htsmsg_add_bool(conf, "enabled", enabled);
397 idnode_update(&mi->ti_id, conf);
398 htsmsg_destroy(conf);
399 }
400 }
401
402 static void
mpegts_input_display_name(mpegts_input_t * mi,char * buf,size_t len)403 mpegts_input_display_name ( mpegts_input_t *mi, char *buf, size_t len )
404 {
405 if (mi->mi_name) {
406 strlcpy(buf, mi->mi_name, len);
407 } else {
408 *buf = 0;
409 }
410 }
411
412 int
mpegts_input_get_weight(mpegts_input_t * mi,mpegts_mux_t * mm,int flags,int weight)413 mpegts_input_get_weight ( mpegts_input_t *mi, mpegts_mux_t *mm, int flags, int weight )
414 {
415 const mpegts_mux_instance_t *mmi;
416 const service_t *s;
417 const th_subscription_t *ths;
418 int w = 0, count = 0;
419
420 /* Service subs */
421 pthread_mutex_lock(&mi->mi_output_lock);
422 LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link)
423 LIST_FOREACH(s, &mmi->mmi_mux->mm_transports, s_active_link)
424 LIST_FOREACH(ths, &s->s_subscriptions, ths_service_link) {
425 w = MAX(w, ths->ths_weight);
426 count++;
427 }
428 pthread_mutex_unlock(&mi->mi_output_lock);
429 return w > 0 ? w + count - 1 : 0;
430 }
431
432 int
mpegts_input_get_priority(mpegts_input_t * mi,mpegts_mux_t * mm,int flags)433 mpegts_input_get_priority ( mpegts_input_t *mi, mpegts_mux_t *mm, int flags )
434 {
435 if (flags & SUBSCRIPTION_STREAMING) {
436 if (mi->mi_streaming_priority > 0)
437 return mi->mi_streaming_priority;
438 }
439 return mi->mi_priority;
440 }
441
442 int
mpegts_input_warm_mux(mpegts_input_t * mi,mpegts_mux_instance_t * mmi)443 mpegts_input_warm_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
444 {
445 mpegts_mux_instance_t *cur;
446
447 cur = LIST_FIRST(&mi->mi_mux_active);
448 if (cur != NULL) {
449 /* Already tuned */
450 if (mmi == cur)
451 return 0;
452
453 /* Stop current */
454 cur->mmi_mux->mm_stop(cur->mmi_mux, 1, SM_CODE_SUBSCRIPTION_OVERRIDDEN);
455 }
456 if (LIST_FIRST(&mi->mi_mux_active))
457 return SM_CODE_TUNING_FAILED;
458 return 0;
459 }
460
461 static int
mpegts_input_start_mux(mpegts_input_t * mi,mpegts_mux_instance_t * mmi,int weight)462 mpegts_input_start_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi, int weight )
463 {
464 return SM_CODE_TUNING_FAILED;
465 }
466
467 static void
mpegts_input_stop_mux(mpegts_input_t * mi,mpegts_mux_instance_t * mmi)468 mpegts_input_stop_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
469 {
470 }
471
472 int
mpegts_mps_cmp(mpegts_pid_sub_t * a,mpegts_pid_sub_t * b)473 mpegts_mps_cmp ( mpegts_pid_sub_t *a, mpegts_pid_sub_t *b )
474 {
475 if (a->mps_type != b->mps_type) {
476 if (a->mps_type & MPS_SERVICE)
477 return 1;
478 else
479 return -1;
480 }
481 if (a->mps_owner < b->mps_owner) return -1;
482 if (a->mps_owner > b->mps_owner) return 1;
483 return 0;
484 }
485
486 void
mpegts_input_close_pids(mpegts_input_t * mi,mpegts_mux_t * mm,void * owner,int all)487 mpegts_input_close_pids
488 ( mpegts_input_t *mi, mpegts_mux_t *mm, void *owner, int all )
489 {
490 mpegts_pid_t *mp, *mp_next;
491 mpegts_pid_sub_t *mps, *mps_next;
492 int pid;
493
494 if (all)
495 for (mps = LIST_FIRST(&mm->mm_all_subs); mps; mps = mps_next) {
496 mps_next = LIST_NEXT(mps, mps_svcraw_link);
497 if (mps->mps_owner != owner) continue;
498 pid = MPEGTS_FULLMUX_PID;
499 if (mps->mps_type & MPS_TABLES) pid = MPEGTS_TABLES_PID;
500 mpegts_input_close_pid(mi, mm, pid, mps->mps_type, mps->mps_weight, mps->mps_owner);
501 }
502 for (mp = RB_FIRST(&mm->mm_pids); mp; mp = mp_next) {
503 mp_next = RB_NEXT(mp, mp_link);
504 for (mps = RB_FIRST(&mp->mp_subs); mps; mps = mps_next) {
505 mps_next = RB_NEXT(mps, mps_link);
506 if (mps->mps_owner != owner) continue;
507 mpegts_input_close_pid(mi, mm, mp->mp_pid, mps->mps_type, mps->mps_weight, mps->mps_owner);
508 }
509 }
510 }
511
512 mpegts_pid_t *
mpegts_input_open_pid(mpegts_input_t * mi,mpegts_mux_t * mm,int pid,int type,int weight,void * owner,int reopen)513 mpegts_input_open_pid
514 ( mpegts_input_t *mi, mpegts_mux_t *mm, int pid, int type, int weight,
515 void *owner, int reopen )
516 {
517 char buf[512];
518 mpegts_pid_t *mp;
519 mpegts_pid_sub_t *mps, *mps2;
520
521 assert(owner != NULL);
522 assert((type & (MPS_STREAM|MPS_SERVICE|MPS_RAW)) == 0 ||
523 (((type & MPS_STREAM) ? 1 : 0) +
524 ((type & MPS_SERVICE) ? 1 : 0) +
525 ((type & MPS_RAW) ? 1 : 0)) == 1);
526 lock_assert(&mi->mi_output_lock);
527
528 if (pid == MPEGTS_FULLMUX_PID)
529 mpegts_input_close_pids(mi, mm, owner, 1);
530
531 if ((mp = mpegts_mux_find_pid(mm, pid, 1))) {
532 mps = calloc(1, sizeof(*mps));
533 mps->mps_type = type;
534 mps->mps_weight = weight;
535 mps->mps_owner = owner;
536 if (pid == MPEGTS_FULLMUX_PID || pid == MPEGTS_TABLES_PID) {
537 mp->mp_type |= type;
538 LIST_FOREACH(mps2, &mm->mm_all_subs, mps_svcraw_link)
539 if (mps2->mps_owner == owner) break;
540 if (mps2 == NULL) {
541 LIST_INSERT_HEAD(&mm->mm_all_subs, mps, mps_svcraw_link);
542 mpegts_mux_nice_name(mm, buf, sizeof(buf));
543 tvhdebug(LS_MPEGTS, "%s - open PID %s subscription [%04x/%p]",
544 buf, (type & MPS_TABLES) ? "tables" : "fullmux", type, owner);
545 mm->mm_update_pids_flag = 1;
546 } else {
547 if (!reopen) {
548 mpegts_mux_nice_name(mm, buf, sizeof(buf));
549 tvherror(LS_MPEGTS,
550 "%s - open PID %04x (%d) failed, dupe sub (owner %p)",
551 buf, mp->mp_pid, mp->mp_pid, owner);
552 }
553 free(mps);
554 mp = NULL;
555 }
556 } else if (!RB_INSERT_SORTED(&mp->mp_subs, mps, mps_link, mpegts_mps_cmp)) {
557 mp->mp_type |= type;
558 if (type & MPS_RAW)
559 LIST_INSERT_HEAD(&mp->mp_raw_subs, mps, mps_raw_link);
560 if (type & MPS_SERVICE)
561 LIST_INSERT_HEAD(&mp->mp_svc_subs, mps, mps_svcraw_link);
562 mpegts_mux_nice_name(mm, buf, sizeof(buf));
563 tvhdebug(LS_MPEGTS, "%s - open PID %04X (%d) [%d/%p]",
564 buf, mp->mp_pid, mp->mp_pid, type, owner);
565 mm->mm_update_pids_flag = 1;
566 } else {
567 if (!reopen) {
568 mpegts_mux_nice_name(mm, buf, sizeof(buf));
569 tvherror(LS_MPEGTS, "%s - open PID %04x (%d) failed, dupe sub (owner %p)",
570 buf, mp->mp_pid, mp->mp_pid, owner);
571 }
572 free(mps);
573 mp = NULL;
574 }
575 }
576 return mp;
577 }
578
579 int
mpegts_input_close_pid(mpegts_input_t * mi,mpegts_mux_t * mm,int pid,int type,int weight,void * owner)580 mpegts_input_close_pid
581 ( mpegts_input_t *mi, mpegts_mux_t *mm, int pid, int type, int weight, void *owner )
582 {
583 char buf[512];
584 mpegts_pid_sub_t *mps, skel;
585 mpegts_pid_t *mp;
586 int mask;
587 assert(owner != NULL);
588 lock_assert(&mi->mi_output_lock);
589 if (!(mp = mpegts_mux_find_pid(mm, pid, 0)))
590 return -1;
591 if (pid == MPEGTS_FULLMUX_PID || pid == MPEGTS_TABLES_PID) {
592 mpegts_mux_nice_name(mm, buf, sizeof(buf));
593 LIST_FOREACH(mps, &mm->mm_all_subs, mps_svcraw_link)
594 if (mps->mps_owner == owner) break;
595 if (mps == NULL) return -1;
596 tvhdebug(LS_MPEGTS, "%s - close PID %s subscription [%04x/%p]",
597 buf, pid == MPEGTS_TABLES_PID ? "tables" : "fullmux",
598 type, owner);
599 if (pid == MPEGTS_FULLMUX_PID)
600 mpegts_input_close_pids(mi, mm, owner, 0);
601 LIST_REMOVE(mps, mps_svcraw_link);
602 free(mps);
603 mm->mm_update_pids_flag = 1;
604 mask = pid == MPEGTS_FULLMUX_PID ? MPS_ALL : MPS_TABLES;
605 LIST_FOREACH(mps, &mm->mm_all_subs, mps_svcraw_link)
606 if (mps->mps_type & mask) break;
607 if (mps) return 0;
608 } else {
609 skel.mps_type = type;
610 skel.mps_weight = weight;
611 skel.mps_owner = owner;
612 mps = RB_FIND(&mp->mp_subs, &skel, mps_link, mpegts_mps_cmp);
613 if (pid == mm->mm_last_pid) {
614 mm->mm_last_pid = -1;
615 mm->mm_last_mp = NULL;
616 }
617 if (mps) {
618 mpegts_mux_nice_name(mm, buf, sizeof(buf));
619 tvhdebug(LS_MPEGTS, "%s - close PID %04X (%d) [%d/%p]",
620 buf, mp->mp_pid, mp->mp_pid, type, owner);
621 if (type & MPS_RAW)
622 LIST_REMOVE(mps, mps_raw_link);
623 if (type & MPS_SERVICE)
624 LIST_REMOVE(mps, mps_svcraw_link);
625 RB_REMOVE(&mp->mp_subs, mps, mps_link);
626 free(mps);
627 mm->mm_update_pids_flag = 1;
628 }
629 }
630 if (!RB_FIRST(&mp->mp_subs)) {
631 if (mm->mm_last_pid == mp->mp_pid) {
632 mm->mm_last_pid = -1;
633 mm->mm_last_mp = NULL;
634 }
635 RB_REMOVE(&mm->mm_pids, mp, mp_link);
636 free(mp);
637 return 1;
638 } else {
639 type = 0;
640 RB_FOREACH(mps, &mp->mp_subs, mps_link)
641 type |= mps->mps_type;
642 mp->mp_type = type;
643 }
644 return 0;
645 }
646
647 static void
mpegts_input_update_pids(mpegts_input_t * mi,mpegts_mux_t * mm)648 mpegts_input_update_pids
649 ( mpegts_input_t *mi, mpegts_mux_t *mm )
650 {
651 /* nothing - override */
652 }
653
mpegts_mps_weight(elementary_stream_t * st)654 int mpegts_mps_weight(elementary_stream_t *st)
655 {
656 if (SCT_ISVIDEO(st->es_type))
657 return MPS_WEIGHT_VIDEO + MIN(st->es_index, 49);
658 else if (SCT_ISAUDIO(st->es_type))
659 return MPS_WEIGHT_AUDIO + MIN(st->es_index, 49);
660 else if (SCT_ISSUBTITLE(st->es_type))
661 return MPS_WEIGHT_SUBTITLE + MIN(st->es_index, 49);
662 else
663 return MPS_WEIGHT_ESOTHER + MIN(st->es_index, 49);
664 }
665
666 static int
mpegts_input_cat_pass_callback(mpegts_table_t * mt,const uint8_t * ptr,int len,int tableid)667 mpegts_input_cat_pass_callback
668 (mpegts_table_t *mt, const uint8_t *ptr, int len, int tableid)
669 {
670 int r, sect, last, ver;
671 uint8_t dtag, dlen;
672 uint16_t pid;
673 uintptr_t caid;
674 mpegts_mux_t *mm = mt->mt_mux;
675 mpegts_psi_table_state_t *st = NULL;
676 service_t *s = mt->mt_opaque;
677 elementary_stream_t *es;
678 mpegts_input_t *mi;
679
680 /* Start */
681 r = dvb_table_begin((mpegts_psi_table_t *)mt, ptr, len,
682 tableid, 0, 5, &st, §, &last, &ver);
683 if (r != 1) return r;
684 ptr += 5;
685 len -= 5;
686
687 /* Send CAT data for descramblers */
688 descrambler_cat_data(mm, ptr, len);
689
690 while(len > 2) {
691 dtag = *ptr++;
692 dlen = *ptr++;
693 len -= 2;
694
695 switch(dtag) {
696 case DVB_DESC_CA:
697 if (len >= 4 && dlen >= 4 && mm->mm_active) {
698 caid = ( ptr[0] << 8) | ptr[1];
699 pid = ((ptr[2] & 0x1f) << 8) | ptr[3];
700 tvhdebug(LS_TBL_BASE, "cat: pass: caid %04X (%d) pid %04X (%d)",
701 (uint16_t)caid, (uint16_t)caid, pid, pid);
702 pthread_mutex_lock(&s->s_stream_mutex);
703 es = NULL;
704 if (service_stream_find((service_t *)s, pid) == NULL) {
705 es = service_stream_create(s, pid, SCT_CA);
706 es->es_pid_opened = 1;
707 }
708 pthread_mutex_unlock(&s->s_stream_mutex);
709 if (es && mm->mm_active && (mi = mm->mm_active->mmi_input) != NULL) {
710 pthread_mutex_lock(&mi->mi_output_lock);
711 if ((mi = mm->mm_active->mmi_input) != NULL)
712 mpegts_input_open_pid(mi, mm, pid,
713 MPS_SERVICE, MPS_WEIGHT_CAT, s, 0);
714 pthread_mutex_unlock(&mi->mi_output_lock);
715 }
716 }
717 break;
718 default:
719 break;
720 }
721
722 ptr += dlen;
723 len -= dlen;
724 }
725
726 /* Finish */
727 return dvb_table_end((mpegts_psi_table_t *)mt, st, sect);
728 }
729
730 void
mpegts_input_open_service(mpegts_input_t * mi,mpegts_service_t * s,int flags,int init,int weight)731 mpegts_input_open_service
732 ( mpegts_input_t *mi, mpegts_service_t *s, int flags, int init, int weight )
733 {
734 mpegts_mux_t *mm = s->s_dvb_mux;
735 elementary_stream_t *st;
736 mpegts_apids_t *pids;
737 mpegts_apid_t *p;
738 int i, reopen = !init;
739
740 /* Add to list */
741 pthread_mutex_lock(&mi->mi_output_lock);
742 if (!s->s_dvb_active_input) {
743 LIST_INSERT_HEAD(&mm->mm_transports, ((service_t*)s), s_active_link);
744 s->s_dvb_active_input = mi;
745 }
746 /* Register PIDs */
747 pthread_mutex_lock(&s->s_stream_mutex);
748 if (s->s_type == STYPE_STD) {
749
750 if (s->s_pmt_pid == SERVICE_PMT_AUTO)
751 goto no_pids;
752
753 mpegts_input_open_pid(mi, mm, s->s_pmt_pid, MPS_SERVICE, MPS_WEIGHT_PMT, s, reopen);
754 mpegts_input_open_pid(mi, mm, s->s_pcr_pid, MPS_SERVICE, MPS_WEIGHT_PCR, s, reopen);
755 if (s->s_scrambled_pass)
756 mpegts_input_open_pid(mi, mm, DVB_CAT_PID, MPS_SERVICE, MPS_WEIGHT_CAT, s, reopen);
757 /* Open only filtered components here */
758 TAILQ_FOREACH(st, &s->s_filt_components, es_filt_link)
759 if ((s->s_scrambled_pass || st->es_type != SCT_CA) &&
760 st->es_pid != s->s_pmt_pid && st->es_pid != s->s_pcr_pid) {
761 st->es_pid_opened = 1;
762 mpegts_input_open_pid(mi, mm, st->es_pid, MPS_SERVICE, mpegts_mps_weight(st), s, reopen);
763 }
764
765 mpegts_service_update_slave_pids(s, 0);
766
767 } else {
768 if ((pids = s->s_pids) != NULL) {
769 if (pids->all) {
770 mpegts_input_open_pid(mi, mm, MPEGTS_FULLMUX_PID, MPS_RAW | MPS_ALL, MPS_WEIGHT_RAW, s, reopen);
771 } else {
772 for (i = 0; i < pids->count; i++) {
773 p = &pids->pids[i];
774 mpegts_input_open_pid(mi, mm, p->pid, MPS_RAW, p->weight, s, reopen);
775 }
776 }
777 } else if (flags & SUBSCRIPTION_TABLES) {
778 mpegts_input_open_pid(mi, mm, MPEGTS_TABLES_PID, MPS_RAW | MPS_TABLES, MPS_WEIGHT_PAT, s, reopen);
779 } else if (flags & SUBSCRIPTION_MINIMAL) {
780 mpegts_input_open_pid(mi, mm, DVB_PAT_PID, MPS_RAW, MPS_WEIGHT_PAT, s, reopen);
781 }
782 }
783
784 no_pids:
785 pthread_mutex_unlock(&s->s_stream_mutex);
786 pthread_mutex_unlock(&mi->mi_output_lock);
787
788 /* Add PMT monitor */
789 if(s->s_type == STYPE_STD) {
790 s->s_pmt_mon =
791 mpegts_table_add(mm, DVB_PMT_BASE, DVB_PMT_MASK,
792 dvb_pmt_callback, s, "pmt", LS_TBL_BASE,
793 MT_CRC, s->s_pmt_pid, MPS_WEIGHT_PMT);
794 if (s->s_scrambled_pass && (flags & SUBSCRIPTION_EMM) != 0) {
795 s->s_cat_mon =
796 mpegts_table_add(mm, DVB_CAT_BASE, DVB_CAT_MASK,
797 mpegts_input_cat_pass_callback, s, "cat",
798 LS_TBL_BASE, MT_QUICKREQ | MT_CRC, DVB_CAT_PID,
799 MPS_WEIGHT_CAT);
800 }
801 }
802
803 mpegts_mux_update_pids(mm);
804 }
805
806 void
mpegts_input_close_service(mpegts_input_t * mi,mpegts_service_t * s)807 mpegts_input_close_service ( mpegts_input_t *mi, mpegts_service_t *s )
808 {
809 mpegts_mux_t *mm = s->s_dvb_mux;
810 elementary_stream_t *st;
811
812 /* Close PMT/CAT tables */
813 if (s->s_type == STYPE_STD) {
814 if (s->s_pmt_mon)
815 mpegts_table_destroy(s->s_pmt_mon);
816 if (s->s_cat_mon)
817 mpegts_table_destroy(s->s_cat_mon);
818 }
819 s->s_pmt_mon = NULL;
820
821 /* Remove from list */
822 pthread_mutex_lock(&mi->mi_output_lock);
823 if (s->s_dvb_active_input != NULL) {
824 LIST_REMOVE(((service_t*)s), s_active_link);
825 s->s_dvb_active_input = NULL;
826 }
827 /* Close PID */
828 pthread_mutex_lock(&s->s_stream_mutex);
829 if (s->s_type == STYPE_STD) {
830
831 if (s->s_pmt_pid == SERVICE_PMT_AUTO)
832 goto no_pids;
833
834 mpegts_input_close_pid(mi, mm, s->s_pmt_pid, MPS_SERVICE, MPS_WEIGHT_PMT, s);
835 mpegts_input_close_pid(mi, mm, s->s_pcr_pid, MPS_SERVICE, MPS_WEIGHT_PCR, s);
836 if (s->s_scrambled_pass)
837 mpegts_input_close_pid(mi, mm, DVB_CAT_PID, MPS_SERVICE, MPS_WEIGHT_CAT, s);
838 /* Close all opened PIDs (the component filter may be changed at runtime) */
839 TAILQ_FOREACH(st, &s->s_components, es_link) {
840 if (st->es_pid_opened) {
841 st->es_pid_opened = 0;
842 mpegts_input_close_pid(mi, mm, st->es_pid, MPS_SERVICE, mpegts_mps_weight(st), s);
843 }
844 }
845
846 mpegts_service_update_slave_pids(s, 1);
847
848 } else {
849 mpegts_input_close_pids(mi, mm, s, 1);
850 }
851
852 no_pids:
853 pthread_mutex_unlock(&s->s_stream_mutex);
854 pthread_mutex_unlock(&mi->mi_output_lock);
855
856 mpegts_mux_update_pids(mm);
857
858 /* Stop mux? */
859 s->s_dvb_mux->mm_stop(s->s_dvb_mux, 0, SM_CODE_OK);
860 }
861
862 static void
mpegts_input_create_mux_instance(mpegts_input_t * mi,mpegts_mux_t * mm)863 mpegts_input_create_mux_instance
864 ( mpegts_input_t *mi, mpegts_mux_t *mm )
865 {
866 extern const idclass_t mpegts_mux_instance_class;
867 tvh_input_instance_t *tii;
868 LIST_FOREACH(tii, &mi->mi_mux_instances, tii_input_link)
869 if (((mpegts_mux_instance_t *)tii)->mmi_mux == mm) break;
870 if (!tii)
871 (void)mpegts_mux_instance_create(mpegts_mux_instance, NULL, mi, mm);
872 }
873
874 static void
mpegts_input_started_mux(mpegts_input_t * mi,mpegts_mux_instance_t * mmi)875 mpegts_input_started_mux
876 ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
877 {
878 #if ENABLE_TSDEBUG
879 extern char *tvheadend_tsdebug;
880 static const char *tmpdir = "/tmp/tvheadend.tsdebug/";
881 char buf[128];
882 char path[PATH_MAX];
883 struct stat st;
884 if (!tvheadend_tsdebug && !stat(tmpdir, &st) && (st.st_mode & S_IFDIR) != 0)
885 tvheadend_tsdebug = (char *)tmpdir;
886 if (tvheadend_tsdebug && !strcmp(tvheadend_tsdebug, tmpdir) && stat(tmpdir, &st))
887 tvheadend_tsdebug = NULL;
888 if (tvheadend_tsdebug) {
889 mpegts_mux_nice_name(mmi->mmi_mux, buf, sizeof(buf));
890 snprintf(path, sizeof(path), "%s/%s-%li-%p-mux.ts", tvheadend_tsdebug,
891 buf, (long)mono2sec(mclk()), mi);
892 mmi->mmi_mux->mm_tsdebug_fd = tvh_open(path, O_WRONLY | O_CREAT, S_IRUSR | S_IWUSR);
893 if (mmi->mmi_mux->mm_tsdebug_fd < 0)
894 tvherror(LS_TSDEBUG, "unable to create file '%s' (%i)", path, errno);
895 snprintf(path, sizeof(path), "%s/%s-%li-%p-input.ts", tvheadend_tsdebug,
896 buf, (long)mono2sec(mclk()), mi);
897 mmi->mmi_mux->mm_tsdebug_fd2 = tvh_open(path, O_WRONLY | O_CREAT, S_IRUSR | S_IWUSR);
898 if (mmi->mmi_mux->mm_tsdebug_fd2 < 0)
899 tvherror(LS_TSDEBUG, "unable to create file '%s' (%i)", path, errno);
900 } else {
901 mmi->mmi_mux->mm_tsdebug_fd = -1;
902 mmi->mmi_mux->mm_tsdebug_fd2 = -1;
903 }
904 #endif
905
906 /* Deliver first TS packets as fast as possible */
907 mi->mi_last_dispatch = 0;
908
909 /* Arm timer */
910 if (LIST_FIRST(&mi->mi_mux_active) == NULL)
911 mtimer_arm_rel(&mi->mi_status_timer, mpegts_input_status_timer, mi, sec2mono(1));
912
913 /* Update */
914 mmi->mmi_mux->mm_active = mmi;
915
916 /* Accept packets */
917 LIST_INSERT_HEAD(&mi->mi_mux_active, mmi, mmi_active_link);
918 notify_reload("input_status");
919 mpegts_input_dbus_notify(mi, 1);
920 }
921
922 static void
mpegts_input_stopping_mux(mpegts_input_t * mi,mpegts_mux_instance_t * mmi)923 mpegts_input_stopping_mux
924 ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
925 {
926 assert(mmi->mmi_mux->mm_active);
927
928 pthread_mutex_lock(&mi->mi_input_lock);
929 mmi->mmi_mux->mm_active = NULL;
930 pthread_mutex_unlock(&mi->mi_input_lock);
931 pthread_mutex_lock(&mi->mi_output_lock);
932 mmi->mmi_mux->mm_active = NULL;
933 pthread_mutex_unlock(&mi->mi_output_lock);
934 }
935
936 static void
mpegts_input_stopped_mux(mpegts_input_t * mi,mpegts_mux_instance_t * mmi)937 mpegts_input_stopped_mux
938 ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
939 {
940 char buf[256];
941 service_t *s, *s_next;
942 mpegts_mux_t *mm = mmi->mmi_mux;
943
944 /* no longer active */
945 LIST_REMOVE(mmi, mmi_active_link);
946
947 /* Disarm timer */
948 if (LIST_FIRST(&mi->mi_mux_active) == NULL)
949 mtimer_disarm(&mi->mi_status_timer);
950
951 mi->mi_display_name(mi, buf, sizeof(buf));
952 tvhtrace(LS_MPEGTS, "%s - flush subscribers", buf);
953 for (s = LIST_FIRST(&mm->mm_transports); s; s = s_next) {
954 s_next = LIST_NEXT(s, s_active_link);
955 service_remove_subscriber(s, NULL, SM_CODE_SUBSCRIPTION_OVERRIDDEN);
956 }
957 notify_reload("input_status");
958 mpegts_input_dbus_notify(mi, 0);
959
960 #if ENABLE_TSDEBUG
961 tsdebug_packet_t *tp;
962 if (mm->mm_tsdebug_fd >= 0)
963 close(mm->mm_tsdebug_fd);
964 if (mm->mm_tsdebug_fd2 >= 0)
965 close(mm->mm_tsdebug_fd2);
966 mm->mm_tsdebug_fd = -1;
967 mm->mm_tsdebug_fd2 = -1;
968 mm->mm_tsdebug_pos = 0;
969 while ((tp = TAILQ_FIRST(&mm->mm_tsdebug_packets)) != NULL) {
970 TAILQ_REMOVE(&mm->mm_tsdebug_packets, tp, link);
971 free(tp);
972 }
973 #endif
974 }
975
976 static int
mpegts_input_has_subscription(mpegts_input_t * mi,mpegts_mux_t * mm)977 mpegts_input_has_subscription ( mpegts_input_t *mi, mpegts_mux_t *mm )
978 {
979 int ret = 0;
980 const service_t *t;
981 const th_subscription_t *ths;
982 pthread_mutex_lock(&mi->mi_output_lock);
983 LIST_FOREACH(t, &mm->mm_transports, s_active_link) {
984 if (t->s_type == STYPE_RAW) {
985 LIST_FOREACH(ths, &t->s_subscriptions, ths_service_link)
986 if (!strcmp(ths->ths_title, "keep")) break;
987 if (ths) continue;
988 }
989 ret = 1;
990 break;
991 }
992 pthread_mutex_unlock(&mi->mi_output_lock);
993 return ret;
994 }
995
996 static void
mpegts_input_tuning_error(mpegts_input_t * mi,mpegts_mux_t * mm)997 mpegts_input_tuning_error ( mpegts_input_t *mi, mpegts_mux_t *mm )
998 {
999 service_t *t, *t_next;
1000 pthread_mutex_lock(&mi->mi_output_lock);
1001 for (t = LIST_FIRST(&mm->mm_transports); t; t = t_next) {
1002 t_next = LIST_NEXT(t, s_active_link);
1003 pthread_mutex_lock(&t->s_stream_mutex);
1004 service_set_streaming_status_flags(t, TSS_TUNING);
1005 pthread_mutex_unlock(&t->s_stream_mutex);
1006 }
1007 pthread_mutex_unlock(&mi->mi_output_lock);
1008 }
1009
1010 /* **************************************************************************
1011 * Data processing
1012 * *************************************************************************/
1013
1014 #if 0
1015 static int data_noise ( mpegts_packet_t *mp )
1016 {
1017 static uint64_t off = 0, win = 4096, limit = 2*1024*1024;
1018 uint8_t *data = mp->mp_data;
1019 uint32_t i, p, s, len = mp->mp_len;
1020 for (p = 0; p < len; p += 188) {
1021 off += 188;
1022 if (off >= limit && off < limit + win) {
1023 if ((off & 3) == 1) {
1024 memmove(data + p, data + p + 188, len - (p + 188));
1025 p -= 188;
1026 mp->mp_len -= 188;
1027 return 1;
1028 }
1029 s = ((data[2] + data[3] + data[4]) & 3) + 1;
1030 for (i = 0; i < 188; i += s)
1031 ((char *)data)[p+i] ^= data[10] + data[12] + data[i];
1032 } else if (off >= limit + win) {
1033 off = 0;
1034 limit = (uint64_t)data[15] * 4 * 1024;
1035 win = (uint64_t)data[16] * 16;
1036 }
1037 }
1038 return 0;
1039 }
1040 #else
data_noise(mpegts_packet_t * mp)1041 static inline int data_noise( mpegts_packet_t *mp ) { return 0; }
1042 #endif
1043
1044 static inline int
get_pcr(const uint8_t * tsb,int64_t * rpcr)1045 get_pcr ( const uint8_t *tsb, int64_t *rpcr )
1046 {
1047 int_fast64_t pcr;
1048
1049 if (tsb[1] & 0x80) /* error bit */
1050 return 0;
1051
1052 if ((tsb[3] & 0x20) == 0 ||
1053 tsb[4] <= 5 ||
1054 (tsb[5] & 0x10) == 0)
1055 return 0;
1056
1057 pcr = (uint64_t)tsb[6] << 25;
1058 pcr |= (uint64_t)tsb[7] << 17;
1059 pcr |= (uint64_t)tsb[8] << 9;
1060 pcr |= (uint64_t)tsb[9] << 1;
1061 pcr |= ((uint64_t)tsb[10] >> 7) & 0x01;
1062 *rpcr = pcr;
1063 return 1;
1064 }
1065
1066 static inline int
ts_sync_count(const uint8_t * tsb,int len)1067 ts_sync_count ( const uint8_t *tsb, int len )
1068 {
1069 const uint8_t *start = tsb;
1070 while (len >= 188) {
1071 if (len >= 1880 &&
1072 tsb[0*188] == 0x47 && tsb[1*188] == 0x47 &&
1073 tsb[2*188] == 0x47 && tsb[3*188] == 0x47 &&
1074 tsb[4*188] == 0x47 && tsb[5*188] == 0x47 &&
1075 tsb[6*188] == 0x47 && tsb[7*188] == 0x47 &&
1076 tsb[8*188] == 0x47 && tsb[9*188] == 0x47) {
1077 len -= 1880;
1078 tsb += 1880;
1079 } else if (*tsb == 0x47) {
1080 len -= 188;
1081 tsb += 188;
1082 } else {
1083 break;
1084 }
1085 }
1086 return tsb - start;
1087 }
1088
1089 void
mpegts_input_recv_packets(mpegts_input_t * mi,mpegts_mux_instance_t * mmi,sbuf_t * sb,int flags,mpegts_pcr_t * pcr)1090 mpegts_input_recv_packets
1091 ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi, sbuf_t *sb,
1092 int flags, mpegts_pcr_t *pcr )
1093 {
1094 int len, len2, off;
1095 mpegts_packet_t *mp;
1096 uint8_t *tsb;
1097 #define MIN_TS_PKT 100
1098 #define MIN_TS_SYN (5*188)
1099
1100 retry:
1101 len2 = 0;
1102 off = 0;
1103 tsb = sb->sb_data;
1104 len = sb->sb_ptr;
1105 if (len < (MIN_TS_PKT * 188) && (flags & MPEGTS_DATA_CC_RESTART) == 0) {
1106 /* For slow streams, check also against the clock */
1107 if (monocmpfastsec(mclk(), mi->mi_last_dispatch))
1108 return;
1109 }
1110 mi->mi_last_dispatch = mclk();
1111
1112 /* Check for sync */
1113 while ( (len >= MIN_TS_SYN) &&
1114 ((len2 = ts_sync_count(tsb, len)) < MIN_TS_SYN) ) {
1115 atomic_add(&mmi->tii_stats.unc, 1);
1116 --len;
1117 ++tsb;
1118 ++off;
1119 }
1120
1121 // Note: we check for sync here so that the buffer can always be
1122 // processed in its entirety inside the processing thread
1123 // without the potential need to buffer data (since that would
1124 // require per mmi buffers, where this is generally not required)
1125
1126 /* Extract PCR on demand */
1127 if (pcr) {
1128 uint8_t *tmp, *end;
1129 uint16_t pid;
1130 for (tmp = tsb, end = tsb + len2; tmp < end; tmp += 188) {
1131 pid = ((tmp[1] & 0x1f) << 8) | tmp[2];
1132 if (pcr->pcr_pid == MPEGTS_PID_NONE || pcr->pcr_pid == pid) {
1133 if (get_pcr(tmp, &pcr->pcr_first)) {
1134 pcr->pcr_pid = pid;
1135 break;
1136 }
1137 }
1138 }
1139 if (pcr->pcr_pid != MPEGTS_PID_NONE) {
1140 for (tmp = tsb + len2 - 188; tmp >= tsb; tmp -= 188) {
1141 pid = ((tmp[1] & 0x1f) << 8) | tmp[2];
1142 if (pcr->pcr_pid == pid) {
1143 if (get_pcr(tmp, &pcr->pcr_last)) {
1144 pcr->pcr_pid = pid;
1145 break;
1146 }
1147 }
1148 }
1149 }
1150 }
1151
1152 /* Pass */
1153 if (len2 >= MIN_TS_SYN || (flags & MPEGTS_DATA_CC_RESTART)) {
1154 mp = malloc(sizeof(mpegts_packet_t) + len2);
1155 mp->mp_mux = mmi->mmi_mux;
1156 mp->mp_len = len2;
1157 mp->mp_cc_restart = (flags & MPEGTS_DATA_CC_RESTART) ? 1 : 0;
1158
1159 memcpy(mp->mp_data, tsb, len2);
1160 if (mi->mi_remove_scrambled_bits || (flags & MPEGTS_DATA_REMOVE_SCRAMBLED) != 0) {
1161 uint8_t *tmp, *end;
1162 for (tmp = mp->mp_data, end = mp->mp_data + len2; tmp < end; tmp += 188)
1163 tmp[3] &= ~0xc0;
1164 }
1165
1166 len -= len2;
1167 off += len2;
1168
1169 if ((flags & MPEGTS_DATA_CC_RESTART) == 0 && data_noise(mp)) {
1170 free(mp);
1171 goto end;
1172 }
1173
1174 pthread_mutex_lock(&mi->mi_input_lock);
1175 if (mmi->mmi_mux->mm_active == mmi) {
1176 if (mi->mi_input_queue_size < 50*1024*1024) {
1177 mi->mi_input_queue_size += len2;
1178 memoryinfo_alloc(&mpegts_input_queue_memoryinfo, sizeof(mpegts_packet_t) + len2);
1179 mpegts_mux_grab(mp->mp_mux);
1180 TAILQ_INSERT_TAIL(&mi->mi_input_queue, mp, mp_link);
1181 tvh_cond_signal(&mi->mi_input_cond, 0);
1182 } else {
1183 if (tvhlog_limit(&mi->mi_input_queue_loglimit, 10))
1184 tvhwarn(LS_MPEGTS, "too much queued input data (over 50MB), discarding new");
1185 free(mp);
1186 }
1187 } else {
1188 free(mp);
1189 }
1190 pthread_mutex_unlock(&mi->mi_input_lock);
1191 }
1192
1193 /* Adjust buffer */
1194 end:
1195 if (len && (flags & MPEGTS_DATA_CC_RESTART) == 0) {
1196 sbuf_cut(sb, off); // cut off the bottom
1197 if (sb->sb_ptr >= MIN_TS_PKT * 188)
1198 goto retry;
1199 } else
1200 sb->sb_ptr = 0; // clear
1201 }
1202
1203 static void
mpegts_input_table_dispatch(mpegts_mux_t * mm,const char * logprefix,const uint8_t * tsb,int tsb_len)1204 mpegts_input_table_dispatch
1205 ( mpegts_mux_t *mm, const char *logprefix, const uint8_t *tsb, int tsb_len )
1206 {
1207 int i, len = 0, c = 0;
1208 const uint8_t *tsb2, *tsb2_end;
1209 uint16_t pid = ((tsb[1] & 0x1f) << 8) | tsb[2];
1210 mpegts_table_t *mt, **vec;
1211
1212 /* Collate - tables may be removed during callbacks */
1213 pthread_mutex_lock(&mm->mm_tables_lock);
1214 i = mm->mm_num_tables;
1215 vec = alloca(i * sizeof(mpegts_table_t *));
1216 LIST_FOREACH(mt, &mm->mm_tables, mt_link) {
1217 c++;
1218 if (mt->mt_destroyed || !mt->mt_subscribed || mt->mt_pid != pid)
1219 continue;
1220 mpegts_table_grab(mt);
1221 if (len < i)
1222 vec[len++] = mt;
1223 }
1224 pthread_mutex_unlock(&mm->mm_tables_lock);
1225 if (i != c) {
1226 tvherror(LS_TBL, "tables count inconsistency (num %d, list %d)", i, c);
1227 assert(0);
1228 }
1229
1230 /* Process */
1231 for (i = 0; i < len; i++) {
1232 mt = vec[i];
1233 if (!mt->mt_destroyed && mt->mt_pid == pid)
1234 for (tsb2 = tsb, tsb2_end = tsb + tsb_len; tsb2 < tsb2_end; tsb2 += 188)
1235 mpegts_psi_section_reassemble((mpegts_psi_table_t *)mt, logprefix,
1236 tsb2, mt->mt_flags & MT_CRC,
1237 mpegts_table_dispatch, mt);
1238 mpegts_table_release(mt);
1239 }
1240 }
1241
1242 static void
mpegts_input_table_waiting(mpegts_input_t * mi,mpegts_mux_t * mm)1243 mpegts_input_table_waiting ( mpegts_input_t *mi, mpegts_mux_t *mm )
1244 {
1245 mpegts_table_t *mt;
1246
1247 if (!mm || !mm->mm_active)
1248 return;
1249 pthread_mutex_lock(&mm->mm_tables_lock);
1250 while ((mt = TAILQ_FIRST(&mm->mm_defer_tables)) != NULL) {
1251 mpegts_table_consistency_check(mm);
1252 TAILQ_REMOVE(&mm->mm_defer_tables, mt, mt_defer_link);
1253 if (mt->mt_defer_cmd == MT_DEFER_OPEN_PID && !mt->mt_destroyed) {
1254 mt->mt_defer_cmd = 0;
1255 if (!mt->mt_subscribed) {
1256 mt->mt_subscribed = 1;
1257 pthread_mutex_unlock(&mm->mm_tables_lock);
1258 mpegts_input_open_pid(mi, mm, mt->mt_pid, mpegts_table_type(mt), mt->mt_weight, mt, 0);
1259 } else {
1260 pthread_mutex_unlock(&mm->mm_tables_lock);
1261 }
1262 } else if (mt->mt_defer_cmd == MT_DEFER_CLOSE_PID) {
1263 mt->mt_defer_cmd = 0;
1264 if (mt->mt_subscribed) {
1265 mt->mt_subscribed = 0;
1266 pthread_mutex_unlock(&mm->mm_tables_lock);
1267 mpegts_input_close_pid(mi, mm, mt->mt_pid, mpegts_table_type(mt), mt->mt_weight, mt);
1268 } else {
1269 pthread_mutex_unlock(&mm->mm_tables_lock);
1270 }
1271 } else {
1272 pthread_mutex_unlock(&mm->mm_tables_lock);
1273 }
1274 mpegts_table_release(mt);
1275 pthread_mutex_lock(&mm->mm_tables_lock);
1276 }
1277 mpegts_table_consistency_check(mm);
1278 pthread_mutex_unlock(&mm->mm_tables_lock);
1279 }
1280
1281 #if ENABLE_TSDEBUG
1282 static void
tsdebug_check_tspkt(mpegts_mux_t * mm,uint8_t * pkt,int len)1283 tsdebug_check_tspkt( mpegts_mux_t *mm, uint8_t *pkt, int len )
1284 {
1285 void tsdebugcw_new_keys(service_t *t, int type, uint8_t *odd, uint8_t *even);
1286 uint32_t pos, type, keylen, sid, crc;
1287 mpegts_service_t *t;
1288
1289 for ( ; len > 0; pkt += 188, len -= 188) {
1290 if (memcmp(pkt + 4, "TVHeadendDescramblerKeys", 24))
1291 continue;
1292 pos = 4 + 24;
1293 type = pkt[pos + 0];
1294 keylen = pkt[pos + 1];
1295 sid = (pkt[pos + 2] << 8) | pkt[pos + 3];
1296 pos += 4 + 2 * keylen;
1297 if (pos > 184)
1298 return;
1299 crc = (pkt[pos + 0] << 24) | (pkt[pos + 1] << 16) |
1300 (pkt[pos + 2] << 8) | pkt[pos + 3];
1301 if (crc != tvh_crc32(pkt, pos, 0x859aa5ba))
1302 return;
1303 LIST_FOREACH(t, &mm->mm_services, s_dvb_mux_link)
1304 if (t->s_dvb_service_id == sid) break;
1305 if (!t)
1306 return;
1307 pos = 4 + 24 + 4;
1308 tvhdebug(LS_DESCRAMBLER, "Keys from MPEG-TS source (PID 0x1FFF)!");
1309 tsdebugcw_new_keys((service_t *)t, type, pkt + pos, pkt + pos + keylen);
1310 }
1311 }
1312 #endif
1313
1314 static int
mpegts_input_process(mpegts_input_t * mi,mpegts_packet_t * mpkt)1315 mpegts_input_process
1316 ( mpegts_input_t *mi, mpegts_packet_t *mpkt )
1317 {
1318 uint16_t pid;
1319 uint8_t cc, cc2;
1320 uint8_t *tsb = mpkt->mp_data, *tsb2, *tsb2_end;
1321 int len = mpkt->mp_len, llen;
1322 int type = 0, f;
1323 mpegts_pid_t *mp;
1324 mpegts_pid_sub_t *mps;
1325 service_t *s;
1326 elementary_stream_t *st;
1327 int table_wakeup = 0;
1328 mpegts_mux_t *mm = mpkt->mp_mux;
1329 mpegts_mux_instance_t *mmi;
1330 #if ENABLE_TSDEBUG
1331 off_t tsdebug_pos;
1332 #endif
1333 char muxname[256];
1334
1335 if (mm == NULL || (mmi = mm->mm_active) == NULL)
1336 return 0;
1337
1338 mpegts_mux_nice_name(mm, muxname, sizeof(muxname));
1339
1340 assert(mm == mmi->mmi_mux);
1341
1342 #if ENABLE_TSDEBUG
1343 tsdebug_pos = mm->mm_tsdebug_pos;
1344 #endif
1345
1346 /* Process */
1347 assert((len % 188) == 0);
1348 while (len > 0) {
1349
1350 /*
1351 * mask
1352 * 0 - 0xFF - sync word 0x47
1353 * 1 - 0x80 - transport error
1354 * 1 - 0x1F - pid high
1355 * 2 - 0xFF - pid low
1356 * 3 - 0xC0 - scrambled
1357 * 3 - 0x10 - CC check
1358 */
1359 llen = mpegts_word_count(tsb, len, 0xFF9FFFD0);
1360
1361 pid = (tsb[1] << 8) | tsb[2];
1362
1363 /* Transport error */
1364 if (pid & 0x8000) {
1365 if ((pid & 0x1FFF) != 0x1FFF)
1366 atomic_add(&mmi->tii_stats.te, 1);
1367 }
1368
1369 pid &= 0x1FFF;
1370
1371 /* Ignore NUL packets */
1372 if (pid == 0x1FFF) {
1373 #if ENABLE_TSDEBUG
1374 tsdebug_check_tspkt(mm, tsb, llen);
1375 #endif
1376 goto done;
1377 }
1378
1379 /* Find PID */
1380 if ((mp = mpegts_mux_find_pid(mm, pid, 0))) {
1381
1382 /* Low level CC check */
1383 if (tsb[3] & 0x10) {
1384 for (tsb2 = tsb, tsb2_end = tsb + llen, cc2 = mp->mp_cc;
1385 tsb2 < tsb2_end; tsb2 += 188) {
1386 cc = tsb2[3] & 0x0f;
1387 if (cc2 != 0xff && cc2 != cc) {
1388 tvhtrace(LS_MPEGTS, "%s: pid %04X cc err %2d != %2d", muxname, pid, cc, cc2);
1389 atomic_add(&mmi->tii_stats.cc, 1);
1390 }
1391 cc2 = (cc + 1) & 0xF;
1392 }
1393 mp->mp_cc = cc2;
1394 }
1395
1396 type = mp->mp_type;
1397
1398 /* Stream all PIDs */
1399 LIST_FOREACH(mps, &mm->mm_all_subs, mps_svcraw_link)
1400 if ((mps->mps_type & MPS_ALL) || (type & (MPS_TABLE|MPS_FTABLE)))
1401 ts_recv_raw((mpegts_service_t *)mps->mps_owner, tsb, llen);
1402
1403 /* Stream raw PIDs */
1404 if (type & MPS_RAW) {
1405 LIST_FOREACH(mps, &mp->mp_raw_subs, mps_raw_link)
1406 ts_recv_raw((mpegts_service_t *)mps->mps_owner, tsb, llen);
1407 }
1408
1409 /* Stream service data */
1410 if (type & MPS_SERVICE) {
1411 LIST_FOREACH(mps, &mp->mp_svc_subs, mps_svcraw_link) {
1412 s = mps->mps_owner;
1413 f = (type & (MPS_TABLE|MPS_FTABLE)) ||
1414 (pid == s->s_pmt_pid) || (pid == s->s_pcr_pid);
1415 ts_recv_packet1((mpegts_service_t*)s, tsb, llen, f);
1416 }
1417 } else
1418 /* Stream table data */
1419 if (type & MPS_STREAM) {
1420 LIST_FOREACH(s, &mm->mm_transports, s_active_link) {
1421 if (s->s_type != STYPE_STD) continue;
1422 f = (type & (MPS_TABLE|MPS_FTABLE)) ||
1423 (pid == s->s_pmt_pid) || (pid == s->s_pcr_pid);
1424 ts_recv_packet1((mpegts_service_t*)s, tsb, llen, f);
1425 }
1426 }
1427
1428 /* Table data */
1429 if (type & (MPS_TABLE | MPS_FTABLE)) {
1430 if (!(tsb[1] & 0x80)) {
1431 if (type & MPS_FTABLE)
1432 mpegts_input_table_dispatch(mm, muxname, tsb, llen);
1433 if (type & MPS_TABLE) {
1434 if (mi->mi_table_queue_size >= 2*1024*1024) {
1435 if (tvhlog_limit(&mi->mi_input_queue_loglimit, 10))
1436 tvhwarn(LS_MPEGTS, "too much queued table input data (over 2MB), discarding new");
1437 } else {
1438 mpegts_table_feed_t *mtf = malloc(sizeof(mpegts_table_feed_t)+llen);
1439 mtf->mtf_len = llen;
1440 memcpy(mtf->mtf_tsb, tsb, llen);
1441 mtf->mtf_mux = mm;
1442 mi->mi_table_queue_size += llen;
1443 memoryinfo_alloc(&mpegts_input_table_memoryinfo, sizeof(mpegts_table_feed_t) + llen);
1444 TAILQ_INSERT_TAIL(&mi->mi_table_queue, mtf, mtf_link);
1445 table_wakeup = 1;
1446 }
1447 }
1448 } else {
1449 //tvhdebug("tsdemux", "%s - SI packet had errors", name);
1450 }
1451 }
1452
1453 } else {
1454
1455 /* Stream to all fullmux subscribers */
1456 LIST_FOREACH(mps, &mm->mm_all_subs, mps_svcraw_link)
1457 ts_recv_raw((mpegts_service_t *)mps->mps_owner, tsb, llen);
1458
1459 }
1460
1461 done:
1462 tsb += llen;
1463 len -= llen;
1464 #if ENABLE_TSDEBUG
1465 mm->mm_tsdebug_pos += llen;
1466 #endif
1467 }
1468
1469 /* Raw stream */
1470 if (tsb != mpkt->mp_data &&
1471 LIST_FIRST(&mmi->mmi_streaming_pad.sp_targets) != NULL) {
1472
1473 streaming_message_t sm;
1474 pktbuf_t *pb = pktbuf_alloc(mpkt->mp_data, tsb - mpkt->mp_data);
1475 memset(&sm, 0, sizeof(sm));
1476 sm.sm_type = SMT_MPEGTS;
1477 sm.sm_data = pb;
1478 streaming_pad_deliver(&mmi->mmi_streaming_pad, streaming_msg_clone(&sm));
1479 pktbuf_ref_dec(pb);
1480 }
1481 #if ENABLE_TSDEBUG
1482 {
1483 tsdebug_packet_t *tp, *tp_next;
1484 off_t pos = 0;
1485 size_t used = tsb - mpkt->mp_data;
1486 for (tp = TAILQ_FIRST(&mm->mm_tsdebug_packets); tp; tp = tp_next) {
1487 tp_next = TAILQ_NEXT(tp, link);
1488 assert((tp->pos % 188) == 0);
1489 assert(tp->pos >= tsdebug_pos && tp->pos < tsdebug_pos + used);
1490 if (mm->mm_tsdebug_fd >= 0) {
1491 tvh_write(mm->mm_tsdebug_fd, mpkt->mp_data + pos, tp->pos - tsdebug_pos - pos);
1492 tvh_write(mm->mm_tsdebug_fd, tp->pkt, 188);
1493 }
1494 pos = tp->pos - tsdebug_pos;
1495 TAILQ_REMOVE(&mm->mm_tsdebug_packets, tp, link);
1496 free(tp);
1497 }
1498 if (pos < used && mm->mm_tsdebug_fd >= 0)
1499 tvh_write(mm->mm_tsdebug_fd, mpkt->mp_data + pos, used - pos);
1500 }
1501 #endif
1502
1503 if (mpkt->mp_cc_restart) {
1504 LIST_FOREACH(s, &mm->mm_transports, s_active_link)
1505 TAILQ_FOREACH(st, &s->s_components, es_link)
1506 st->es_cc = -1;
1507 }
1508
1509 /* Wake table */
1510 if (table_wakeup)
1511 tvh_cond_signal(&mi->mi_table_cond, 0);
1512
1513 /* Bandwidth monitoring */
1514 llen = tsb - mpkt->mp_data;
1515 atomic_add(&mmi->tii_stats.bps, llen);
1516 return llen;
1517 }
1518
1519 static void *
mpegts_input_thread(void * p)1520 mpegts_input_thread ( void * p )
1521 {
1522 mpegts_packet_t *mp;
1523 mpegts_input_t *mi = p;
1524 size_t bytes = 0;
1525 int update_pids;
1526 char buf[256];
1527
1528 mi->mi_display_name(mi, buf, sizeof(buf));
1529 pthread_mutex_lock(&mi->mi_input_lock);
1530 while (atomic_get(&mi->mi_running)) {
1531
1532 /* Wait for a packet */
1533 if (!(mp = TAILQ_FIRST(&mi->mi_input_queue))) {
1534 if (bytes) {
1535 tvhtrace(LS_MPEGTS, "input %s got %zu bytes", buf, bytes);
1536 bytes = 0;
1537 }
1538 tvh_cond_wait(&mi->mi_input_cond, &mi->mi_input_lock);
1539 continue;
1540 }
1541 mi->mi_input_queue_size -= mp->mp_len;
1542 memoryinfo_free(&mpegts_input_queue_memoryinfo, sizeof(mpegts_packet_t) + mp->mp_len);
1543 TAILQ_REMOVE(&mi->mi_input_queue, mp, mp_link);
1544 pthread_mutex_unlock(&mi->mi_input_lock);
1545
1546 /* Process */
1547 pthread_mutex_lock(&mi->mi_output_lock);
1548 mpegts_input_table_waiting(mi, mp->mp_mux);
1549 if (mp->mp_mux && mp->mp_mux->mm_update_pids_flag) {
1550 pthread_mutex_unlock(&mi->mi_output_lock);
1551 pthread_mutex_lock(&global_lock);
1552 mpegts_mux_update_pids(mp->mp_mux);
1553 pthread_mutex_unlock(&global_lock);
1554 pthread_mutex_lock(&mi->mi_output_lock);
1555 }
1556 bytes += mpegts_input_process(mi, mp);
1557 update_pids = mp->mp_mux && mp->mp_mux->mm_update_pids_flag;
1558 pthread_mutex_unlock(&mi->mi_output_lock);
1559 if (update_pids) {
1560 pthread_mutex_lock(&global_lock);
1561 mpegts_mux_update_pids(mp->mp_mux);
1562 pthread_mutex_unlock(&global_lock);
1563 }
1564
1565 /* Cleanup */
1566 if (mp->mp_mux)
1567 mpegts_mux_release(mp->mp_mux);
1568 free(mp);
1569
1570 #if ENABLE_TSDEBUG
1571 {
1572 extern void tsdebugcw_go(void);
1573 tsdebugcw_go();
1574 }
1575 #endif
1576
1577 pthread_mutex_lock(&mi->mi_input_lock);
1578 }
1579
1580 tvhtrace(LS_MPEGTS, "input %s got %zu bytes (finish)", buf, bytes);
1581
1582 /* Flush */
1583 while ((mp = TAILQ_FIRST(&mi->mi_input_queue))) {
1584 memoryinfo_free(&mpegts_input_queue_memoryinfo, sizeof(mpegts_packet_t) + mp->mp_len);
1585 TAILQ_REMOVE(&mi->mi_input_queue, mp, mp_link);
1586 if (mp->mp_mux)
1587 mpegts_mux_release(mp->mp_mux);
1588 free(mp);
1589 }
1590 mi->mi_input_queue_size = 0;
1591 pthread_mutex_unlock(&mi->mi_input_lock);
1592
1593 return NULL;
1594 }
1595
1596 static void *
mpegts_input_table_thread(void * aux)1597 mpegts_input_table_thread ( void *aux )
1598 {
1599 mpegts_table_feed_t *mtf;
1600 mpegts_input_t *mi = aux;
1601 mpegts_mux_t *mm = NULL;
1602 char muxname[256];
1603
1604 pthread_mutex_lock(&mi->mi_output_lock);
1605 while (atomic_get(&mi->mi_running)) {
1606
1607 /* Wait for data */
1608 if (!(mtf = TAILQ_FIRST(&mi->mi_table_queue))) {
1609 tvh_cond_wait(&mi->mi_table_cond, &mi->mi_output_lock);
1610 continue;
1611 }
1612 mi->mi_table_queue_size -= mtf->mtf_len;
1613 memoryinfo_free(&mpegts_input_table_memoryinfo, sizeof(mpegts_table_feed_t) + mtf->mtf_len);
1614 TAILQ_REMOVE(&mi->mi_table_queue, mtf, mtf_link);
1615 pthread_mutex_unlock(&mi->mi_output_lock);
1616
1617 /* Process */
1618 pthread_mutex_lock(&global_lock);
1619 if (atomic_get(&mi->mi_running)) {
1620 if (mm != mtf->mtf_mux) {
1621 mm = mtf->mtf_mux;
1622 if (mm)
1623 mpegts_mux_nice_name(mm, muxname, sizeof(muxname));
1624 }
1625 if (mm && mm->mm_active)
1626 mpegts_input_table_dispatch(mm, muxname, mtf->mtf_tsb, mtf->mtf_len);
1627 }
1628 pthread_mutex_unlock(&global_lock);
1629
1630 /* Cleanup */
1631 free(mtf);
1632 pthread_mutex_lock(&mi->mi_output_lock);
1633 }
1634
1635 /* Flush */
1636 while ((mtf = TAILQ_FIRST(&mi->mi_table_queue)) != NULL) {
1637 memoryinfo_free(&mpegts_input_table_memoryinfo, sizeof(mpegts_table_feed_t) + mtf->mtf_len);
1638 TAILQ_REMOVE(&mi->mi_table_queue, mtf, mtf_link);
1639 free(mtf);
1640 }
1641 mi->mi_table_queue_size = 0;
1642 pthread_mutex_unlock(&mi->mi_output_lock);
1643
1644 return NULL;
1645 }
1646
1647 void
mpegts_input_flush_mux(mpegts_input_t * mi,mpegts_mux_t * mm)1648 mpegts_input_flush_mux
1649 ( mpegts_input_t *mi, mpegts_mux_t *mm )
1650 {
1651 mpegts_table_feed_t *mtf;
1652 mpegts_packet_t *mp;
1653
1654 lock_assert(&global_lock);
1655
1656 // Note: to avoid long delays in here, rather than actually
1657 // remove things from the Q, we simply invalidate by clearing
1658 // the mux pointer and allow the threads to deal with the deletion
1659
1660 /* Flush input Q */
1661 pthread_mutex_lock(&mi->mi_input_lock);
1662 TAILQ_FOREACH(mp, &mi->mi_input_queue, mp_link) {
1663 if (mp->mp_mux == mm) {
1664 mpegts_mux_release(mm);
1665 mp->mp_mux = NULL;
1666 }
1667 }
1668 pthread_mutex_unlock(&mi->mi_input_lock);
1669
1670 /* Flush table Q */
1671 pthread_mutex_lock(&mi->mi_output_lock);
1672 TAILQ_FOREACH(mtf, &mi->mi_table_queue, mtf_link) {
1673 if (mtf->mtf_mux == mm)
1674 mtf->mtf_mux = NULL;
1675 }
1676 pthread_mutex_unlock(&mi->mi_output_lock);
1677 /* mux active must be NULL here */
1678 /* otherwise the picked mtf might be processed after mux deactivation */
1679 assert(mm->mm_active == NULL);
1680 }
1681
1682 static void
mpegts_input_stream_status(mpegts_mux_instance_t * mmi,tvh_input_stream_t * st)1683 mpegts_input_stream_status
1684 ( mpegts_mux_instance_t *mmi, tvh_input_stream_t *st )
1685 {
1686 int s = 0, w = 0;
1687 char buf[512], ubuf[UUID_HEX_SIZE];
1688 th_subscription_t *ths;
1689 const service_t *t;
1690 mpegts_mux_t *mm = mmi->mmi_mux;
1691 mpegts_input_t *mi = mmi->mmi_input;
1692
1693 LIST_FOREACH(t, &mm->mm_transports, s_active_link)
1694 if (((mpegts_service_t *)t)->s_dvb_mux == mm)
1695 LIST_FOREACH(ths, &t->s_subscriptions, ths_service_link) {
1696 s++;
1697 w = MAX(w, ths->ths_weight);
1698 }
1699
1700 st->uuid = strdup(idnode_uuid_as_str(&mmi->tii_id, ubuf));
1701 mi->mi_display_name(mi, buf, sizeof(buf));
1702 st->input_name = strdup(buf);
1703 mpegts_mux_nice_name(mm, buf, sizeof(buf));
1704 st->stream_name = strdup(buf);
1705 st->subs_count = s;
1706 st->max_weight = w;
1707 pthread_mutex_lock(&mmi->tii_stats_mutex);
1708 st->stats.signal = mmi->tii_stats.signal;
1709 st->stats.snr = mmi->tii_stats.snr;
1710 st->stats.ber = mmi->tii_stats.ber;
1711 st->stats.signal_scale = mmi->tii_stats.signal_scale;
1712 st->stats.snr_scale = mmi->tii_stats.snr_scale;
1713 st->stats.ec_bit = mmi->tii_stats.ec_bit;
1714 st->stats.tc_bit = mmi->tii_stats.tc_bit;
1715 st->stats.ec_block = mmi->tii_stats.ec_block;
1716 st->stats.tc_block = mmi->tii_stats.tc_block;
1717 pthread_mutex_unlock(&mmi->tii_stats_mutex);
1718 st->stats.unc = atomic_get(&mmi->tii_stats.unc);
1719 st->stats.cc = atomic_get(&mmi->tii_stats.cc);
1720 st->stats.te = atomic_get(&mmi->tii_stats.te);
1721 st->stats.bps = atomic_exchange(&mmi->tii_stats.bps, 0) * 8;
1722 }
1723
1724 void
mpegts_input_empty_status(mpegts_input_t * mi,tvh_input_stream_t * st)1725 mpegts_input_empty_status
1726 ( mpegts_input_t *mi, tvh_input_stream_t *st )
1727 {
1728 char buf[512], ubuf[UUID_HEX_SIZE];
1729 tvh_input_instance_t *mmi_;
1730 mpegts_mux_instance_t *mmi;
1731
1732 st->uuid = strdup(idnode_uuid_as_str(&mi->ti_id, ubuf));
1733 mi->mi_display_name(mi, buf, sizeof(buf));
1734 st->input_name = strdup(buf);
1735 LIST_FOREACH(mmi_, &mi->mi_mux_instances, tii_input_link) {
1736 mmi = (mpegts_mux_instance_t *)mmi_;
1737 st->stats.unc += atomic_get(&mmi->tii_stats.unc);
1738 st->stats.cc += atomic_get(&mmi->tii_stats.cc);
1739 pthread_mutex_lock(&mmi->tii_stats_mutex);
1740 st->stats.te += mmi->tii_stats.te;
1741 st->stats.ec_block += mmi->tii_stats.ec_block;
1742 st->stats.tc_block += mmi->tii_stats.tc_block;
1743 pthread_mutex_unlock(&mmi->tii_stats_mutex);
1744 }
1745 }
1746
1747 static void
mpegts_input_get_streams(tvh_input_t * i,tvh_input_stream_list_t * isl)1748 mpegts_input_get_streams
1749 ( tvh_input_t *i, tvh_input_stream_list_t *isl )
1750 {
1751 tvh_input_stream_t *st = NULL;
1752 mpegts_input_t *mi = (mpegts_input_t*)i;
1753 mpegts_mux_instance_t *mmi;
1754
1755 pthread_mutex_lock(&mi->mi_output_lock);
1756 LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link) {
1757 st = calloc(1, sizeof(tvh_input_stream_t));
1758 mpegts_input_stream_status(mmi, st);
1759 LIST_INSERT_HEAD(isl, st, link);
1760 }
1761 if (st == NULL && mi->mi_empty_status && mi->mi_enabled) {
1762 st = calloc(1, sizeof(tvh_input_stream_t));
1763 mi->mi_empty_status(mi, st);
1764 LIST_INSERT_HEAD(isl, st, link);
1765 }
1766 pthread_mutex_unlock(&mi->mi_output_lock);
1767 }
1768
1769 static void
mpegts_input_clear_stats(tvh_input_t * i)1770 mpegts_input_clear_stats ( tvh_input_t *i )
1771 {
1772 mpegts_input_t *mi = (mpegts_input_t*)i;
1773 tvh_input_instance_t *mmi_;
1774 mpegts_mux_instance_t *mmi;
1775
1776 pthread_mutex_lock(&mi->mi_output_lock);
1777 LIST_FOREACH(mmi_, &mi->mi_mux_instances, tii_input_link) {
1778 mmi = (mpegts_mux_instance_t *)mmi_;
1779 atomic_set(&mmi->tii_stats.unc, 0);
1780 atomic_set(&mmi->tii_stats.cc, 0);
1781 pthread_mutex_lock(&mmi->tii_stats_mutex);
1782 mmi->tii_stats.te = 0;
1783 mmi->tii_stats.ec_block = 0;
1784 mmi->tii_stats.tc_block = 0;
1785 pthread_mutex_unlock(&mmi->tii_stats_mutex);
1786 }
1787 pthread_mutex_unlock(&mi->mi_output_lock);
1788 notify_reload("input_status");
1789 }
1790
1791 static void
mpegts_input_thread_start(void * aux)1792 mpegts_input_thread_start ( void *aux )
1793 {
1794 mpegts_input_t *mi = aux;
1795 atomic_set(&mi->mi_running, 1);
1796
1797 tvhthread_create(&mi->mi_table_tid, NULL,
1798 mpegts_input_table_thread, mi, "mi-table");
1799 tvhthread_create(&mi->mi_input_tid, NULL,
1800 mpegts_input_thread, mi, "mi-main");
1801 }
1802
1803 static void
mpegts_input_thread_stop(mpegts_input_t * mi)1804 mpegts_input_thread_stop ( mpegts_input_t *mi )
1805 {
1806 atomic_set(&mi->mi_running, 0);
1807 mtimer_disarm(&mi->mi_input_thread_start);
1808
1809 /* Stop input thread */
1810 pthread_mutex_lock(&mi->mi_input_lock);
1811 tvh_cond_signal(&mi->mi_input_cond, 0);
1812 pthread_mutex_unlock(&mi->mi_input_lock);
1813
1814 /* Stop table thread */
1815 pthread_mutex_lock(&mi->mi_output_lock);
1816 tvh_cond_signal(&mi->mi_table_cond, 0);
1817 pthread_mutex_unlock(&mi->mi_output_lock);
1818
1819 /* Join threads (relinquish lock due to potential deadlock) */
1820 pthread_mutex_unlock(&global_lock);
1821 if (mi->mi_input_tid)
1822 pthread_join(mi->mi_input_tid, NULL);
1823 if (mi->mi_table_tid)
1824 pthread_join(mi->mi_table_tid, NULL);
1825 pthread_mutex_lock(&global_lock);
1826 }
1827
1828 /* **************************************************************************
1829 * Status monitoring
1830 * *************************************************************************/
1831
1832 void
mpegts_input_status_timer(void * p)1833 mpegts_input_status_timer ( void *p )
1834 {
1835 tvh_input_stream_t st;
1836 mpegts_input_t *mi = p;
1837 mpegts_mux_instance_t *mmi;
1838 htsmsg_t *e;
1839 int64_t subs = 0;
1840
1841 pthread_mutex_lock(&mi->mi_output_lock);
1842 LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link) {
1843 memset(&st, 0, sizeof(st));
1844 mpegts_input_stream_status(mmi, &st);
1845 e = tvh_input_stream_create_msg(&st);
1846 htsmsg_add_u32(e, "update", 1);
1847 notify_by_msg("input_status", e, 0);
1848 subs += st.subs_count;
1849 tvh_input_stream_destroy(&st);
1850 }
1851 pthread_mutex_unlock(&mi->mi_output_lock);
1852 mtimer_arm_rel(&mi->mi_status_timer, mpegts_input_status_timer, mi, sec2mono(1));
1853 mpegts_input_dbus_notify(mi, subs);
1854 }
1855
1856 /* **************************************************************************
1857 * Creation/Config
1858 * *************************************************************************/
1859
1860 static int mpegts_input_idx = 0;
1861 mpegts_input_list_t mpegts_input_all;
1862
1863 mpegts_input_t*
mpegts_input_create0(mpegts_input_t * mi,const idclass_t * class,const char * uuid,htsmsg_t * c)1864 mpegts_input_create0
1865 ( mpegts_input_t *mi, const idclass_t *class, const char *uuid,
1866 htsmsg_t *c )
1867 {
1868 if (idnode_insert(&mi->ti_id, uuid, class, 0)) {
1869 if (uuid)
1870 tvherror(LS_MPEGTS, "invalid input uuid '%s'", uuid);
1871 free(mi);
1872 return NULL;
1873 }
1874 LIST_INSERT_HEAD(&tvh_inputs, (tvh_input_t*)mi, ti_link);
1875
1876 /* Defaults */
1877 mi->mi_is_enabled = mpegts_input_is_enabled;
1878 mi->mi_display_name = mpegts_input_display_name;
1879 mi->mi_get_weight = mpegts_input_get_weight;
1880 mi->mi_get_priority = mpegts_input_get_priority;
1881 mi->mi_warm_mux = mpegts_input_warm_mux;
1882 mi->mi_start_mux = mpegts_input_start_mux;
1883 mi->mi_stop_mux = mpegts_input_stop_mux;
1884 mi->mi_open_service = mpegts_input_open_service;
1885 mi->mi_close_service = mpegts_input_close_service;
1886 mi->mi_update_pids = mpegts_input_update_pids;
1887 mi->mi_create_mux_instance = mpegts_input_create_mux_instance;
1888 mi->mi_started_mux = mpegts_input_started_mux;
1889 mi->mi_stopping_mux = mpegts_input_stopping_mux;
1890 mi->mi_stopped_mux = mpegts_input_stopped_mux;
1891 mi->mi_has_subscription = mpegts_input_has_subscription;
1892 mi->mi_tuning_error = mpegts_input_tuning_error;
1893 mi->ti_get_streams = mpegts_input_get_streams;
1894 mi->ti_clear_stats = mpegts_input_clear_stats;
1895
1896 /* Index */
1897 mi->mi_instance = ++mpegts_input_idx;
1898
1899 /* Init input/output structures */
1900 pthread_mutex_init(&mi->mi_input_lock, NULL);
1901 tvh_cond_init(&mi->mi_input_cond);
1902 TAILQ_INIT(&mi->mi_input_queue);
1903
1904 pthread_mutex_init(&mi->mi_output_lock, NULL);
1905 tvh_cond_init(&mi->mi_table_cond);
1906 TAILQ_INIT(&mi->mi_table_queue);
1907
1908 /* Defaults */
1909 mi->mi_ota_epg = 1;
1910 mi->mi_initscan = 1;
1911 mi->mi_idlescan = 1;
1912
1913 /* Add to global list */
1914 LIST_INSERT_HEAD(&mpegts_input_all, mi, mi_global_link);
1915
1916 /* Load config */
1917 if (c)
1918 idnode_load(&mi->ti_id, c);
1919
1920 /* Start threads */
1921 mtimer_arm_rel(&mi->mi_input_thread_start, mpegts_input_thread_start, mi, 0);
1922
1923 return mi;
1924 }
1925
1926 void
mpegts_input_stop_all(mpegts_input_t * mi)1927 mpegts_input_stop_all ( mpegts_input_t *mi )
1928 {
1929 mpegts_mux_instance_t *mmi;
1930 while ((mmi = LIST_FIRST(&mi->mi_mux_active)))
1931 mmi->mmi_mux->mm_stop(mmi->mmi_mux, 1, SM_CODE_OK);
1932 }
1933
1934 void
mpegts_input_delete(mpegts_input_t * mi,int delconf)1935 mpegts_input_delete ( mpegts_input_t *mi, int delconf )
1936 {
1937 mpegts_network_link_t *mnl;
1938 tvh_input_instance_t *tii, *tii_next;
1939
1940 /* Early shutdown flag */
1941 atomic_set(&mi->mi_running, 0);
1942
1943 idnode_save_check(&mi->ti_id, delconf);
1944
1945 /* Remove networks */
1946 while ((mnl = LIST_FIRST(&mi->mi_networks)))
1947 mpegts_input_del_network(mnl);
1948
1949 /* Remove mux instances assigned to this input */
1950 tii = LIST_FIRST(&mi->mi_mux_instances);
1951 while (tii) {
1952 tii_next = LIST_NEXT(tii, tii_input_link);
1953 if (((mpegts_mux_instance_t *)tii)->mmi_input == mi)
1954 tii->tii_delete(tii);
1955 tii = tii_next;
1956 }
1957
1958 /* Remove global refs */
1959 idnode_unlink(&mi->ti_id);
1960 LIST_REMOVE(mi, ti_link);
1961 LIST_REMOVE(mi, mi_global_link);
1962
1963 /* Stop threads (will unlock global_lock to join) */
1964 mpegts_input_thread_stop(mi);
1965
1966 pthread_mutex_destroy(&mi->mi_output_lock);
1967 tvh_cond_destroy(&mi->mi_table_cond);
1968 free(mi->mi_name);
1969 free(mi->mi_linked);
1970 free(mi);
1971 }
1972
1973 void
mpegts_input_save(mpegts_input_t * mi,htsmsg_t * m)1974 mpegts_input_save ( mpegts_input_t *mi, htsmsg_t *m )
1975 {
1976 idnode_save(&mi->ti_id, m);
1977 }
1978
1979 int
mpegts_input_add_network(mpegts_input_t * mi,mpegts_network_t * mn)1980 mpegts_input_add_network ( mpegts_input_t *mi, mpegts_network_t *mn )
1981 {
1982 mpegts_network_link_t *mnl;
1983
1984 /* Find existing */
1985 LIST_FOREACH(mnl, &mi->mi_networks, mnl_mi_link)
1986 if (mnl->mnl_network == mn)
1987 break;
1988
1989 /* Clear mark */
1990 if (mnl) {
1991 mnl->mnl_mark = 0;
1992 return 0;
1993 }
1994
1995 /* Create new */
1996 mnl = calloc(1, sizeof(mpegts_network_link_t));
1997 mnl->mnl_input = mi;
1998 mnl->mnl_network = mn;
1999 LIST_INSERT_HEAD(&mi->mi_networks, mnl, mnl_mi_link);
2000 LIST_INSERT_HEAD(&mn->mn_inputs, mnl, mnl_mn_link);
2001 idnode_notify_changed(&mnl->mnl_network->mn_id);
2002 return 1;
2003 }
2004
2005 static void
mpegts_input_del_network(mpegts_network_link_t * mnl)2006 mpegts_input_del_network ( mpegts_network_link_t *mnl )
2007 {
2008 idnode_notify_changed(&mnl->mnl_network->mn_id);
2009 LIST_REMOVE(mnl, mnl_mn_link);
2010 LIST_REMOVE(mnl, mnl_mi_link);
2011 free(mnl);
2012 }
2013
2014 int
mpegts_input_set_networks(mpegts_input_t * mi,htsmsg_t * msg)2015 mpegts_input_set_networks ( mpegts_input_t *mi, htsmsg_t *msg )
2016 {
2017 int save = 0;
2018 const char *str;
2019 htsmsg_field_t *f;
2020 mpegts_network_t *mn;
2021 mpegts_network_link_t *mnl, *nxt;
2022
2023 /* Mark for deletion */
2024 LIST_FOREACH(mnl, &mi->mi_networks, mnl_mi_link)
2025 mnl->mnl_mark = 1;
2026
2027 /* Link */
2028 HTSMSG_FOREACH(f, msg) {
2029 if (!(str = htsmsg_field_get_str(f))) continue;
2030 if (!(mn = mpegts_network_find(str))) continue;
2031 save |= mpegts_input_add_network(mi, mn);
2032 }
2033
2034 /* Unlink */
2035 for (mnl = LIST_FIRST(&mi->mi_networks); mnl != NULL; mnl = nxt) {
2036 nxt = LIST_NEXT(mnl, mnl_mi_link);
2037 if (mnl->mnl_mark) {
2038 mpegts_input_del_network(mnl);
2039 save = 1;
2040 }
2041 }
2042
2043 return save;
2044 }
2045
2046 int
mpegts_input_grace(mpegts_input_t * mi,mpegts_mux_t * mm)2047 mpegts_input_grace( mpegts_input_t *mi, mpegts_mux_t *mm )
2048 {
2049 /* Get timeout */
2050 int t = 0;
2051 if (mi && mi->mi_get_grace)
2052 t = mi->mi_get_grace(mi, mm);
2053 if (t < 5) t = 5; // lower bound
2054 return t;
2055 }
2056
2057 /******************************************************************************
2058 * Editor Configuration
2059 *
2060 * vim:sts=2:ts=2:sw=2:et
2061 *****************************************************************************/
2062