1 /*
2  * mpegtsparse.c -
3  * Copyright (C) 2007 Alessandro Decina
4  *
5  * Authors:
6  *   Alessandro Decina <alessandro@nnva.org>
7  *   Zaheer Abbas Merali <zaheerabbas at merali dot org>
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  */
24 
25 #ifdef HAVE_CONFIG_H
26 #include "config.h"
27 #endif
28 
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <string.h>
32 
33 #include "mpegtsbase.h"
34 #include "mpegtsparse.h"
35 #include "gstmpegdesc.h"
36 
37 /* latency in mseconds is maximum 100 ms between PCR */
38 #define TS_LATENCY 100
39 
40 #define TABLE_ID_UNSET 0xFF
41 #define RUNNING_STATUS_RUNNING 4
42 
43 GST_DEBUG_CATEGORY_STATIC (mpegts_parse_debug);
44 #define GST_CAT_DEFAULT mpegts_parse_debug
45 
46 typedef struct _MpegTSParsePad MpegTSParsePad;
47 
48 typedef struct
49 {
50   MpegTSBaseProgram program;
51   MpegTSParsePad *tspad;
52 } MpegTSParseProgram;
53 
54 struct _MpegTSParsePad
55 {
56   GstPad *pad;
57 
58   /* the program number that the peer wants on this pad */
59   gint program_number;
60   MpegTSParseProgram *program;
61 
62   /* set to FALSE before a push and TRUE after */
63   gboolean pushed;
64 
65   /* the return of the latest push */
66   GstFlowReturn flow_return;
67 };
68 
69 static GstStaticPadTemplate src_template =
70 GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC,
71     GST_PAD_ALWAYS,
72     GST_STATIC_CAPS ("video/mpegts, " "systemstream = (boolean) true ")
73     );
74 
75 static GstStaticPadTemplate program_template =
76 GST_STATIC_PAD_TEMPLATE ("program_%u", GST_PAD_SRC,
77     GST_PAD_REQUEST,
78     GST_STATIC_CAPS ("video/mpegts, " "systemstream = (boolean) true ")
79     );
80 
81 enum
82 {
83   PROP_0,
84   PROP_SET_TIMESTAMPS,
85   PROP_SMOOTHING_LATENCY,
86   PROP_PCR_PID,
87   /* FILL ME */
88 };
89 
90 static void mpegts_parse_set_property (GObject * object, guint prop_id,
91     const GValue * value, GParamSpec * pspec);
92 static void mpegts_parse_get_property (GObject * object, guint prop_id,
93     GValue * value, GParamSpec * pspec);
94 
95 static void
96 mpegts_parse_program_started (MpegTSBase * base, MpegTSBaseProgram * program);
97 static void
98 mpegts_parse_program_stopped (MpegTSBase * base, MpegTSBaseProgram * program);
99 
100 static GstFlowReturn
101 mpegts_parse_push (MpegTSBase * base, MpegTSPacketizerPacket * packet,
102     GstMpegtsSection * section);
103 static void mpegts_parse_inspect_packet (MpegTSBase * base,
104     MpegTSPacketizerPacket * packet);
105 
106 static MpegTSParsePad *mpegts_parse_create_tspad (MpegTSParse2 * parse,
107     const gchar * name);
108 static void mpegts_parse_destroy_tspad (MpegTSParse2 * parse,
109     MpegTSParsePad * tspad);
110 
111 static void mpegts_parse_pad_removed (GstElement * element, GstPad * pad);
112 static GstPad *mpegts_parse_request_new_pad (GstElement * element,
113     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
114 static void mpegts_parse_release_pad (GstElement * element, GstPad * pad);
115 static gboolean mpegts_parse_src_pad_query (GstPad * pad, GstObject * parent,
116     GstQuery * query);
117 static gboolean push_event (MpegTSBase * base, GstEvent * event);
118 
119 #define mpegts_parse_parent_class parent_class
120 G_DEFINE_TYPE (MpegTSParse2, mpegts_parse, GST_TYPE_MPEGTS_BASE);
121 static void mpegts_parse_reset (MpegTSBase * base);
122 static GstFlowReturn mpegts_parse_input_done (MpegTSBase * base,
123     GstBuffer * buffer);
124 static GstFlowReturn
125 drain_pending_buffers (MpegTSParse2 * parse, gboolean drain_all);
126 
127 static void
mpegts_parse_dispose(GObject * object)128 mpegts_parse_dispose (GObject * object)
129 {
130   MpegTSParse2 *parse = (MpegTSParse2 *) object;
131 
132   gst_flow_combiner_free (parse->flowcombiner);
133 
134   GST_CALL_PARENT (G_OBJECT_CLASS, dispose, (object));
135 }
136 
137 static void
mpegts_parse_class_init(MpegTSParse2Class * klass)138 mpegts_parse_class_init (MpegTSParse2Class * klass)
139 {
140   GObjectClass *gobject_class = (GObjectClass *) (klass);
141   GstElementClass *element_class;
142   MpegTSBaseClass *ts_class;
143 
144   gobject_class->set_property = mpegts_parse_set_property;
145   gobject_class->get_property = mpegts_parse_get_property;
146   gobject_class->dispose = mpegts_parse_dispose;
147 
148   g_object_class_install_property (gobject_class, PROP_SET_TIMESTAMPS,
149       g_param_spec_boolean ("set-timestamps",
150           "Timestamp (or re-timestamp) the output stream",
151           "If set, timestamps will be set on the output buffers using "
152           "PCRs and smoothed over the smoothing-latency period", FALSE,
153           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
154   g_object_class_install_property (gobject_class, PROP_SMOOTHING_LATENCY,
155       g_param_spec_uint ("smoothing-latency", "Smoothing Latency",
156           "Additional latency in microseconds for smoothing jitter in input timestamps on live capture",
157           0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
158   g_object_class_install_property (gobject_class, PROP_PCR_PID,
159       g_param_spec_int ("pcr-pid", "PID containing PCR",
160           "Set the PID to use for PCR values (-1 for auto)",
161           -1, G_MAXINT, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
162 
163   element_class = GST_ELEMENT_CLASS (klass);
164   element_class->pad_removed = mpegts_parse_pad_removed;
165   element_class->request_new_pad = mpegts_parse_request_new_pad;
166   element_class->release_pad = mpegts_parse_release_pad;
167 
168   gst_element_class_add_static_pad_template (element_class, &src_template);
169   gst_element_class_add_static_pad_template (element_class, &program_template);
170 
171   gst_element_class_set_static_metadata (element_class,
172       "MPEG transport stream parser", "Codec/Parser",
173       "Parses MPEG2 transport streams",
174       "Alessandro Decina <alessandro@nnva.org>, "
175       "Zaheer Abbas Merali <zaheerabbas at merali dot org>");
176 
177   ts_class = GST_MPEGTS_BASE_CLASS (klass);
178   ts_class->push = GST_DEBUG_FUNCPTR (mpegts_parse_push);
179   ts_class->push_event = GST_DEBUG_FUNCPTR (push_event);
180   ts_class->program_started = GST_DEBUG_FUNCPTR (mpegts_parse_program_started);
181   ts_class->program_stopped = GST_DEBUG_FUNCPTR (mpegts_parse_program_stopped);
182   ts_class->reset = GST_DEBUG_FUNCPTR (mpegts_parse_reset);
183   ts_class->input_done = GST_DEBUG_FUNCPTR (mpegts_parse_input_done);
184   ts_class->inspect_packet = GST_DEBUG_FUNCPTR (mpegts_parse_inspect_packet);
185 }
186 
187 static void
mpegts_parse_init(MpegTSParse2 * parse)188 mpegts_parse_init (MpegTSParse2 * parse)
189 {
190   MpegTSBase *base = (MpegTSBase *) parse;
191 
192   base->program_size = sizeof (MpegTSParseProgram);
193   /* We will only need to handle data/section if we have request pads */
194   base->push_data = FALSE;
195   base->push_section = FALSE;
196 
197   parse->user_pcr_pid = parse->pcr_pid = -1;
198 
199   parse->flowcombiner = gst_flow_combiner_new ();
200 
201   parse->srcpad = gst_pad_new_from_static_template (&src_template, "src");
202   gst_flow_combiner_add_pad (parse->flowcombiner, parse->srcpad);
203   parse->first = TRUE;
204   gst_pad_set_query_function (parse->srcpad,
205       GST_DEBUG_FUNCPTR (mpegts_parse_src_pad_query));
206   gst_element_add_pad (GST_ELEMENT (parse), parse->srcpad);
207 
208   parse->have_group_id = FALSE;
209   parse->group_id = G_MAXUINT;
210 }
211 
212 static void
mpegts_parse_reset(MpegTSBase * base)213 mpegts_parse_reset (MpegTSBase * base)
214 {
215   MpegTSParse2 *parse = (MpegTSParse2 *) base;
216 
217   /* Set the various know PIDs we are interested in */
218 
219   /* CAT */
220   MPEGTS_BIT_SET (base->known_psi, 1);
221   /* NIT, ST */
222   MPEGTS_BIT_SET (base->known_psi, 0x10);
223   /* SDT, BAT, ST */
224   MPEGTS_BIT_SET (base->known_psi, 0x11);
225   /* EIT, ST, CIT (TS 102 323) */
226   MPEGTS_BIT_SET (base->known_psi, 0x12);
227   /* RST, ST */
228   MPEGTS_BIT_SET (base->known_psi, 0x13);
229   /* RNT (TS 102 323) */
230   MPEGTS_BIT_SET (base->known_psi, 0x16);
231   /* inband signalling */
232   MPEGTS_BIT_SET (base->known_psi, 0x1c);
233   /* measurement */
234   MPEGTS_BIT_SET (base->known_psi, 0x1d);
235   /* DIT */
236   MPEGTS_BIT_SET (base->known_psi, 0x1e);
237   /* SIT */
238   MPEGTS_BIT_SET (base->known_psi, 0x1f);
239 
240   parse->first = TRUE;
241   parse->have_group_id = FALSE;
242   parse->group_id = G_MAXUINT;
243 
244   g_list_free_full (parse->pending_buffers, (GDestroyNotify) gst_buffer_unref);
245   parse->pending_buffers = NULL;
246 
247   parse->current_pcr = GST_CLOCK_TIME_NONE;
248   parse->previous_pcr = GST_CLOCK_TIME_NONE;
249   parse->base_pcr = GST_CLOCK_TIME_NONE;
250   parse->bytes_since_pcr = 0;
251   parse->pcr_pid = parse->user_pcr_pid;
252   parse->ts_offset = 0;
253 }
254 
255 static void
mpegts_parse_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)256 mpegts_parse_set_property (GObject * object, guint prop_id,
257     const GValue * value, GParamSpec * pspec)
258 {
259   MpegTSParse2 *parse = (MpegTSParse2 *) object;
260 
261   switch (prop_id) {
262     case PROP_SET_TIMESTAMPS:
263       parse->set_timestamps = g_value_get_boolean (value);
264       break;
265     case PROP_SMOOTHING_LATENCY:
266       parse->smoothing_latency = GST_USECOND * g_value_get_uint (value);
267       mpegts_packetizer_set_pcr_discont_threshold (GST_MPEGTS_BASE
268           (parse)->packetizer, parse->smoothing_latency);
269       break;
270     case PROP_PCR_PID:
271       parse->pcr_pid = parse->user_pcr_pid = g_value_get_int (value);
272       break;
273     default:
274       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
275   }
276 }
277 
278 static void
mpegts_parse_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)279 mpegts_parse_get_property (GObject * object, guint prop_id,
280     GValue * value, GParamSpec * pspec)
281 {
282   MpegTSParse2 *parse = (MpegTSParse2 *) object;
283 
284   switch (prop_id) {
285     case PROP_SET_TIMESTAMPS:
286       g_value_set_boolean (value, parse->set_timestamps);
287       break;
288     case PROP_SMOOTHING_LATENCY:
289       g_value_set_uint (value, parse->smoothing_latency / GST_USECOND);
290       break;
291     case PROP_PCR_PID:
292       g_value_set_int (value, parse->pcr_pid);
293       break;
294     default:
295       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
296   }
297 }
298 
299 static gboolean
prepare_src_pad(MpegTSBase * base,MpegTSParse2 * parse)300 prepare_src_pad (MpegTSBase * base, MpegTSParse2 * parse)
301 {
302   GstEvent *event;
303   gchar *stream_id;
304   GstCaps *caps;
305 
306   if (!parse->first)
307     return TRUE;
308 
309   /* If there's no packet_size yet, we can't set caps yet */
310   if (G_UNLIKELY (base->packetizer->packet_size == 0))
311     return FALSE;
312 
313   stream_id =
314       gst_pad_create_stream_id (parse->srcpad, GST_ELEMENT_CAST (base),
315       "multi-program");
316 
317   event =
318       gst_pad_get_sticky_event (parse->parent.sinkpad, GST_EVENT_STREAM_START,
319       0);
320   if (event) {
321     if (gst_event_parse_group_id (event, &parse->group_id))
322       parse->have_group_id = TRUE;
323     else
324       parse->have_group_id = FALSE;
325     gst_event_unref (event);
326   } else if (!parse->have_group_id) {
327     parse->have_group_id = TRUE;
328     parse->group_id = gst_util_group_id_next ();
329   }
330   event = gst_event_new_stream_start (stream_id);
331   if (parse->have_group_id)
332     gst_event_set_group_id (event, parse->group_id);
333 
334   gst_pad_push_event (parse->srcpad, event);
335   g_free (stream_id);
336 
337   caps = gst_caps_new_simple ("video/mpegts",
338       "systemstream", G_TYPE_BOOLEAN, TRUE,
339       "packetsize", G_TYPE_INT, base->packetizer->packet_size, NULL);
340 
341   gst_pad_set_caps (parse->srcpad, caps);
342   gst_caps_unref (caps);
343 
344   /* If setting output timestamps, ensure that the output segment is TIME */
345   if (parse->set_timestamps == FALSE || base->segment.format == GST_FORMAT_TIME)
346     gst_pad_push_event (parse->srcpad, gst_event_new_segment (&base->segment));
347   else {
348     GstSegment seg;
349     gst_segment_init (&seg, GST_FORMAT_TIME);
350     GST_DEBUG_OBJECT (parse,
351         "Generating time output segment %" GST_SEGMENT_FORMAT, &seg);
352     gst_pad_push_event (parse->srcpad, gst_event_new_segment (&seg));
353   }
354 
355   parse->first = FALSE;
356 
357   return TRUE;
358 }
359 
360 static gboolean
push_event(MpegTSBase * base,GstEvent * event)361 push_event (MpegTSBase * base, GstEvent * event)
362 {
363   MpegTSParse2 *parse = (MpegTSParse2 *) base;
364   GList *tmp;
365 
366   if (G_UNLIKELY (parse->first)) {
367     /* We will send the segment when really starting  */
368     if (G_UNLIKELY (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT)) {
369       gst_event_unref (event);
370       return TRUE;
371     }
372     prepare_src_pad (base, parse);
373   }
374   if (G_UNLIKELY (GST_EVENT_TYPE (event) == GST_EVENT_EOS))
375     drain_pending_buffers (parse, TRUE);
376 
377   if (G_UNLIKELY (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT))
378     parse->ts_offset = 0;
379 
380   for (tmp = parse->srcpads; tmp; tmp = tmp->next) {
381     GstPad *pad = (GstPad *) tmp->data;
382     if (pad) {
383       gst_event_ref (event);
384       gst_pad_push_event (pad, event);
385     }
386   }
387 
388   gst_pad_push_event (parse->srcpad, event);
389 
390   return TRUE;
391 }
392 
393 static MpegTSParsePad *
mpegts_parse_create_tspad(MpegTSParse2 * parse,const gchar * pad_name)394 mpegts_parse_create_tspad (MpegTSParse2 * parse, const gchar * pad_name)
395 {
396   GstPad *pad;
397   MpegTSParsePad *tspad;
398 
399   pad = gst_pad_new_from_static_template (&program_template, pad_name);
400   gst_pad_set_query_function (pad,
401       GST_DEBUG_FUNCPTR (mpegts_parse_src_pad_query));
402 
403   /* create our wrapper */
404   tspad = g_new0 (MpegTSParsePad, 1);
405   tspad->pad = pad;
406   tspad->program_number = -1;
407   tspad->program = NULL;
408   tspad->pushed = FALSE;
409   tspad->flow_return = GST_FLOW_NOT_LINKED;
410   gst_pad_set_element_private (pad, tspad);
411   gst_flow_combiner_add_pad (parse->flowcombiner, pad);
412 
413   return tspad;
414 }
415 
416 static void
mpegts_parse_destroy_tspad(MpegTSParse2 * parse,MpegTSParsePad * tspad)417 mpegts_parse_destroy_tspad (MpegTSParse2 * parse, MpegTSParsePad * tspad)
418 {
419   /* free the wrapper */
420   g_free (tspad);
421 }
422 
423 static void
mpegts_parse_pad_removed(GstElement * element,GstPad * pad)424 mpegts_parse_pad_removed (GstElement * element, GstPad * pad)
425 {
426   MpegTSParsePad *tspad;
427   MpegTSBase *base = (MpegTSBase *) element;
428   MpegTSParse2 *parse = GST_MPEGTS_PARSE (element);
429 
430   if (gst_pad_get_direction (pad) == GST_PAD_SINK)
431     return;
432 
433   tspad = (MpegTSParsePad *) gst_pad_get_element_private (pad);
434   if (tspad) {
435     mpegts_parse_destroy_tspad (parse, tspad);
436 
437     parse->srcpads = g_list_remove_all (parse->srcpads, pad);
438   }
439   if (parse->srcpads == NULL) {
440     base->push_data = FALSE;
441     base->push_section = FALSE;
442   }
443 
444   if (GST_ELEMENT_CLASS (parent_class)->pad_removed)
445     GST_ELEMENT_CLASS (parent_class)->pad_removed (element, pad);
446 }
447 
448 static GstPad *
mpegts_parse_request_new_pad(GstElement * element,GstPadTemplate * template,const gchar * padname,const GstCaps * caps)449 mpegts_parse_request_new_pad (GstElement * element, GstPadTemplate * template,
450     const gchar * padname, const GstCaps * caps)
451 {
452   MpegTSBase *base = (MpegTSBase *) element;
453   MpegTSParse2 *parse;
454   MpegTSParsePad *tspad;
455   MpegTSParseProgram *parseprogram;
456   GstPad *pad;
457   gint program_num = -1;
458   GstEvent *event;
459   gchar *stream_id;
460 
461   g_return_val_if_fail (template != NULL, NULL);
462   g_return_val_if_fail (GST_IS_MPEGTS_PARSE (element), NULL);
463   g_return_val_if_fail (padname != NULL, NULL);
464 
465   sscanf (padname + 8, "%d", &program_num);
466 
467   GST_DEBUG_OBJECT (element, "padname:%s, program:%d", padname, program_num);
468 
469   parse = GST_MPEGTS_PARSE (element);
470 
471   tspad = mpegts_parse_create_tspad (parse, padname);
472   tspad->program_number = program_num;
473 
474   /* Find if the program is already active */
475   parseprogram =
476       (MpegTSParseProgram *) mpegts_base_get_program (GST_MPEGTS_BASE (parse),
477       program_num);
478   if (parseprogram) {
479     tspad->program = parseprogram;
480     parseprogram->tspad = tspad;
481   }
482 
483   pad = tspad->pad;
484   parse->srcpads = g_list_append (parse->srcpads, pad);
485   base->push_data = TRUE;
486   base->push_section = TRUE;
487 
488   gst_pad_set_active (pad, TRUE);
489 
490   stream_id = gst_pad_create_stream_id (pad, element, padname + 8);
491 
492   event =
493       gst_pad_get_sticky_event (parse->parent.sinkpad, GST_EVENT_STREAM_START,
494       0);
495   if (event) {
496     if (gst_event_parse_group_id (event, &parse->group_id))
497       parse->have_group_id = TRUE;
498     else
499       parse->have_group_id = FALSE;
500     gst_event_unref (event);
501   } else if (!parse->have_group_id) {
502     parse->have_group_id = TRUE;
503     parse->group_id = gst_util_group_id_next ();
504   }
505   event = gst_event_new_stream_start (stream_id);
506   if (parse->have_group_id)
507     gst_event_set_group_id (event, parse->group_id);
508 
509   gst_pad_push_event (pad, event);
510   g_free (stream_id);
511 
512   gst_element_add_pad (element, pad);
513 
514   return pad;
515 }
516 
517 static void
mpegts_parse_release_pad(GstElement * element,GstPad * pad)518 mpegts_parse_release_pad (GstElement * element, GstPad * pad)
519 {
520   MpegTSParse2 *parse = (MpegTSParse2 *) element;
521 
522   gst_pad_set_active (pad, FALSE);
523   /* we do the cleanup in GstElement::pad-removed */
524   gst_flow_combiner_remove_pad (parse->flowcombiner, pad);
525   gst_element_remove_pad (element, pad);
526 }
527 
528 static GstFlowReturn
mpegts_parse_tspad_push_section(MpegTSParse2 * parse,MpegTSParsePad * tspad,GstMpegtsSection * section,MpegTSPacketizerPacket * packet)529 mpegts_parse_tspad_push_section (MpegTSParse2 * parse, MpegTSParsePad * tspad,
530     GstMpegtsSection * section, MpegTSPacketizerPacket * packet)
531 {
532   GstFlowReturn ret = GST_FLOW_OK;
533   gboolean to_push = TRUE;
534 
535   if (tspad->program_number != -1) {
536     if (tspad->program) {
537       /* we push all sections to all pads except PMTs which we
538        * only push to pads meant to receive that program number */
539       if (section->table_id == 0x02) {
540         /* PMT */
541         if (section->subtable_extension != tspad->program_number)
542           to_push = FALSE;
543       }
544     } else if (section->table_id != 0x00) {
545       /* there's a program filter on the pad but the PMT for the program has not
546        * been parsed yet, ignore the pad until we get a PMT.
547        * But we always allow PAT to go through */
548       to_push = FALSE;
549     }
550   }
551 
552   GST_DEBUG_OBJECT (parse,
553       "pushing section: %d program number: %d table_id: %d", to_push,
554       tspad->program_number, section->table_id);
555 
556   if (to_push) {
557     GstBuffer *buf =
558         gst_buffer_new_and_alloc (packet->data_end - packet->data_start);
559     gst_buffer_fill (buf, 0, packet->data_start,
560         packet->data_end - packet->data_start);
561     ret = gst_pad_push (tspad->pad, buf);
562     ret = gst_flow_combiner_update_flow (parse->flowcombiner, ret);
563   }
564 
565   GST_LOG_OBJECT (parse, "Returning %s", gst_flow_get_name (ret));
566   return ret;
567 }
568 
569 static GstFlowReturn
mpegts_parse_tspad_push(MpegTSParse2 * parse,MpegTSParsePad * tspad,MpegTSPacketizerPacket * packet)570 mpegts_parse_tspad_push (MpegTSParse2 * parse, MpegTSParsePad * tspad,
571     MpegTSPacketizerPacket * packet)
572 {
573   GstFlowReturn ret = GST_FLOW_OK;
574   MpegTSBaseProgram *bp = NULL;
575 
576   if (tspad->program_number != -1) {
577     if (tspad->program)
578       bp = (MpegTSBaseProgram *) tspad->program;
579     else
580       bp = mpegts_base_get_program ((MpegTSBase *) parse,
581           tspad->program_number);
582   }
583 
584   if (bp) {
585     if (packet->pid == bp->pmt_pid || bp->streams == NULL
586         || bp->streams[packet->pid]) {
587       GstBuffer *buf =
588           gst_buffer_new_and_alloc (packet->data_end - packet->data_start);
589       gst_buffer_fill (buf, 0, packet->data_start,
590           packet->data_end - packet->data_start);
591       /* push if there's no filter or if the pid is in the filter */
592       ret = gst_pad_push (tspad->pad, buf);
593       ret = gst_flow_combiner_update_flow (parse->flowcombiner, ret);
594     }
595   }
596   GST_DEBUG_OBJECT (parse, "Returning %s", gst_flow_get_name (ret));
597 
598   return ret;
599 }
600 
601 static void
pad_clear_for_push(GstPad * pad,MpegTSParse2 * parse)602 pad_clear_for_push (GstPad * pad, MpegTSParse2 * parse)
603 {
604   MpegTSParsePad *tspad = (MpegTSParsePad *) gst_pad_get_element_private (pad);
605 
606   tspad->flow_return = GST_FLOW_NOT_LINKED;
607   tspad->pushed = FALSE;
608 }
609 
610 static GstFlowReturn
mpegts_parse_push(MpegTSBase * base,MpegTSPacketizerPacket * packet,GstMpegtsSection * section)611 mpegts_parse_push (MpegTSBase * base, MpegTSPacketizerPacket * packet,
612     GstMpegtsSection * section)
613 {
614   MpegTSParse2 *parse = (MpegTSParse2 *) base;
615   guint32 pads_cookie;
616   gboolean done = FALSE;
617   GstPad *pad = NULL;
618   MpegTSParsePad *tspad;
619   GstFlowReturn ret;
620   GList *srcpads;
621 
622   GST_OBJECT_LOCK (parse);
623   srcpads = parse->srcpads;
624 
625   /* clear tspad->pushed on pads */
626   g_list_foreach (srcpads, (GFunc) pad_clear_for_push, parse);
627   if (srcpads)
628     ret = GST_FLOW_NOT_LINKED;
629   else
630     ret = GST_FLOW_OK;
631 
632   /* Get cookie and source pads list */
633   pads_cookie = GST_ELEMENT_CAST (parse)->pads_cookie;
634   if (G_LIKELY (srcpads)) {
635     pad = GST_PAD_CAST (srcpads->data);
636     g_object_ref (pad);
637   }
638   GST_OBJECT_UNLOCK (parse);
639 
640   while (pad && !done) {
641     tspad = gst_pad_get_element_private (pad);
642 
643     if (G_LIKELY (!tspad->pushed)) {
644       if (section) {
645         tspad->flow_return =
646             mpegts_parse_tspad_push_section (parse, tspad, section, packet);
647       } else {
648         tspad->flow_return = mpegts_parse_tspad_push (parse, tspad, packet);
649       }
650       tspad->pushed = TRUE;
651 
652       if (G_UNLIKELY (tspad->flow_return != GST_FLOW_OK
653               && tspad->flow_return != GST_FLOW_NOT_LINKED)) {
654         /* return the error upstream */
655         ret = tspad->flow_return;
656         done = TRUE;
657       }
658 
659     }
660 
661     if (ret == GST_FLOW_NOT_LINKED)
662       ret = tspad->flow_return;
663 
664     g_object_unref (pad);
665 
666     if (G_UNLIKELY (!done)) {
667       GST_OBJECT_LOCK (parse);
668       if (G_UNLIKELY (pads_cookie != GST_ELEMENT_CAST (parse)->pads_cookie)) {
669         /* resync */
670         GST_DEBUG ("resync");
671         pads_cookie = GST_ELEMENT_CAST (parse)->pads_cookie;
672         srcpads = parse->srcpads;
673       } else {
674         GST_DEBUG ("getting next pad");
675         /* Get next pad */
676         srcpads = g_list_next (srcpads);
677       }
678 
679       if (srcpads) {
680         pad = GST_PAD_CAST (srcpads->data);
681         g_object_ref (pad);
682       } else
683         done = TRUE;
684       GST_OBJECT_UNLOCK (parse);
685     }
686   }
687 
688   return ret;
689 }
690 
691 static void
mpegts_parse_inspect_packet(MpegTSBase * base,MpegTSPacketizerPacket * packet)692 mpegts_parse_inspect_packet (MpegTSBase * base, MpegTSPacketizerPacket * packet)
693 {
694   MpegTSParse2 *parse = GST_MPEGTS_PARSE (base);
695   GST_LOG ("pid 0x%04x pusi:%d, afc:%d, cont:%d, payload:%p PCR %"
696       G_GUINT64_FORMAT, packet->pid, packet->payload_unit_start_indicator,
697       packet->scram_afc_cc & 0x30,
698       FLAGS_CONTINUITY_COUNTER (packet->scram_afc_cc), packet->payload,
699       packet->pcr);
700 
701   /* Store the PCR if desired */
702   if (parse->current_pcr == GST_CLOCK_TIME_NONE &&
703       packet->afc_flags & MPEGTS_AFC_PCR_FLAG) {
704     /* Take this as the pcr_pid if set to auto-select */
705     if (parse->pcr_pid == -1)
706       parse->pcr_pid = packet->pid;
707     /* Check the PCR-PID matches the program we want for multiple programs */
708     if (parse->pcr_pid == packet->pid) {
709       parse->current_pcr = mpegts_packetizer_pts_to_ts (base->packetizer,
710           PCRTIME_TO_GSTTIME (packet->pcr), parse->pcr_pid);
711       GST_DEBUG ("Got new PCR %" GST_TIME_FORMAT " raw %" G_GUINT64_FORMAT,
712           GST_TIME_ARGS (parse->current_pcr), packet->pcr);
713       if (parse->base_pcr == GST_CLOCK_TIME_NONE) {
714         parse->base_pcr = parse->current_pcr;
715       }
716     }
717   }
718 }
719 
720 static GstClockTime
get_pending_timestamp_diff(MpegTSParse2 * parse)721 get_pending_timestamp_diff (MpegTSParse2 * parse)
722 {
723   GList *l;
724   GstClockTime first_ts, last_ts;
725 
726   if (parse->pending_buffers == NULL)
727     return GST_CLOCK_TIME_NONE;
728 
729   l = g_list_last (parse->pending_buffers);
730   first_ts = GST_BUFFER_PTS (l->data);
731   if (first_ts == GST_CLOCK_TIME_NONE)
732     return GST_CLOCK_TIME_NONE;
733 
734   l = g_list_first (parse->pending_buffers);
735   last_ts = GST_BUFFER_PTS (l->data);
736   if (last_ts == GST_CLOCK_TIME_NONE)
737     return GST_CLOCK_TIME_NONE;
738 
739   return last_ts - first_ts;
740 }
741 
742 static GstFlowReturn
drain_pending_buffers(MpegTSParse2 * parse,gboolean drain_all)743 drain_pending_buffers (MpegTSParse2 * parse, gboolean drain_all)
744 {
745   GstFlowReturn ret = GST_FLOW_OK;
746   GstClockTime start_ts;
747   GstClockTime pcr = GST_CLOCK_TIME_NONE;
748   GstClockTime pcr_diff = 0;
749   gsize pcr_bytes, bytes_since_pcr, pos;
750   GstBuffer *buffer;
751   GList *l, *end = NULL;
752 
753   if (parse->pending_buffers == NULL)
754     return GST_FLOW_OK;         /* Nothing to push */
755 
756   /*
757    * There are 4 cases:
758    *  1 We get a buffer with no PCR -> it's the head of the list
759    *      -> Do nothing, unless it's EOS
760    *  2 We get a buffer with a PCR, it's the first PCR we've seen, and belongs
761    *    to the buffer at the head of the list
762    *    -> Push any buffers in the list except the head,
763    *       using a smoothing of their timestamps to land at the PCR
764    *    -> store new PCR as the previous PCR, bytes_since_pcr = sizeof (buffer);
765    *  3 It's EOS (drain_all == TRUE, current_pcr == NONE)
766    *    -> Push any buffers in the list using a smoothing of their timestamps
767    *       starting at the previous PCR or first TS
768    *  4 We get a buffer with a PCR, and have a previous PCR
769    *    -> If distance > smoothing_latency,
770    *       output buffers except the last in the pending queue using
771    *       piecewise-linear timestamps
772    *    -> store new PCR as the previous PCR, bytes_since_pcr = sizeof (buffer);
773    */
774 
775   /* Case 1 */
776   if (!GST_CLOCK_TIME_IS_VALID (parse->current_pcr) && !drain_all)
777     return GST_FLOW_OK;
778 
779   if (GST_CLOCK_TIME_IS_VALID (parse->current_pcr)) {
780     pcr = parse->current_pcr;
781     parse->current_pcr = GST_CLOCK_TIME_NONE;
782   }
783 
784   /* The bytes of the last buffer are after the PCR */
785   buffer = GST_BUFFER (g_list_nth_data (parse->pending_buffers, 0));
786   bytes_since_pcr = gst_buffer_get_size (buffer);
787 
788   pcr_bytes = parse->bytes_since_pcr - bytes_since_pcr;
789 
790   if (!drain_all)
791     end = g_list_first (parse->pending_buffers);
792 
793   /* Case 2 */
794   if (!GST_CLOCK_TIME_IS_VALID (parse->previous_pcr)) {
795     pcr_diff = get_pending_timestamp_diff (parse);
796 
797     /* Calculate the start_ts that ends at the end timestamp */
798     start_ts = GST_CLOCK_TIME_NONE;
799     if (end) {
800       start_ts = GST_BUFFER_PTS (GST_BUFFER (end->data));
801       if (start_ts > pcr_diff)
802         start_ts -= pcr_diff;
803     }
804   } else if (drain_all) {       /* Case 3 */
805     start_ts = parse->previous_pcr;
806     pcr_diff = get_pending_timestamp_diff (parse);
807   } else {                      /* Case 4 */
808     start_ts = parse->previous_pcr;
809     if (GST_CLOCK_TIME_IS_VALID (pcr) && pcr > start_ts)
810       pcr_diff = GST_CLOCK_DIFF (start_ts, pcr);
811 
812     /* Make sure PCR observations are sufficiently far apart */
813     if (drain_all == FALSE && pcr_diff < parse->smoothing_latency)
814       return GST_FLOW_OK;
815   }
816 
817   GST_INFO_OBJECT (parse, "Pushing buffers - startTS %" GST_TIME_FORMAT
818       " duration %" GST_TIME_FORMAT " %" G_GSIZE_FORMAT " bytes",
819       GST_TIME_ARGS (start_ts), GST_TIME_ARGS (pcr_diff), pcr_bytes);
820 
821   /* Now, push buffers out pacing timestamps over pcr_diff time and pcr_bytes */
822   pos = 0;
823   l = g_list_last (parse->pending_buffers);
824   while (l != end) {
825     GList *p;
826     GstClockTime out_ts = start_ts;
827 
828     buffer = gst_buffer_make_writable (GST_BUFFER (l->data));
829 
830     if (out_ts != GST_CLOCK_TIME_NONE && pcr_diff != GST_CLOCK_TIME_NONE &&
831         pcr_bytes && pos)
832       out_ts += gst_util_uint64_scale (pcr_diff, pos, pcr_bytes);
833 
834     pos += gst_buffer_get_size (buffer);
835 
836     GST_DEBUG_OBJECT (parse,
837         "InputTS %" GST_TIME_FORMAT " out %" GST_TIME_FORMAT,
838         GST_TIME_ARGS (GST_BUFFER_PTS (buffer)), GST_TIME_ARGS (out_ts));
839 
840     GST_BUFFER_PTS (buffer) = out_ts + parse->ts_offset;
841     GST_BUFFER_DTS (buffer) = out_ts + parse->ts_offset;
842     if (ret == GST_FLOW_OK) {
843       ret = gst_pad_push (parse->srcpad, buffer);
844       ret = gst_flow_combiner_update_flow (parse->flowcombiner, ret);
845     } else
846       gst_buffer_unref (buffer);
847 
848     /* Free this list node and move to the next */
849     p = g_list_previous (l);
850     parse->pending_buffers = g_list_delete_link (parse->pending_buffers, l);
851     l = p;
852   }
853 
854   parse->pending_buffers = end;
855   parse->bytes_since_pcr = bytes_since_pcr;
856   parse->previous_pcr = pcr;
857   return ret;
858 }
859 
860 static GstFlowReturn
mpegts_parse_input_done(MpegTSBase * base,GstBuffer * buffer)861 mpegts_parse_input_done (MpegTSBase * base, GstBuffer * buffer)
862 {
863   MpegTSParse2 *parse = GST_MPEGTS_PARSE (base);
864   GstFlowReturn ret = GST_FLOW_OK;
865 
866   GST_LOG_OBJECT (parse, "Received buffer %" GST_PTR_FORMAT, buffer);
867 
868   if (parse->current_pcr != GST_CLOCK_TIME_NONE) {
869     GST_DEBUG_OBJECT (parse,
870         "InputTS %" GST_TIME_FORMAT " PCR %" GST_TIME_FORMAT,
871         GST_TIME_ARGS (GST_BUFFER_PTS (buffer)),
872         GST_TIME_ARGS (parse->current_pcr));
873   }
874 
875   if (parse->set_timestamps || parse->first) {
876     parse->pending_buffers = g_list_prepend (parse->pending_buffers, buffer);
877     parse->bytes_since_pcr += gst_buffer_get_size (buffer);
878     buffer = NULL;
879   }
880 
881   if (!prepare_src_pad (base, parse))
882     return GST_FLOW_OK;
883 
884   if (parse->pending_buffers != NULL) {
885     /* Don't keep pending_buffers if not setting output timestamps */
886     gboolean drain_all = (parse->set_timestamps == FALSE);
887     ret = drain_pending_buffers (parse, drain_all);
888     if (ret != GST_FLOW_OK) {
889       if (buffer)
890         gst_buffer_unref (buffer);
891       return ret;
892     }
893   }
894 
895   if (buffer != NULL) {
896     ret = gst_pad_push (parse->srcpad, buffer);
897     ret = gst_flow_combiner_update_flow (parse->flowcombiner, ret);
898   }
899 
900   return ret;
901 }
902 
903 static MpegTSParsePad *
find_pad_for_program(MpegTSParse2 * parse,guint program_number)904 find_pad_for_program (MpegTSParse2 * parse, guint program_number)
905 {
906   GList *tmp;
907 
908   for (tmp = parse->srcpads; tmp; tmp = tmp->next) {
909     MpegTSParsePad *tspad = gst_pad_get_element_private ((GstPad *) tmp->data);
910 
911     if (tspad->program_number == program_number)
912       return tspad;
913   }
914 
915   return NULL;
916 }
917 
918 static void
mpegts_parse_program_started(MpegTSBase * base,MpegTSBaseProgram * program)919 mpegts_parse_program_started (MpegTSBase * base, MpegTSBaseProgram * program)
920 {
921   MpegTSParse2 *parse = GST_MPEGTS_PARSE (base);
922   MpegTSParseProgram *parseprogram = (MpegTSParseProgram *) program;
923   MpegTSParsePad *tspad;
924 
925   /* If we have a request pad for that program, activate it */
926   tspad = find_pad_for_program (parse, program->program_number);
927 
928   if (tspad) {
929     tspad->program = parseprogram;
930     parseprogram->tspad = tspad;
931   }
932 }
933 
934 static void
mpegts_parse_program_stopped(MpegTSBase * base,MpegTSBaseProgram * program)935 mpegts_parse_program_stopped (MpegTSBase * base, MpegTSBaseProgram * program)
936 {
937   MpegTSParse2 *parse = GST_MPEGTS_PARSE (base);
938   MpegTSParseProgram *parseprogram = (MpegTSParseProgram *) program;
939   MpegTSParsePad *tspad;
940 
941   /* If we have a request pad for that program, activate it */
942   tspad = find_pad_for_program (parse, program->program_number);
943 
944   if (tspad) {
945     tspad->program = NULL;
946     parseprogram->tspad = NULL;
947   }
948 
949   parse->pcr_pid = -1;
950   parse->ts_offset += parse->current_pcr - parse->base_pcr;
951   parse->base_pcr = GST_CLOCK_TIME_NONE;
952 }
953 
954 static gboolean
mpegts_parse_src_pad_query(GstPad * pad,GstObject * parent,GstQuery * query)955 mpegts_parse_src_pad_query (GstPad * pad, GstObject * parent, GstQuery * query)
956 {
957   MpegTSParse2 *parse = GST_MPEGTS_PARSE (parent);
958   gboolean res;
959 
960   switch (GST_QUERY_TYPE (query)) {
961     case GST_QUERY_LATENCY:
962     {
963       if ((res = gst_pad_peer_query (((MpegTSBase *) parse)->sinkpad, query))) {
964         gboolean is_live;
965         GstClockTime min_latency, max_latency;
966 
967         gst_query_parse_latency (query, &is_live, &min_latency, &max_latency);
968         if (is_live) {
969           GstClockTime extra_latency = TS_LATENCY * GST_MSECOND;
970           if (parse->set_timestamps) {
971             extra_latency = MAX (extra_latency, parse->smoothing_latency);
972           }
973           min_latency += extra_latency;
974           if (max_latency != GST_CLOCK_TIME_NONE)
975             max_latency += extra_latency;
976         }
977 
978         gst_query_set_latency (query, is_live, min_latency, max_latency);
979       }
980       break;
981     }
982     default:
983       res = gst_pad_query_default (pad, parent, query);
984   }
985   return res;
986 }
987 
988 gboolean
gst_mpegtsparse_plugin_init(GstPlugin * plugin)989 gst_mpegtsparse_plugin_init (GstPlugin * plugin)
990 {
991   GST_DEBUG_CATEGORY_INIT (mpegts_parse_debug, "tsparse", 0,
992       "MPEG transport stream parser");
993 
994   return gst_element_register (plugin, "tsparse",
995       GST_RANK_NONE, GST_TYPE_MPEGTS_PARSE);
996 }
997