1 /**
2 * Streaming helpers
3 * Copyright (C) 2008 Andreas Öman
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 #include <string.h>
20 #include <assert.h>
21
22 #include "tvheadend.h"
23 #include "streaming.h"
24 #include "packet.h"
25 #include "atomic.h"
26 #include "service.h"
27 #include "timeshift.h"
28
29 static memoryinfo_t streaming_msg_memoryinfo = { .my_name = "Streaming message" };
30
31 void
streaming_pad_init(streaming_pad_t * sp)32 streaming_pad_init(streaming_pad_t *sp)
33 {
34 LIST_INIT(&sp->sp_targets);
35 sp->sp_ntargets = 0;
36 sp->sp_reject_filter = ~0;
37 }
38
39 /**
40 *
41 */
42 void
streaming_target_init(streaming_target_t * st,streaming_ops_t * ops,void * opaque,int reject_filter)43 streaming_target_init(streaming_target_t *st, streaming_ops_t *ops,
44 void *opaque, int reject_filter)
45 {
46 st->st_ops = *ops;
47 st->st_opaque = opaque;
48 st->st_reject_filter = reject_filter;
49 }
50
51 /**
52 *
53 */
54 static size_t
streaming_message_data_size(streaming_message_t * sm)55 streaming_message_data_size(streaming_message_t *sm)
56 {
57 if (sm->sm_type == SMT_PACKET) {
58 th_pkt_t *pkt = sm->sm_data;
59 if (pkt && pkt->pkt_payload)
60 return pktbuf_len(pkt->pkt_payload);
61 } else if (sm->sm_type == SMT_MPEGTS) {
62 pktbuf_t *pkt_payload = sm->sm_data;
63 if (pkt_payload)
64 return pktbuf_len(pkt_payload);
65 }
66 return 0;
67 }
68
69 /**
70 *
71 */
72 static void
streaming_queue_deliver(void * opauqe,streaming_message_t * sm)73 streaming_queue_deliver(void *opauqe, streaming_message_t *sm)
74 {
75 streaming_queue_t *sq = opauqe;
76
77 pthread_mutex_lock(&sq->sq_mutex);
78
79 /* queue size protection */
80 if (sq->sq_maxsize && sq->sq_maxsize < sq->sq_size) {
81 streaming_msg_free(sm);
82 } else {
83 TAILQ_INSERT_TAIL(&sq->sq_queue, sm, sm_link);
84 sq->sq_size += streaming_message_data_size(sm);
85 }
86
87 tvh_cond_signal(&sq->sq_cond, 0);
88 pthread_mutex_unlock(&sq->sq_mutex);
89 }
90
91 /**
92 *
93 */
94 static htsmsg_t *
streaming_queue_info(void * opaque,htsmsg_t * list)95 streaming_queue_info(void *opaque, htsmsg_t *list)
96 {
97 streaming_queue_t *sq = opaque;
98 char buf[256];
99 snprintf(buf, sizeof(buf), "streaming queue %p size %zd", sq, sq->sq_size);
100 htsmsg_add_str(list, NULL, buf);
101 return list;
102 }
103
104 /**
105 *
106 */
107 void
streaming_queue_remove(streaming_queue_t * sq,streaming_message_t * sm)108 streaming_queue_remove(streaming_queue_t *sq, streaming_message_t *sm)
109 {
110 sq->sq_size -= streaming_message_data_size(sm);
111 TAILQ_REMOVE(&sq->sq_queue, sm, sm_link);
112 }
113
114 /**
115 *
116 */
117 void
streaming_queue_init(streaming_queue_t * sq,int reject_filter,size_t maxsize)118 streaming_queue_init(streaming_queue_t *sq, int reject_filter, size_t maxsize)
119 {
120 static streaming_ops_t ops = {
121 .st_cb = streaming_queue_deliver,
122 .st_info = streaming_queue_info
123 };
124
125 streaming_target_init(&sq->sq_st, &ops, sq, reject_filter);
126
127 pthread_mutex_init(&sq->sq_mutex, NULL);
128 tvh_cond_init(&sq->sq_cond);
129 TAILQ_INIT(&sq->sq_queue);
130
131 sq->sq_maxsize = maxsize;
132 sq->sq_size = 0;
133 }
134
135 /**
136 *
137 */
138 void
streaming_queue_deinit(streaming_queue_t * sq)139 streaming_queue_deinit(streaming_queue_t *sq)
140 {
141 sq->sq_size = 0;
142 streaming_queue_clear(&sq->sq_queue);
143 pthread_mutex_destroy(&sq->sq_mutex);
144 tvh_cond_destroy(&sq->sq_cond);
145 }
146
147 /**
148 *
149 */
150 void
streaming_queue_clear(struct streaming_message_queue * q)151 streaming_queue_clear(struct streaming_message_queue *q)
152 {
153 streaming_message_t *sm;
154
155 while((sm = TAILQ_FIRST(q)) != NULL) {
156 TAILQ_REMOVE(q, sm, sm_link);
157 streaming_msg_free(sm);
158 }
159 }
160
161 /**
162 *
163 */
164 void
streaming_target_connect(streaming_pad_t * sp,streaming_target_t * st)165 streaming_target_connect(streaming_pad_t *sp, streaming_target_t *st)
166 {
167 sp->sp_ntargets++;
168 st->st_pad = sp;
169 LIST_INSERT_HEAD(&sp->sp_targets, st, st_link);
170 sp->sp_reject_filter &= st->st_reject_filter;
171 }
172
173
174 /**
175 *
176 */
177 void
streaming_target_disconnect(streaming_pad_t * sp,streaming_target_t * st)178 streaming_target_disconnect(streaming_pad_t *sp, streaming_target_t *st)
179 {
180 int filter;
181
182 sp->sp_ntargets--;
183 st->st_pad = NULL;
184
185 LIST_REMOVE(st, st_link);
186
187 filter = ~0;
188 LIST_FOREACH(st, &sp->sp_targets, st_link)
189 filter &= st->st_reject_filter;
190 sp->sp_reject_filter = filter;
191 }
192
193
194 /**
195 *
196 */
197 streaming_message_t *
streaming_msg_create(streaming_message_type_t type)198 streaming_msg_create(streaming_message_type_t type)
199 {
200 streaming_message_t *sm = malloc(sizeof(streaming_message_t));
201 memoryinfo_alloc(&streaming_msg_memoryinfo, sizeof(*sm));
202 sm->sm_type = type;
203 #if ENABLE_TIMESHIFT
204 sm->sm_time = 0;
205 #endif
206 return sm;
207 }
208
209
210 /**
211 *
212 */
213 streaming_message_t *
streaming_msg_create_pkt(th_pkt_t * pkt)214 streaming_msg_create_pkt(th_pkt_t *pkt)
215 {
216 streaming_message_t *sm = streaming_msg_create(SMT_PACKET);
217 sm->sm_data = pkt;
218 pkt_ref_inc(pkt);
219 return sm;
220 }
221
222
223 /**
224 *
225 */
226 streaming_message_t *
streaming_msg_create_data(streaming_message_type_t type,void * data)227 streaming_msg_create_data(streaming_message_type_t type, void *data)
228 {
229 streaming_message_t *sm = streaming_msg_create(type);
230 sm->sm_data = data;
231 return sm;
232 }
233
234
235 /**
236 *
237 */
238 streaming_message_t *
streaming_msg_create_code(streaming_message_type_t type,int code)239 streaming_msg_create_code(streaming_message_type_t type, int code)
240 {
241 streaming_message_t *sm = streaming_msg_create(type);
242 sm->sm_code = code;
243 return sm;
244 }
245
246
247
248 /**
249 *
250 */
251 streaming_message_t *
streaming_msg_clone(streaming_message_t * src)252 streaming_msg_clone(streaming_message_t *src)
253 {
254 streaming_message_t *dst = malloc(sizeof(streaming_message_t));
255 streaming_start_t *ss;
256
257 memoryinfo_alloc(&streaming_msg_memoryinfo, sizeof(*dst));
258
259 dst->sm_type = src->sm_type;
260 #if ENABLE_TIMESHIFT
261 dst->sm_time = src->sm_time;
262 #endif
263
264 switch(src->sm_type) {
265
266 case SMT_PACKET:
267 pkt_ref_inc(src->sm_data);
268 dst->sm_data = src->sm_data;
269 break;
270
271 case SMT_START:
272 ss = dst->sm_data = src->sm_data;
273 streaming_start_ref(ss);
274 break;
275
276 case SMT_SKIP:
277 dst->sm_data = malloc(sizeof(streaming_skip_t));
278 memcpy(dst->sm_data, src->sm_data, sizeof(streaming_skip_t));
279 break;
280
281 case SMT_SIGNAL_STATUS:
282 dst->sm_data = malloc(sizeof(signal_status_t));
283 memcpy(dst->sm_data, src->sm_data, sizeof(signal_status_t));
284 break;
285
286 case SMT_DESCRAMBLE_INFO:
287 dst->sm_data = malloc(sizeof(descramble_info_t));
288 memcpy(dst->sm_data, src->sm_data, sizeof(descramble_info_t));
289 break;
290
291 case SMT_TIMESHIFT_STATUS:
292 dst->sm_data = malloc(sizeof(timeshift_status_t));
293 memcpy(dst->sm_data, src->sm_data, sizeof(timeshift_status_t));
294 break;
295
296 case SMT_GRACE:
297 case SMT_SPEED:
298 case SMT_STOP:
299 case SMT_SERVICE_STATUS:
300 case SMT_NOSTART:
301 case SMT_NOSTART_WARN:
302 dst->sm_code = src->sm_code;
303 break;
304
305 case SMT_EXIT:
306 break;
307
308 case SMT_MPEGTS:
309 pktbuf_ref_inc(src->sm_data);
310 dst->sm_data = src->sm_data;
311 break;
312
313 default:
314 abort();
315 }
316 return dst;
317 }
318
319
320 /**
321 *
322 */
323 void
streaming_start_unref(streaming_start_t * ss)324 streaming_start_unref(streaming_start_t *ss)
325 {
326 int i;
327
328 if((atomic_add(&ss->ss_refcount, -1)) != 1)
329 return;
330
331 service_source_info_free(&ss->ss_si);
332 for(i = 0; i < ss->ss_num_components; i++)
333 if(ss->ss_components[i].ssc_gh)
334 pktbuf_ref_dec(ss->ss_components[i].ssc_gh);
335 free(ss);
336 }
337
338 /**
339 *
340 */
341 void
streaming_msg_free(streaming_message_t * sm)342 streaming_msg_free(streaming_message_t *sm)
343 {
344 if (!sm)
345 return;
346
347 switch(sm->sm_type) {
348 case SMT_PACKET:
349 if(sm->sm_data)
350 pkt_ref_dec(sm->sm_data);
351 break;
352
353 case SMT_START:
354 if(sm->sm_data)
355 streaming_start_unref(sm->sm_data);
356 break;
357
358 case SMT_GRACE:
359 case SMT_STOP:
360 case SMT_EXIT:
361 case SMT_SERVICE_STATUS:
362 case SMT_NOSTART:
363 case SMT_NOSTART_WARN:
364 case SMT_SPEED:
365 break;
366
367 case SMT_SKIP:
368 case SMT_SIGNAL_STATUS:
369 case SMT_DESCRAMBLE_INFO:
370 #if ENABLE_TIMESHIFT
371 case SMT_TIMESHIFT_STATUS:
372 #endif
373 free(sm->sm_data);
374 break;
375
376 case SMT_MPEGTS:
377 if(sm->sm_data)
378 pktbuf_ref_dec(sm->sm_data);
379 break;
380
381 default:
382 abort();
383 }
384 memoryinfo_free(&streaming_msg_memoryinfo, sizeof(*sm));
385 free(sm);
386 }
387
388 /**
389 *
390 */
391 void
streaming_target_deliver2(streaming_target_t * st,streaming_message_t * sm)392 streaming_target_deliver2(streaming_target_t *st, streaming_message_t *sm)
393 {
394 if (st->st_reject_filter & SMT_TO_MASK(sm->sm_type))
395 streaming_msg_free(sm);
396 else
397 streaming_target_deliver(st, sm);
398 }
399
400 /**
401 *
402 */
403 void
streaming_pad_deliver(streaming_pad_t * sp,streaming_message_t * sm)404 streaming_pad_deliver(streaming_pad_t *sp, streaming_message_t *sm)
405 {
406 streaming_target_t *st, *next, *run = NULL;
407
408 for (st = LIST_FIRST(&sp->sp_targets); st; st = next) {
409 next = LIST_NEXT(st, st_link);
410 assert(next != st);
411 if (st->st_reject_filter & SMT_TO_MASK(sm->sm_type))
412 continue;
413 if (run)
414 streaming_target_deliver(run, streaming_msg_clone(sm));
415 run = st;
416 }
417 if (run)
418 streaming_target_deliver(run, sm);
419 else
420 streaming_msg_free(sm);
421 }
422
423 /**
424 *
425 */
426 const char *
streaming_code2txt(int code)427 streaming_code2txt(int code)
428 {
429 static __thread char ret[64];
430
431 switch(code) {
432 case SM_CODE_OK:
433 return N_("OK");
434 case SM_CODE_FORCE_OK:
435 return N_("Forced OK");
436
437 case SM_CODE_SOURCE_RECONFIGURED:
438 return N_("Source reconfigured");
439 case SM_CODE_BAD_SOURCE:
440 return N_("Source quality is bad");
441 case SM_CODE_SOURCE_DELETED:
442 return N_("Source deleted");
443 case SM_CODE_SUBSCRIPTION_OVERRIDDEN:
444 return N_("Subscription overridden");
445 case SM_CODE_INVALID_TARGET:
446 return N_("Invalid target");
447 case SM_CODE_USER_ACCESS:
448 return N_("User access error");
449 case SM_CODE_USER_LIMIT:
450 return N_("User limit reached");
451 case SM_CODE_WEAK_STREAM:
452 return N_("Weak stream");
453 case SM_CODE_USER_REQUEST:
454 return N_("User request");
455
456 case SM_CODE_NO_FREE_ADAPTER:
457 return N_("No free adapter");
458 case SM_CODE_MUX_NOT_ENABLED:
459 return N_("Mux not enabled");
460 case SM_CODE_NOT_FREE:
461 return N_("Adapter in use by another subscription");
462 case SM_CODE_TUNING_FAILED:
463 return N_("Tuning failed");
464 case SM_CODE_SVC_NOT_ENABLED:
465 return N_("No service enabled");
466 case SM_CODE_BAD_SIGNAL:
467 return N_("Signal quality too poor");
468 case SM_CODE_NO_SOURCE:
469 return N_("No source available");
470 case SM_CODE_NO_SERVICE:
471 return N_("No service assigned to channel");
472 case SM_CODE_NO_ADAPTERS:
473 return N_("No assigned adapters");
474 case SM_CODE_INVALID_SERVICE:
475 return N_("Invalid service");
476
477 case SM_CODE_ABORTED:
478 return N_("Aborted by user");
479
480 case SM_CODE_NO_DESCRAMBLER:
481 return N_("No descrambler");
482 case SM_CODE_NO_ACCESS:
483 return N_("No access");
484 case SM_CODE_NO_INPUT:
485 return N_("No input detected");
486 case SM_CODE_NO_SPACE:
487 return N_("Not enough disk space");
488
489 default:
490 snprintf(ret, sizeof(ret), _("Unknown reason (%i)"), code);
491 return ret;
492 }
493 }
494
495
496 /**
497 *
498 */
499 streaming_start_t *
streaming_start_copy(const streaming_start_t * src)500 streaming_start_copy(const streaming_start_t *src)
501 {
502 int i;
503 size_t siz = sizeof(streaming_start_t) +
504 sizeof(streaming_start_component_t) * src->ss_num_components;
505
506 streaming_start_t *dst = malloc(siz);
507
508 memcpy(dst, src, siz);
509 service_source_info_copy(&dst->ss_si, &src->ss_si);
510
511 for(i = 0; i < dst->ss_num_components; i++) {
512 streaming_start_component_t *ssc = &dst->ss_components[i];
513 if(ssc->ssc_gh != NULL)
514 pktbuf_ref_inc(ssc->ssc_gh);
515 }
516
517 dst->ss_refcount = 1;
518 return dst;
519 }
520
521
522 /**
523 *
524 */
525 streaming_start_component_t *
streaming_start_component_find_by_index(streaming_start_t * ss,int idx)526 streaming_start_component_find_by_index(streaming_start_t *ss, int idx)
527 {
528 int i;
529 for(i = 0; i < ss->ss_num_components; i++) {
530 streaming_start_component_t *ssc = &ss->ss_components[i];
531 if(ssc->ssc_index == idx)
532 return ssc;
533 }
534 return NULL;
535 }
536
537 /**
538 *
539 */
540 static struct strtab streamtypetab[] = {
541 { "NONE", SCT_NONE },
542 { "UNKNOWN", SCT_UNKNOWN },
543 { "MPEG2VIDEO", SCT_MPEG2VIDEO },
544 { "MPEG2AUDIO", SCT_MPEG2AUDIO },
545 { "H264", SCT_H264 },
546 { "AC3", SCT_AC3 },
547 { "TELETEXT", SCT_TELETEXT },
548 { "DVBSUB", SCT_DVBSUB },
549 { "CA", SCT_CA },
550 { "AAC", SCT_AAC },
551 { "MPEGTS", SCT_MPEGTS },
552 { "TEXTSUB", SCT_TEXTSUB },
553 { "EAC3", SCT_EAC3 },
554 { "AAC-LATM", SCT_MP4A },
555 { "VP8", SCT_VP8 },
556 { "VORBIS", SCT_VORBIS },
557 { "HEVC", SCT_HEVC },
558 { "VP9", SCT_VP9 },
559 };
560
561 /**
562 *
563 */
564 const char *
streaming_component_type2txt(streaming_component_type_t s)565 streaming_component_type2txt(streaming_component_type_t s)
566 {
567 return val2str(s, streamtypetab) ?: "INVALID";
568 }
569
570 streaming_component_type_t
streaming_component_txt2type(const char * s)571 streaming_component_txt2type(const char *s)
572 {
573 return s ? str2val(s, streamtypetab) : SCT_UNKNOWN;
574 }
575
576 const char *
streaming_component_audio_type2desc(int audio_type)577 streaming_component_audio_type2desc(int audio_type)
578 {
579 /* From ISO 13818-1 - ISO 639 language descriptor */
580 switch(audio_type) {
581 case 0: return ""; /* "Undefined" in the standard, but used for normal audio */
582 case 1: return N_("Clean effects");
583 case 2: return N_("Hearing impaired");
584 case 3: return N_("Visually impaired commentary/audio description");
585 }
586
587 return N_("Reserved");
588 }
589
590 /*
591 *
592 */
streaming_init(void)593 void streaming_init(void)
594 {
595 memoryinfo_register(&streaming_msg_memoryinfo);
596 }
597
streaming_done(void)598 void streaming_done(void)
599 {
600 pthread_mutex_lock(&global_lock);
601 memoryinfo_unregister(&streaming_msg_memoryinfo);
602 pthread_mutex_unlock(&global_lock);
603 }
604