1 /**
2  *  Streaming helpers
3  *  Copyright (C) 2008 Andreas Öman
4  *
5  *  This program is free software: you can redistribute it and/or modify
6  *  it under the terms of the GNU General Public License as published by
7  *  the Free Software Foundation, either version 3 of the License, or
8  *  (at your option) any later version.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU General Public License for more details.
14  *
15  *  You should have received a copy of the GNU General Public License
16  *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
17  */
18 
19 #include <string.h>
20 #include <assert.h>
21 
22 #include "tvheadend.h"
23 #include "streaming.h"
24 #include "packet.h"
25 #include "atomic.h"
26 #include "service.h"
27 #include "timeshift.h"
28 
29 static memoryinfo_t streaming_msg_memoryinfo = { .my_name = "Streaming message" };
30 
31 void
streaming_pad_init(streaming_pad_t * sp)32 streaming_pad_init(streaming_pad_t *sp)
33 {
34   LIST_INIT(&sp->sp_targets);
35   sp->sp_ntargets = 0;
36   sp->sp_reject_filter = ~0;
37 }
38 
39 /**
40  *
41  */
42 void
streaming_target_init(streaming_target_t * st,streaming_ops_t * ops,void * opaque,int reject_filter)43 streaming_target_init(streaming_target_t *st, streaming_ops_t *ops,
44                       void *opaque, int reject_filter)
45 {
46   st->st_ops = *ops;
47   st->st_opaque = opaque;
48   st->st_reject_filter = reject_filter;
49 }
50 
51 /**
52  *
53  */
54 static size_t
streaming_message_data_size(streaming_message_t * sm)55 streaming_message_data_size(streaming_message_t *sm)
56 {
57   if (sm->sm_type == SMT_PACKET) {
58     th_pkt_t *pkt = sm->sm_data;
59     if (pkt && pkt->pkt_payload)
60       return pktbuf_len(pkt->pkt_payload);
61   } else if (sm->sm_type == SMT_MPEGTS) {
62     pktbuf_t *pkt_payload = sm->sm_data;
63     if (pkt_payload)
64       return pktbuf_len(pkt_payload);
65   }
66   return 0;
67 }
68 
69 /**
70  *
71  */
72 static void
streaming_queue_deliver(void * opauqe,streaming_message_t * sm)73 streaming_queue_deliver(void *opauqe, streaming_message_t *sm)
74 {
75   streaming_queue_t *sq = opauqe;
76 
77   pthread_mutex_lock(&sq->sq_mutex);
78 
79   /* queue size protection */
80   if (sq->sq_maxsize && sq->sq_maxsize < sq->sq_size) {
81     streaming_msg_free(sm);
82   } else {
83     TAILQ_INSERT_TAIL(&sq->sq_queue, sm, sm_link);
84     sq->sq_size += streaming_message_data_size(sm);
85   }
86 
87   tvh_cond_signal(&sq->sq_cond, 0);
88   pthread_mutex_unlock(&sq->sq_mutex);
89 }
90 
91 /**
92  *
93  */
94 static htsmsg_t *
streaming_queue_info(void * opaque,htsmsg_t * list)95 streaming_queue_info(void *opaque, htsmsg_t *list)
96 {
97   streaming_queue_t *sq = opaque;
98   char buf[256];
99   snprintf(buf, sizeof(buf), "streaming queue %p size %zd", sq, sq->sq_size);
100   htsmsg_add_str(list, NULL, buf);
101   return list;
102 }
103 
104 /**
105  *
106  */
107 void
streaming_queue_remove(streaming_queue_t * sq,streaming_message_t * sm)108 streaming_queue_remove(streaming_queue_t *sq, streaming_message_t *sm)
109 {
110   sq->sq_size -= streaming_message_data_size(sm);
111   TAILQ_REMOVE(&sq->sq_queue, sm, sm_link);
112 }
113 
114 /**
115  *
116  */
117 void
streaming_queue_init(streaming_queue_t * sq,int reject_filter,size_t maxsize)118 streaming_queue_init(streaming_queue_t *sq, int reject_filter, size_t maxsize)
119 {
120   static streaming_ops_t ops = {
121     .st_cb   = streaming_queue_deliver,
122     .st_info = streaming_queue_info
123   };
124 
125   streaming_target_init(&sq->sq_st, &ops, sq, reject_filter);
126 
127   pthread_mutex_init(&sq->sq_mutex, NULL);
128   tvh_cond_init(&sq->sq_cond);
129   TAILQ_INIT(&sq->sq_queue);
130 
131   sq->sq_maxsize = maxsize;
132   sq->sq_size = 0;
133 }
134 
135 /**
136  *
137  */
138 void
streaming_queue_deinit(streaming_queue_t * sq)139 streaming_queue_deinit(streaming_queue_t *sq)
140 {
141   sq->sq_size = 0;
142   streaming_queue_clear(&sq->sq_queue);
143   pthread_mutex_destroy(&sq->sq_mutex);
144   tvh_cond_destroy(&sq->sq_cond);
145 }
146 
147 /**
148  *
149  */
150 void
streaming_queue_clear(struct streaming_message_queue * q)151 streaming_queue_clear(struct streaming_message_queue *q)
152 {
153   streaming_message_t *sm;
154 
155   while((sm = TAILQ_FIRST(q)) != NULL) {
156     TAILQ_REMOVE(q, sm, sm_link);
157     streaming_msg_free(sm);
158   }
159 }
160 
161 /**
162  *
163  */
164 void
streaming_target_connect(streaming_pad_t * sp,streaming_target_t * st)165 streaming_target_connect(streaming_pad_t *sp, streaming_target_t *st)
166 {
167   sp->sp_ntargets++;
168   st->st_pad = sp;
169   LIST_INSERT_HEAD(&sp->sp_targets, st, st_link);
170   sp->sp_reject_filter &= st->st_reject_filter;
171 }
172 
173 
174 /**
175  *
176  */
177 void
streaming_target_disconnect(streaming_pad_t * sp,streaming_target_t * st)178 streaming_target_disconnect(streaming_pad_t *sp, streaming_target_t *st)
179 {
180   int filter;
181 
182   sp->sp_ntargets--;
183   st->st_pad = NULL;
184 
185   LIST_REMOVE(st, st_link);
186 
187   filter = ~0;
188   LIST_FOREACH(st, &sp->sp_targets, st_link)
189     filter &= st->st_reject_filter;
190   sp->sp_reject_filter = filter;
191 }
192 
193 
194 /**
195  *
196  */
197 streaming_message_t *
streaming_msg_create(streaming_message_type_t type)198 streaming_msg_create(streaming_message_type_t type)
199 {
200   streaming_message_t *sm = malloc(sizeof(streaming_message_t));
201   memoryinfo_alloc(&streaming_msg_memoryinfo, sizeof(*sm));
202   sm->sm_type = type;
203 #if ENABLE_TIMESHIFT
204   sm->sm_time = 0;
205 #endif
206   return sm;
207 }
208 
209 
210 /**
211  *
212  */
213 streaming_message_t *
streaming_msg_create_pkt(th_pkt_t * pkt)214 streaming_msg_create_pkt(th_pkt_t *pkt)
215 {
216   streaming_message_t *sm = streaming_msg_create(SMT_PACKET);
217   sm->sm_data = pkt;
218   pkt_ref_inc(pkt);
219   return sm;
220 }
221 
222 
223 /**
224  *
225  */
226 streaming_message_t *
streaming_msg_create_data(streaming_message_type_t type,void * data)227 streaming_msg_create_data(streaming_message_type_t type, void *data)
228 {
229   streaming_message_t *sm = streaming_msg_create(type);
230   sm->sm_data = data;
231   return sm;
232 }
233 
234 
235 /**
236  *
237  */
238 streaming_message_t *
streaming_msg_create_code(streaming_message_type_t type,int code)239 streaming_msg_create_code(streaming_message_type_t type, int code)
240 {
241   streaming_message_t *sm = streaming_msg_create(type);
242   sm->sm_code = code;
243   return sm;
244 }
245 
246 
247 
248 /**
249  *
250  */
251 streaming_message_t *
streaming_msg_clone(streaming_message_t * src)252 streaming_msg_clone(streaming_message_t *src)
253 {
254   streaming_message_t *dst = malloc(sizeof(streaming_message_t));
255   streaming_start_t *ss;
256 
257   memoryinfo_alloc(&streaming_msg_memoryinfo, sizeof(*dst));
258 
259   dst->sm_type      = src->sm_type;
260 #if ENABLE_TIMESHIFT
261   dst->sm_time      = src->sm_time;
262 #endif
263 
264   switch(src->sm_type) {
265 
266   case SMT_PACKET:
267     pkt_ref_inc(src->sm_data);
268     dst->sm_data = src->sm_data;
269     break;
270 
271   case SMT_START:
272     ss = dst->sm_data = src->sm_data;
273     streaming_start_ref(ss);
274     break;
275 
276   case SMT_SKIP:
277     dst->sm_data = malloc(sizeof(streaming_skip_t));
278     memcpy(dst->sm_data, src->sm_data, sizeof(streaming_skip_t));
279     break;
280 
281   case SMT_SIGNAL_STATUS:
282     dst->sm_data = malloc(sizeof(signal_status_t));
283     memcpy(dst->sm_data, src->sm_data, sizeof(signal_status_t));
284     break;
285 
286   case SMT_DESCRAMBLE_INFO:
287     dst->sm_data = malloc(sizeof(descramble_info_t));
288     memcpy(dst->sm_data, src->sm_data, sizeof(descramble_info_t));
289     break;
290 
291   case SMT_TIMESHIFT_STATUS:
292     dst->sm_data = malloc(sizeof(timeshift_status_t));
293     memcpy(dst->sm_data, src->sm_data, sizeof(timeshift_status_t));
294     break;
295 
296   case SMT_GRACE:
297   case SMT_SPEED:
298   case SMT_STOP:
299   case SMT_SERVICE_STATUS:
300   case SMT_NOSTART:
301   case SMT_NOSTART_WARN:
302     dst->sm_code = src->sm_code;
303     break;
304 
305   case SMT_EXIT:
306     break;
307 
308   case SMT_MPEGTS:
309     pktbuf_ref_inc(src->sm_data);
310     dst->sm_data = src->sm_data;
311     break;
312 
313   default:
314     abort();
315   }
316   return dst;
317 }
318 
319 
320 /**
321  *
322  */
323 void
streaming_start_unref(streaming_start_t * ss)324 streaming_start_unref(streaming_start_t *ss)
325 {
326   int i;
327 
328   if((atomic_add(&ss->ss_refcount, -1)) != 1)
329     return;
330 
331   service_source_info_free(&ss->ss_si);
332   for(i = 0; i < ss->ss_num_components; i++)
333     if(ss->ss_components[i].ssc_gh)
334       pktbuf_ref_dec(ss->ss_components[i].ssc_gh);
335   free(ss);
336 }
337 
338 /**
339  *
340  */
341 void
streaming_msg_free(streaming_message_t * sm)342 streaming_msg_free(streaming_message_t *sm)
343 {
344   if (!sm)
345     return;
346 
347   switch(sm->sm_type) {
348   case SMT_PACKET:
349     if(sm->sm_data)
350       pkt_ref_dec(sm->sm_data);
351     break;
352 
353   case SMT_START:
354     if(sm->sm_data)
355       streaming_start_unref(sm->sm_data);
356     break;
357 
358   case SMT_GRACE:
359   case SMT_STOP:
360   case SMT_EXIT:
361   case SMT_SERVICE_STATUS:
362   case SMT_NOSTART:
363   case SMT_NOSTART_WARN:
364   case SMT_SPEED:
365     break;
366 
367   case SMT_SKIP:
368   case SMT_SIGNAL_STATUS:
369   case SMT_DESCRAMBLE_INFO:
370 #if ENABLE_TIMESHIFT
371   case SMT_TIMESHIFT_STATUS:
372 #endif
373     free(sm->sm_data);
374     break;
375 
376   case SMT_MPEGTS:
377     if(sm->sm_data)
378       pktbuf_ref_dec(sm->sm_data);
379     break;
380 
381   default:
382     abort();
383   }
384   memoryinfo_free(&streaming_msg_memoryinfo, sizeof(*sm));
385   free(sm);
386 }
387 
388 /**
389  *
390  */
391 void
streaming_target_deliver2(streaming_target_t * st,streaming_message_t * sm)392 streaming_target_deliver2(streaming_target_t *st, streaming_message_t *sm)
393 {
394   if (st->st_reject_filter & SMT_TO_MASK(sm->sm_type))
395     streaming_msg_free(sm);
396   else
397     streaming_target_deliver(st, sm);
398 }
399 
400 /**
401  *
402  */
403 void
streaming_pad_deliver(streaming_pad_t * sp,streaming_message_t * sm)404 streaming_pad_deliver(streaming_pad_t *sp, streaming_message_t *sm)
405 {
406   streaming_target_t *st, *next, *run = NULL;
407 
408   for (st = LIST_FIRST(&sp->sp_targets); st; st = next) {
409     next = LIST_NEXT(st, st_link);
410     assert(next != st);
411     if (st->st_reject_filter & SMT_TO_MASK(sm->sm_type))
412       continue;
413     if (run)
414       streaming_target_deliver(run, streaming_msg_clone(sm));
415     run = st;
416   }
417   if (run)
418     streaming_target_deliver(run, sm);
419   else
420     streaming_msg_free(sm);
421 }
422 
423 /**
424  *
425  */
426 const char *
streaming_code2txt(int code)427 streaming_code2txt(int code)
428 {
429   static __thread char ret[64];
430 
431   switch(code) {
432   case SM_CODE_OK:
433     return N_("OK");
434   case SM_CODE_FORCE_OK:
435     return N_("Forced OK");
436 
437   case SM_CODE_SOURCE_RECONFIGURED:
438     return N_("Source reconfigured");
439   case SM_CODE_BAD_SOURCE:
440     return N_("Source quality is bad");
441   case SM_CODE_SOURCE_DELETED:
442     return N_("Source deleted");
443   case SM_CODE_SUBSCRIPTION_OVERRIDDEN:
444     return N_("Subscription overridden");
445   case SM_CODE_INVALID_TARGET:
446     return N_("Invalid target");
447   case SM_CODE_USER_ACCESS:
448     return N_("User access error");
449   case SM_CODE_USER_LIMIT:
450     return N_("User limit reached");
451   case SM_CODE_WEAK_STREAM:
452     return N_("Weak stream");
453   case SM_CODE_USER_REQUEST:
454     return N_("User request");
455 
456   case SM_CODE_NO_FREE_ADAPTER:
457     return N_("No free adapter");
458   case SM_CODE_MUX_NOT_ENABLED:
459     return N_("Mux not enabled");
460   case SM_CODE_NOT_FREE:
461     return N_("Adapter in use by another subscription");
462   case SM_CODE_TUNING_FAILED:
463     return N_("Tuning failed");
464   case SM_CODE_SVC_NOT_ENABLED:
465     return N_("No service enabled");
466   case SM_CODE_BAD_SIGNAL:
467     return N_("Signal quality too poor");
468   case SM_CODE_NO_SOURCE:
469     return N_("No source available");
470   case SM_CODE_NO_SERVICE:
471     return N_("No service assigned to channel");
472   case SM_CODE_NO_ADAPTERS:
473     return N_("No assigned adapters");
474   case SM_CODE_INVALID_SERVICE:
475     return N_("Invalid service");
476 
477   case SM_CODE_ABORTED:
478     return N_("Aborted by user");
479 
480   case SM_CODE_NO_DESCRAMBLER:
481     return N_("No descrambler");
482   case SM_CODE_NO_ACCESS:
483     return N_("No access");
484   case SM_CODE_NO_INPUT:
485     return N_("No input detected");
486   case SM_CODE_NO_SPACE:
487     return N_("Not enough disk space");
488 
489   default:
490     snprintf(ret, sizeof(ret), _("Unknown reason (%i)"), code);
491     return ret;
492   }
493 }
494 
495 
496 /**
497  *
498  */
499 streaming_start_t *
streaming_start_copy(const streaming_start_t * src)500 streaming_start_copy(const streaming_start_t *src)
501 {
502   int i;
503   size_t siz = sizeof(streaming_start_t) +
504     sizeof(streaming_start_component_t) * src->ss_num_components;
505 
506   streaming_start_t *dst = malloc(siz);
507 
508   memcpy(dst, src, siz);
509   service_source_info_copy(&dst->ss_si, &src->ss_si);
510 
511   for(i = 0; i < dst->ss_num_components; i++) {
512     streaming_start_component_t *ssc = &dst->ss_components[i];
513     if(ssc->ssc_gh != NULL)
514       pktbuf_ref_inc(ssc->ssc_gh);
515   }
516 
517   dst->ss_refcount = 1;
518   return dst;
519 }
520 
521 
522 /**
523  *
524  */
525 streaming_start_component_t *
streaming_start_component_find_by_index(streaming_start_t * ss,int idx)526 streaming_start_component_find_by_index(streaming_start_t *ss, int idx)
527 {
528   int i;
529   for(i = 0; i < ss->ss_num_components; i++) {
530     streaming_start_component_t *ssc = &ss->ss_components[i];
531     if(ssc->ssc_index == idx)
532       return ssc;
533   }
534   return NULL;
535 }
536 
537 /**
538  *
539  */
540 static struct strtab streamtypetab[] = {
541   { "NONE",       SCT_NONE },
542   { "UNKNOWN",    SCT_UNKNOWN },
543   { "MPEG2VIDEO", SCT_MPEG2VIDEO },
544   { "MPEG2AUDIO", SCT_MPEG2AUDIO },
545   { "H264",       SCT_H264 },
546   { "AC3",        SCT_AC3 },
547   { "TELETEXT",   SCT_TELETEXT },
548   { "DVBSUB",     SCT_DVBSUB },
549   { "CA",         SCT_CA },
550   { "AAC",        SCT_AAC },
551   { "MPEGTS",     SCT_MPEGTS },
552   { "TEXTSUB",    SCT_TEXTSUB },
553   { "EAC3",       SCT_EAC3 },
554   { "AAC-LATM",   SCT_MP4A },
555   { "VP8",        SCT_VP8 },
556   { "VORBIS",     SCT_VORBIS },
557   { "HEVC",       SCT_HEVC },
558   { "VP9",        SCT_VP9 },
559 };
560 
561 /**
562  *
563  */
564 const char *
streaming_component_type2txt(streaming_component_type_t s)565 streaming_component_type2txt(streaming_component_type_t s)
566 {
567   return val2str(s, streamtypetab) ?: "INVALID";
568 }
569 
570 streaming_component_type_t
streaming_component_txt2type(const char * s)571 streaming_component_txt2type(const char *s)
572 {
573   return s ? str2val(s, streamtypetab) : SCT_UNKNOWN;
574 }
575 
576 const char *
streaming_component_audio_type2desc(int audio_type)577 streaming_component_audio_type2desc(int audio_type)
578 {
579   /* From ISO 13818-1 - ISO 639 language descriptor */
580   switch(audio_type) {
581     case 0: return ""; /* "Undefined" in the standard, but used for normal audio */
582     case 1: return N_("Clean effects");
583     case 2: return N_("Hearing impaired");
584     case 3: return N_("Visually impaired commentary/audio description");
585   }
586 
587   return N_("Reserved");
588 }
589 
590 /*
591  *
592  */
streaming_init(void)593 void streaming_init(void)
594 {
595   memoryinfo_register(&streaming_msg_memoryinfo);
596 }
597 
streaming_done(void)598 void streaming_done(void)
599 {
600   pthread_mutex_lock(&global_lock);
601   memoryinfo_unregister(&streaming_msg_memoryinfo);
602   pthread_mutex_unlock(&global_lock);
603 }
604