1 /*
2  * mpegtspacketizer.c -
3  * Copyright (C) 2007, 2008 Alessandro Decina, Zaheer Merali
4  *
5  * Authors:
6  *   Zaheer Merali <zaheerabbas at merali dot org>
7  *   Alessandro Decina <alessandro@nnva.org>
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  */
24 
25 #include <string.h>
26 #include <stdlib.h>
27 
28 /* Skew calculation pameters */
29 #define MAX_TIME	(2 * GST_SECOND)
30 
31 /* maximal PCR time */
32 #define PCR_MAX_VALUE (((((guint64)1)<<33) * 300) + 298)
33 #define PCR_GST_MAX_VALUE (PCR_MAX_VALUE * GST_MSECOND / (PCR_MSECOND))
34 #define PTS_DTS_MAX_VALUE (((guint64)1) << 33)
35 
36 #include "mpegtspacketizer.h"
37 #include "gstmpegdesc.h"
38 
39 GST_DEBUG_CATEGORY_STATIC (mpegts_packetizer_debug);
40 #define GST_CAT_DEFAULT mpegts_packetizer_debug
41 
42 static void _init_local (void);
43 G_DEFINE_TYPE_EXTENDED (MpegTSPacketizer2, mpegts_packetizer, G_TYPE_OBJECT, 0,
44     _init_local ());
45 
46 #define ABSDIFF(a,b) ((a) < (b) ? (b) - (a) : (a) - (b))
47 
48 #define PACKETIZER_GROUP_LOCK(p) g_mutex_lock(&((p)->group_lock))
49 #define PACKETIZER_GROUP_UNLOCK(p) g_mutex_unlock(&((p)->group_lock))
50 
51 static void mpegts_packetizer_dispose (GObject * object);
52 static void mpegts_packetizer_finalize (GObject * object);
53 static GstClockTime calculate_skew (MpegTSPacketizer2 * packetizer,
54     MpegTSPCR * pcr, guint64 pcrtime, GstClockTime time);
55 static void _close_current_group (MpegTSPCR * pcrtable);
56 static void record_pcr (MpegTSPacketizer2 * packetizer, MpegTSPCR * pcrtable,
57     guint64 pcr, guint64 offset);
58 
59 #define CONTINUITY_UNSET 255
60 #define VERSION_NUMBER_UNSET 255
61 #define TABLE_ID_UNSET 0xFF
62 #define PACKET_SYNC_BYTE 0x47
63 
64 static inline MpegTSPCR *
get_pcr_table(MpegTSPacketizer2 * packetizer,guint16 pid)65 get_pcr_table (MpegTSPacketizer2 * packetizer, guint16 pid)
66 {
67   MpegTSPCR *res;
68 
69   res = packetizer->observations[packetizer->pcrtablelut[pid]];
70 
71   if (G_UNLIKELY (res == NULL)) {
72     /* If we don't have a PCR table for the requested PID, create one .. */
73     res = g_new0 (MpegTSPCR, 1);
74     /* Add it to the last table position */
75     packetizer->observations[packetizer->lastobsid] = res;
76     /* Update the pcrtablelut */
77     packetizer->pcrtablelut[pid] = packetizer->lastobsid;
78     /* And increment the last know slot */
79     packetizer->lastobsid++;
80 
81     /* Finally set the default values */
82     res->pid = pid;
83     res->base_time = GST_CLOCK_TIME_NONE;
84     res->base_pcrtime = GST_CLOCK_TIME_NONE;
85     res->last_pcrtime = GST_CLOCK_TIME_NONE;
86     res->window_pos = 0;
87     res->window_filling = TRUE;
88     res->window_min = 0;
89     res->skew = 0;
90     res->prev_send_diff = GST_CLOCK_TIME_NONE;
91     res->prev_out_time = GST_CLOCK_TIME_NONE;
92     res->pcroffset = 0;
93 
94     res->current = g_slice_new0 (PCROffsetCurrent);
95   }
96 
97   return res;
98 }
99 
100 static void
pcr_offset_group_free(PCROffsetGroup * group)101 pcr_offset_group_free (PCROffsetGroup * group)
102 {
103   g_free (group->values);
104   g_slice_free (PCROffsetGroup, group);
105 }
106 
107 static void
flush_observations(MpegTSPacketizer2 * packetizer)108 flush_observations (MpegTSPacketizer2 * packetizer)
109 {
110   gint i;
111 
112   for (i = 0; i < packetizer->lastobsid; i++) {
113     g_list_free_full (packetizer->observations[i]->groups,
114         (GDestroyNotify) pcr_offset_group_free);
115     if (packetizer->observations[i]->current)
116       g_slice_free (PCROffsetCurrent, packetizer->observations[i]->current);
117     g_free (packetizer->observations[i]);
118     packetizer->observations[i] = NULL;
119   }
120   memset (packetizer->pcrtablelut, 0xff, 0x2000);
121   packetizer->lastobsid = 0;
122 }
123 
124 GstClockTime
mpegts_packetizer_get_current_time(MpegTSPacketizer2 * packetizer,guint16 pcr_pid)125 mpegts_packetizer_get_current_time (MpegTSPacketizer2 * packetizer,
126     guint16 pcr_pid)
127 {
128   MpegTSPCR *pcrtable = get_pcr_table (packetizer, pcr_pid);
129 
130   if (pcrtable == NULL)
131     return GST_CLOCK_TIME_NONE;
132 
133   return mpegts_packetizer_pts_to_ts (packetizer, pcrtable->last_pcrtime,
134       pcr_pid);
135 }
136 
137 static inline MpegTSPacketizerStreamSubtable *
find_subtable(GSList * subtables,guint8 table_id,guint16 subtable_extension)138 find_subtable (GSList * subtables, guint8 table_id, guint16 subtable_extension)
139 {
140   GSList *tmp;
141 
142   /* FIXME: Make this an array ! */
143   for (tmp = subtables; tmp; tmp = tmp->next) {
144     MpegTSPacketizerStreamSubtable *sub =
145         (MpegTSPacketizerStreamSubtable *) tmp->data;
146     if (sub->table_id == table_id
147         && sub->subtable_extension == subtable_extension)
148       return sub;
149   }
150 
151   return NULL;
152 }
153 
154 static gboolean
seen_section_before(MpegTSPacketizerStream * stream,guint8 table_id,guint16 subtable_extension,guint8 version_number,guint8 section_number,guint8 last_section_number)155 seen_section_before (MpegTSPacketizerStream * stream, guint8 table_id,
156     guint16 subtable_extension, guint8 version_number, guint8 section_number,
157     guint8 last_section_number)
158 {
159   MpegTSPacketizerStreamSubtable *subtable;
160 
161   /* Check if we've seen this table_id/subtable_extension first */
162   subtable = find_subtable (stream->subtables, table_id, subtable_extension);
163   if (!subtable) {
164     GST_DEBUG ("Haven't seen subtable");
165     return FALSE;
166   }
167   /* If we have, check it has the same version_number */
168   if (subtable->version_number != version_number) {
169     GST_DEBUG ("Different version number");
170     return FALSE;
171   }
172   /* Did the number of sections change ? */
173   if (subtable->last_section_number != last_section_number) {
174     GST_DEBUG ("Different last_section_number");
175     return FALSE;
176   }
177   /* Finally return whether we saw that section or not */
178   return MPEGTS_BIT_IS_SET (subtable->seen_section, section_number);
179 }
180 
181 static MpegTSPacketizerStreamSubtable *
mpegts_packetizer_stream_subtable_new(guint8 table_id,guint16 subtable_extension,guint8 last_section_number)182 mpegts_packetizer_stream_subtable_new (guint8 table_id,
183     guint16 subtable_extension, guint8 last_section_number)
184 {
185   MpegTSPacketizerStreamSubtable *subtable;
186 
187   subtable = g_new0 (MpegTSPacketizerStreamSubtable, 1);
188   subtable->version_number = VERSION_NUMBER_UNSET;
189   subtable->table_id = table_id;
190   subtable->subtable_extension = subtable_extension;
191   subtable->last_section_number = last_section_number;
192   return subtable;
193 }
194 
195 static MpegTSPacketizerStream *
mpegts_packetizer_stream_new(guint16 pid)196 mpegts_packetizer_stream_new (guint16 pid)
197 {
198   MpegTSPacketizerStream *stream;
199 
200   stream = (MpegTSPacketizerStream *) g_new0 (MpegTSPacketizerStream, 1);
201   stream->continuity_counter = CONTINUITY_UNSET;
202   stream->subtables = NULL;
203   stream->table_id = TABLE_ID_UNSET;
204   stream->pid = pid;
205   return stream;
206 }
207 
208 static void
mpegts_packetizer_clear_section(MpegTSPacketizerStream * stream)209 mpegts_packetizer_clear_section (MpegTSPacketizerStream * stream)
210 {
211   stream->continuity_counter = CONTINUITY_UNSET;
212   stream->section_length = 0;
213   stream->section_offset = 0;
214   stream->table_id = TABLE_ID_UNSET;
215   g_free (stream->section_data);
216   stream->section_data = NULL;
217 }
218 
219 static void
mpegts_packetizer_stream_subtable_free(MpegTSPacketizerStreamSubtable * subtable)220 mpegts_packetizer_stream_subtable_free (MpegTSPacketizerStreamSubtable *
221     subtable)
222 {
223   g_free (subtable);
224 }
225 
226 static void
mpegts_packetizer_stream_free(MpegTSPacketizerStream * stream)227 mpegts_packetizer_stream_free (MpegTSPacketizerStream * stream)
228 {
229   mpegts_packetizer_clear_section (stream);
230   g_slist_foreach (stream->subtables,
231       (GFunc) mpegts_packetizer_stream_subtable_free, NULL);
232   g_slist_free (stream->subtables);
233   g_free (stream);
234 }
235 
236 static void
mpegts_packetizer_class_init(MpegTSPacketizer2Class * klass)237 mpegts_packetizer_class_init (MpegTSPacketizer2Class * klass)
238 {
239   GObjectClass *gobject_class;
240 
241   gobject_class = G_OBJECT_CLASS (klass);
242 
243   gobject_class->dispose = mpegts_packetizer_dispose;
244   gobject_class->finalize = mpegts_packetizer_finalize;
245 }
246 
247 static void
mpegts_packetizer_init(MpegTSPacketizer2 * packetizer)248 mpegts_packetizer_init (MpegTSPacketizer2 * packetizer)
249 {
250   g_mutex_init (&packetizer->group_lock);
251 
252   packetizer->adapter = gst_adapter_new ();
253   packetizer->offset = 0;
254   packetizer->empty = TRUE;
255   packetizer->streams = g_new0 (MpegTSPacketizerStream *, 8192);
256   packetizer->packet_size = 0;
257   packetizer->calculate_skew = FALSE;
258   packetizer->calculate_offset = FALSE;
259 
260   packetizer->map_data = NULL;
261   packetizer->map_size = 0;
262   packetizer->map_offset = 0;
263   packetizer->need_sync = FALSE;
264 
265   memset (packetizer->pcrtablelut, 0xff, 0x2000);
266   memset (packetizer->observations, 0x0, sizeof (packetizer->observations));
267   packetizer->lastobsid = 0;
268 
269   packetizer->nb_seen_offsets = 0;
270   packetizer->refoffset = -1;
271   packetizer->last_in_time = GST_CLOCK_TIME_NONE;
272   packetizer->pcr_discont_threshold = GST_SECOND;
273 }
274 
275 static void
mpegts_packetizer_dispose(GObject * object)276 mpegts_packetizer_dispose (GObject * object)
277 {
278   MpegTSPacketizer2 *packetizer = GST_MPEGTS_PACKETIZER (object);
279 
280   if (!packetizer->disposed) {
281     if (packetizer->packet_size)
282       packetizer->packet_size = 0;
283     if (packetizer->streams) {
284       int i;
285       for (i = 0; i < 8192; i++) {
286         if (packetizer->streams[i])
287           mpegts_packetizer_stream_free (packetizer->streams[i]);
288       }
289       g_free (packetizer->streams);
290     }
291 
292     gst_adapter_clear (packetizer->adapter);
293     g_object_unref (packetizer->adapter);
294     g_mutex_clear (&packetizer->group_lock);
295     packetizer->disposed = TRUE;
296     packetizer->offset = 0;
297     packetizer->empty = TRUE;
298 
299     flush_observations (packetizer);
300   }
301 
302   if (G_OBJECT_CLASS (mpegts_packetizer_parent_class)->dispose)
303     G_OBJECT_CLASS (mpegts_packetizer_parent_class)->dispose (object);
304 }
305 
306 static void
mpegts_packetizer_finalize(GObject * object)307 mpegts_packetizer_finalize (GObject * object)
308 {
309   if (G_OBJECT_CLASS (mpegts_packetizer_parent_class)->finalize)
310     G_OBJECT_CLASS (mpegts_packetizer_parent_class)->finalize (object);
311 }
312 
313 static inline guint64
mpegts_packetizer_compute_pcr(const guint8 * data)314 mpegts_packetizer_compute_pcr (const guint8 * data)
315 {
316   guint32 pcr1;
317   guint16 pcr2;
318   guint64 pcr, pcr_ext;
319 
320   pcr1 = GST_READ_UINT32_BE (data);
321   pcr2 = GST_READ_UINT16_BE (data + 4);
322   pcr = ((guint64) pcr1) << 1;
323   pcr |= (pcr2 & 0x8000) >> 15;
324   pcr_ext = (pcr2 & 0x01ff);
325   return pcr * 300 + pcr_ext % 300;
326 }
327 
328 static gboolean
mpegts_packetizer_parse_adaptation_field_control(MpegTSPacketizer2 * packetizer,MpegTSPacketizerPacket * packet)329 mpegts_packetizer_parse_adaptation_field_control (MpegTSPacketizer2 *
330     packetizer, MpegTSPacketizerPacket * packet)
331 {
332   guint8 length, afcflags;
333   guint8 *data;
334 
335   length = *packet->data++;
336 
337   /* an adaptation field with length 0 is valid and
338    * can be used to insert a single stuffing byte */
339   if (!length) {
340     packet->afc_flags = 0;
341     return TRUE;
342   }
343 
344   if ((packet->scram_afc_cc & 0x30) == 0x20) {
345     /* no payload, adaptation field of 183 bytes */
346     if (length > 183) {
347       GST_WARNING ("PID 0x%04x afc == 0x%02x and length %d > 183",
348           packet->pid, packet->scram_afc_cc & 0x30, length);
349       return FALSE;
350     }
351     if (length != 183) {
352       GST_WARNING ("PID 0x%04x afc == 0x%02x and length %d != 183",
353           packet->pid, packet->scram_afc_cc & 0x30, length);
354       GST_MEMDUMP ("Unknown payload", packet->data + length,
355           packet->data_end - packet->data - length);
356     }
357   } else if (length > 182) {
358     GST_WARNING ("PID 0x%04x afc == 0x%02x and length %d > 182",
359         packet->pid, packet->scram_afc_cc & 0x30, length);
360     return FALSE;
361   }
362 
363   if (packet->data + length > packet->data_end) {
364     GST_DEBUG
365         ("PID 0x%04x afc length %d overflows the buffer current %d max %d",
366         packet->pid, length, (gint) (packet->data - packet->data_start),
367         (gint) (packet->data_end - packet->data_start));
368     return FALSE;
369   }
370 
371   data = packet->data;
372   packet->data += length;
373 
374   afcflags = packet->afc_flags = *data++;
375 
376   GST_DEBUG ("flags: %s%s%s%s%s%s%s%s%s",
377       afcflags & 0x80 ? "discontinuity " : "",
378       afcflags & 0x40 ? "random_access " : "",
379       afcflags & 0x20 ? "elementary_stream_priority " : "",
380       afcflags & 0x10 ? "PCR " : "",
381       afcflags & 0x08 ? "OPCR " : "",
382       afcflags & 0x04 ? "splicing_point " : "",
383       afcflags & 0x02 ? "transport_private_data " : "",
384       afcflags & 0x01 ? "extension " : "", afcflags == 0x00 ? "<none>" : "");
385 
386   /* PCR */
387   if (afcflags & MPEGTS_AFC_PCR_FLAG) {
388     MpegTSPCR *pcrtable = NULL;
389     packet->pcr = mpegts_packetizer_compute_pcr (data);
390     data += 6;
391     GST_DEBUG ("pcr 0x%04x %" G_GUINT64_FORMAT " (%" GST_TIME_FORMAT
392         ") offset:%" G_GUINT64_FORMAT, packet->pid, packet->pcr,
393         GST_TIME_ARGS (PCRTIME_TO_GSTTIME (packet->pcr)), packet->offset);
394 
395     PACKETIZER_GROUP_LOCK (packetizer);
396     if (packetizer->calculate_skew
397         && GST_CLOCK_TIME_IS_VALID (packetizer->last_in_time)) {
398       pcrtable = get_pcr_table (packetizer, packet->pid);
399       calculate_skew (packetizer, pcrtable, packet->pcr,
400           packetizer->last_in_time);
401     }
402     if (packetizer->calculate_offset) {
403       if (!pcrtable)
404         pcrtable = get_pcr_table (packetizer, packet->pid);
405       record_pcr (packetizer, pcrtable, packet->pcr, packet->offset);
406     }
407     PACKETIZER_GROUP_UNLOCK (packetizer);
408   }
409 #ifndef GST_DISABLE_GST_DEBUG
410   /* OPCR */
411   if (afcflags & MPEGTS_AFC_OPCR_FLAG) {
412     /* Note: We don't use/need opcr for the time being */
413     guint64 opcr = mpegts_packetizer_compute_pcr (data);
414     data += 6;
415     GST_DEBUG ("opcr %" G_GUINT64_FORMAT " (%" GST_TIME_FORMAT ")",
416         opcr, GST_TIME_ARGS (PCRTIME_TO_GSTTIME (opcr)));
417   }
418 
419   if (afcflags & MPEGTS_AFC_SPLICING_POINT_FLAG) {
420     GST_DEBUG ("splice_countdown: %u", *data++);
421   }
422 
423   if (afcflags & MPEGTS_AFC_TRANSPORT_PRIVATE_DATA_FLAG) {
424     guint8 len = *data++;
425     GST_MEMDUMP ("private data", data, len);
426     data += len;
427   }
428 
429   if (afcflags & MPEGTS_AFC_EXTENSION_FLAG) {
430     guint8 extlen = *data++;
431     guint8 flags = *data++;
432     GST_DEBUG ("extension size:%d flags : %s%s%s", extlen,
433         flags & 0x80 ? "ltw " : "",
434         flags & 0x40 ? "piecewise_rate " : "",
435         flags & 0x20 ? "seamless_splice " : "");
436     if (flags & 0x80) {
437       GST_DEBUG ("legal time window: valid_flag:%d offset:%d", *data >> 7,
438           GST_READ_UINT16_BE (data) & 0x7fff);
439       data += 2;
440     }
441   }
442 #endif
443 
444   return TRUE;
445 }
446 
447 static MpegTSPacketizerPacketReturn
mpegts_packetizer_parse_packet(MpegTSPacketizer2 * packetizer,MpegTSPacketizerPacket * packet)448 mpegts_packetizer_parse_packet (MpegTSPacketizer2 * packetizer,
449     MpegTSPacketizerPacket * packet)
450 {
451   guint8 *data;
452   guint8 tmp;
453 
454   data = packet->data_start;
455   data += 1;
456   tmp = *data;
457 
458   /* transport_error_indicator 1 */
459   if (G_UNLIKELY (tmp & 0x80))
460     return PACKET_BAD;
461 
462   /* payload_unit_start_indicator 1 */
463   packet->payload_unit_start_indicator = tmp & 0x40;
464 
465   /* transport_priority 1 */
466   /* PID 13 */
467   packet->pid = GST_READ_UINT16_BE (data) & 0x1FFF;
468   data += 2;
469 
470   packet->scram_afc_cc = tmp = *data++;
471   /* transport_scrambling_control 2 */
472   if (G_UNLIKELY (tmp & 0xc0))
473     return PACKET_BAD;
474 
475   packet->data = data;
476 
477   packet->afc_flags = 0;
478   packet->pcr = G_MAXUINT64;
479 
480   if (FLAGS_HAS_AFC (tmp)) {
481     if (!mpegts_packetizer_parse_adaptation_field_control (packetizer, packet))
482       return FALSE;
483   }
484 
485   if (FLAGS_HAS_PAYLOAD (tmp))
486     packet->payload = packet->data;
487   else
488     packet->payload = NULL;
489 
490   return PACKET_OK;
491 }
492 
493 static GstMpegtsSection *
mpegts_packetizer_parse_section_header(MpegTSPacketizer2 * packetizer,MpegTSPacketizerStream * stream)494 mpegts_packetizer_parse_section_header (MpegTSPacketizer2 * packetizer,
495     MpegTSPacketizerStream * stream)
496 {
497   MpegTSPacketizerStreamSubtable *subtable;
498   GstMpegtsSection *res;
499 
500   subtable =
501       find_subtable (stream->subtables, stream->table_id,
502       stream->subtable_extension);
503   if (subtable) {
504     GST_DEBUG ("Found previous subtable_extension:0x%04x",
505         stream->subtable_extension);
506     if (G_UNLIKELY (stream->version_number != subtable->version_number)) {
507       /* If the version number changed, reset the subtable */
508       subtable->version_number = stream->version_number;
509       subtable->last_section_number = stream->last_section_number;
510       memset (subtable->seen_section, 0, 32);
511     }
512   } else {
513     GST_DEBUG ("Appending new subtable_extension: 0x%04x",
514         stream->subtable_extension);
515     subtable = mpegts_packetizer_stream_subtable_new (stream->table_id,
516         stream->subtable_extension, stream->last_section_number);
517     subtable->version_number = stream->version_number;
518 
519     stream->subtables = g_slist_prepend (stream->subtables, subtable);
520   }
521 
522   GST_MEMDUMP ("Full section data", stream->section_data,
523       stream->section_length);
524   /* TODO ? : Replace this by an efficient version (where we provide all
525    * pre-parsed header data) */
526   res =
527       gst_mpegts_section_new (stream->pid, stream->section_data,
528       stream->section_length);
529   stream->section_data = NULL;
530   mpegts_packetizer_clear_section (stream);
531 
532   if (res) {
533     /* NOTE : Due to the new mpegts-si system, There is a insanely low probability
534      * that we might have gotten a section that was corrupted (i.e. wrong crc)
535      * and that we consider it as seen.
536      *
537      * The reason why we consider this as acceptable is because all the previous
538      * checks were already done:
539      * * transport layer checks (DVB)
540      * * 0x47 validation
541      * * continuity counter validation
542      * * subtable validation
543      * * section_number validation
544      * * section_length validation
545      *
546      * The probability of this happening vs the overhead of doing CRC checks
547      * on all sections (including those we would not use) is just not worth it.
548      * */
549     MPEGTS_BIT_SET (subtable->seen_section, stream->section_number);
550     res->offset = stream->offset;
551   }
552 
553   return res;
554 }
555 
556 void
mpegts_packetizer_clear(MpegTSPacketizer2 * packetizer)557 mpegts_packetizer_clear (MpegTSPacketizer2 * packetizer)
558 {
559   guint i;
560   MpegTSPCR *pcrtable;
561 
562   packetizer->packet_size = 0;
563 
564   if (packetizer->streams) {
565     int i;
566     for (i = 0; i < 8192; i++) {
567       if (packetizer->streams[i]) {
568         mpegts_packetizer_stream_free (packetizer->streams[i]);
569       }
570     }
571     memset (packetizer->streams, 0, 8192 * sizeof (MpegTSPacketizerStream *));
572   }
573 
574   gst_adapter_clear (packetizer->adapter);
575   packetizer->offset = 0;
576   packetizer->empty = TRUE;
577   packetizer->need_sync = FALSE;
578   packetizer->map_data = NULL;
579   packetizer->map_size = 0;
580   packetizer->map_offset = 0;
581   packetizer->last_in_time = GST_CLOCK_TIME_NONE;
582 
583   pcrtable = packetizer->observations[packetizer->pcrtablelut[0x1fff]];
584   if (pcrtable)
585     pcrtable->base_time = GST_CLOCK_TIME_NONE;
586 
587   /* Close current PCR group */
588   PACKETIZER_GROUP_LOCK (packetizer);
589 
590   for (i = 0; i < MAX_PCR_OBS_CHANNELS; i++) {
591     if (packetizer->observations[i])
592       _close_current_group (packetizer->observations[i]);
593     else
594       break;
595   }
596   PACKETIZER_GROUP_UNLOCK (packetizer);
597 }
598 
599 void
mpegts_packetizer_flush(MpegTSPacketizer2 * packetizer,gboolean hard)600 mpegts_packetizer_flush (MpegTSPacketizer2 * packetizer, gboolean hard)
601 {
602   guint i;
603   MpegTSPCR *pcrtable;
604   GST_DEBUG ("Flushing");
605 
606   if (packetizer->streams) {
607     for (i = 0; i < 8192; i++) {
608       if (packetizer->streams[i]) {
609         mpegts_packetizer_clear_section (packetizer->streams[i]);
610       }
611     }
612   }
613   gst_adapter_clear (packetizer->adapter);
614 
615   packetizer->offset = 0;
616   packetizer->empty = TRUE;
617   packetizer->need_sync = FALSE;
618   packetizer->map_data = NULL;
619   packetizer->map_size = 0;
620   packetizer->map_offset = 0;
621   packetizer->last_in_time = GST_CLOCK_TIME_NONE;
622 
623   pcrtable = packetizer->observations[packetizer->pcrtablelut[0x1fff]];
624   if (pcrtable)
625     pcrtable->base_time = GST_CLOCK_TIME_NONE;
626 
627   /* Close current PCR group */
628   PACKETIZER_GROUP_LOCK (packetizer);
629   for (i = 0; i < MAX_PCR_OBS_CHANNELS; i++) {
630     if (packetizer->observations[i])
631       _close_current_group (packetizer->observations[i]);
632     else
633       break;
634   }
635   PACKETIZER_GROUP_UNLOCK (packetizer);
636 
637   if (hard) {
638     /* For pull mode seeks in tsdemux the observation must be preserved */
639     flush_observations (packetizer);
640   }
641 }
642 
643 void
mpegts_packetizer_remove_stream(MpegTSPacketizer2 * packetizer,gint16 pid)644 mpegts_packetizer_remove_stream (MpegTSPacketizer2 * packetizer, gint16 pid)
645 {
646   MpegTSPacketizerStream *stream = packetizer->streams[pid];
647   if (stream) {
648     GST_INFO ("Removing stream for PID 0x%04x", pid);
649     mpegts_packetizer_stream_free (stream);
650     packetizer->streams[pid] = NULL;
651   }
652 }
653 
654 MpegTSPacketizer2 *
mpegts_packetizer_new(void)655 mpegts_packetizer_new (void)
656 {
657   MpegTSPacketizer2 *packetizer;
658 
659   packetizer =
660       GST_MPEGTS_PACKETIZER (g_object_new (GST_TYPE_MPEGTS_PACKETIZER, NULL));
661 
662   return packetizer;
663 }
664 
665 void
mpegts_packetizer_push(MpegTSPacketizer2 * packetizer,GstBuffer * buffer)666 mpegts_packetizer_push (MpegTSPacketizer2 * packetizer, GstBuffer * buffer)
667 {
668   GstClockTime ts;
669   if (G_UNLIKELY (packetizer->empty)) {
670     packetizer->empty = FALSE;
671     packetizer->offset = GST_BUFFER_OFFSET (buffer);
672   }
673 
674   GST_DEBUG ("Pushing %" G_GSIZE_FORMAT " byte from offset %"
675       G_GUINT64_FORMAT, gst_buffer_get_size (buffer),
676       GST_BUFFER_OFFSET (buffer));
677   gst_adapter_push (packetizer->adapter, buffer);
678   /* If the buffer has a valid timestamp, store it - preferring DTS,
679    * which is where upstream arrival times should be stored */
680   ts = GST_BUFFER_DTS_OR_PTS (buffer);
681   if (GST_CLOCK_TIME_IS_VALID (ts))
682     packetizer->last_in_time = ts;
683 }
684 
685 static void
mpegts_packetizer_flush_bytes(MpegTSPacketizer2 * packetizer,gsize size)686 mpegts_packetizer_flush_bytes (MpegTSPacketizer2 * packetizer, gsize size)
687 {
688   if (size > 0) {
689     GST_LOG ("flushing %" G_GSIZE_FORMAT " bytes from adapter", size);
690     gst_adapter_flush (packetizer->adapter, size);
691   }
692 
693   packetizer->map_data = NULL;
694   packetizer->map_size = 0;
695   packetizer->map_offset = 0;
696 }
697 
698 static gboolean
mpegts_packetizer_map(MpegTSPacketizer2 * packetizer,gsize size)699 mpegts_packetizer_map (MpegTSPacketizer2 * packetizer, gsize size)
700 {
701   gsize available;
702 
703   if (packetizer->map_size - packetizer->map_offset >= size)
704     return TRUE;
705 
706   mpegts_packetizer_flush_bytes (packetizer, packetizer->map_offset);
707 
708   available = gst_adapter_available (packetizer->adapter);
709   if (available < size)
710     return FALSE;
711 
712   packetizer->map_data =
713       (guint8 *) gst_adapter_map (packetizer->adapter, available);
714   if (!packetizer->map_data)
715     return FALSE;
716 
717   packetizer->map_size = available;
718   packetizer->map_offset = 0;
719 
720   GST_LOG ("mapped %" G_GSIZE_FORMAT " bytes from adapter", available);
721 
722   return TRUE;
723 }
724 
725 static gboolean
mpegts_try_discover_packet_size(MpegTSPacketizer2 * packetizer)726 mpegts_try_discover_packet_size (MpegTSPacketizer2 * packetizer)
727 {
728   guint8 *data;
729   gsize size, i, j;
730 
731   static const guint psizes[] = {
732     MPEGTS_NORMAL_PACKETSIZE,
733     MPEGTS_M2TS_PACKETSIZE,
734     MPEGTS_DVB_ASI_PACKETSIZE,
735     MPEGTS_ATSC_PACKETSIZE
736   };
737 
738   if (!mpegts_packetizer_map (packetizer, 4 * MPEGTS_MAX_PACKETSIZE))
739     return FALSE;
740 
741   size = packetizer->map_size - packetizer->map_offset;
742   data = packetizer->map_data + packetizer->map_offset;
743 
744   for (i = 0; i + 3 * MPEGTS_MAX_PACKETSIZE < size; i++) {
745     /* find a sync byte */
746     if (data[i] != PACKET_SYNC_BYTE)
747       continue;
748 
749     /* check for 4 consecutive sync bytes with each possible packet size */
750     for (j = 0; j < G_N_ELEMENTS (psizes); j++) {
751       guint packet_size = psizes[j];
752 
753       if (data[i + packet_size] == PACKET_SYNC_BYTE &&
754           data[i + 2 * packet_size] == PACKET_SYNC_BYTE &&
755           data[i + 3 * packet_size] == PACKET_SYNC_BYTE) {
756         packetizer->packet_size = packet_size;
757         goto out;
758       }
759     }
760   }
761 
762 out:
763   packetizer->map_offset += i;
764 
765   if (packetizer->packet_size == 0) {
766     GST_DEBUG ("Could not determine packet size in %" G_GSIZE_FORMAT
767         " bytes buffer, flush %" G_GSIZE_FORMAT " bytes", size, i);
768     mpegts_packetizer_flush_bytes (packetizer, packetizer->map_offset);
769     return FALSE;
770   }
771 
772   GST_INFO ("have packetsize detected: %u bytes", packetizer->packet_size);
773 
774   if (packetizer->packet_size == MPEGTS_M2TS_PACKETSIZE &&
775       packetizer->map_offset >= 4)
776     packetizer->map_offset -= 4;
777 
778   return TRUE;
779 }
780 
781 static gboolean
mpegts_packetizer_sync(MpegTSPacketizer2 * packetizer)782 mpegts_packetizer_sync (MpegTSPacketizer2 * packetizer)
783 {
784   gboolean found = FALSE;
785   guint8 *data;
786   guint packet_size;
787   gsize size, sync_offset, i;
788 
789   packet_size = packetizer->packet_size;
790 
791   if (!mpegts_packetizer_map (packetizer, 3 * packet_size))
792     return FALSE;
793 
794   size = packetizer->map_size - packetizer->map_offset;
795   data = packetizer->map_data + packetizer->map_offset;
796 
797   if (packet_size == MPEGTS_M2TS_PACKETSIZE)
798     sync_offset = 4;
799   else
800     sync_offset = 0;
801 
802   for (i = sync_offset; i + 2 * packet_size < size; i++) {
803     if (data[i] == PACKET_SYNC_BYTE &&
804         data[i + packet_size] == PACKET_SYNC_BYTE &&
805         data[i + 2 * packet_size] == PACKET_SYNC_BYTE) {
806       found = TRUE;
807       break;
808     }
809   }
810 
811   packetizer->map_offset += i - sync_offset;
812 
813   if (!found)
814     mpegts_packetizer_flush_bytes (packetizer, packetizer->map_offset);
815 
816   return found;
817 }
818 
819 MpegTSPacketizerPacketReturn
mpegts_packetizer_next_packet(MpegTSPacketizer2 * packetizer,MpegTSPacketizerPacket * packet)820 mpegts_packetizer_next_packet (MpegTSPacketizer2 * packetizer,
821     MpegTSPacketizerPacket * packet)
822 {
823   guint8 *packet_data;
824   guint packet_size;
825   gsize sync_offset;
826 
827   packet_size = packetizer->packet_size;
828   if (G_UNLIKELY (!packet_size)) {
829     if (!mpegts_try_discover_packet_size (packetizer))
830       return PACKET_NEED_MORE;
831     packet_size = packetizer->packet_size;
832   }
833 
834   /* M2TS packets don't start with the sync byte, all other variants do */
835   if (packet_size == MPEGTS_M2TS_PACKETSIZE)
836     sync_offset = 4;
837   else
838     sync_offset = 0;
839 
840   while (1) {
841     if (packetizer->need_sync) {
842       if (!mpegts_packetizer_sync (packetizer))
843         return PACKET_NEED_MORE;
844       packetizer->need_sync = FALSE;
845     }
846 
847     if (!mpegts_packetizer_map (packetizer, packet_size))
848       return PACKET_NEED_MORE;
849 
850     packet_data = &packetizer->map_data[packetizer->map_offset + sync_offset];
851 
852     /* Check sync byte */
853     if (G_UNLIKELY (*packet_data != PACKET_SYNC_BYTE)) {
854       GST_DEBUG ("lost sync");
855       packetizer->need_sync = TRUE;
856     } else {
857       /* ALL mpeg-ts variants contain 188 bytes of data. Those with bigger
858        * packet sizes contain either extra data (timesync, FEC, ..) either
859        * before or after the data */
860       packet->data_start = packet_data;
861       packet->data_end = packet->data_start + 188;
862       packet->offset = packetizer->offset;
863       GST_LOG ("offset %" G_GUINT64_FORMAT, packet->offset);
864       packetizer->offset += packet_size;
865       GST_MEMDUMP ("data_start", packet->data_start, 16);
866 
867       return mpegts_packetizer_parse_packet (packetizer, packet);
868     }
869   }
870 }
871 
872 MpegTSPacketizerPacketReturn
mpegts_packetizer_process_next_packet(MpegTSPacketizer2 * packetizer)873 mpegts_packetizer_process_next_packet (MpegTSPacketizer2 * packetizer)
874 {
875   MpegTSPacketizerPacket packet;
876   MpegTSPacketizerPacketReturn ret;
877 
878   ret = mpegts_packetizer_next_packet (packetizer, &packet);
879   if (ret != PACKET_NEED_MORE)
880     mpegts_packetizer_clear_packet (packetizer, &packet);
881 
882   return ret;
883 }
884 
885 void
mpegts_packetizer_clear_packet(MpegTSPacketizer2 * packetizer,MpegTSPacketizerPacket * packet)886 mpegts_packetizer_clear_packet (MpegTSPacketizer2 * packetizer,
887     MpegTSPacketizerPacket * packet)
888 {
889   guint8 packet_size = packetizer->packet_size;
890 
891   if (packetizer->map_data) {
892     packetizer->map_offset += packet_size;
893     if (packetizer->map_size - packetizer->map_offset < packet_size)
894       mpegts_packetizer_flush_bytes (packetizer, packetizer->map_offset);
895   }
896 }
897 
898 gboolean
mpegts_packetizer_has_packets(MpegTSPacketizer2 * packetizer)899 mpegts_packetizer_has_packets (MpegTSPacketizer2 * packetizer)
900 {
901   if (G_UNLIKELY (!packetizer->packet_size)) {
902     if (!mpegts_try_discover_packet_size (packetizer))
903       return FALSE;
904   }
905   return gst_adapter_available (packetizer->adapter) >= packetizer->packet_size;
906 }
907 
908 /*
909  * Ideally it should just return a section if:
910  * * The section is complete
911  * * The section is valid (sanity checks for length for example)
912  * * The section applies now (current_next_indicator)
913  * * The section is an update or was never seen
914  *
915  * The section should be a new GstMpegtsSection:
916  * * properly initialized
917  * * With pid, table_id AND section_type set (move logic from mpegtsbase)
918  * * With data copied into it (yes, minor overhead)
919  *
920  * In all other cases it should just return NULL
921  *
922  * If more than one section is available, the 'remaining' field will
923  * be set to the beginning of a valid GList containing other sections.
924  * */
925 GstMpegtsSection *
mpegts_packetizer_push_section(MpegTSPacketizer2 * packetizer,MpegTSPacketizerPacket * packet,GList ** remaining)926 mpegts_packetizer_push_section (MpegTSPacketizer2 * packetizer,
927     MpegTSPacketizerPacket * packet, GList ** remaining)
928 {
929   GstMpegtsSection *section;
930   GstMpegtsSection *res = NULL;
931   MpegTSPacketizerStream *stream;
932   gboolean long_packet;
933   guint8 pointer = 0, table_id;
934   guint16 subtable_extension;
935   gsize to_read;
936   guint section_length;
937   /* data points to the current read location
938    * data_start points to the beginning of the data to accumulate */
939   guint8 *data, *data_start;
940   guint8 packet_cc;
941   GList *others = NULL;
942   guint8 version_number, section_number, last_section_number;
943 
944   data = packet->data;
945   packet_cc = FLAGS_CONTINUITY_COUNTER (packet->scram_afc_cc);
946 
947   /* Get our filter */
948   stream = packetizer->streams[packet->pid];
949   if (G_UNLIKELY (stream == NULL)) {
950     if (!packet->payload_unit_start_indicator) {
951       /* Early exit (we need to start with a section start) */
952       GST_DEBUG ("PID 0x%04x  waiting for section start", packet->pid);
953       goto out;
954     }
955     stream = mpegts_packetizer_stream_new (packet->pid);
956     packetizer->streams[packet->pid] = stream;
957   }
958 
959   GST_MEMDUMP ("Full packet data", packet->data,
960       packet->data_end - packet->data);
961 
962   /* This function is split into several parts:
963    *
964    * Pre checks (packet-wide). Determines where we go next
965    * accumulate_data: store data and check if section is complete
966    * section_start: handle beginning of a section, if needed loop back to
967    *                accumulate_data
968    *
969    * The trigger that makes the loop stop and return is if:
970    * 1) We do not have enough data for the current packet
971    * 2) There is remaining data after a packet which is only made
972    *    of stuffing bytes (0xff).
973    *
974    * Pre-loop checks, related to the whole incoming packet:
975    *
976    * If there is a CC-discont:
977    *  If it is a PUSI, skip the pointer and handle section_start
978    *  If not a PUSI, reset and return nothing
979    * If there is not a CC-discont:
980    *  If it is a PUSI
981    *    If pointer, accumulate that data and check for complete section
982    *    (loop)
983    *  If it is not a PUSI
984    *    Accumulate the expected data and check for complete section
985    *    (loop)
986    *
987    **/
988 
989   if (packet->payload_unit_start_indicator) {
990     pointer = *data++;
991     /* If the pointer is zero, we're guaranteed to be able to handle it */
992     if (pointer == 0) {
993       GST_LOG
994           ("PID 0x%04x PUSI and pointer == 0, skipping straight to section_start parsing",
995           packet->pid);
996       mpegts_packetizer_clear_section (stream);
997       goto section_start;
998     }
999   }
1000 
1001   if (stream->continuity_counter == CONTINUITY_UNSET ||
1002       (stream->continuity_counter + 1) % 16 != packet_cc) {
1003     if (stream->continuity_counter != CONTINUITY_UNSET)
1004       GST_WARNING ("PID 0x%04x section discontinuity (%d vs %d)", packet->pid,
1005           stream->continuity_counter, packet_cc);
1006     mpegts_packetizer_clear_section (stream);
1007     /* If not a PUSI, not much we can do */
1008     if (!packet->payload_unit_start_indicator) {
1009       GST_LOG ("PID 0x%04x continuity discont/unset and not PUSI, bailing out",
1010           packet->pid);
1011       goto out;
1012     }
1013     /* If PUSI, skip pointer data and carry on to section start */
1014     data += pointer;
1015     pointer = 0;
1016     GST_LOG ("discont, but PUSI, skipped %d bytes and doing section start",
1017         pointer);
1018     goto section_start;
1019   }
1020 
1021   GST_LOG ("Accumulating data from beginning of packet");
1022 
1023   data_start = data;
1024 
1025 accumulate_data:
1026   /* If not the beginning of a new section, accumulate what we have */
1027   stream->continuity_counter = packet_cc;
1028   to_read = MIN (stream->section_length - stream->section_offset,
1029       packet->data_end - data_start);
1030   memcpy (stream->section_data + stream->section_offset, data_start, to_read);
1031   stream->section_offset += to_read;
1032   /* Point data to after the data we accumulated */
1033   data = data_start + to_read;
1034   GST_DEBUG ("Appending data (need %d, have %d)", stream->section_length,
1035       stream->section_offset);
1036 
1037   /* Check if we have enough */
1038   if (stream->section_offset < stream->section_length) {
1039     GST_DEBUG ("PID 0x%04x, section not complete (Got %d, need %d)",
1040         stream->pid, stream->section_offset, stream->section_length);
1041     goto out;
1042   }
1043 
1044   /* Small sanity check. We should have collected *exactly* the right amount */
1045   if (G_UNLIKELY (stream->section_offset != stream->section_length))
1046     GST_WARNING ("PID 0x%04x Accumulated too much data (%d vs %d) !",
1047         stream->pid, stream->section_offset, stream->section_length);
1048   GST_DEBUG ("PID 0x%04x Section complete", stream->pid);
1049 
1050   if ((section = mpegts_packetizer_parse_section_header (packetizer, stream))) {
1051     if (res)
1052       others = g_list_append (others, section);
1053     else
1054       res = section;
1055   }
1056 
1057 section_start:
1058   subtable_extension = 0;
1059   version_number = 0;
1060   last_section_number = 0;
1061   section_number = 0;
1062   table_id = 0;
1063 
1064   /* FIXME : We need at least 3 bytes (or 8 for long packets) with current algorithm :(
1065    * We might end up losing sections that start across two packets (srsl...) */
1066   if (data > packet->data_end - 3 || *data == 0xff) {
1067     /* flush stuffing bytes and leave */
1068     mpegts_packetizer_clear_section (stream);
1069     goto out;
1070   }
1071 
1072   /* We have more data to process ... */
1073   GST_DEBUG ("PID 0x%04x, More section present in packet (remaining bytes:%"
1074       G_GSIZE_FORMAT ")", stream->pid, (gsize) (packet->data_end - data));
1075   GST_MEMDUMP ("section_start", data, packet->data_end - data);
1076   data_start = data;
1077   /* Beginning of a new section */
1078   /*
1079    * section_syntax_indicator means that the header is of the following format:
1080    * * table_id (8bit)
1081    * * section_syntax_indicator (1bit) == 0
1082    * * reserved/private fields (3bit)
1083    * * section_length (12bit)
1084    * * data (of size section_length)
1085    * * NO CRC !
1086    */
1087   long_packet = data[1] & 0x80;
1088 
1089   /* Fast path for short packets */
1090   if (!long_packet) {
1091     /* We can create the section now (function will check for size) */
1092     GST_DEBUG ("Short packet");
1093     section_length = (GST_READ_UINT16_BE (data + 1) & 0xfff) + 3;
1094     /* Only do fast-path if we have enough byte */
1095     if (section_length < packet->data_end - data) {
1096       if ((section =
1097               gst_mpegts_section_new (packet->pid, g_memdup (data,
1098                       section_length), section_length))) {
1099         GST_DEBUG ("PID 0x%04x Short section complete !", packet->pid);
1100         section->offset = packet->offset;
1101         if (res)
1102           others = g_list_append (others, section);
1103         else
1104           res = section;
1105       }
1106       /* Advance reader and potentially read another section */
1107       data += section_length;
1108       if (data < packet->data_end && *data != 0xff)
1109         goto section_start;
1110       /* If not, exit */
1111       goto out;
1112     }
1113     /* We don't have enough bytes to do short section shortcut */
1114   }
1115 
1116   /* Beginning of a new section, do as much pre-parsing as possible */
1117   /* table_id                        : 8  bit */
1118   table_id = *data++;
1119 
1120   /* section_syntax_indicator        : 1  bit
1121    * other_fields (reserved)         : 3  bit
1122    * section_length                  : 12 bit */
1123   section_length = (GST_READ_UINT16_BE (data) & 0x0FFF) + 3;
1124   data += 2;
1125 
1126   if (long_packet) {
1127     /* Do we have enough data for a long packet? */
1128     if (data > packet->data_end - 5)
1129       goto out;
1130 
1131     /* subtable_extension (always present, we are in a long section) */
1132     /* subtable extension              : 16 bit */
1133     subtable_extension = GST_READ_UINT16_BE (data);
1134     data += 2;
1135 
1136     /* reserved                      : 2  bit
1137      * version_number                : 5  bit
1138      * current_next_indicator        : 1  bit */
1139     /* Bail out now if current_next_indicator == 0 */
1140     if (G_UNLIKELY (!(*data & 0x01))) {
1141       GST_DEBUG
1142           ("PID 0x%04x table_id 0x%02x section does not apply (current_next_indicator == 0)",
1143           packet->pid, table_id);
1144       goto out;
1145     }
1146 
1147     version_number = *data++ >> 1 & 0x1f;
1148     /* section_number                : 8  bit */
1149     section_number = *data++;
1150     /* last_section_number                : 8  bit */
1151     last_section_number = *data++;
1152   } else {
1153     subtable_extension = 0;
1154     version_number = 0;
1155     section_number = 0;
1156     last_section_number = 0;
1157   }
1158   GST_DEBUG
1159       ("PID 0x%04x length:%d table_id:0x%02x subtable_extension:0x%04x version_number:%d section_number:%d(last:%d)",
1160       packet->pid, section_length, table_id, subtable_extension, version_number,
1161       section_number, last_section_number);
1162 
1163   to_read = MIN (section_length, packet->data_end - data_start);
1164 
1165   /* Check as early as possible whether we already saw this section
1166    * i.e. that we saw a subtable with:
1167    * * same subtable_extension (might be zero)
1168    * * same version_number
1169    * * same last_section_number
1170    * * same section_number was seen
1171    */
1172   if (seen_section_before (stream, table_id, subtable_extension,
1173           version_number, section_number, last_section_number)) {
1174     GST_DEBUG
1175         ("PID 0x%04x Already processed table_id:0x%02x subtable_extension:0x%04x, version_number:%d, section_number:%d",
1176         packet->pid, table_id, subtable_extension, version_number,
1177         section_number);
1178     /* skip data and see if we have more sections after */
1179     data = data_start + to_read;
1180     if (data == packet->data_end || *data == 0xff)
1181       goto out;
1182     goto section_start;
1183   }
1184   if (G_UNLIKELY (section_number > last_section_number)) {
1185     GST_WARNING
1186         ("PID 0x%04x corrupted packet (section_number:%d > last_section_number:%d)",
1187         packet->pid, section_number, last_section_number);
1188     goto out;
1189   }
1190 
1191 
1192   /* Copy over already parsed values */
1193   stream->table_id = table_id;
1194   stream->section_length = section_length;
1195   stream->version_number = version_number;
1196   stream->subtable_extension = subtable_extension;
1197   stream->section_number = section_number;
1198   stream->last_section_number = last_section_number;
1199   stream->offset = packet->offset;
1200 
1201   /* Create enough room to store chunks of sections */
1202   stream->section_data = g_malloc (stream->section_length);
1203   stream->section_offset = 0;
1204 
1205   /* Finally, accumulate and check if we parsed enough */
1206   goto accumulate_data;
1207 
1208 out:
1209   packet->data = data;
1210   *remaining = others;
1211 
1212   GST_DEBUG ("result: %p", res);
1213 
1214   return res;
1215 }
1216 
1217 static void
_init_local(void)1218 _init_local (void)
1219 {
1220   GST_DEBUG_CATEGORY_INIT (mpegts_packetizer_debug, "mpegtspacketizer", 0,
1221       "MPEG transport stream parser");
1222 }
1223 
1224 
1225 static void
mpegts_packetizer_resync(MpegTSPCR * pcr,GstClockTime time,GstClockTime gstpcrtime,gboolean reset_skew)1226 mpegts_packetizer_resync (MpegTSPCR * pcr, GstClockTime time,
1227     GstClockTime gstpcrtime, gboolean reset_skew)
1228 {
1229   pcr->base_time = time;
1230   pcr->base_pcrtime = gstpcrtime;
1231   pcr->prev_out_time = GST_CLOCK_TIME_NONE;
1232   pcr->prev_send_diff = GST_CLOCK_TIME_NONE;
1233   if (reset_skew) {
1234     pcr->window_filling = TRUE;
1235     pcr->window_pos = 0;
1236     pcr->window_min = 0;
1237     pcr->window_size = 0;
1238     pcr->skew = 0;
1239   }
1240 }
1241 
1242 
1243 /* Code mostly copied from -good/gst/rtpmanager/rtpjitterbuffer.c */
1244 
1245 /* For the clock skew we use a windowed low point averaging algorithm as can be
1246  * found in Fober, Orlarey and Letz, 2005, "Real Time Clock Skew Estimation
1247  * over Network Delays":
1248  * http://www.grame.fr/Ressources/pub/TR-050601.pdf
1249  * http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.102.1546
1250  *
1251  * The idea is that the jitter is composed of:
1252  *
1253  *  J = N + n
1254  *
1255  *   D   : a constant network delay.
1256  *   n   : random added noise. The noise is concentrated around 0
1257  *
1258  * In the receiver we can track the elapsed time at the sender with:
1259  *
1260  *  send_diff(i) = (Tsi - Ts0);
1261  *
1262  *   Tsi : The time at the sender at packet i
1263  *   Ts0 : The time at the sender at the first packet
1264  *
1265  * This is the difference between the RTP timestamp in the first received packet
1266  * and the current packet.
1267  *
1268  * At the receiver we have to deal with the jitter introduced by the network.
1269  *
1270  *  recv_diff(i) = (Tri - Tr0)
1271  *
1272  *   Tri : The time at the receiver at packet i
1273  *   Tr0 : The time at the receiver at the first packet
1274  *
1275  * Both of these values contain a jitter Ji, a jitter for packet i, so we can
1276  * write:
1277  *
1278  *  recv_diff(i) = (Cri + D + ni) - (Cr0 + D + n0))
1279  *
1280  *    Cri    : The time of the clock at the receiver for packet i
1281  *    D + ni : The jitter when receiving packet i
1282  *
1283  * We see that the network delay is irrelevant here as we can elliminate D:
1284  *
1285  *  recv_diff(i) = (Cri + ni) - (Cr0 + n0))
1286  *
1287  * The drift is now expressed as:
1288  *
1289  *  Drift(i) = recv_diff(i) - send_diff(i);
1290  *
1291  * We now keep the W latest values of Drift and find the minimum (this is the
1292  * one with the lowest network jitter and thus the one which is least affected
1293  * by it). We average this lowest value to smooth out the resulting network skew.
1294  *
1295  * Both the window and the weighting used for averaging influence the accuracy
1296  * of the drift estimation. Finding the correct parameters turns out to be a
1297  * compromise between accuracy and inertia.
1298  *
1299  * We use a 2 second window or up to 512 data points, which is statistically big
1300  * enough to catch spikes (FIXME, detect spikes).
1301  * We also use a rather large weighting factor (125) to smoothly adapt. During
1302  * startup, when filling the window, we use a parabolic weighting factor, the
1303  * more the window is filled, the faster we move to the detected possible skew.
1304  *
1305  * Returns: @time adjusted with the clock skew.
1306  */
1307 static GstClockTime
calculate_skew(MpegTSPacketizer2 * packetizer,MpegTSPCR * pcr,guint64 pcrtime,GstClockTime time)1308 calculate_skew (MpegTSPacketizer2 * packetizer,
1309     MpegTSPCR * pcr, guint64 pcrtime, GstClockTime time)
1310 {
1311   guint64 send_diff, recv_diff;
1312   gint64 delta;
1313   gint64 old;
1314   gint pos, i;
1315   GstClockTime gstpcrtime, out_time;
1316 #ifndef GST_DISABLE_GST_DEBUG
1317   guint64 slope;
1318 #endif
1319 
1320   gstpcrtime = PCRTIME_TO_GSTTIME (pcrtime) + pcr->pcroffset;
1321 
1322   /* first time, lock on to time and gstpcrtime */
1323   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (pcr->base_time))) {
1324     pcr->base_time = time;
1325     pcr->prev_out_time = GST_CLOCK_TIME_NONE;
1326     GST_DEBUG ("Taking new base time %" GST_TIME_FORMAT, GST_TIME_ARGS (time));
1327   }
1328 
1329   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (pcr->base_pcrtime))) {
1330     pcr->base_pcrtime = gstpcrtime;
1331     pcr->prev_send_diff = -1;
1332     GST_DEBUG ("Taking new base pcrtime %" GST_TIME_FORMAT,
1333         GST_TIME_ARGS (gstpcrtime));
1334   }
1335 
1336   /* Handle PCR wraparound and resets */
1337   if (GST_CLOCK_TIME_IS_VALID (pcr->last_pcrtime) &&
1338       gstpcrtime < pcr->last_pcrtime) {
1339     if (pcr->last_pcrtime - gstpcrtime > PCR_GST_MAX_VALUE / 2) {
1340       /* PCR wraparound */
1341       GST_DEBUG ("PCR wrap");
1342       pcr->pcroffset += PCR_GST_MAX_VALUE;
1343       gstpcrtime = PCRTIME_TO_GSTTIME (pcrtime) + pcr->pcroffset;
1344       send_diff = gstpcrtime - pcr->base_pcrtime;
1345     } else if (GST_CLOCK_TIME_IS_VALID (time)
1346         && pcr->last_pcrtime - gstpcrtime > 15 * GST_SECOND) {
1347       /* Time jumped backward by > 15 seconds, and we have a timestamp
1348        * to use to close the discont. Assume a reset */
1349       GST_DEBUG ("PCR reset");
1350       /* Calculate PCR we would have expected for the given input time,
1351        * essentially applying the reverse correction process
1352        *
1353        * We want to find the PCR offset to apply
1354        *   pcroffset = (corrected) gstpcrtime - (received) gstpcrtime
1355        *
1356        * send_diff = (corrected) gstpcrtime - pcr->base_pcrtime
1357        * recv_diff = time - pcr->base_time
1358        * out_time = pcr->base_time + send_diff
1359        *
1360        * We are assuming that send_diff == recv_diff
1361        *   (corrected) gstpcrtime - pcr->base_pcrtime = time - pcr->base_time
1362        * Giving us:
1363        *   (corrected) gstpcrtime = time - pcr->base_time + pcr->base_pcrtime
1364        *
1365        * And therefore:
1366        *   pcroffset = time - pcr->base_time + pcr->base_pcrtime - (received) gstpcrtime
1367        **/
1368       pcr->pcroffset += time - pcr->base_time + pcr->base_pcrtime - gstpcrtime;
1369       gstpcrtime = PCRTIME_TO_GSTTIME (pcrtime) + pcr->pcroffset;
1370       send_diff = gstpcrtime - pcr->base_pcrtime;
1371       GST_DEBUG ("Introduced offset is now %" GST_TIME_FORMAT
1372           " corrected pcr time %" GST_TIME_FORMAT,
1373           GST_TIME_ARGS (pcr->pcroffset), GST_TIME_ARGS (gstpcrtime));
1374     } else {
1375       /* Small jumps backward, assume some arrival jitter and skip it */
1376       send_diff = 0;
1377 
1378       if (pcr->last_pcrtime - gstpcrtime < GST_SECOND) {
1379         GST_WARNING
1380             ("(small) backward timestamps at server or no buffer timestamps. Ignoring.");
1381         /* This will trigger the no_skew logic before but leave other state
1382          * intact */
1383         time = GST_CLOCK_TIME_NONE;
1384       } else {
1385         /* A bigger backward step than packet out-of-order can account for. Reset base PCR time
1386          * to be resynched the next time we see a PCR */
1387         GST_WARNING
1388             ("backward timestamps at server or no buffer timestamps. Resync base PCR");
1389         pcr->base_pcrtime = GST_CLOCK_TIME_NONE;
1390       }
1391     }
1392   } else
1393     send_diff = gstpcrtime - pcr->base_pcrtime;
1394 
1395   GST_DEBUG ("gstpcr %" GST_TIME_FORMAT ", buftime %" GST_TIME_FORMAT
1396       ", base %" GST_TIME_FORMAT ", send_diff %" GST_TIME_FORMAT,
1397       GST_TIME_ARGS (gstpcrtime), GST_TIME_ARGS (time),
1398       GST_TIME_ARGS (pcr->base_pcrtime), GST_TIME_ARGS (send_diff));
1399 
1400   /* keep track of the last extended pcrtime */
1401   pcr->last_pcrtime = gstpcrtime;
1402 
1403   /* we don't have an arrival timestamp so we can't do skew detection. we
1404    * should still apply a timestamp based on RTP timestamp and base_time */
1405   if (!GST_CLOCK_TIME_IS_VALID (time)
1406       || !GST_CLOCK_TIME_IS_VALID (pcr->base_time))
1407     goto no_skew;
1408 
1409   /* elapsed time at receiver, includes the jitter */
1410   recv_diff = time - pcr->base_time;
1411 
1412   /* Ignore packets received at 100% the same time (i.e. from the same input buffer) */
1413   if (G_UNLIKELY (time == pcr->prev_in_time
1414           && GST_CLOCK_TIME_IS_VALID (pcr->prev_in_time)))
1415     goto no_skew;
1416 
1417   /* measure the diff */
1418   delta = ((gint64) recv_diff) - ((gint64) send_diff);
1419 
1420 #ifndef GST_DISABLE_GST_DEBUG
1421   /* measure the slope, this gives a rought estimate between the sender speed
1422    * and the receiver speed. This should be approximately 8, higher values
1423    * indicate a burst (especially when the connection starts) */
1424   slope = recv_diff > 0 ? (send_diff * 8) / recv_diff : 8;
1425 #endif
1426 
1427   GST_DEBUG ("time %" GST_TIME_FORMAT ", base %" GST_TIME_FORMAT
1428       ", recv_diff %" GST_TIME_FORMAT ", slope %" G_GUINT64_FORMAT,
1429       GST_TIME_ARGS (time), GST_TIME_ARGS (pcr->base_time),
1430       GST_TIME_ARGS (recv_diff), slope);
1431 
1432   /* if the difference between the sender timeline and the receiver timeline
1433    * changed too quickly we have to resync because the server likely restarted
1434    * its timestamps. */
1435   if (ABS (delta - pcr->skew) > packetizer->pcr_discont_threshold) {
1436     GST_WARNING ("delta - skew: %" GST_TIME_FORMAT " too big, reset skew",
1437         GST_TIME_ARGS (delta - pcr->skew));
1438     mpegts_packetizer_resync (pcr, time, gstpcrtime, TRUE);
1439     send_diff = 0;
1440     delta = 0;
1441   }
1442 
1443   pos = pcr->window_pos;
1444 
1445   if (G_UNLIKELY (pcr->window_filling)) {
1446     /* we are filling the window */
1447     GST_DEBUG ("filling %d, delta %" G_GINT64_FORMAT, pos, delta);
1448     pcr->window[pos++] = delta;
1449     /* calc the min delta we observed */
1450     if (G_UNLIKELY (pos == 1 || delta < pcr->window_min))
1451       pcr->window_min = delta;
1452 
1453     if (G_UNLIKELY (send_diff >= MAX_TIME || pos >= MAX_WINDOW)) {
1454       pcr->window_size = pos;
1455 
1456       /* window filled */
1457       GST_DEBUG ("min %" G_GINT64_FORMAT, pcr->window_min);
1458 
1459       /* the skew is now the min */
1460       pcr->skew = pcr->window_min;
1461       pcr->window_filling = FALSE;
1462     } else {
1463       gint perc_time, perc_window, perc;
1464 
1465       /* figure out how much we filled the window, this depends on the amount of
1466        * time we have or the max number of points we keep. */
1467       perc_time = send_diff * 100 / MAX_TIME;
1468       perc_window = pos * 100 / MAX_WINDOW;
1469       perc = MAX (perc_time, perc_window);
1470 
1471       /* make a parabolic function, the closer we get to the MAX, the more value
1472        * we give to the scaling factor of the new value */
1473       perc = perc * perc;
1474 
1475       /* quickly go to the min value when we are filling up, slowly when we are
1476        * just starting because we're not sure it's a good value yet. */
1477       pcr->skew =
1478           (perc * pcr->window_min + ((10000 - perc) * pcr->skew)) / 10000;
1479       pcr->window_size = pos + 1;
1480     }
1481   } else {
1482     /* pick old value and store new value. We keep the previous value in order
1483      * to quickly check if the min of the window changed */
1484     old = pcr->window[pos];
1485     pcr->window[pos++] = delta;
1486 
1487     if (G_UNLIKELY (delta <= pcr->window_min)) {
1488       /* if the new value we inserted is smaller or equal to the current min,
1489        * it becomes the new min */
1490       pcr->window_min = delta;
1491     } else if (G_UNLIKELY (old == pcr->window_min)) {
1492       gint64 min = G_MAXINT64;
1493 
1494       /* if we removed the old min, we have to find a new min */
1495       for (i = 0; i < pcr->window_size; i++) {
1496         /* we found another value equal to the old min, we can stop searching now */
1497         if (pcr->window[i] == old) {
1498           min = old;
1499           break;
1500         }
1501         if (pcr->window[i] < min)
1502           min = pcr->window[i];
1503       }
1504       pcr->window_min = min;
1505     }
1506     /* average the min values */
1507     pcr->skew = (pcr->window_min + (124 * pcr->skew)) / 125;
1508     GST_DEBUG ("delta %" G_GINT64_FORMAT ", new min: %" G_GINT64_FORMAT,
1509         delta, pcr->window_min);
1510   }
1511   /* wrap around in the window */
1512   if (G_UNLIKELY (pos >= pcr->window_size))
1513     pos = 0;
1514 
1515   pcr->window_pos = pos;
1516 
1517 no_skew:
1518   /* the output time is defined as the base timestamp plus the PCR time
1519    * adjusted for the clock skew .*/
1520   if (pcr->base_time != -1) {
1521     out_time = pcr->base_time + send_diff;
1522     /* skew can be negative and we don't want to make invalid timestamps */
1523     if (pcr->skew < 0 && out_time < -pcr->skew) {
1524       out_time = 0;
1525     } else {
1526       out_time += pcr->skew;
1527     }
1528     /* check if timestamps are not going backwards, we can only check this if we
1529      * have a previous out time and a previous send_diff */
1530     if (G_LIKELY (pcr->prev_out_time != -1 && pcr->prev_send_diff != -1)) {
1531       /* now check for backwards timestamps */
1532       if (G_UNLIKELY (
1533               /* if the server timestamps went up and the out_time backwards */
1534               (send_diff > pcr->prev_send_diff
1535                   && out_time < pcr->prev_out_time) ||
1536               /* if the server timestamps went backwards and the out_time forwards */
1537               (send_diff < pcr->prev_send_diff
1538                   && out_time > pcr->prev_out_time) ||
1539               /* if the server timestamps did not change */
1540               send_diff == pcr->prev_send_diff)) {
1541         GST_DEBUG ("backwards timestamps, using previous time");
1542         out_time = GSTTIME_TO_MPEGTIME (out_time);
1543       }
1544     }
1545   } else {
1546     /* We simply use the pcrtime without applying any skew compensation */
1547     out_time = time;
1548   }
1549 
1550   pcr->prev_out_time = out_time;
1551   pcr->prev_in_time = time;
1552   pcr->prev_send_diff = send_diff;
1553 
1554   GST_DEBUG ("skew %" G_GINT64_FORMAT ", out %" GST_TIME_FORMAT,
1555       pcr->skew, GST_TIME_ARGS (out_time));
1556 
1557   return out_time;
1558 }
1559 
1560 static void
_reevaluate_group_pcr_offset(MpegTSPCR * pcrtable,PCROffsetGroup * group)1561 _reevaluate_group_pcr_offset (MpegTSPCR * pcrtable, PCROffsetGroup * group)
1562 {
1563   PCROffsetGroup *prev = NULL;
1564 #ifndef GST_DISABLE_GST_DEBUG
1565   PCROffsetGroup *first = pcrtable->groups->data;
1566 #endif
1567   PCROffsetCurrent *current = pcrtable->current;
1568   GList *tmp;
1569 
1570   /* Go over all ESTIMATED groups until the target group */
1571   for (tmp = pcrtable->groups; tmp; tmp = tmp->next) {
1572     PCROffsetGroup *cur = (PCROffsetGroup *) tmp->data;
1573 
1574     /* Skip groups that don't need re-evaluation */
1575     if (!(cur->flags & PCR_GROUP_FLAG_ESTIMATED)) {
1576       GST_DEBUG ("Skipping group %p pcr_offset (currently %" GST_TIME_FORMAT
1577           ")", cur, GST_TIME_ARGS (PCRTIME_TO_GSTTIME (cur->pcr_offset)));
1578       prev = cur;
1579       continue;
1580     }
1581 
1582     /* This should not happen ! The first group is *always* correct (zero) */
1583     if (G_UNLIKELY (prev == NULL)) {
1584       GST_ERROR ("First PCR Group was not estimated (bug). Setting to zero");
1585       cur->pcr_offset = 0;
1586       cur->flags &= ~PCR_GROUP_FLAG_ESTIMATED;
1587       return;
1588     }
1589 
1590     /* Finally do the estimation of this group's PCR offset based on the
1591      * previous group information */
1592 
1593     GST_DEBUG ("Re-evaluating group %p pcr_offset (currently %" GST_TIME_FORMAT
1594         ")", group, GST_TIME_ARGS (PCRTIME_TO_GSTTIME (cur->pcr_offset)));
1595 
1596     GST_DEBUG ("cur->first_pcr:%" GST_TIME_FORMAT " prev->first_pcr:%"
1597         GST_TIME_FORMAT, GST_TIME_ARGS (PCRTIME_TO_GSTTIME (cur->first_pcr)),
1598         GST_TIME_ARGS (PCRTIME_TO_GSTTIME (prev->first_pcr)));
1599 
1600     if (G_UNLIKELY (cur->first_pcr < prev->first_pcr)) {
1601       guint64 prevbr, lastbr;
1602       guint64 prevpcr;
1603       guint64 prevoffset, lastoffset;
1604 
1605       /* Take the previous group pcr_offset and figure out how much to add
1606        * to it for the current group */
1607 
1608       /* Right now we do a dumb bitrate estimation
1609        * estimate bitrate (prev - first) : bitrate from the start
1610        * estimate bitrate (prev) : bitrate of previous group
1611        * estimate bitrate (last - first) : bitrate from previous group
1612        *
1613        * We will use raw (non-corrected/non-absolute) PCR values in a first time
1614        * to detect wraparound/resets/gaps...
1615        *
1616        * We will use the corrected/asolute PCR values to calculate
1617        * bitrate and estimate the target group pcr_offset.
1618        * */
1619 
1620       /* If the current window estimator is over the previous group, used those
1621        * values as the latest (since they are more recent) */
1622       if (current->group == prev && current->pending[current->last].offset) {
1623         prevoffset =
1624             current->pending[current->last].offset + prev->first_offset;
1625         prevpcr = current->pending[current->last].pcr + prev->first_pcr;
1626         /* prevbr: bitrate(prev) */
1627         prevbr =
1628             gst_util_uint64_scale (PCR_SECOND,
1629             current->pending[current->last].offset,
1630             current->pending[current->last].pcr);
1631         GST_DEBUG ("Previous group bitrate (%" G_GUINT64_FORMAT " / %"
1632             GST_TIME_FORMAT ") : %" G_GUINT64_FORMAT,
1633             current->pending[current->last].offset,
1634             GST_TIME_ARGS (PCRTIME_TO_GSTTIME (current->pending[current->last].
1635                     pcr)), prevbr);
1636       } else if (prev->values[prev->last_value].offset) {
1637         prevoffset = prev->values[prev->last_value].offset + prev->first_offset;
1638         prevpcr = prev->values[prev->last_value].pcr + prev->first_pcr;
1639         /* prevbr: bitrate(prev) (FIXME : Cache) */
1640         prevbr =
1641             gst_util_uint64_scale (PCR_SECOND,
1642             prev->values[prev->last_value].offset,
1643             prev->values[prev->last_value].pcr);
1644         GST_DEBUG ("Previous group bitrate (%" G_GUINT64_FORMAT " / %"
1645             GST_TIME_FORMAT ") : %" G_GUINT64_FORMAT,
1646             prev->values[prev->last_value].offset,
1647             GST_TIME_ARGS (PCRTIME_TO_GSTTIME (prev->values[prev->last_value].
1648                     pcr)), prevbr);
1649       } else {
1650         GST_DEBUG ("Using overall bitrate");
1651         prevoffset = prev->values[prev->last_value].offset + prev->first_offset;
1652         prevpcr = prev->values[prev->last_value].pcr + prev->first_pcr;
1653         prevbr = gst_util_uint64_scale (PCR_SECOND,
1654             prev->first_offset, prev->pcr_offset);
1655       }
1656       lastoffset = cur->values[cur->last_value].offset + cur->first_offset;
1657 
1658       GST_DEBUG ("Offset first:%" G_GUINT64_FORMAT " prev:%" G_GUINT64_FORMAT
1659           " cur:%" G_GUINT64_FORMAT, first->first_offset, prevoffset,
1660           lastoffset);
1661       GST_DEBUG ("PCR first:%" GST_TIME_FORMAT " prev:%" GST_TIME_FORMAT
1662           " cur:%" GST_TIME_FORMAT,
1663           GST_TIME_ARGS (PCRTIME_TO_GSTTIME (first->first_pcr)),
1664           GST_TIME_ARGS (PCRTIME_TO_GSTTIME (prevpcr)),
1665           GST_TIME_ARGS (PCRTIME_TO_GSTTIME (cur->values[cur->last_value].pcr +
1666                   cur->first_pcr)));
1667 
1668       if (prevpcr - cur->first_pcr > (PCR_MAX_VALUE * 9 / 10)) {
1669         gfloat diffprev;
1670         guint64 guess_offset;
1671 
1672         /* Let's assume there is a PCR wraparound between the previous and current
1673          * group.
1674          * [ prev ]... PCR_MAX | 0 ...[ current ]
1675          * The estimated pcr_offset would therefore be:
1676          * current.first + (PCR_MAX_VALUE - prev.first)
1677          *
1678          * 1) Check if bitrate(prev) would be consistent with bitrate (cur - prev)
1679          */
1680         guess_offset = PCR_MAX_VALUE - prev->first_pcr + cur->first_pcr;
1681         lastbr = gst_util_uint64_scale (PCR_SECOND, lastoffset - prevoffset,
1682             guess_offset + cur->values[cur->last_value].pcr - (prevpcr -
1683                 prev->first_pcr));
1684         GST_DEBUG ("Wraparound prev-cur (guess_offset:%" GST_TIME_FORMAT
1685             ") bitrate:%" G_GUINT64_FORMAT,
1686             GST_TIME_ARGS (PCRTIME_TO_GSTTIME (guess_offset)), lastbr);
1687         diffprev = (float) 100.0 *(ABSDIFF (prevbr, lastbr)) / (float) prevbr;
1688         GST_DEBUG ("Difference with previous bitrate:%f", diffprev);
1689         if (diffprev < 10.0) {
1690           GST_DEBUG ("Difference < 10.0, Setting pcr_offset to %"
1691               G_GUINT64_FORMAT, guess_offset);
1692           cur->pcr_offset = guess_offset;
1693           if (diffprev < 1.0) {
1694             GST_DEBUG ("Difference < 1.0, Removing ESTIMATED flags");
1695             cur->flags &= ~PCR_GROUP_FLAG_ESTIMATED;
1696           }
1697         }
1698         /* Indicate the the previous group is before a wrapover */
1699         prev->flags |= PCR_GROUP_FLAG_WRAPOVER;
1700       } else {
1701         guint64 resetprev;
1702         /* Let's assume there was a PCR reset between the previous and current
1703          * group
1704          * [ prev ] ... x | x - reset ... [ current ]
1705          *
1706          * The estimated pcr_offset would then be
1707          * = current.first - (x - reset) + (x - prev.first) + 100ms (for safety)
1708          * = current.first + reset - prev.first + 100ms (for safety)
1709          */
1710         /* In order to calculate the reset, we estimate what the PCR would have
1711          * been by using prevbr */
1712         /* FIXME : Which bitrate should we use ???  */
1713         GST_DEBUG ("Using prevbr:%" G_GUINT64_FORMAT " and taking offsetdiff:%"
1714             G_GUINT64_FORMAT, prevbr, cur->first_offset - prev->first_offset);
1715         resetprev =
1716             gst_util_uint64_scale (PCR_SECOND,
1717             cur->first_offset - prev->first_offset, prevbr);
1718         GST_DEBUG ("Estimated full PCR for offset %" G_GUINT64_FORMAT
1719             ", using prevbr:%"
1720             GST_TIME_FORMAT, cur->first_offset,
1721             GST_TIME_ARGS (PCRTIME_TO_GSTTIME (resetprev)));
1722         cur->pcr_offset = prev->pcr_offset + resetprev + 100 * PCR_MSECOND;
1723         GST_DEBUG ("Adjusted group PCR_offset to %" GST_TIME_FORMAT,
1724             GST_TIME_ARGS (PCRTIME_TO_GSTTIME (cur->pcr_offset)));
1725         /* Indicate the the previous group is before a reset */
1726         prev->flags |= PCR_GROUP_FLAG_RESET;
1727       }
1728     } else {
1729       /* FIXME : Detect gaps if bitrate difference is really too big ? */
1730       cur->pcr_offset = prev->pcr_offset + cur->first_pcr - prev->first_pcr;
1731       GST_DEBUG ("Assuming there is no gap, setting pcr_offset to %"
1732           GST_TIME_FORMAT,
1733           GST_TIME_ARGS (PCRTIME_TO_GSTTIME (cur->pcr_offset)));
1734       /* Remove the reset and wrapover flag (if it was previously there) */
1735       prev->flags &= ~PCR_GROUP_FLAG_RESET;
1736       prev->flags &= ~PCR_GROUP_FLAG_WRAPOVER;
1737     }
1738 
1739 
1740     /* Remember prev for the next group evaluation */
1741     prev = cur;
1742   }
1743 }
1744 
1745 static PCROffsetGroup *
_new_group(guint64 pcr,guint64 offset,guint64 pcr_offset,guint flags)1746 _new_group (guint64 pcr, guint64 offset, guint64 pcr_offset, guint flags)
1747 {
1748   PCROffsetGroup *group = g_slice_new0 (PCROffsetGroup);
1749 
1750   GST_DEBUG ("Input PCR %" GST_TIME_FORMAT " offset:%" G_GUINT64_FORMAT
1751       " pcr_offset:%" G_GUINT64_FORMAT " flags:%d",
1752       GST_TIME_ARGS (PCRTIME_TO_GSTTIME (pcr)), offset, pcr_offset, flags);
1753 
1754   group->flags = flags;
1755   group->values = g_new0 (PCROffset, DEFAULT_ALLOCATED_OFFSET);
1756   /* The first pcr/offset diff is always 0/0 */
1757   group->values[0].pcr = group->values[0].offset = 0;
1758   group->nb_allocated = DEFAULT_ALLOCATED_OFFSET;
1759 
1760   /* Store the full values */
1761   group->first_pcr = pcr;
1762   group->first_offset = offset;
1763   group->pcr_offset = pcr_offset;
1764 
1765   GST_DEBUG ("Created group starting with pcr:%" GST_TIME_FORMAT " offset:%"
1766       G_GUINT64_FORMAT " pcr_offset:%" GST_TIME_FORMAT,
1767       GST_TIME_ARGS (PCRTIME_TO_GSTTIME (group->first_pcr)),
1768       group->first_offset,
1769       GST_TIME_ARGS (PCRTIME_TO_GSTTIME (group->pcr_offset)));
1770 
1771   return group;
1772 }
1773 
1774 static void
_insert_group_after(MpegTSPCR * pcrtable,PCROffsetGroup * group,PCROffsetGroup * prev)1775 _insert_group_after (MpegTSPCR * pcrtable, PCROffsetGroup * group,
1776     PCROffsetGroup * prev)
1777 {
1778   if (prev == NULL) {
1779     /* First group */
1780     pcrtable->groups = g_list_prepend (pcrtable->groups, group);
1781   } else {
1782     GList *tmp, *toinsert, *prevlist = NULL, *nextlist = NULL;
1783     /* Insert before next and prev */
1784     for (tmp = pcrtable->groups; tmp; tmp = tmp->next) {
1785       if (tmp->data == prev) {
1786         prevlist = tmp;
1787         nextlist = tmp->next;
1788         break;
1789       }
1790     }
1791     if (!prevlist) {
1792       /* The non NULL prev given isn't in the list */
1793       GST_WARNING ("Request to insert before a group which isn't in the list");
1794       pcrtable->groups = g_list_prepend (pcrtable->groups, group);
1795     } else {
1796       toinsert = g_list_append (NULL, group);
1797       toinsert->next = nextlist;
1798       toinsert->prev = prevlist;
1799       prevlist->next = toinsert;
1800       if (nextlist)
1801         nextlist->prev = toinsert;
1802     }
1803   }
1804 }
1805 
1806 static void
_use_group(MpegTSPCR * pcrtable,PCROffsetGroup * group)1807 _use_group (MpegTSPCR * pcrtable, PCROffsetGroup * group)
1808 {
1809   PCROffsetCurrent *current = pcrtable->current;
1810 
1811   memset (current, 0, sizeof (PCROffsetCurrent));
1812   current->group = group;
1813   current->pending[0] = group->values[group->last_value];
1814   current->last_value = current->pending[0];
1815   current->write = 1;
1816   current->prev = group->values[group->last_value];
1817   current->first_pcr = group->first_pcr;
1818   current->first_offset = group->first_offset;
1819 }
1820 
1821 /* Create a new group with the specified values after prev
1822  * Set current to that new group */
1823 static void
_set_current_group(MpegTSPCR * pcrtable,PCROffsetGroup * prev,guint64 pcr,guint64 offset,gboolean contiguous)1824 _set_current_group (MpegTSPCR * pcrtable,
1825     PCROffsetGroup * prev, guint64 pcr, guint64 offset, gboolean contiguous)
1826 {
1827   PCROffsetGroup *group;
1828   guint flags = 0;
1829   guint64 pcr_offset = 0;
1830 
1831   /* Handle wraparound/gap (only if contiguous with previous group) */
1832   if (contiguous) {
1833     guint64 lastpcr = prev->first_pcr + prev->values[prev->last_value].pcr;
1834 
1835     /* Set CLOSED flag on previous group and remember pcr_offset */
1836     prev->flags |= PCR_GROUP_FLAG_CLOSED;
1837     pcr_offset = prev->pcr_offset;
1838 
1839     /* Wraparound ? */
1840     if (lastpcr > pcr) {
1841       /* In offset-mode, a PCR wraparound is only actually consistent if
1842        * we have a very high confidence (99% right now, might need to change
1843        * later) */
1844       if (lastpcr - pcr > (PCR_MAX_VALUE * 99 / 100)) {
1845         GST_WARNING ("WRAPAROUND detected. diff %" GST_TIME_FORMAT,
1846             GST_TIME_ARGS (PCRTIME_TO_GSTTIME (lastpcr - pcr)));
1847         /* The previous group closed at PCR_MAX_VALUE */
1848         pcr_offset += PCR_MAX_VALUE - prev->first_pcr + pcr;
1849       } else {
1850         GST_WARNING ("RESET detected. diff %" GST_TIME_FORMAT,
1851             GST_TIME_ARGS (PCRTIME_TO_GSTTIME (lastpcr - pcr)));
1852         /* The previous group closed at the raw last_pcr diff (+100ms for safety) */
1853         pcr_offset += prev->values[prev->last_value].pcr + 100 * PCR_MSECOND;
1854       }
1855     } else if (lastpcr < pcr - 500 * PCR_MSECOND) {
1856       GST_WARNING ("GAP detected. diff %" GST_TIME_FORMAT,
1857           GST_TIME_ARGS (PCRTIME_TO_GSTTIME (pcr - lastpcr)));
1858       /* The previous group closed at the raw last_pcr diff (+500ms for safety) */
1859       pcr_offset += prev->values[prev->last_value].pcr + 500 * PCR_MSECOND;
1860     } else
1861       /* Normal continuation (contiguous in time) */
1862       pcr_offset += pcr - prev->first_pcr;
1863 
1864   } else if (prev != NULL)
1865     /* If we are not contiguous and it's not the first group, the pcr_offset
1866      * will be estimated */
1867     flags = PCR_GROUP_FLAG_ESTIMATED;
1868 
1869   group = _new_group (pcr, offset, pcr_offset, flags);
1870   _use_group (pcrtable, group);
1871   _insert_group_after (pcrtable, group, prev);
1872   if (!contiguous)
1873     _reevaluate_group_pcr_offset (pcrtable, group);
1874 }
1875 
1876 static inline void
_append_group_values(PCROffsetGroup * group,PCROffset pcroffset)1877 _append_group_values (PCROffsetGroup * group, PCROffset pcroffset)
1878 {
1879   /* Only append if new values */
1880   if (group->values[group->last_value].offset == pcroffset.offset &&
1881       group->values[group->last_value].pcr == pcroffset.pcr) {
1882     GST_DEBUG ("Same values, ignoring");
1883   } else {
1884     group->last_value++;
1885     /* Resize values if needed */
1886     if (G_UNLIKELY (group->nb_allocated == group->last_value)) {
1887       group->nb_allocated += DEFAULT_ALLOCATED_OFFSET;
1888       group->values =
1889           g_realloc (group->values, group->nb_allocated * sizeof (PCROffset));
1890     }
1891     group->values[group->last_value] = pcroffset;
1892   }
1893 
1894   GST_DEBUG ("First PCR:%" GST_TIME_FORMAT " offset:%" G_GUINT64_FORMAT
1895       " PCR_offset:%" GST_TIME_FORMAT,
1896       GST_TIME_ARGS (PCRTIME_TO_GSTTIME (group->first_pcr)),
1897       group->first_offset,
1898       GST_TIME_ARGS (PCRTIME_TO_GSTTIME (group->pcr_offset)));
1899   GST_DEBUG ("Last PCR: +%" GST_TIME_FORMAT " offset: +%" G_GUINT64_FORMAT,
1900       GST_TIME_ARGS (PCRTIME_TO_GSTTIME (pcroffset.pcr)), pcroffset.offset);
1901 }
1902 
1903 /* Move last values from current (if any) to the current group
1904  * and reset current.
1905  * Note: This does not set the CLOSED flag (since we have no next
1906  * contiguous group) */
1907 static void
_close_current_group(MpegTSPCR * pcrtable)1908 _close_current_group (MpegTSPCR * pcrtable)
1909 {
1910   PCROffsetCurrent *current = pcrtable->current;
1911   PCROffsetGroup *group = current->group;
1912 
1913   if (group == NULL)
1914     return;
1915   GST_DEBUG ("Closing group and resetting current");
1916 
1917   /* Store last values */
1918   _append_group_values (group, current->pending[current->last]);
1919   memset (current, 0, sizeof (PCROffsetCurrent));
1920   /* And re-evaluate all groups */
1921 }
1922 
1923 static void
record_pcr(MpegTSPacketizer2 * packetizer,MpegTSPCR * pcrtable,guint64 pcr,guint64 offset)1924 record_pcr (MpegTSPacketizer2 * packetizer, MpegTSPCR * pcrtable,
1925     guint64 pcr, guint64 offset)
1926 {
1927   PCROffsetCurrent *current = pcrtable->current;
1928   gint64 corpcr, coroffset;
1929 
1930   packetizer->nb_seen_offsets += 1;
1931 
1932   pcrtable->last_pcrtime = PCRTIME_TO_GSTTIME (pcr);
1933   /* FIXME : Invert logic later (probability is higher that we have a
1934    * current estimator) */
1935 
1936   /* Check for current */
1937   if (G_UNLIKELY (current->group == NULL)) {
1938     PCROffsetGroup *prev = NULL;
1939     GList *tmp;
1940     /* No current estimator. This happens for the initial value, or after
1941      * discont and flushes. Figure out where we need to record this position.
1942      *
1943      * Possible choices:
1944      * 1) No groups at all:
1945      *    Create a new group with pcr/offset
1946      *    Initialize current to that group
1947      * 2) Entirely within an existing group
1948      *    bail out (FIXME: Make this detection faster)
1949      * 3) Not in any group
1950      *    Create a new group with pcr/offset at the right position
1951      *    Initialize current to that group
1952      */
1953     GST_DEBUG ("No current window estimator, Checking for group to use");
1954     for (tmp = pcrtable->groups; tmp; tmp = tmp->next) {
1955       PCROffsetGroup *group = (PCROffsetGroup *) tmp->data;
1956 
1957       GST_DEBUG ("First PCR:%" GST_TIME_FORMAT " offset:%" G_GUINT64_FORMAT
1958           " PCR_offset:%" GST_TIME_FORMAT,
1959           GST_TIME_ARGS (PCRTIME_TO_GSTTIME (group->first_pcr)),
1960           group->first_offset,
1961           GST_TIME_ARGS (PCRTIME_TO_GSTTIME (group->pcr_offset)));
1962       GST_DEBUG ("Last PCR: +%" GST_TIME_FORMAT " offset: +%" G_GUINT64_FORMAT,
1963           GST_TIME_ARGS (PCRTIME_TO_GSTTIME (group->values[group->last_value].
1964                   pcr)), group->values[group->last_value].offset);
1965       /* Check if before group */
1966       if (offset < group->first_offset) {
1967         GST_DEBUG ("offset is before that group");
1968         break;
1969       }
1970       /* Check if within group */
1971       if (offset <=
1972           (group->values[group->last_value].offset + group->first_offset)) {
1973         GST_DEBUG ("Already observed PCR offset %" G_GUINT64_FORMAT, offset);
1974         return;
1975       }
1976       /* Check if just after group (i.e. continuation of it) */
1977       if (!(group->flags & PCR_GROUP_FLAG_CLOSED) &&
1978           pcr - group->first_pcr - group->values[group->last_value].pcr <=
1979           100 * PCR_MSECOND) {
1980         GST_DEBUG ("Continuation of existing group");
1981         _use_group (pcrtable, group);
1982         return;
1983       }
1984       /* Else after group */
1985       prev = group;
1986     }
1987     _set_current_group (pcrtable, prev, pcr, offset, FALSE);
1988     return;
1989   }
1990 
1991   corpcr = pcr - current->first_pcr;
1992   coroffset = offset - current->first_offset;
1993 
1994   /* FIXME : Detect if we've gone into the next group !
1995    * FIXME : Close group when that happens */
1996   GST_DEBUG ("first:%d, last:%d, write:%d", current->first, current->last,
1997       current->write);
1998   GST_DEBUG ("First PCR:%" GST_TIME_FORMAT " offset:%" G_GUINT64_FORMAT,
1999       GST_TIME_ARGS (PCRTIME_TO_GSTTIME (current->first_pcr)),
2000       current->first_offset);
2001   GST_DEBUG ("Last PCR: +%" GST_TIME_FORMAT " offset: +%" G_GUINT64_FORMAT,
2002       GST_TIME_ARGS (PCRTIME_TO_GSTTIME (current->pending[current->last].pcr)),
2003       current->pending[current->last].offset);
2004   GST_DEBUG ("To add (corrected) PCR:%" GST_TIME_FORMAT " offset:%"
2005       G_GINT64_FORMAT, GST_TIME_ARGS (PCRTIME_TO_GSTTIME (corpcr)), coroffset);
2006 
2007   /* Do we need to close the current group ? */
2008   /* Check for wrapover/discont */
2009   if (G_UNLIKELY (corpcr < current->pending[current->last].pcr)) {
2010     /* FIXME : ignore very small deltas (< 500ms ?) which are most likely
2011      * stray values */
2012     GST_DEBUG
2013         ("PCR smaller than previously observed one, handling discont/wrapover");
2014     /* Take values from current and put them in the current group (closing it) */
2015     /* Create new group with new pcr/offset just after the current group
2016      * and mark it as a wrapover */
2017     /* Initialize current to that group with new values */
2018     _append_group_values (current->group, current->pending[current->last]);
2019     _set_current_group (pcrtable, current->group, pcr, offset, TRUE);
2020     return;
2021   }
2022   /* If PCR diff is greater than 500ms, create new group */
2023   if (G_UNLIKELY (corpcr - current->pending[current->last].pcr >
2024           500 * PCR_MSECOND)) {
2025     GST_DEBUG ("New PCR more than 500ms away, handling discont");
2026     /* Take values from current and put them in the current group (closing it) */
2027     /* Create new group with pcr/offset just after the current group
2028      * and mark it as a discont */
2029     /* Initialize current to that group with new values */
2030     _append_group_values (current->group, current->pending[current->last]);
2031     _set_current_group (pcrtable, current->group, pcr, offset, TRUE);
2032     return;
2033   }
2034 
2035   if (G_UNLIKELY (corpcr == current->last_value.pcr)) {
2036     GST_DEBUG ("Ignoring same PCR (stream is drunk)");
2037     return;
2038   }
2039 
2040   /* update current window */
2041   current->pending[current->write].pcr = corpcr;
2042   current->pending[current->write].offset = coroffset;
2043   current->last_value = current->pending[current->write];
2044   current->last = (current->last + 1) % PCR_BITRATE_NEEDED;
2045   current->write = (current->write + 1) % PCR_BITRATE_NEEDED;
2046 
2047   GST_DEBUG ("first:%d, last:%d, write:%d", current->first, current->last,
2048       current->write);
2049   GST_DEBUG ("First PCR:%" GST_TIME_FORMAT " offset:%" G_GUINT64_FORMAT,
2050       GST_TIME_ARGS (PCRTIME_TO_GSTTIME (current->first_pcr)),
2051       current->first_offset);
2052   GST_DEBUG ("Last PCR: +%" GST_TIME_FORMAT " offset: +%" G_GUINT64_FORMAT,
2053       GST_TIME_ARGS (PCRTIME_TO_GSTTIME (current->pending[current->last].pcr)),
2054       current->pending[current->last].offset);
2055 
2056   /* If we haven't stored enough values, bail out */
2057   if (current->write != current->first) {
2058     GST_DEBUG
2059         ("Not enough observations to calculate bitrate (first:%d, last:%d)",
2060         current->first, current->last);
2061     return;
2062   }
2063 
2064   /* If we are at least 1s away from reference value AND we have filled our
2065    * window, we can start comparing bitrates */
2066   if (current->pending[current->first].pcr - current->prev.pcr > PCR_SECOND) {
2067     /* Calculate window bitrate */
2068     current->cur_bitrate = gst_util_uint64_scale (PCR_SECOND,
2069         current->pending[current->last].offset -
2070         current->pending[current->first].offset,
2071         current->pending[current->last].pcr -
2072         current->pending[current->first].pcr);
2073     GST_DEBUG ("Current bitrate is now %" G_GUINT64_FORMAT,
2074         current->cur_bitrate);
2075 
2076     /* Calculate previous bitrate */
2077     current->prev_bitrate =
2078         gst_util_uint64_scale (PCR_SECOND,
2079         current->pending[current->first].offset - current->prev.offset,
2080         current->pending[current->first].pcr - current->prev.pcr);
2081     GST_DEBUG ("Previous group bitrate now %" G_GUINT64_FORMAT,
2082         current->prev_bitrate);
2083 
2084     /* FIXME : Better bitrate changes ? Currently 10% changes */
2085     if (ABSDIFF (current->cur_bitrate,
2086             current->prev_bitrate) * 10 > current->prev_bitrate) {
2087       GST_DEBUG ("Current bitrate changed by more than 10%% (old:%"
2088           G_GUINT64_FORMAT " new:%" G_GUINT64_FORMAT ")", current->prev_bitrate,
2089           current->cur_bitrate);
2090       /* If we detected a change in bitrate, this means that
2091        * d(first - prev) is a different bitrate than d(last - first).
2092        *
2093        * Two conclusions can be made:
2094        * 1) d(first - prev) is a complete bitrate "chain" (values between the
2095        *    reference value and first pending value have consistent bitrate).
2096        * 2) next values (from second pending value onwards) will no longer have
2097        *    the same bitrate.
2098        *
2099        * The question remains as to how long the new bitrate change is going to
2100        * last for (it might be short or longer term). For this we need to restart
2101        * bitrate estimation.
2102        *
2103        * * We move over first to the last value of group (a new chain ends and
2104        *   starts from there)
2105        * * We remember that last group value as our new window reference
2106        * * We restart our window filing from the last observed value
2107        *
2108        * Once our new window is filled we will end up in two different scenarios:
2109        * 1) Either the bitrate change was consistent, and therefore the bitrate
2110        *    will have remained constant over at least 2 window length
2111        * 2) The bitrate change was very short (1 window duration) and we will
2112        *    close that chain and restart again.
2113        * X) And of course if any discont/gaps/wrapover happen in the meantime they
2114        *    will also close the group.
2115        */
2116       _append_group_values (current->group, current->pending[current->first]);
2117       current->prev = current->pending[current->first];
2118       current->first = current->last;
2119       current->write = (current->first + 1) % PCR_BITRATE_NEEDED;
2120       return;
2121     }
2122   }
2123 
2124   /* Update read position */
2125   current->first = (current->first + 1) % PCR_BITRATE_NEEDED;
2126 }
2127 
2128 
2129 /* convert specified offset into stream time */
2130 GstClockTime
mpegts_packetizer_offset_to_ts(MpegTSPacketizer2 * packetizer,guint64 offset,guint16 pid)2131 mpegts_packetizer_offset_to_ts (MpegTSPacketizer2 * packetizer,
2132     guint64 offset, guint16 pid)
2133 {
2134   PCROffsetGroup *last;
2135   MpegTSPCR *pcrtable;
2136   GList *tmp;
2137   GstClockTime res;
2138   guint64 lastpcr, lastoffset;
2139 
2140   GST_DEBUG ("offset %" G_GUINT64_FORMAT, offset);
2141 
2142   if (G_UNLIKELY (!packetizer->calculate_offset))
2143     return GST_CLOCK_TIME_NONE;
2144 
2145   if (G_UNLIKELY (packetizer->refoffset == -1))
2146     return GST_CLOCK_TIME_NONE;
2147 
2148   if (G_UNLIKELY (offset < packetizer->refoffset))
2149     return GST_CLOCK_TIME_NONE;
2150 
2151   PACKETIZER_GROUP_LOCK (packetizer);
2152 
2153   pcrtable = get_pcr_table (packetizer, pid);
2154 
2155   if (g_list_length (pcrtable->groups) < 1) {
2156     PACKETIZER_GROUP_UNLOCK (packetizer);
2157     GST_WARNING ("Not enough observations to return a duration estimate");
2158     return GST_CLOCK_TIME_NONE;
2159   }
2160 
2161   if (g_list_length (pcrtable->groups) > 1) {
2162     GST_LOG ("Using last group");
2163 
2164     /* FIXME : Refine this later to use neighbouring groups */
2165     tmp = g_list_last (pcrtable->groups);
2166     last = tmp->data;
2167 
2168     if (G_UNLIKELY (last->flags & PCR_GROUP_FLAG_ESTIMATED))
2169       _reevaluate_group_pcr_offset (pcrtable, last);
2170 
2171     /* lastpcr is the full value in PCR from the first first chunk of data */
2172     lastpcr = last->values[last->last_value].pcr + last->pcr_offset;
2173     /* lastoffset is the full offset from the first chunk of data */
2174     lastoffset =
2175         last->values[last->last_value].offset + last->first_offset -
2176         packetizer->refoffset;
2177   } else {
2178     PCROffsetCurrent *current = pcrtable->current;
2179 
2180     if (!current->group) {
2181       PACKETIZER_GROUP_UNLOCK (packetizer);
2182       GST_LOG ("No PCR yet");
2183       return GST_CLOCK_TIME_NONE;
2184     }
2185     /* If doing progressive read, use current */
2186     GST_LOG ("Using current group");
2187     lastpcr = current->group->pcr_offset + current->pending[current->last].pcr;
2188     lastoffset = current->first_offset + current->pending[current->last].offset;
2189   }
2190   GST_DEBUG ("lastpcr:%" GST_TIME_FORMAT " lastoffset:%" G_GUINT64_FORMAT
2191       " refoffset:%" G_GUINT64_FORMAT,
2192       GST_TIME_ARGS (PCRTIME_TO_GSTTIME (lastpcr)), lastoffset,
2193       packetizer->refoffset);
2194 
2195   /* Convert byte difference into time difference (and transformed from 27MHz to 1GHz) */
2196   res =
2197       PCRTIME_TO_GSTTIME (gst_util_uint64_scale (offset - packetizer->refoffset,
2198           lastpcr, lastoffset));
2199 
2200   PACKETIZER_GROUP_UNLOCK (packetizer);
2201 
2202   GST_DEBUG ("Returning timestamp %" GST_TIME_FORMAT " for offset %"
2203       G_GUINT64_FORMAT, GST_TIME_ARGS (res), offset);
2204 
2205   return res;
2206 }
2207 
2208 /* Input  : local PTS (in GHz units)
2209  * Return : Stream time (in GHz units) */
2210 GstClockTime
mpegts_packetizer_pts_to_ts(MpegTSPacketizer2 * packetizer,GstClockTime pts,guint16 pcr_pid)2211 mpegts_packetizer_pts_to_ts (MpegTSPacketizer2 * packetizer,
2212     GstClockTime pts, guint16 pcr_pid)
2213 {
2214   GstClockTime res = GST_CLOCK_TIME_NONE;
2215   MpegTSPCR *pcrtable;
2216 
2217   PACKETIZER_GROUP_LOCK (packetizer);
2218   pcrtable = get_pcr_table (packetizer, pcr_pid);
2219 
2220   if (!GST_CLOCK_TIME_IS_VALID (pcrtable->base_time) && pcr_pid == 0x1fff &&
2221       GST_CLOCK_TIME_IS_VALID (packetizer->last_in_time)) {
2222     pcrtable->base_time = packetizer->last_in_time;
2223     pcrtable->base_pcrtime = pts;
2224   }
2225 
2226   /* Use clock skew if present */
2227   if (packetizer->calculate_skew
2228       && GST_CLOCK_TIME_IS_VALID (pcrtable->base_time)) {
2229     GST_DEBUG ("pts %" GST_TIME_FORMAT " base_pcrtime:%" GST_TIME_FORMAT
2230         " base_time:%" GST_TIME_FORMAT " pcroffset:%" GST_TIME_FORMAT,
2231         GST_TIME_ARGS (pts),
2232         GST_TIME_ARGS (pcrtable->base_pcrtime),
2233         GST_TIME_ARGS (pcrtable->base_time),
2234         GST_TIME_ARGS (pcrtable->pcroffset));
2235     res = pts + pcrtable->pcroffset;
2236 
2237     /* Don't return anything if we differ too much against last seen PCR */
2238     /* FIXME : Ideally we want to figure out whether we have a wraparound or
2239      * a reset so we can provide actual values.
2240      * That being said, this will only happen for the small interval of time
2241      * where PTS/DTS are wrapping just before we see the first reset/wrap PCR
2242      */
2243     if (G_UNLIKELY (pcr_pid != 0x1fff &&
2244             ABSDIFF (res, pcrtable->last_pcrtime) > 15 * GST_SECOND))
2245       res = GST_CLOCK_TIME_NONE;
2246     else {
2247       GstClockTime tmp = pcrtable->base_time + pcrtable->skew;
2248       if (tmp + res > pcrtable->base_pcrtime)
2249         res += tmp - pcrtable->base_pcrtime;
2250       else
2251         res = GST_CLOCK_TIME_NONE;
2252     }
2253   } else if (packetizer->calculate_offset && pcrtable->groups) {
2254     gint64 refpcr = G_MAXINT64, refpcroffset;
2255     PCROffsetGroup *group = pcrtable->current->group;
2256 
2257     /* Generic calculation:
2258      * Stream Time = PTS - first group PCR + group PCR_offset
2259      *
2260      * In case of wrapover:
2261      * Stream Time = PTS + MAX_PCR - first group PCR + group PCR_offset
2262      * (which we actually do by using first group PCR -= MAX_PCR in order
2263      *  to end up with the same calculation as for non-wrapover) */
2264 
2265     if (group) {
2266       /* If we have a current group the value is pretty much guaranteed */
2267       GST_DEBUG ("Using current First PCR:%" GST_TIME_FORMAT " offset:%"
2268           G_GUINT64_FORMAT " PCR_offset:%" GST_TIME_FORMAT,
2269           GST_TIME_ARGS (PCRTIME_TO_GSTTIME (group->first_pcr)),
2270           group->first_offset,
2271           GST_TIME_ARGS (PCRTIME_TO_GSTTIME (group->pcr_offset)));
2272       refpcr = group->first_pcr;
2273       refpcroffset = group->pcr_offset;
2274       if (pts < PCRTIME_TO_GSTTIME (refpcr)) {
2275         /* Only apply wrapover if we're certain it is, and avoid
2276          * returning bogus values if it's a PTS/DTS which is *just*
2277          * before the start of the current group
2278          */
2279         if (PCRTIME_TO_GSTTIME (refpcr) - pts > GST_SECOND) {
2280           pts += PCR_GST_MAX_VALUE;
2281         } else
2282           refpcr = G_MAXINT64;
2283       }
2284     } else {
2285       GList *tmp;
2286       /* Otherwise, find a suitable group */
2287 
2288       GST_DEBUG ("Find group for current offset %" G_GUINT64_FORMAT,
2289           packetizer->offset);
2290 
2291       for (tmp = pcrtable->groups; tmp; tmp = tmp->next) {
2292         PCROffsetGroup *tgroup = tmp->data;
2293         GST_DEBUG ("Trying First PCR:%" GST_TIME_FORMAT " offset:%"
2294             G_GUINT64_FORMAT " PCR_offset:%" GST_TIME_FORMAT,
2295             GST_TIME_ARGS (PCRTIME_TO_GSTTIME (tgroup->first_pcr)),
2296             tgroup->first_offset,
2297             GST_TIME_ARGS (PCRTIME_TO_GSTTIME (tgroup->pcr_offset)));
2298         /* Gone too far ? */
2299         if (tgroup->first_offset > packetizer->offset) {
2300           /* If there isn't a pending reset, use that value */
2301           if (group) {
2302             GST_DEBUG ("PTS is %" GST_TIME_FORMAT " into group",
2303                 GST_TIME_ARGS (pts - PCRTIME_TO_GSTTIME (group->first_pcr)));
2304           }
2305           break;
2306         }
2307         group = tgroup;
2308         /* In that group ? */
2309         if (group->first_offset + group->values[group->last_value].offset >
2310             packetizer->offset) {
2311           GST_DEBUG ("PTS is %" GST_TIME_FORMAT " into group",
2312               GST_TIME_ARGS (pts - PCRTIME_TO_GSTTIME (group->first_pcr)));
2313           break;
2314         }
2315       }
2316       if (group && !(group->flags & PCR_GROUP_FLAG_RESET)) {
2317         GST_DEBUG ("Using group !");
2318         refpcr = group->first_pcr;
2319         refpcroffset = group->pcr_offset;
2320         if (pts < PCRTIME_TO_GSTTIME (refpcr)) {
2321           if (PCRTIME_TO_GSTTIME (refpcr) - pts > GST_SECOND)
2322             pts += PCR_GST_MAX_VALUE;
2323           else
2324             refpcr = G_MAXINT64;
2325         }
2326       }
2327     }
2328     if (refpcr != G_MAXINT64)
2329       res =
2330           pts - PCRTIME_TO_GSTTIME (refpcr) + PCRTIME_TO_GSTTIME (refpcroffset);
2331     else
2332       GST_WARNING ("No groups, can't calculate timestamp");
2333   } else
2334     GST_WARNING ("Not enough information to calculate proper timestamp");
2335 
2336   PACKETIZER_GROUP_UNLOCK (packetizer);
2337 
2338   GST_DEBUG ("Returning timestamp %" GST_TIME_FORMAT " for pts %"
2339       GST_TIME_FORMAT " pcr_pid:0x%04x", GST_TIME_ARGS (res),
2340       GST_TIME_ARGS (pts), pcr_pid);
2341   return res;
2342 }
2343 
2344 /* Stream time to offset */
2345 guint64
mpegts_packetizer_ts_to_offset(MpegTSPacketizer2 * packetizer,GstClockTime ts,guint16 pcr_pid)2346 mpegts_packetizer_ts_to_offset (MpegTSPacketizer2 * packetizer,
2347     GstClockTime ts, guint16 pcr_pid)
2348 {
2349   MpegTSPCR *pcrtable;
2350   guint64 res;
2351   PCROffsetGroup *nextgroup = NULL, *prevgroup = NULL;
2352   guint64 querypcr, firstpcr, lastpcr, firstoffset, lastoffset;
2353   PCROffsetCurrent *current;
2354   GList *tmp;
2355 
2356   if (!packetizer->calculate_offset)
2357     return -1;
2358 
2359   PACKETIZER_GROUP_LOCK (packetizer);
2360   pcrtable = get_pcr_table (packetizer, pcr_pid);
2361 
2362   if (pcrtable->groups == NULL) {
2363     PACKETIZER_GROUP_UNLOCK (packetizer);
2364     return -1;
2365   }
2366 
2367   querypcr = GSTTIME_TO_PCRTIME (ts);
2368 
2369   GST_DEBUG ("Searching offset for ts %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
2370 
2371   /* First check if we're within the current pending group */
2372   current = pcrtable->current;
2373   if (current && current->group && (querypcr >= current->group->pcr_offset) &&
2374       querypcr - current->group->pcr_offset <=
2375       current->pending[current->last].pcr) {
2376     GST_DEBUG ("pcr is in current group");
2377     nextgroup = current->group;
2378     goto calculate_points;
2379   }
2380 
2381   /* Find the neighbouring groups */
2382   for (tmp = pcrtable->groups; tmp; tmp = tmp->next) {
2383     nextgroup = (PCROffsetGroup *) tmp->data;
2384 
2385     GST_DEBUG ("Trying group PCR %" GST_TIME_FORMAT " (offset %"
2386         G_GUINT64_FORMAT " pcr_offset %" GST_TIME_FORMAT,
2387         GST_TIME_ARGS (PCRTIME_TO_GSTTIME (nextgroup->first_pcr)),
2388         nextgroup->first_offset,
2389         GST_TIME_ARGS (PCRTIME_TO_GSTTIME (nextgroup->pcr_offset)));
2390 
2391     /* Check if we've gone too far */
2392     if (nextgroup->pcr_offset > querypcr) {
2393       GST_DEBUG ("pcr is before that group");
2394       break;
2395     }
2396 
2397     if (tmp->next == NULL) {
2398       GST_DEBUG ("pcr is beyond last group");
2399       break;
2400     }
2401 
2402     prevgroup = nextgroup;
2403 
2404     /* Maybe it's in this group */
2405     if (nextgroup->values[nextgroup->last_value].pcr +
2406         nextgroup->pcr_offset >= querypcr) {
2407       GST_DEBUG ("pcr is in that group");
2408       break;
2409     }
2410   }
2411 
2412 calculate_points:
2413 
2414   GST_DEBUG ("nextgroup:%p, prevgroup:%p", nextgroup, prevgroup);
2415 
2416   if (nextgroup == prevgroup || prevgroup == NULL) {
2417     /* We use the current group to calculate position:
2418      * * if the PCR is within this group
2419      * * if there is only one group to use for calculation
2420      */
2421     GST_DEBUG ("In group or after last one");
2422     lastoffset = firstoffset = nextgroup->first_offset;
2423     lastpcr = firstpcr = nextgroup->pcr_offset;
2424     if (current && nextgroup == current->group) {
2425       lastoffset += current->pending[current->last].offset;
2426       lastpcr += current->pending[current->last].pcr;
2427     } else {
2428       lastoffset += nextgroup->values[nextgroup->last_value].offset;
2429       lastpcr += nextgroup->values[nextgroup->last_value].pcr;
2430     }
2431   } else {
2432     GST_DEBUG ("Between group");
2433     lastoffset = nextgroup->first_offset;
2434     lastpcr = nextgroup->pcr_offset;
2435     firstoffset =
2436         prevgroup->values[prevgroup->last_value].offset +
2437         prevgroup->first_offset;
2438     firstpcr =
2439         prevgroup->values[prevgroup->last_value].pcr + prevgroup->pcr_offset;
2440   }
2441 
2442   PACKETIZER_GROUP_UNLOCK (packetizer);
2443 
2444   GST_DEBUG ("Using prev PCR %" G_GUINT64_FORMAT " offset %" G_GUINT64_FORMAT,
2445       firstpcr, firstoffset);
2446   GST_DEBUG ("Using last PCR %" G_GUINT64_FORMAT " offset %" G_GUINT64_FORMAT,
2447       lastpcr, lastoffset);
2448 
2449   res = firstoffset;
2450   if (lastpcr != firstpcr)
2451     res += gst_util_uint64_scale (querypcr - firstpcr,
2452         lastoffset - firstoffset, lastpcr - firstpcr);
2453 
2454   GST_DEBUG ("Returning offset %" G_GUINT64_FORMAT " for ts %"
2455       GST_TIME_FORMAT, res, GST_TIME_ARGS (ts));
2456 
2457   return res;
2458 }
2459 
2460 void
mpegts_packetizer_set_reference_offset(MpegTSPacketizer2 * packetizer,guint64 refoffset)2461 mpegts_packetizer_set_reference_offset (MpegTSPacketizer2 * packetizer,
2462     guint64 refoffset)
2463 {
2464   GST_DEBUG ("Setting reference offset to %" G_GUINT64_FORMAT, refoffset);
2465 
2466   PACKETIZER_GROUP_LOCK (packetizer);
2467   packetizer->refoffset = refoffset;
2468   PACKETIZER_GROUP_UNLOCK (packetizer);
2469 }
2470 
2471 void
mpegts_packetizer_set_pcr_discont_threshold(MpegTSPacketizer2 * packetizer,GstClockTime threshold)2472 mpegts_packetizer_set_pcr_discont_threshold (MpegTSPacketizer2 * packetizer,
2473     GstClockTime threshold)
2474 {
2475   PACKETIZER_GROUP_LOCK (packetizer);
2476   packetizer->pcr_discont_threshold = threshold;
2477   PACKETIZER_GROUP_UNLOCK (packetizer);
2478 }
2479 
2480 void
mpegts_packetizer_set_current_pcr_offset(MpegTSPacketizer2 * packetizer,GstClockTime offset,guint16 pcr_pid)2481 mpegts_packetizer_set_current_pcr_offset (MpegTSPacketizer2 * packetizer,
2482     GstClockTime offset, guint16 pcr_pid)
2483 {
2484   guint64 pcr_offset;
2485   gint64 delta;
2486   MpegTSPCR *pcrtable;
2487   PCROffsetGroup *group;
2488   GList *tmp;
2489   gboolean apply = FALSE;
2490 
2491   /* fast path */
2492   PACKETIZER_GROUP_LOCK (packetizer);
2493   pcrtable = get_pcr_table (packetizer, pcr_pid);
2494 
2495   if (pcrtable == NULL || pcrtable->current->group == NULL) {
2496     PACKETIZER_GROUP_UNLOCK (packetizer);
2497     return;
2498   }
2499 
2500   pcr_offset = GSTTIME_TO_PCRTIME (offset);
2501 
2502   /* Pick delta from *first* group */
2503   if (pcrtable->groups)
2504     group = pcrtable->groups->data;
2505   else
2506     group = pcrtable->current->group;
2507   GST_DEBUG ("Current group PCR %" GST_TIME_FORMAT " (offset %"
2508       G_GUINT64_FORMAT " pcr_offset %" GST_TIME_FORMAT,
2509       GST_TIME_ARGS (PCRTIME_TO_GSTTIME (group->first_pcr)),
2510       group->first_offset,
2511       GST_TIME_ARGS (PCRTIME_TO_GSTTIME (group->pcr_offset)));
2512 
2513   /* Remember the difference between previous initial pcr_offset and
2514    * new initial pcr_offset */
2515   delta = pcr_offset - group->pcr_offset;
2516   if (delta == 0) {
2517     GST_DEBUG ("No shift to apply");
2518     PACKETIZER_GROUP_UNLOCK (packetizer);
2519     return;
2520   }
2521   GST_DEBUG ("Shifting groups by %" GST_TIME_FORMAT
2522       " for new initial pcr_offset %" GST_TIME_FORMAT,
2523       GST_TIME_ARGS (PCRTIME_TO_GSTTIME (delta)), GST_TIME_ARGS (offset));
2524 
2525   for (tmp = pcrtable->groups; tmp; tmp = tmp->next) {
2526     PCROffsetGroup *tgroup = (tmp->data);
2527     if (tgroup == group)
2528       apply = TRUE;
2529     if (apply) {
2530       tgroup->pcr_offset += delta;
2531       GST_DEBUG ("Update group PCR %" GST_TIME_FORMAT " (offset %"
2532           G_GUINT64_FORMAT " pcr_offset %" GST_TIME_FORMAT,
2533           GST_TIME_ARGS (PCRTIME_TO_GSTTIME (tgroup->first_pcr)),
2534           tgroup->first_offset,
2535           GST_TIME_ARGS (PCRTIME_TO_GSTTIME (tgroup->pcr_offset)));
2536     } else
2537       GST_DEBUG ("Not modifying group PCR %" GST_TIME_FORMAT " (offset %"
2538           G_GUINT64_FORMAT " pcr_offset %" GST_TIME_FORMAT,
2539           GST_TIME_ARGS (PCRTIME_TO_GSTTIME (tgroup->first_pcr)),
2540           tgroup->first_offset,
2541           GST_TIME_ARGS (PCRTIME_TO_GSTTIME (tgroup->pcr_offset)));
2542   }
2543   PACKETIZER_GROUP_UNLOCK (packetizer);
2544 }
2545