1 /**
2  *  Global header modification
3  *  Copyright (C) 2010 Andreas Öman
4  *
5  *  This program is free software: you can redistribute it and/or modify
6  *  it under the terms of the GNU General Public License as published by
7  *  the Free Software Foundation, either version 3 of the License, or
8  *  (at your option) any later version.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU General Public License for more details.
14  *
15  *  You should have received a copy of the GNU General Public License
16  *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
17  */
18 
19 #include <assert.h>
20 #include "tvheadend.h"
21 #include "streaming.h"
22 #include "globalheaders.h"
23 
24 typedef struct globalheaders {
25   streaming_target_t gh_input;
26 
27   streaming_target_t *gh_output;
28 
29   struct th_pktref_queue gh_holdq;
30 
31   streaming_start_t *gh_ss;
32 
33   int gh_passthru;
34 
35 } globalheaders_t;
36 
37 /* note: there up to 2.5 sec diffs in some sources! */
38 #define MAX_SCAN_TIME 5000  // in ms
39 
40 /**
41  *
42  */
43 static inline int
gh_require_meta(int type)44 gh_require_meta(int type)
45 {
46   return type == SCT_HEVC ||
47          type == SCT_H264 ||
48          type == SCT_MPEG2VIDEO ||
49          type == SCT_MP4A ||
50          type == SCT_AAC ||
51          type == SCT_VORBIS;
52 }
53 
54 /**
55  *
56  */
57 static inline int
gh_is_audiovideo(int type)58 gh_is_audiovideo(int type)
59 {
60   return SCT_ISVIDEO(type) || SCT_ISAUDIO(type);
61 }
62 
63 /**
64  *
65  */
66 static void
gh_flush(globalheaders_t * gh)67 gh_flush(globalheaders_t *gh)
68 {
69   if(gh->gh_ss != NULL) {
70     streaming_start_unref(gh->gh_ss);
71     gh->gh_ss = NULL;
72   }
73 
74   pktref_clear_queue(&gh->gh_holdq);
75 }
76 
77 
78 /**
79  *
80  */
81 static void
apply_header(streaming_start_component_t * ssc,th_pkt_t * pkt)82 apply_header(streaming_start_component_t *ssc, th_pkt_t *pkt)
83 {
84   if(ssc->ssc_frameduration == 0 && pkt->pkt_duration != 0)
85     ssc->ssc_frameduration = pkt->pkt_duration;
86 
87   if(SCT_ISAUDIO(ssc->ssc_type) && !ssc->ssc_channels && !ssc->ssc_sri) {
88     ssc->ssc_channels = pkt->a.pkt_channels;
89     ssc->ssc_sri      = pkt->a.pkt_sri;
90     ssc->ssc_ext_sri  = pkt->a.pkt_ext_sri;
91   }
92 
93   if(SCT_ISVIDEO(ssc->ssc_type)) {
94     if(pkt->v.pkt_aspect_num && pkt->v.pkt_aspect_den) {
95       ssc->ssc_aspect_num = pkt->v.pkt_aspect_num;
96       ssc->ssc_aspect_den = pkt->v.pkt_aspect_den;
97     }
98   }
99 
100   if(ssc->ssc_gh != NULL)
101     return;
102 
103   if(pkt->pkt_meta != NULL) {
104     ssc->ssc_gh = pkt->pkt_meta;
105     pktbuf_ref_inc(ssc->ssc_gh);
106     return;
107   }
108 
109   if (ssc->ssc_type == SCT_MP4A || ssc->ssc_type == SCT_AAC) {
110     ssc->ssc_gh = pktbuf_alloc(NULL, pkt->a.pkt_ext_sri ? 5 : 2);
111     uint8_t *d = pktbuf_ptr(ssc->ssc_gh);
112 
113     const int profile = 2; /* AAC LC */
114     d[0] = (profile << 3) | ((pkt->a.pkt_sri & 0xe) >> 1);
115     d[1] = ((pkt->a.pkt_sri & 0x1) << 7) | (pkt->a.pkt_channels << 3);
116     if (pkt->a.pkt_ext_sri) { /* SBR extension */
117       d[2] = 0x56;
118       d[3] = 0xe5;
119       d[4] = 0x80 | ((pkt->a.pkt_ext_sri - 1) << 3);
120     }
121   }
122 }
123 
124 
125 /**
126  *
127  */
128 static int
header_complete(streaming_start_component_t * ssc,int not_so_picky)129 header_complete(streaming_start_component_t *ssc, int not_so_picky)
130 {
131   int is_video = SCT_ISVIDEO(ssc->ssc_type);
132   int is_audio = !is_video && SCT_ISAUDIO(ssc->ssc_type);
133 
134   if((is_audio || is_video) && ssc->ssc_frameduration == 0)
135     return 0;
136 
137   if(is_video) {
138     if(!not_so_picky && (ssc->ssc_aspect_num == 0 || ssc->ssc_aspect_den == 0 ||
139                          ssc->ssc_width == 0 || ssc->ssc_height == 0))
140       return 0;
141   }
142 
143   if(is_audio && (ssc->ssc_sri == 0 || ssc->ssc_channels == 0))
144     return 0;
145 
146   if(ssc->ssc_gh == NULL && gh_require_meta(ssc->ssc_type))
147     return 0;
148 
149   return 1;
150 }
151 
152 
153 /**
154  *
155  */
156 static int64_t
gh_queue_delay(globalheaders_t * gh,int index)157 gh_queue_delay(globalheaders_t *gh, int index)
158 {
159   th_pktref_t *f = TAILQ_FIRST(&gh->gh_holdq);
160   th_pktref_t *l = TAILQ_LAST(&gh->gh_holdq, th_pktref_queue);
161   streaming_start_component_t *ssc;
162   int64_t diff;
163 
164   /*
165    * Find only packets which require the meta data. Ignore others.
166    */
167   while (f != l) {
168     if (f->pr_pkt->pkt_dts != PTS_UNSET) {
169       ssc = streaming_start_component_find_by_index
170               (gh->gh_ss, f->pr_pkt->pkt_componentindex);
171       if (ssc && ssc->ssc_index == index)
172         break;
173     }
174     f = TAILQ_NEXT(f, pr_link);
175   }
176   while (l != f) {
177     if (l->pr_pkt->pkt_dts != PTS_UNSET) {
178       ssc = streaming_start_component_find_by_index
179               (gh->gh_ss, l->pr_pkt->pkt_componentindex);
180       if (ssc && ssc->ssc_index == index)
181         break;
182     }
183     l = TAILQ_PREV(l, th_pktref_queue, pr_link);
184   }
185 
186   if (l->pr_pkt->pkt_dts != PTS_UNSET && f->pr_pkt->pkt_dts != PTS_UNSET) {
187     diff = (l->pr_pkt->pkt_dts & PTS_MASK) - (f->pr_pkt->pkt_dts & PTS_MASK);
188     if (diff < 0)
189       diff += PTS_MASK;
190   } else {
191     diff = 0;
192   }
193 
194   /* special noop packet from transcoder, increase decision limit */
195   if (l == f && l->pr_pkt->pkt_payload == NULL)
196     return 1;
197 
198   return diff;
199 }
200 
201 
202 /**
203  *
204  */
205 static int
headers_complete(globalheaders_t * gh)206 headers_complete(globalheaders_t *gh)
207 {
208   streaming_start_t *ss = gh->gh_ss;
209   streaming_start_component_t *ssc;
210   int64_t *qd = alloca(ss->ss_num_components * sizeof(int64_t));
211   int64_t qd_max = 0;
212   int i, threshold = 0;
213 
214   assert(ss != NULL);
215 
216   for(i = 0; i < ss->ss_num_components; i++) {
217     ssc = &ss->ss_components[i];
218     qd[i] = gh_is_audiovideo(ssc->ssc_type) ?
219               gh_queue_delay(gh, ssc->ssc_index) : 0;
220     if (qd[i] > qd_max)
221       qd_max = qd[i];
222   }
223 
224   if (qd_max <= 0)
225     return 0;
226 
227   threshold = qd_max > MAX_SCAN_TIME * 90;
228 
229   for(i = 0; i < ss->ss_num_components; i++) {
230     ssc = &ss->ss_components[i];
231 
232     if(!header_complete(ssc, threshold)) {
233       /*
234        * disable stream only when
235        * - half timeout is reached without any packets seen
236        * - maximal timeout is reached without metadata
237        */
238       if(threshold || (qd[i] <= 0 && qd_max > (MAX_SCAN_TIME * 90) / 2)) {
239 	ssc->ssc_disabled = 1;
240         tvhdebug(LS_GLOBALHEADERS, "gh disable stream %d %s%s%s (PID %i) threshold %d qd %"PRId64" qd_max %"PRId64,
241              ssc->ssc_index, streaming_component_type2txt(ssc->ssc_type),
242              ssc->ssc_lang[0] ? " " : "", ssc->ssc_lang, ssc->ssc_pid,
243              threshold, qd[i], qd_max);
244       } else {
245 	return 0;
246       }
247     } else {
248       ssc->ssc_disabled = 0;
249     }
250   }
251 
252   if (tvhtrace_enabled()) {
253     for(i = 0; i < ss->ss_num_components; i++) {
254       ssc = &ss->ss_components[i];
255       tvhtrace(LS_GLOBALHEADERS, "stream %d %s%s%s (PID %i) complete time %"PRId64"%s",
256                ssc->ssc_index, streaming_component_type2txt(ssc->ssc_type),
257                ssc->ssc_lang[0] ? " " : "", ssc->ssc_lang, ssc->ssc_pid,
258                gh_queue_delay(gh, ssc->ssc_index),
259                ssc->ssc_disabled ? " disabled" : "");
260     }
261   }
262 
263   return 1;
264 }
265 
266 
267 /**
268  *
269  */
270 static void
gh_start(globalheaders_t * gh,streaming_message_t * sm)271 gh_start(globalheaders_t *gh, streaming_message_t *sm)
272 {
273   gh->gh_ss = streaming_start_copy(sm->sm_data);
274   streaming_msg_free(sm);
275 }
276 
277 
278 /**
279  *
280  */
281 static void
gh_hold(globalheaders_t * gh,streaming_message_t * sm)282 gh_hold(globalheaders_t *gh, streaming_message_t *sm)
283 {
284   th_pkt_t *pkt;
285   streaming_start_component_t *ssc;
286 
287   switch(sm->sm_type) {
288   case SMT_PACKET:
289     pkt = sm->sm_data;
290     ssc = streaming_start_component_find_by_index(gh->gh_ss,
291 						  pkt->pkt_componentindex);
292     if (ssc == NULL) {
293       tvherror(LS_GLOBALHEADERS, "Unable to find component %d", pkt->pkt_componentindex);
294       streaming_msg_free(sm);
295       return;
296     }
297 
298     pkt_trace(LS_GLOBALHEADERS, pkt, "hold receive");
299 
300     pkt_ref_inc(pkt);
301 
302     if (pkt->pkt_err == 0)
303       apply_header(ssc, pkt);
304 
305     pktref_enqueue(&gh->gh_holdq, pkt);
306 
307     streaming_msg_free(sm);
308 
309     if(!gh_is_audiovideo(ssc->ssc_type))
310       break;
311 
312     if(!headers_complete(gh))
313       break;
314 
315     // Send our modified start
316     sm = streaming_msg_create_data(SMT_START,
317 				   streaming_start_copy(gh->gh_ss));
318     streaming_target_deliver2(gh->gh_output, sm);
319 
320     // Send all pending packets
321     while((pkt = pktref_get_first(&gh->gh_holdq)) != NULL) {
322       if (pkt->pkt_payload) {
323         sm = streaming_msg_create_pkt(pkt);
324         streaming_target_deliver2(gh->gh_output, sm);
325       }
326       pkt_ref_dec(pkt);
327     }
328     gh->gh_passthru = 1;
329     break;
330 
331   case SMT_START:
332     if (gh->gh_ss)
333       gh_flush(gh);
334     gh_start(gh, sm);
335     break;
336 
337   case SMT_STOP:
338     gh_flush(gh);
339     streaming_msg_free(sm);
340     break;
341 
342   case SMT_GRACE:
343   case SMT_EXIT:
344   case SMT_SERVICE_STATUS:
345   case SMT_SIGNAL_STATUS:
346   case SMT_DESCRAMBLE_INFO:
347   case SMT_NOSTART:
348   case SMT_NOSTART_WARN:
349   case SMT_MPEGTS:
350   case SMT_SPEED:
351   case SMT_SKIP:
352   case SMT_TIMESHIFT_STATUS:
353     streaming_target_deliver2(gh->gh_output, sm);
354     break;
355   }
356 }
357 
358 
359 /**
360  *
361  */
362 static void
gh_pass(globalheaders_t * gh,streaming_message_t * sm)363 gh_pass(globalheaders_t *gh, streaming_message_t *sm)
364 {
365   th_pkt_t *pkt;
366   switch(sm->sm_type) {
367   case SMT_START:
368     /* stop */
369     gh->gh_passthru = 0;
370     gh_flush(gh);
371     /* restart */
372     gh_start(gh, sm);
373     break;
374 
375   case SMT_STOP:
376     gh->gh_passthru = 0;
377     gh_flush(gh);
378     // FALLTHRU
379   case SMT_GRACE:
380   case SMT_EXIT:
381   case SMT_SERVICE_STATUS:
382   case SMT_SIGNAL_STATUS:
383   case SMT_DESCRAMBLE_INFO:
384   case SMT_NOSTART:
385   case SMT_NOSTART_WARN:
386   case SMT_MPEGTS:
387   case SMT_SKIP:
388   case SMT_SPEED:
389   case SMT_TIMESHIFT_STATUS:
390     streaming_target_deliver2(gh->gh_output, sm);
391     break;
392   case SMT_PACKET:
393     pkt = sm->sm_data;
394     if (pkt->pkt_payload || pkt->pkt_err)
395       streaming_target_deliver2(gh->gh_output, sm);
396     else
397       streaming_msg_free(sm);
398     break;
399   }
400 }
401 
402 
403 /**
404  *
405  */
406 static void
globalheaders_input(void * opaque,streaming_message_t * sm)407 globalheaders_input(void *opaque, streaming_message_t *sm)
408 {
409   globalheaders_t *gh = opaque;
410 
411   if(gh->gh_passthru)
412     gh_pass(gh, sm);
413   else
414     gh_hold(gh, sm);
415 }
416 
417 static htsmsg_t *
globalheaders_input_info(void * opaque,htsmsg_t * list)418 globalheaders_input_info(void *opaque, htsmsg_t *list)
419 {
420   globalheaders_t *gh = opaque;
421   streaming_target_t *st = gh->gh_output;
422   htsmsg_add_str(list, NULL, "globalheaders input");
423   return st->st_ops.st_info(st->st_opaque, list);
424 }
425 
426 static streaming_ops_t globalheaders_input_ops = {
427   .st_cb   = globalheaders_input,
428   .st_info = globalheaders_input_info
429 };
430 
431 
432 /**
433  *
434  */
435 streaming_target_t *
globalheaders_create(streaming_target_t * output)436 globalheaders_create(streaming_target_t *output)
437 {
438   globalheaders_t *gh = calloc(1, sizeof(globalheaders_t));
439 
440   TAILQ_INIT(&gh->gh_holdq);
441 
442   gh->gh_output = output;
443   streaming_target_init(&gh->gh_input, &globalheaders_input_ops, gh, 0);
444   return &gh->gh_input;
445 }
446 
447 
448 /**
449  *
450  */
451 void
globalheaders_destroy(streaming_target_t * pad)452 globalheaders_destroy(streaming_target_t *pad)
453 {
454   globalheaders_t *gh = (globalheaders_t *)pad;
455   gh_flush(gh);
456   free(gh);
457 }
458