1 /**
2 * Global header modification
3 * Copyright (C) 2010 Andreas Öman
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 #include <assert.h>
20 #include "tvheadend.h"
21 #include "streaming.h"
22 #include "globalheaders.h"
23
24 typedef struct globalheaders {
25 streaming_target_t gh_input;
26
27 streaming_target_t *gh_output;
28
29 struct th_pktref_queue gh_holdq;
30
31 streaming_start_t *gh_ss;
32
33 int gh_passthru;
34
35 } globalheaders_t;
36
37 /* note: there up to 2.5 sec diffs in some sources! */
38 #define MAX_SCAN_TIME 5000 // in ms
39
40 /**
41 *
42 */
43 static inline int
gh_require_meta(int type)44 gh_require_meta(int type)
45 {
46 return type == SCT_HEVC ||
47 type == SCT_H264 ||
48 type == SCT_MPEG2VIDEO ||
49 type == SCT_MP4A ||
50 type == SCT_AAC ||
51 type == SCT_VORBIS;
52 }
53
54 /**
55 *
56 */
57 static inline int
gh_is_audiovideo(int type)58 gh_is_audiovideo(int type)
59 {
60 return SCT_ISVIDEO(type) || SCT_ISAUDIO(type);
61 }
62
63 /**
64 *
65 */
66 static void
gh_flush(globalheaders_t * gh)67 gh_flush(globalheaders_t *gh)
68 {
69 if(gh->gh_ss != NULL) {
70 streaming_start_unref(gh->gh_ss);
71 gh->gh_ss = NULL;
72 }
73
74 pktref_clear_queue(&gh->gh_holdq);
75 }
76
77
78 /**
79 *
80 */
81 static void
apply_header(streaming_start_component_t * ssc,th_pkt_t * pkt)82 apply_header(streaming_start_component_t *ssc, th_pkt_t *pkt)
83 {
84 if(ssc->ssc_frameduration == 0 && pkt->pkt_duration != 0)
85 ssc->ssc_frameduration = pkt->pkt_duration;
86
87 if(SCT_ISAUDIO(ssc->ssc_type) && !ssc->ssc_channels && !ssc->ssc_sri) {
88 ssc->ssc_channels = pkt->a.pkt_channels;
89 ssc->ssc_sri = pkt->a.pkt_sri;
90 ssc->ssc_ext_sri = pkt->a.pkt_ext_sri;
91 }
92
93 if(SCT_ISVIDEO(ssc->ssc_type)) {
94 if(pkt->v.pkt_aspect_num && pkt->v.pkt_aspect_den) {
95 ssc->ssc_aspect_num = pkt->v.pkt_aspect_num;
96 ssc->ssc_aspect_den = pkt->v.pkt_aspect_den;
97 }
98 }
99
100 if(ssc->ssc_gh != NULL)
101 return;
102
103 if(pkt->pkt_meta != NULL) {
104 ssc->ssc_gh = pkt->pkt_meta;
105 pktbuf_ref_inc(ssc->ssc_gh);
106 return;
107 }
108
109 if (ssc->ssc_type == SCT_MP4A || ssc->ssc_type == SCT_AAC) {
110 ssc->ssc_gh = pktbuf_alloc(NULL, pkt->a.pkt_ext_sri ? 5 : 2);
111 uint8_t *d = pktbuf_ptr(ssc->ssc_gh);
112
113 const int profile = 2; /* AAC LC */
114 d[0] = (profile << 3) | ((pkt->a.pkt_sri & 0xe) >> 1);
115 d[1] = ((pkt->a.pkt_sri & 0x1) << 7) | (pkt->a.pkt_channels << 3);
116 if (pkt->a.pkt_ext_sri) { /* SBR extension */
117 d[2] = 0x56;
118 d[3] = 0xe5;
119 d[4] = 0x80 | ((pkt->a.pkt_ext_sri - 1) << 3);
120 }
121 }
122 }
123
124
125 /**
126 *
127 */
128 static int
header_complete(streaming_start_component_t * ssc,int not_so_picky)129 header_complete(streaming_start_component_t *ssc, int not_so_picky)
130 {
131 int is_video = SCT_ISVIDEO(ssc->ssc_type);
132 int is_audio = !is_video && SCT_ISAUDIO(ssc->ssc_type);
133
134 if((is_audio || is_video) && ssc->ssc_frameduration == 0)
135 return 0;
136
137 if(is_video) {
138 if(!not_so_picky && (ssc->ssc_aspect_num == 0 || ssc->ssc_aspect_den == 0 ||
139 ssc->ssc_width == 0 || ssc->ssc_height == 0))
140 return 0;
141 }
142
143 if(is_audio && (ssc->ssc_sri == 0 || ssc->ssc_channels == 0))
144 return 0;
145
146 if(ssc->ssc_gh == NULL && gh_require_meta(ssc->ssc_type))
147 return 0;
148
149 return 1;
150 }
151
152
153 /**
154 *
155 */
156 static int64_t
gh_queue_delay(globalheaders_t * gh,int index)157 gh_queue_delay(globalheaders_t *gh, int index)
158 {
159 th_pktref_t *f = TAILQ_FIRST(&gh->gh_holdq);
160 th_pktref_t *l = TAILQ_LAST(&gh->gh_holdq, th_pktref_queue);
161 streaming_start_component_t *ssc;
162 int64_t diff;
163
164 /*
165 * Find only packets which require the meta data. Ignore others.
166 */
167 while (f != l) {
168 if (f->pr_pkt->pkt_dts != PTS_UNSET) {
169 ssc = streaming_start_component_find_by_index
170 (gh->gh_ss, f->pr_pkt->pkt_componentindex);
171 if (ssc && ssc->ssc_index == index)
172 break;
173 }
174 f = TAILQ_NEXT(f, pr_link);
175 }
176 while (l != f) {
177 if (l->pr_pkt->pkt_dts != PTS_UNSET) {
178 ssc = streaming_start_component_find_by_index
179 (gh->gh_ss, l->pr_pkt->pkt_componentindex);
180 if (ssc && ssc->ssc_index == index)
181 break;
182 }
183 l = TAILQ_PREV(l, th_pktref_queue, pr_link);
184 }
185
186 if (l->pr_pkt->pkt_dts != PTS_UNSET && f->pr_pkt->pkt_dts != PTS_UNSET) {
187 diff = (l->pr_pkt->pkt_dts & PTS_MASK) - (f->pr_pkt->pkt_dts & PTS_MASK);
188 if (diff < 0)
189 diff += PTS_MASK;
190 } else {
191 diff = 0;
192 }
193
194 /* special noop packet from transcoder, increase decision limit */
195 if (l == f && l->pr_pkt->pkt_payload == NULL)
196 return 1;
197
198 return diff;
199 }
200
201
202 /**
203 *
204 */
205 static int
headers_complete(globalheaders_t * gh)206 headers_complete(globalheaders_t *gh)
207 {
208 streaming_start_t *ss = gh->gh_ss;
209 streaming_start_component_t *ssc;
210 int64_t *qd = alloca(ss->ss_num_components * sizeof(int64_t));
211 int64_t qd_max = 0;
212 int i, threshold = 0;
213
214 assert(ss != NULL);
215
216 for(i = 0; i < ss->ss_num_components; i++) {
217 ssc = &ss->ss_components[i];
218 qd[i] = gh_is_audiovideo(ssc->ssc_type) ?
219 gh_queue_delay(gh, ssc->ssc_index) : 0;
220 if (qd[i] > qd_max)
221 qd_max = qd[i];
222 }
223
224 if (qd_max <= 0)
225 return 0;
226
227 threshold = qd_max > MAX_SCAN_TIME * 90;
228
229 for(i = 0; i < ss->ss_num_components; i++) {
230 ssc = &ss->ss_components[i];
231
232 if(!header_complete(ssc, threshold)) {
233 /*
234 * disable stream only when
235 * - half timeout is reached without any packets seen
236 * - maximal timeout is reached without metadata
237 */
238 if(threshold || (qd[i] <= 0 && qd_max > (MAX_SCAN_TIME * 90) / 2)) {
239 ssc->ssc_disabled = 1;
240 tvhdebug(LS_GLOBALHEADERS, "gh disable stream %d %s%s%s (PID %i) threshold %d qd %"PRId64" qd_max %"PRId64,
241 ssc->ssc_index, streaming_component_type2txt(ssc->ssc_type),
242 ssc->ssc_lang[0] ? " " : "", ssc->ssc_lang, ssc->ssc_pid,
243 threshold, qd[i], qd_max);
244 } else {
245 return 0;
246 }
247 } else {
248 ssc->ssc_disabled = 0;
249 }
250 }
251
252 if (tvhtrace_enabled()) {
253 for(i = 0; i < ss->ss_num_components; i++) {
254 ssc = &ss->ss_components[i];
255 tvhtrace(LS_GLOBALHEADERS, "stream %d %s%s%s (PID %i) complete time %"PRId64"%s",
256 ssc->ssc_index, streaming_component_type2txt(ssc->ssc_type),
257 ssc->ssc_lang[0] ? " " : "", ssc->ssc_lang, ssc->ssc_pid,
258 gh_queue_delay(gh, ssc->ssc_index),
259 ssc->ssc_disabled ? " disabled" : "");
260 }
261 }
262
263 return 1;
264 }
265
266
267 /**
268 *
269 */
270 static void
gh_start(globalheaders_t * gh,streaming_message_t * sm)271 gh_start(globalheaders_t *gh, streaming_message_t *sm)
272 {
273 gh->gh_ss = streaming_start_copy(sm->sm_data);
274 streaming_msg_free(sm);
275 }
276
277
278 /**
279 *
280 */
281 static void
gh_hold(globalheaders_t * gh,streaming_message_t * sm)282 gh_hold(globalheaders_t *gh, streaming_message_t *sm)
283 {
284 th_pkt_t *pkt;
285 streaming_start_component_t *ssc;
286
287 switch(sm->sm_type) {
288 case SMT_PACKET:
289 pkt = sm->sm_data;
290 ssc = streaming_start_component_find_by_index(gh->gh_ss,
291 pkt->pkt_componentindex);
292 if (ssc == NULL) {
293 tvherror(LS_GLOBALHEADERS, "Unable to find component %d", pkt->pkt_componentindex);
294 streaming_msg_free(sm);
295 return;
296 }
297
298 pkt_trace(LS_GLOBALHEADERS, pkt, "hold receive");
299
300 pkt_ref_inc(pkt);
301
302 if (pkt->pkt_err == 0)
303 apply_header(ssc, pkt);
304
305 pktref_enqueue(&gh->gh_holdq, pkt);
306
307 streaming_msg_free(sm);
308
309 if(!gh_is_audiovideo(ssc->ssc_type))
310 break;
311
312 if(!headers_complete(gh))
313 break;
314
315 // Send our modified start
316 sm = streaming_msg_create_data(SMT_START,
317 streaming_start_copy(gh->gh_ss));
318 streaming_target_deliver2(gh->gh_output, sm);
319
320 // Send all pending packets
321 while((pkt = pktref_get_first(&gh->gh_holdq)) != NULL) {
322 if (pkt->pkt_payload) {
323 sm = streaming_msg_create_pkt(pkt);
324 streaming_target_deliver2(gh->gh_output, sm);
325 }
326 pkt_ref_dec(pkt);
327 }
328 gh->gh_passthru = 1;
329 break;
330
331 case SMT_START:
332 if (gh->gh_ss)
333 gh_flush(gh);
334 gh_start(gh, sm);
335 break;
336
337 case SMT_STOP:
338 gh_flush(gh);
339 streaming_msg_free(sm);
340 break;
341
342 case SMT_GRACE:
343 case SMT_EXIT:
344 case SMT_SERVICE_STATUS:
345 case SMT_SIGNAL_STATUS:
346 case SMT_DESCRAMBLE_INFO:
347 case SMT_NOSTART:
348 case SMT_NOSTART_WARN:
349 case SMT_MPEGTS:
350 case SMT_SPEED:
351 case SMT_SKIP:
352 case SMT_TIMESHIFT_STATUS:
353 streaming_target_deliver2(gh->gh_output, sm);
354 break;
355 }
356 }
357
358
359 /**
360 *
361 */
362 static void
gh_pass(globalheaders_t * gh,streaming_message_t * sm)363 gh_pass(globalheaders_t *gh, streaming_message_t *sm)
364 {
365 th_pkt_t *pkt;
366 switch(sm->sm_type) {
367 case SMT_START:
368 /* stop */
369 gh->gh_passthru = 0;
370 gh_flush(gh);
371 /* restart */
372 gh_start(gh, sm);
373 break;
374
375 case SMT_STOP:
376 gh->gh_passthru = 0;
377 gh_flush(gh);
378 // FALLTHRU
379 case SMT_GRACE:
380 case SMT_EXIT:
381 case SMT_SERVICE_STATUS:
382 case SMT_SIGNAL_STATUS:
383 case SMT_DESCRAMBLE_INFO:
384 case SMT_NOSTART:
385 case SMT_NOSTART_WARN:
386 case SMT_MPEGTS:
387 case SMT_SKIP:
388 case SMT_SPEED:
389 case SMT_TIMESHIFT_STATUS:
390 streaming_target_deliver2(gh->gh_output, sm);
391 break;
392 case SMT_PACKET:
393 pkt = sm->sm_data;
394 if (pkt->pkt_payload || pkt->pkt_err)
395 streaming_target_deliver2(gh->gh_output, sm);
396 else
397 streaming_msg_free(sm);
398 break;
399 }
400 }
401
402
403 /**
404 *
405 */
406 static void
globalheaders_input(void * opaque,streaming_message_t * sm)407 globalheaders_input(void *opaque, streaming_message_t *sm)
408 {
409 globalheaders_t *gh = opaque;
410
411 if(gh->gh_passthru)
412 gh_pass(gh, sm);
413 else
414 gh_hold(gh, sm);
415 }
416
417 static htsmsg_t *
globalheaders_input_info(void * opaque,htsmsg_t * list)418 globalheaders_input_info(void *opaque, htsmsg_t *list)
419 {
420 globalheaders_t *gh = opaque;
421 streaming_target_t *st = gh->gh_output;
422 htsmsg_add_str(list, NULL, "globalheaders input");
423 return st->st_ops.st_info(st->st_opaque, list);
424 }
425
426 static streaming_ops_t globalheaders_input_ops = {
427 .st_cb = globalheaders_input,
428 .st_info = globalheaders_input_info
429 };
430
431
432 /**
433 *
434 */
435 streaming_target_t *
globalheaders_create(streaming_target_t * output)436 globalheaders_create(streaming_target_t *output)
437 {
438 globalheaders_t *gh = calloc(1, sizeof(globalheaders_t));
439
440 TAILQ_INIT(&gh->gh_holdq);
441
442 gh->gh_output = output;
443 streaming_target_init(&gh->gh_input, &globalheaders_input_ops, gh, 0);
444 return &gh->gh_input;
445 }
446
447
448 /**
449 *
450 */
451 void
globalheaders_destroy(streaming_target_t * pad)452 globalheaders_destroy(streaming_target_t *pad)
453 {
454 globalheaders_t *gh = (globalheaders_t *)pad;
455 gh_flush(gh);
456 free(gh);
457 }
458