1 /*
2 * mpegtsparse.c -
3 * Copyright (C) 2007 Alessandro Decina
4 *
5 * Authors:
6 * Alessandro Decina <alessandro@nnva.org>
7 * Zaheer Abbas Merali <zaheerabbas at merali dot org>
8 *
9 * This library is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Library General Public
11 * License as published by the Free Software Foundation; either
12 * version 2 of the License, or (at your option) any later version.
13 *
14 * This library is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * Library General Public License for more details.
18 *
19 * You should have received a copy of the GNU Library General Public
20 * License along with this library; if not, write to the
21 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22 * Boston, MA 02110-1301, USA.
23 */
24
25 #ifdef HAVE_CONFIG_H
26 #include "config.h"
27 #endif
28
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <string.h>
32
33 #include "mpegtsbase.h"
34 #include "mpegtsparse.h"
35 #include "gstmpegdesc.h"
36
37 /* latency in mseconds is maximum 100 ms between PCR */
38 #define TS_LATENCY 100
39
40 #define TABLE_ID_UNSET 0xFF
41 #define RUNNING_STATUS_RUNNING 4
42
43 GST_DEBUG_CATEGORY_STATIC (mpegts_parse_debug);
44 #define GST_CAT_DEFAULT mpegts_parse_debug
45
46 typedef struct _MpegTSParsePad MpegTSParsePad;
47
48 typedef struct
49 {
50 MpegTSBaseProgram program;
51 MpegTSParsePad *tspad;
52 } MpegTSParseProgram;
53
54 struct _MpegTSParsePad
55 {
56 GstPad *pad;
57
58 /* the program number that the peer wants on this pad */
59 gint program_number;
60 MpegTSParseProgram *program;
61
62 /* set to FALSE before a push and TRUE after */
63 gboolean pushed;
64
65 /* the return of the latest push */
66 GstFlowReturn flow_return;
67 };
68
69 static GstStaticPadTemplate src_template =
70 GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC,
71 GST_PAD_ALWAYS,
72 GST_STATIC_CAPS ("video/mpegts, " "systemstream = (boolean) true ")
73 );
74
75 static GstStaticPadTemplate program_template =
76 GST_STATIC_PAD_TEMPLATE ("program_%u", GST_PAD_SRC,
77 GST_PAD_REQUEST,
78 GST_STATIC_CAPS ("video/mpegts, " "systemstream = (boolean) true ")
79 );
80
81 enum
82 {
83 PROP_0,
84 PROP_SET_TIMESTAMPS,
85 PROP_SMOOTHING_LATENCY,
86 PROP_PCR_PID,
87 /* FILL ME */
88 };
89
90 static void mpegts_parse_set_property (GObject * object, guint prop_id,
91 const GValue * value, GParamSpec * pspec);
92 static void mpegts_parse_get_property (GObject * object, guint prop_id,
93 GValue * value, GParamSpec * pspec);
94
95 static void
96 mpegts_parse_program_started (MpegTSBase * base, MpegTSBaseProgram * program);
97 static void
98 mpegts_parse_program_stopped (MpegTSBase * base, MpegTSBaseProgram * program);
99
100 static GstFlowReturn
101 mpegts_parse_push (MpegTSBase * base, MpegTSPacketizerPacket * packet,
102 GstMpegtsSection * section);
103 static void mpegts_parse_inspect_packet (MpegTSBase * base,
104 MpegTSPacketizerPacket * packet);
105
106 static MpegTSParsePad *mpegts_parse_create_tspad (MpegTSParse2 * parse,
107 const gchar * name);
108 static void mpegts_parse_destroy_tspad (MpegTSParse2 * parse,
109 MpegTSParsePad * tspad);
110
111 static void mpegts_parse_pad_removed (GstElement * element, GstPad * pad);
112 static GstPad *mpegts_parse_request_new_pad (GstElement * element,
113 GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
114 static void mpegts_parse_release_pad (GstElement * element, GstPad * pad);
115 static gboolean mpegts_parse_src_pad_query (GstPad * pad, GstObject * parent,
116 GstQuery * query);
117 static gboolean push_event (MpegTSBase * base, GstEvent * event);
118
119 #define mpegts_parse_parent_class parent_class
120 G_DEFINE_TYPE (MpegTSParse2, mpegts_parse, GST_TYPE_MPEGTS_BASE);
121 static void mpegts_parse_reset (MpegTSBase * base);
122 static GstFlowReturn mpegts_parse_input_done (MpegTSBase * base,
123 GstBuffer * buffer);
124 static GstFlowReturn
125 drain_pending_buffers (MpegTSParse2 * parse, gboolean drain_all);
126
127 static void
mpegts_parse_dispose(GObject * object)128 mpegts_parse_dispose (GObject * object)
129 {
130 MpegTSParse2 *parse = (MpegTSParse2 *) object;
131
132 gst_flow_combiner_free (parse->flowcombiner);
133
134 GST_CALL_PARENT (G_OBJECT_CLASS, dispose, (object));
135 }
136
137 static void
mpegts_parse_class_init(MpegTSParse2Class * klass)138 mpegts_parse_class_init (MpegTSParse2Class * klass)
139 {
140 GObjectClass *gobject_class = (GObjectClass *) (klass);
141 GstElementClass *element_class;
142 MpegTSBaseClass *ts_class;
143
144 gobject_class->set_property = mpegts_parse_set_property;
145 gobject_class->get_property = mpegts_parse_get_property;
146 gobject_class->dispose = mpegts_parse_dispose;
147
148 g_object_class_install_property (gobject_class, PROP_SET_TIMESTAMPS,
149 g_param_spec_boolean ("set-timestamps",
150 "Timestamp (or re-timestamp) the output stream",
151 "If set, timestamps will be set on the output buffers using "
152 "PCRs and smoothed over the smoothing-latency period", FALSE,
153 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
154 g_object_class_install_property (gobject_class, PROP_SMOOTHING_LATENCY,
155 g_param_spec_uint ("smoothing-latency", "Smoothing Latency",
156 "Additional latency in microseconds for smoothing jitter in input timestamps on live capture",
157 0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
158 g_object_class_install_property (gobject_class, PROP_PCR_PID,
159 g_param_spec_int ("pcr-pid", "PID containing PCR",
160 "Set the PID to use for PCR values (-1 for auto)",
161 -1, G_MAXINT, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
162
163 element_class = GST_ELEMENT_CLASS (klass);
164 element_class->pad_removed = mpegts_parse_pad_removed;
165 element_class->request_new_pad = mpegts_parse_request_new_pad;
166 element_class->release_pad = mpegts_parse_release_pad;
167
168 gst_element_class_add_static_pad_template (element_class, &src_template);
169 gst_element_class_add_static_pad_template (element_class, &program_template);
170
171 gst_element_class_set_static_metadata (element_class,
172 "MPEG transport stream parser", "Codec/Parser",
173 "Parses MPEG2 transport streams",
174 "Alessandro Decina <alessandro@nnva.org>, "
175 "Zaheer Abbas Merali <zaheerabbas at merali dot org>");
176
177 ts_class = GST_MPEGTS_BASE_CLASS (klass);
178 ts_class->push = GST_DEBUG_FUNCPTR (mpegts_parse_push);
179 ts_class->push_event = GST_DEBUG_FUNCPTR (push_event);
180 ts_class->program_started = GST_DEBUG_FUNCPTR (mpegts_parse_program_started);
181 ts_class->program_stopped = GST_DEBUG_FUNCPTR (mpegts_parse_program_stopped);
182 ts_class->reset = GST_DEBUG_FUNCPTR (mpegts_parse_reset);
183 ts_class->input_done = GST_DEBUG_FUNCPTR (mpegts_parse_input_done);
184 ts_class->inspect_packet = GST_DEBUG_FUNCPTR (mpegts_parse_inspect_packet);
185 }
186
187 static void
mpegts_parse_init(MpegTSParse2 * parse)188 mpegts_parse_init (MpegTSParse2 * parse)
189 {
190 MpegTSBase *base = (MpegTSBase *) parse;
191
192 base->program_size = sizeof (MpegTSParseProgram);
193 /* We will only need to handle data/section if we have request pads */
194 base->push_data = FALSE;
195 base->push_section = FALSE;
196
197 parse->user_pcr_pid = parse->pcr_pid = -1;
198
199 parse->flowcombiner = gst_flow_combiner_new ();
200
201 parse->srcpad = gst_pad_new_from_static_template (&src_template, "src");
202 gst_flow_combiner_add_pad (parse->flowcombiner, parse->srcpad);
203 parse->first = TRUE;
204 gst_pad_set_query_function (parse->srcpad,
205 GST_DEBUG_FUNCPTR (mpegts_parse_src_pad_query));
206 gst_element_add_pad (GST_ELEMENT (parse), parse->srcpad);
207
208 parse->have_group_id = FALSE;
209 parse->group_id = G_MAXUINT;
210 }
211
212 static void
mpegts_parse_reset(MpegTSBase * base)213 mpegts_parse_reset (MpegTSBase * base)
214 {
215 MpegTSParse2 *parse = (MpegTSParse2 *) base;
216
217 /* Set the various know PIDs we are interested in */
218
219 /* CAT */
220 MPEGTS_BIT_SET (base->known_psi, 1);
221 /* NIT, ST */
222 MPEGTS_BIT_SET (base->known_psi, 0x10);
223 /* SDT, BAT, ST */
224 MPEGTS_BIT_SET (base->known_psi, 0x11);
225 /* EIT, ST, CIT (TS 102 323) */
226 MPEGTS_BIT_SET (base->known_psi, 0x12);
227 /* RST, ST */
228 MPEGTS_BIT_SET (base->known_psi, 0x13);
229 /* RNT (TS 102 323) */
230 MPEGTS_BIT_SET (base->known_psi, 0x16);
231 /* inband signalling */
232 MPEGTS_BIT_SET (base->known_psi, 0x1c);
233 /* measurement */
234 MPEGTS_BIT_SET (base->known_psi, 0x1d);
235 /* DIT */
236 MPEGTS_BIT_SET (base->known_psi, 0x1e);
237 /* SIT */
238 MPEGTS_BIT_SET (base->known_psi, 0x1f);
239
240 parse->first = TRUE;
241 parse->have_group_id = FALSE;
242 parse->group_id = G_MAXUINT;
243
244 g_list_free_full (parse->pending_buffers, (GDestroyNotify) gst_buffer_unref);
245 parse->pending_buffers = NULL;
246
247 parse->current_pcr = GST_CLOCK_TIME_NONE;
248 parse->previous_pcr = GST_CLOCK_TIME_NONE;
249 parse->base_pcr = GST_CLOCK_TIME_NONE;
250 parse->bytes_since_pcr = 0;
251 parse->pcr_pid = parse->user_pcr_pid;
252 parse->ts_offset = 0;
253 }
254
255 static void
mpegts_parse_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)256 mpegts_parse_set_property (GObject * object, guint prop_id,
257 const GValue * value, GParamSpec * pspec)
258 {
259 MpegTSParse2 *parse = (MpegTSParse2 *) object;
260
261 switch (prop_id) {
262 case PROP_SET_TIMESTAMPS:
263 parse->set_timestamps = g_value_get_boolean (value);
264 break;
265 case PROP_SMOOTHING_LATENCY:
266 parse->smoothing_latency = GST_USECOND * g_value_get_uint (value);
267 mpegts_packetizer_set_pcr_discont_threshold (GST_MPEGTS_BASE
268 (parse)->packetizer, parse->smoothing_latency);
269 break;
270 case PROP_PCR_PID:
271 parse->pcr_pid = parse->user_pcr_pid = g_value_get_int (value);
272 break;
273 default:
274 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
275 }
276 }
277
278 static void
mpegts_parse_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)279 mpegts_parse_get_property (GObject * object, guint prop_id,
280 GValue * value, GParamSpec * pspec)
281 {
282 MpegTSParse2 *parse = (MpegTSParse2 *) object;
283
284 switch (prop_id) {
285 case PROP_SET_TIMESTAMPS:
286 g_value_set_boolean (value, parse->set_timestamps);
287 break;
288 case PROP_SMOOTHING_LATENCY:
289 g_value_set_uint (value, parse->smoothing_latency / GST_USECOND);
290 break;
291 case PROP_PCR_PID:
292 g_value_set_int (value, parse->pcr_pid);
293 break;
294 default:
295 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
296 }
297 }
298
299 static gboolean
prepare_src_pad(MpegTSBase * base,MpegTSParse2 * parse)300 prepare_src_pad (MpegTSBase * base, MpegTSParse2 * parse)
301 {
302 GstEvent *event;
303 gchar *stream_id;
304 GstCaps *caps;
305
306 if (!parse->first)
307 return TRUE;
308
309 /* If there's no packet_size yet, we can't set caps yet */
310 if (G_UNLIKELY (base->packetizer->packet_size == 0))
311 return FALSE;
312
313 stream_id =
314 gst_pad_create_stream_id (parse->srcpad, GST_ELEMENT_CAST (base),
315 "multi-program");
316
317 event =
318 gst_pad_get_sticky_event (parse->parent.sinkpad, GST_EVENT_STREAM_START,
319 0);
320 if (event) {
321 if (gst_event_parse_group_id (event, &parse->group_id))
322 parse->have_group_id = TRUE;
323 else
324 parse->have_group_id = FALSE;
325 gst_event_unref (event);
326 } else if (!parse->have_group_id) {
327 parse->have_group_id = TRUE;
328 parse->group_id = gst_util_group_id_next ();
329 }
330 event = gst_event_new_stream_start (stream_id);
331 if (parse->have_group_id)
332 gst_event_set_group_id (event, parse->group_id);
333
334 gst_pad_push_event (parse->srcpad, event);
335 g_free (stream_id);
336
337 caps = gst_caps_new_simple ("video/mpegts",
338 "systemstream", G_TYPE_BOOLEAN, TRUE,
339 "packetsize", G_TYPE_INT, base->packetizer->packet_size, NULL);
340
341 gst_pad_set_caps (parse->srcpad, caps);
342 gst_caps_unref (caps);
343
344 /* If setting output timestamps, ensure that the output segment is TIME */
345 if (parse->set_timestamps == FALSE || base->segment.format == GST_FORMAT_TIME)
346 gst_pad_push_event (parse->srcpad, gst_event_new_segment (&base->segment));
347 else {
348 GstSegment seg;
349 gst_segment_init (&seg, GST_FORMAT_TIME);
350 GST_DEBUG_OBJECT (parse,
351 "Generating time output segment %" GST_SEGMENT_FORMAT, &seg);
352 gst_pad_push_event (parse->srcpad, gst_event_new_segment (&seg));
353 }
354
355 parse->first = FALSE;
356
357 return TRUE;
358 }
359
360 static gboolean
push_event(MpegTSBase * base,GstEvent * event)361 push_event (MpegTSBase * base, GstEvent * event)
362 {
363 MpegTSParse2 *parse = (MpegTSParse2 *) base;
364 GList *tmp;
365
366 if (G_UNLIKELY (parse->first)) {
367 /* We will send the segment when really starting */
368 if (G_UNLIKELY (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT)) {
369 gst_event_unref (event);
370 return TRUE;
371 }
372 prepare_src_pad (base, parse);
373 }
374 if (G_UNLIKELY (GST_EVENT_TYPE (event) == GST_EVENT_EOS))
375 drain_pending_buffers (parse, TRUE);
376
377 if (G_UNLIKELY (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT))
378 parse->ts_offset = 0;
379
380 for (tmp = parse->srcpads; tmp; tmp = tmp->next) {
381 GstPad *pad = (GstPad *) tmp->data;
382 if (pad) {
383 gst_event_ref (event);
384 gst_pad_push_event (pad, event);
385 }
386 }
387
388 gst_pad_push_event (parse->srcpad, event);
389
390 return TRUE;
391 }
392
393 static MpegTSParsePad *
mpegts_parse_create_tspad(MpegTSParse2 * parse,const gchar * pad_name)394 mpegts_parse_create_tspad (MpegTSParse2 * parse, const gchar * pad_name)
395 {
396 GstPad *pad;
397 MpegTSParsePad *tspad;
398
399 pad = gst_pad_new_from_static_template (&program_template, pad_name);
400 gst_pad_set_query_function (pad,
401 GST_DEBUG_FUNCPTR (mpegts_parse_src_pad_query));
402
403 /* create our wrapper */
404 tspad = g_new0 (MpegTSParsePad, 1);
405 tspad->pad = pad;
406 tspad->program_number = -1;
407 tspad->program = NULL;
408 tspad->pushed = FALSE;
409 tspad->flow_return = GST_FLOW_NOT_LINKED;
410 gst_pad_set_element_private (pad, tspad);
411 gst_flow_combiner_add_pad (parse->flowcombiner, pad);
412
413 return tspad;
414 }
415
416 static void
mpegts_parse_destroy_tspad(MpegTSParse2 * parse,MpegTSParsePad * tspad)417 mpegts_parse_destroy_tspad (MpegTSParse2 * parse, MpegTSParsePad * tspad)
418 {
419 /* free the wrapper */
420 g_free (tspad);
421 }
422
423 static void
mpegts_parse_pad_removed(GstElement * element,GstPad * pad)424 mpegts_parse_pad_removed (GstElement * element, GstPad * pad)
425 {
426 MpegTSParsePad *tspad;
427 MpegTSBase *base = (MpegTSBase *) element;
428 MpegTSParse2 *parse = GST_MPEGTS_PARSE (element);
429
430 if (gst_pad_get_direction (pad) == GST_PAD_SINK)
431 return;
432
433 tspad = (MpegTSParsePad *) gst_pad_get_element_private (pad);
434 if (tspad) {
435 mpegts_parse_destroy_tspad (parse, tspad);
436
437 parse->srcpads = g_list_remove_all (parse->srcpads, pad);
438 }
439 if (parse->srcpads == NULL) {
440 base->push_data = FALSE;
441 base->push_section = FALSE;
442 }
443
444 if (GST_ELEMENT_CLASS (parent_class)->pad_removed)
445 GST_ELEMENT_CLASS (parent_class)->pad_removed (element, pad);
446 }
447
448 static GstPad *
mpegts_parse_request_new_pad(GstElement * element,GstPadTemplate * template,const gchar * padname,const GstCaps * caps)449 mpegts_parse_request_new_pad (GstElement * element, GstPadTemplate * template,
450 const gchar * padname, const GstCaps * caps)
451 {
452 MpegTSBase *base = (MpegTSBase *) element;
453 MpegTSParse2 *parse;
454 MpegTSParsePad *tspad;
455 MpegTSParseProgram *parseprogram;
456 GstPad *pad;
457 gint program_num = -1;
458 GstEvent *event;
459 gchar *stream_id;
460
461 g_return_val_if_fail (template != NULL, NULL);
462 g_return_val_if_fail (GST_IS_MPEGTS_PARSE (element), NULL);
463 g_return_val_if_fail (padname != NULL, NULL);
464
465 sscanf (padname + 8, "%d", &program_num);
466
467 GST_DEBUG_OBJECT (element, "padname:%s, program:%d", padname, program_num);
468
469 parse = GST_MPEGTS_PARSE (element);
470
471 tspad = mpegts_parse_create_tspad (parse, padname);
472 tspad->program_number = program_num;
473
474 /* Find if the program is already active */
475 parseprogram =
476 (MpegTSParseProgram *) mpegts_base_get_program (GST_MPEGTS_BASE (parse),
477 program_num);
478 if (parseprogram) {
479 tspad->program = parseprogram;
480 parseprogram->tspad = tspad;
481 }
482
483 pad = tspad->pad;
484 parse->srcpads = g_list_append (parse->srcpads, pad);
485 base->push_data = TRUE;
486 base->push_section = TRUE;
487
488 gst_pad_set_active (pad, TRUE);
489
490 stream_id = gst_pad_create_stream_id (pad, element, padname + 8);
491
492 event =
493 gst_pad_get_sticky_event (parse->parent.sinkpad, GST_EVENT_STREAM_START,
494 0);
495 if (event) {
496 if (gst_event_parse_group_id (event, &parse->group_id))
497 parse->have_group_id = TRUE;
498 else
499 parse->have_group_id = FALSE;
500 gst_event_unref (event);
501 } else if (!parse->have_group_id) {
502 parse->have_group_id = TRUE;
503 parse->group_id = gst_util_group_id_next ();
504 }
505 event = gst_event_new_stream_start (stream_id);
506 if (parse->have_group_id)
507 gst_event_set_group_id (event, parse->group_id);
508
509 gst_pad_push_event (pad, event);
510 g_free (stream_id);
511
512 gst_element_add_pad (element, pad);
513
514 return pad;
515 }
516
517 static void
mpegts_parse_release_pad(GstElement * element,GstPad * pad)518 mpegts_parse_release_pad (GstElement * element, GstPad * pad)
519 {
520 MpegTSParse2 *parse = (MpegTSParse2 *) element;
521
522 gst_pad_set_active (pad, FALSE);
523 /* we do the cleanup in GstElement::pad-removed */
524 gst_flow_combiner_remove_pad (parse->flowcombiner, pad);
525 gst_element_remove_pad (element, pad);
526 }
527
528 static GstFlowReturn
mpegts_parse_tspad_push_section(MpegTSParse2 * parse,MpegTSParsePad * tspad,GstMpegtsSection * section,MpegTSPacketizerPacket * packet)529 mpegts_parse_tspad_push_section (MpegTSParse2 * parse, MpegTSParsePad * tspad,
530 GstMpegtsSection * section, MpegTSPacketizerPacket * packet)
531 {
532 GstFlowReturn ret = GST_FLOW_OK;
533 gboolean to_push = TRUE;
534
535 if (tspad->program_number != -1) {
536 if (tspad->program) {
537 /* we push all sections to all pads except PMTs which we
538 * only push to pads meant to receive that program number */
539 if (section->table_id == 0x02) {
540 /* PMT */
541 if (section->subtable_extension != tspad->program_number)
542 to_push = FALSE;
543 }
544 } else if (section->table_id != 0x00) {
545 /* there's a program filter on the pad but the PMT for the program has not
546 * been parsed yet, ignore the pad until we get a PMT.
547 * But we always allow PAT to go through */
548 to_push = FALSE;
549 }
550 }
551
552 GST_DEBUG_OBJECT (parse,
553 "pushing section: %d program number: %d table_id: %d", to_push,
554 tspad->program_number, section->table_id);
555
556 if (to_push) {
557 GstBuffer *buf =
558 gst_buffer_new_and_alloc (packet->data_end - packet->data_start);
559 gst_buffer_fill (buf, 0, packet->data_start,
560 packet->data_end - packet->data_start);
561 ret = gst_pad_push (tspad->pad, buf);
562 ret = gst_flow_combiner_update_flow (parse->flowcombiner, ret);
563 }
564
565 GST_LOG_OBJECT (parse, "Returning %s", gst_flow_get_name (ret));
566 return ret;
567 }
568
569 static GstFlowReturn
mpegts_parse_tspad_push(MpegTSParse2 * parse,MpegTSParsePad * tspad,MpegTSPacketizerPacket * packet)570 mpegts_parse_tspad_push (MpegTSParse2 * parse, MpegTSParsePad * tspad,
571 MpegTSPacketizerPacket * packet)
572 {
573 GstFlowReturn ret = GST_FLOW_OK;
574 MpegTSBaseProgram *bp = NULL;
575
576 if (tspad->program_number != -1) {
577 if (tspad->program)
578 bp = (MpegTSBaseProgram *) tspad->program;
579 else
580 bp = mpegts_base_get_program ((MpegTSBase *) parse,
581 tspad->program_number);
582 }
583
584 if (bp) {
585 if (packet->pid == bp->pmt_pid || bp->streams == NULL
586 || bp->streams[packet->pid]) {
587 GstBuffer *buf =
588 gst_buffer_new_and_alloc (packet->data_end - packet->data_start);
589 gst_buffer_fill (buf, 0, packet->data_start,
590 packet->data_end - packet->data_start);
591 /* push if there's no filter or if the pid is in the filter */
592 ret = gst_pad_push (tspad->pad, buf);
593 ret = gst_flow_combiner_update_flow (parse->flowcombiner, ret);
594 }
595 }
596 GST_DEBUG_OBJECT (parse, "Returning %s", gst_flow_get_name (ret));
597
598 return ret;
599 }
600
601 static void
pad_clear_for_push(GstPad * pad,MpegTSParse2 * parse)602 pad_clear_for_push (GstPad * pad, MpegTSParse2 * parse)
603 {
604 MpegTSParsePad *tspad = (MpegTSParsePad *) gst_pad_get_element_private (pad);
605
606 tspad->flow_return = GST_FLOW_NOT_LINKED;
607 tspad->pushed = FALSE;
608 }
609
610 static GstFlowReturn
mpegts_parse_push(MpegTSBase * base,MpegTSPacketizerPacket * packet,GstMpegtsSection * section)611 mpegts_parse_push (MpegTSBase * base, MpegTSPacketizerPacket * packet,
612 GstMpegtsSection * section)
613 {
614 MpegTSParse2 *parse = (MpegTSParse2 *) base;
615 guint32 pads_cookie;
616 gboolean done = FALSE;
617 GstPad *pad = NULL;
618 MpegTSParsePad *tspad;
619 GstFlowReturn ret;
620 GList *srcpads;
621
622 GST_OBJECT_LOCK (parse);
623 srcpads = parse->srcpads;
624
625 /* clear tspad->pushed on pads */
626 g_list_foreach (srcpads, (GFunc) pad_clear_for_push, parse);
627 if (srcpads)
628 ret = GST_FLOW_NOT_LINKED;
629 else
630 ret = GST_FLOW_OK;
631
632 /* Get cookie and source pads list */
633 pads_cookie = GST_ELEMENT_CAST (parse)->pads_cookie;
634 if (G_LIKELY (srcpads)) {
635 pad = GST_PAD_CAST (srcpads->data);
636 g_object_ref (pad);
637 }
638 GST_OBJECT_UNLOCK (parse);
639
640 while (pad && !done) {
641 tspad = gst_pad_get_element_private (pad);
642
643 if (G_LIKELY (!tspad->pushed)) {
644 if (section) {
645 tspad->flow_return =
646 mpegts_parse_tspad_push_section (parse, tspad, section, packet);
647 } else {
648 tspad->flow_return = mpegts_parse_tspad_push (parse, tspad, packet);
649 }
650 tspad->pushed = TRUE;
651
652 if (G_UNLIKELY (tspad->flow_return != GST_FLOW_OK
653 && tspad->flow_return != GST_FLOW_NOT_LINKED)) {
654 /* return the error upstream */
655 ret = tspad->flow_return;
656 done = TRUE;
657 }
658
659 }
660
661 if (ret == GST_FLOW_NOT_LINKED)
662 ret = tspad->flow_return;
663
664 g_object_unref (pad);
665
666 if (G_UNLIKELY (!done)) {
667 GST_OBJECT_LOCK (parse);
668 if (G_UNLIKELY (pads_cookie != GST_ELEMENT_CAST (parse)->pads_cookie)) {
669 /* resync */
670 GST_DEBUG ("resync");
671 pads_cookie = GST_ELEMENT_CAST (parse)->pads_cookie;
672 srcpads = parse->srcpads;
673 } else {
674 GST_DEBUG ("getting next pad");
675 /* Get next pad */
676 srcpads = g_list_next (srcpads);
677 }
678
679 if (srcpads) {
680 pad = GST_PAD_CAST (srcpads->data);
681 g_object_ref (pad);
682 } else
683 done = TRUE;
684 GST_OBJECT_UNLOCK (parse);
685 }
686 }
687
688 return ret;
689 }
690
691 static void
mpegts_parse_inspect_packet(MpegTSBase * base,MpegTSPacketizerPacket * packet)692 mpegts_parse_inspect_packet (MpegTSBase * base, MpegTSPacketizerPacket * packet)
693 {
694 MpegTSParse2 *parse = GST_MPEGTS_PARSE (base);
695 GST_LOG ("pid 0x%04x pusi:%d, afc:%d, cont:%d, payload:%p PCR %"
696 G_GUINT64_FORMAT, packet->pid, packet->payload_unit_start_indicator,
697 packet->scram_afc_cc & 0x30,
698 FLAGS_CONTINUITY_COUNTER (packet->scram_afc_cc), packet->payload,
699 packet->pcr);
700
701 /* Store the PCR if desired */
702 if (parse->current_pcr == GST_CLOCK_TIME_NONE &&
703 packet->afc_flags & MPEGTS_AFC_PCR_FLAG) {
704 /* Take this as the pcr_pid if set to auto-select */
705 if (parse->pcr_pid == -1)
706 parse->pcr_pid = packet->pid;
707 /* Check the PCR-PID matches the program we want for multiple programs */
708 if (parse->pcr_pid == packet->pid) {
709 parse->current_pcr = mpegts_packetizer_pts_to_ts (base->packetizer,
710 PCRTIME_TO_GSTTIME (packet->pcr), parse->pcr_pid);
711 GST_DEBUG ("Got new PCR %" GST_TIME_FORMAT " raw %" G_GUINT64_FORMAT,
712 GST_TIME_ARGS (parse->current_pcr), packet->pcr);
713 if (parse->base_pcr == GST_CLOCK_TIME_NONE) {
714 parse->base_pcr = parse->current_pcr;
715 }
716 }
717 }
718 }
719
720 static GstClockTime
get_pending_timestamp_diff(MpegTSParse2 * parse)721 get_pending_timestamp_diff (MpegTSParse2 * parse)
722 {
723 GList *l;
724 GstClockTime first_ts, last_ts;
725
726 if (parse->pending_buffers == NULL)
727 return GST_CLOCK_TIME_NONE;
728
729 l = g_list_last (parse->pending_buffers);
730 first_ts = GST_BUFFER_PTS (l->data);
731 if (first_ts == GST_CLOCK_TIME_NONE)
732 return GST_CLOCK_TIME_NONE;
733
734 l = g_list_first (parse->pending_buffers);
735 last_ts = GST_BUFFER_PTS (l->data);
736 if (last_ts == GST_CLOCK_TIME_NONE)
737 return GST_CLOCK_TIME_NONE;
738
739 return last_ts - first_ts;
740 }
741
742 static GstFlowReturn
drain_pending_buffers(MpegTSParse2 * parse,gboolean drain_all)743 drain_pending_buffers (MpegTSParse2 * parse, gboolean drain_all)
744 {
745 GstFlowReturn ret = GST_FLOW_OK;
746 GstClockTime start_ts;
747 GstClockTime pcr = GST_CLOCK_TIME_NONE;
748 GstClockTime pcr_diff = 0;
749 gsize pcr_bytes, bytes_since_pcr, pos;
750 GstBuffer *buffer;
751 GList *l, *end = NULL;
752
753 if (parse->pending_buffers == NULL)
754 return GST_FLOW_OK; /* Nothing to push */
755
756 /*
757 * There are 4 cases:
758 * 1 We get a buffer with no PCR -> it's the head of the list
759 * -> Do nothing, unless it's EOS
760 * 2 We get a buffer with a PCR, it's the first PCR we've seen, and belongs
761 * to the buffer at the head of the list
762 * -> Push any buffers in the list except the head,
763 * using a smoothing of their timestamps to land at the PCR
764 * -> store new PCR as the previous PCR, bytes_since_pcr = sizeof (buffer);
765 * 3 It's EOS (drain_all == TRUE, current_pcr == NONE)
766 * -> Push any buffers in the list using a smoothing of their timestamps
767 * starting at the previous PCR or first TS
768 * 4 We get a buffer with a PCR, and have a previous PCR
769 * -> If distance > smoothing_latency,
770 * output buffers except the last in the pending queue using
771 * piecewise-linear timestamps
772 * -> store new PCR as the previous PCR, bytes_since_pcr = sizeof (buffer);
773 */
774
775 /* Case 1 */
776 if (!GST_CLOCK_TIME_IS_VALID (parse->current_pcr) && !drain_all)
777 return GST_FLOW_OK;
778
779 if (GST_CLOCK_TIME_IS_VALID (parse->current_pcr)) {
780 pcr = parse->current_pcr;
781 parse->current_pcr = GST_CLOCK_TIME_NONE;
782 }
783
784 /* The bytes of the last buffer are after the PCR */
785 buffer = GST_BUFFER (g_list_nth_data (parse->pending_buffers, 0));
786 bytes_since_pcr = gst_buffer_get_size (buffer);
787
788 pcr_bytes = parse->bytes_since_pcr - bytes_since_pcr;
789
790 if (!drain_all)
791 end = g_list_first (parse->pending_buffers);
792
793 /* Case 2 */
794 if (!GST_CLOCK_TIME_IS_VALID (parse->previous_pcr)) {
795 pcr_diff = get_pending_timestamp_diff (parse);
796
797 /* Calculate the start_ts that ends at the end timestamp */
798 start_ts = GST_CLOCK_TIME_NONE;
799 if (end) {
800 start_ts = GST_BUFFER_PTS (GST_BUFFER (end->data));
801 if (start_ts > pcr_diff)
802 start_ts -= pcr_diff;
803 }
804 } else if (drain_all) { /* Case 3 */
805 start_ts = parse->previous_pcr;
806 pcr_diff = get_pending_timestamp_diff (parse);
807 } else { /* Case 4 */
808 start_ts = parse->previous_pcr;
809 if (GST_CLOCK_TIME_IS_VALID (pcr) && pcr > start_ts)
810 pcr_diff = GST_CLOCK_DIFF (start_ts, pcr);
811
812 /* Make sure PCR observations are sufficiently far apart */
813 if (drain_all == FALSE && pcr_diff < parse->smoothing_latency)
814 return GST_FLOW_OK;
815 }
816
817 GST_INFO_OBJECT (parse, "Pushing buffers - startTS %" GST_TIME_FORMAT
818 " duration %" GST_TIME_FORMAT " %" G_GSIZE_FORMAT " bytes",
819 GST_TIME_ARGS (start_ts), GST_TIME_ARGS (pcr_diff), pcr_bytes);
820
821 /* Now, push buffers out pacing timestamps over pcr_diff time and pcr_bytes */
822 pos = 0;
823 l = g_list_last (parse->pending_buffers);
824 while (l != end) {
825 GList *p;
826 GstClockTime out_ts = start_ts;
827
828 buffer = gst_buffer_make_writable (GST_BUFFER (l->data));
829
830 if (out_ts != GST_CLOCK_TIME_NONE && pcr_diff != GST_CLOCK_TIME_NONE &&
831 pcr_bytes && pos)
832 out_ts += gst_util_uint64_scale (pcr_diff, pos, pcr_bytes);
833
834 pos += gst_buffer_get_size (buffer);
835
836 GST_DEBUG_OBJECT (parse,
837 "InputTS %" GST_TIME_FORMAT " out %" GST_TIME_FORMAT,
838 GST_TIME_ARGS (GST_BUFFER_PTS (buffer)), GST_TIME_ARGS (out_ts));
839
840 GST_BUFFER_PTS (buffer) = out_ts + parse->ts_offset;
841 GST_BUFFER_DTS (buffer) = out_ts + parse->ts_offset;
842 if (ret == GST_FLOW_OK) {
843 ret = gst_pad_push (parse->srcpad, buffer);
844 ret = gst_flow_combiner_update_flow (parse->flowcombiner, ret);
845 } else
846 gst_buffer_unref (buffer);
847
848 /* Free this list node and move to the next */
849 p = g_list_previous (l);
850 parse->pending_buffers = g_list_delete_link (parse->pending_buffers, l);
851 l = p;
852 }
853
854 parse->pending_buffers = end;
855 parse->bytes_since_pcr = bytes_since_pcr;
856 parse->previous_pcr = pcr;
857 return ret;
858 }
859
860 static GstFlowReturn
mpegts_parse_input_done(MpegTSBase * base,GstBuffer * buffer)861 mpegts_parse_input_done (MpegTSBase * base, GstBuffer * buffer)
862 {
863 MpegTSParse2 *parse = GST_MPEGTS_PARSE (base);
864 GstFlowReturn ret = GST_FLOW_OK;
865
866 GST_LOG_OBJECT (parse, "Received buffer %" GST_PTR_FORMAT, buffer);
867
868 if (parse->current_pcr != GST_CLOCK_TIME_NONE) {
869 GST_DEBUG_OBJECT (parse,
870 "InputTS %" GST_TIME_FORMAT " PCR %" GST_TIME_FORMAT,
871 GST_TIME_ARGS (GST_BUFFER_PTS (buffer)),
872 GST_TIME_ARGS (parse->current_pcr));
873 }
874
875 if (parse->set_timestamps || parse->first) {
876 parse->pending_buffers = g_list_prepend (parse->pending_buffers, buffer);
877 parse->bytes_since_pcr += gst_buffer_get_size (buffer);
878 buffer = NULL;
879 }
880
881 if (!prepare_src_pad (base, parse))
882 return GST_FLOW_OK;
883
884 if (parse->pending_buffers != NULL) {
885 /* Don't keep pending_buffers if not setting output timestamps */
886 gboolean drain_all = (parse->set_timestamps == FALSE);
887 ret = drain_pending_buffers (parse, drain_all);
888 if (ret != GST_FLOW_OK) {
889 if (buffer)
890 gst_buffer_unref (buffer);
891 return ret;
892 }
893 }
894
895 if (buffer != NULL) {
896 ret = gst_pad_push (parse->srcpad, buffer);
897 ret = gst_flow_combiner_update_flow (parse->flowcombiner, ret);
898 }
899
900 return ret;
901 }
902
903 static MpegTSParsePad *
find_pad_for_program(MpegTSParse2 * parse,guint program_number)904 find_pad_for_program (MpegTSParse2 * parse, guint program_number)
905 {
906 GList *tmp;
907
908 for (tmp = parse->srcpads; tmp; tmp = tmp->next) {
909 MpegTSParsePad *tspad = gst_pad_get_element_private ((GstPad *) tmp->data);
910
911 if (tspad->program_number == program_number)
912 return tspad;
913 }
914
915 return NULL;
916 }
917
918 static void
mpegts_parse_program_started(MpegTSBase * base,MpegTSBaseProgram * program)919 mpegts_parse_program_started (MpegTSBase * base, MpegTSBaseProgram * program)
920 {
921 MpegTSParse2 *parse = GST_MPEGTS_PARSE (base);
922 MpegTSParseProgram *parseprogram = (MpegTSParseProgram *) program;
923 MpegTSParsePad *tspad;
924
925 /* If we have a request pad for that program, activate it */
926 tspad = find_pad_for_program (parse, program->program_number);
927
928 if (tspad) {
929 tspad->program = parseprogram;
930 parseprogram->tspad = tspad;
931 }
932 }
933
934 static void
mpegts_parse_program_stopped(MpegTSBase * base,MpegTSBaseProgram * program)935 mpegts_parse_program_stopped (MpegTSBase * base, MpegTSBaseProgram * program)
936 {
937 MpegTSParse2 *parse = GST_MPEGTS_PARSE (base);
938 MpegTSParseProgram *parseprogram = (MpegTSParseProgram *) program;
939 MpegTSParsePad *tspad;
940
941 /* If we have a request pad for that program, activate it */
942 tspad = find_pad_for_program (parse, program->program_number);
943
944 if (tspad) {
945 tspad->program = NULL;
946 parseprogram->tspad = NULL;
947 }
948
949 parse->pcr_pid = -1;
950 parse->ts_offset += parse->current_pcr - parse->base_pcr;
951 parse->base_pcr = GST_CLOCK_TIME_NONE;
952 }
953
954 static gboolean
mpegts_parse_src_pad_query(GstPad * pad,GstObject * parent,GstQuery * query)955 mpegts_parse_src_pad_query (GstPad * pad, GstObject * parent, GstQuery * query)
956 {
957 MpegTSParse2 *parse = GST_MPEGTS_PARSE (parent);
958 gboolean res;
959
960 switch (GST_QUERY_TYPE (query)) {
961 case GST_QUERY_LATENCY:
962 {
963 if ((res = gst_pad_peer_query (((MpegTSBase *) parse)->sinkpad, query))) {
964 gboolean is_live;
965 GstClockTime min_latency, max_latency;
966
967 gst_query_parse_latency (query, &is_live, &min_latency, &max_latency);
968 if (is_live) {
969 GstClockTime extra_latency = TS_LATENCY * GST_MSECOND;
970 if (parse->set_timestamps) {
971 extra_latency = MAX (extra_latency, parse->smoothing_latency);
972 }
973 min_latency += extra_latency;
974 if (max_latency != GST_CLOCK_TIME_NONE)
975 max_latency += extra_latency;
976 }
977
978 gst_query_set_latency (query, is_live, min_latency, max_latency);
979 }
980 break;
981 }
982 default:
983 res = gst_pad_query_default (pad, parent, query);
984 }
985 return res;
986 }
987
988 gboolean
gst_mpegtsparse_plugin_init(GstPlugin * plugin)989 gst_mpegtsparse_plugin_init (GstPlugin * plugin)
990 {
991 GST_DEBUG_CATEGORY_INIT (mpegts_parse_debug, "tsparse", 0,
992 "MPEG transport stream parser");
993
994 return gst_element_register (plugin, "tsparse",
995 GST_RANK_NONE, GST_TYPE_MPEGTS_PARSE);
996 }
997