1 /* GStreamer
2  * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
3  * Copyright (C) 2006 Andy Wingo <wingo@pobox.com>
4  * Copyright (C) 2008 Vincent Penquerc'h <ogg.k.ogg.k@googlemail.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 
22 /**
23  * SECTION:element-kateparse
24  * @title: kateparse
25  * @short_description: parses kate streams
26  * @see_also: katedec, vorbisparse, oggdemux, theoraparse
27  *
28  * The kateparse element will parse the header packets of the Kate
29  * stream and put them as the streamheader in the caps. This is used in the
30  * multifdsink case where you want to stream live kate streams to multiple
31  * clients, each client has to receive the streamheaders first before they can
32  * consume the kate packets.
33  *
34  * This element also makes sure that the buffers that it pushes out are properly
35  * timestamped and that their offset and offset_end are set. The buffers that
36  * kateparse outputs have all of the metadata that oggmux expects to receive,
37  * which allows you to (for example) remux an ogg/kate file.
38  *
39  * ## Example pipelines
40  *
41  * |[
42  * gst-launch-1.0 -v filesrc location=kate.ogg ! oggdemux ! kateparse ! fakesink
43  * ]|
44  * This pipeline shows that the streamheader is set in the caps, and that each
45  * buffer has the timestamp, duration, offset, and offset_end set.
46  *
47  * |[
48  * gst-launch-1.0 filesrc location=kate.ogg ! oggdemux ! kateparse \
49  *            ! oggmux ! filesink location=kate-remuxed.ogg
50  * ]|
51  * This pipeline shows remuxing. kate-remuxed.ogg might not be exactly the same
52  * as kate.ogg, but they should produce exactly the same decoded data.
53  *
54  */
55 
56 #ifdef HAVE_CONFIG_H
57 #  include "config.h"
58 #endif
59 
60 #include "gstkate.h"
61 #include "gstkateutil.h"
62 #include "gstkateparse.h"
63 
64 GST_DEBUG_CATEGORY_EXTERN (gst_kateparse_debug);
65 #define GST_CAT_DEFAULT gst_kateparse_debug
66 
67 static GstStaticPadTemplate gst_kate_parse_sink_factory =
68     GST_STATIC_PAD_TEMPLATE ("sink",
69     GST_PAD_SINK,
70     GST_PAD_ALWAYS,
71     GST_STATIC_CAPS ("subtitle/x-kate; application/x-kate")
72     );
73 
74 static GstStaticPadTemplate gst_kate_parse_src_factory =
75     GST_STATIC_PAD_TEMPLATE ("src",
76     GST_PAD_SRC,
77     GST_PAD_ALWAYS,
78     GST_STATIC_CAPS ("subtitle/x-kate; application/x-kate")
79     );
80 
81 #define gst_kate_parse_parent_class parent_class
82 G_DEFINE_TYPE (GstKateParse, gst_kate_parse, GST_TYPE_ELEMENT);
83 
84 static GstFlowReturn gst_kate_parse_chain (GstPad * pad, GstObject * parent,
85     GstBuffer * buffer);
86 static GstStateChangeReturn gst_kate_parse_change_state (GstElement * element,
87     GstStateChange transition);
88 static gboolean gst_kate_parse_sink_event (GstPad * pad, GstObject * parent,
89     GstEvent * event);
90 static gboolean gst_kate_parse_src_query (GstPad * pad, GstObject * parent,
91     GstQuery * query);
92 #if 0
93 static gboolean gst_kate_parse_convert (GstPad * pad,
94     GstFormat src_format, gint64 src_value,
95     GstFormat * dest_format, gint64 * dest_value);
96 #endif
97 static GstFlowReturn gst_kate_parse_parse_packet (GstKateParse * parse,
98     GstBuffer * buf);
99 
100 static void
gst_kate_parse_class_init(GstKateParseClass * klass)101 gst_kate_parse_class_init (GstKateParseClass * klass)
102 {
103   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
104 
105   gstelement_class->change_state = gst_kate_parse_change_state;
106 
107   gst_element_class_add_static_pad_template (gstelement_class,
108       &gst_kate_parse_src_factory);
109   gst_element_class_add_static_pad_template (gstelement_class,
110       &gst_kate_parse_sink_factory);
111 
112   gst_element_class_set_static_metadata (gstelement_class, "Kate stream parser",
113       "Codec/Parser/Subtitle",
114       "parse raw kate streams",
115       "Vincent Penquerc'h <ogg.k.ogg.k at googlemail dot com>");
116 
117   klass->parse_packet = GST_DEBUG_FUNCPTR (gst_kate_parse_parse_packet);
118 }
119 
120 static void
gst_kate_parse_init(GstKateParse * parse)121 gst_kate_parse_init (GstKateParse * parse)
122 {
123   parse->sinkpad =
124       gst_pad_new_from_static_template (&gst_kate_parse_sink_factory, "sink");
125   gst_pad_set_chain_function (parse->sinkpad,
126       GST_DEBUG_FUNCPTR (gst_kate_parse_chain));
127   gst_pad_set_event_function (parse->sinkpad,
128       GST_DEBUG_FUNCPTR (gst_kate_parse_sink_event));
129   gst_element_add_pad (GST_ELEMENT (parse), parse->sinkpad);
130 
131   parse->srcpad =
132       gst_pad_new_from_static_template (&gst_kate_parse_src_factory, "src");
133   gst_pad_set_query_function (parse->srcpad,
134       GST_DEBUG_FUNCPTR (gst_kate_parse_src_query));
135   gst_element_add_pad (GST_ELEMENT (parse), parse->srcpad);
136 }
137 
138 static void
gst_kate_parse_drain_event_queue(GstKateParse * parse)139 gst_kate_parse_drain_event_queue (GstKateParse * parse)
140 {
141   while (parse->event_queue->length) {
142     GstEvent *event;
143 
144     event = GST_EVENT_CAST (g_queue_pop_head (parse->event_queue));
145     gst_pad_event_default (parse->sinkpad, NULL, event);
146   }
147 }
148 
149 static GstFlowReturn
gst_kate_parse_push_headers(GstKateParse * parse)150 gst_kate_parse_push_headers (GstKateParse * parse)
151 {
152   /* mark and put on caps */
153   GstCaps *caps;
154   GstBuffer *outbuf;
155   kate_packet packet;
156   GList *headers, *outbuf_list = NULL;
157   int ret;
158   gboolean res;
159 
160   /* get the headers into the caps, passing them to kate as we go */
161   caps =
162       gst_kate_util_set_header_on_caps (&parse->element,
163       gst_pad_get_current_caps (parse->sinkpad), parse->streamheader);
164 
165   if (G_UNLIKELY (!caps)) {
166     GST_ELEMENT_ERROR (parse, STREAM, DECODE, (NULL),
167         ("Failed to set headers on caps"));
168     return GST_FLOW_ERROR;
169   }
170 
171   GST_DEBUG_OBJECT (parse, "here are the caps: %" GST_PTR_FORMAT, caps);
172   res = gst_pad_set_caps (parse->srcpad, caps);
173   gst_caps_unref (caps);
174   if (G_UNLIKELY (!res)) {
175     GST_WARNING_OBJECT (parse->srcpad, "Failed to set caps on source pad");
176     return GST_FLOW_NOT_NEGOTIATED;
177   }
178 
179   headers = parse->streamheader;
180   while (headers) {
181     GstMapInfo info;
182 
183     outbuf = GST_BUFFER_CAST (headers->data);
184 
185     if (!gst_buffer_map (outbuf, &info, GST_MAP_READ)) {
186       GST_WARNING_OBJECT (outbuf, "Failed to map buffer");
187       continue;
188     }
189 
190     kate_packet_wrap (&packet, info.size, info.data);
191     ret = kate_decode_headerin (&parse->ki, &parse->kc, &packet);
192     if (G_UNLIKELY (ret < 0)) {
193       GST_WARNING_OBJECT (parse, "Failed to decode header: %s",
194           gst_kate_util_get_error_message (ret));
195     }
196     gst_buffer_unmap (outbuf, &info);
197     /* takes ownership of outbuf, which was previously in parse->streamheader */
198     GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_HEADER);
199     outbuf_list = g_list_append (outbuf_list, outbuf);
200     headers = headers->next;
201   }
202 
203   /* first process queued events */
204   gst_kate_parse_drain_event_queue (parse);
205 
206   /* push out buffers, ignoring return value... */
207   headers = outbuf_list;
208   while (headers) {
209     outbuf = GST_BUFFER_CAST (headers->data);
210     gst_pad_push (parse->srcpad, outbuf);
211     headers = headers->next;
212   }
213 
214   g_list_free (outbuf_list);
215   g_list_free (parse->streamheader);
216   parse->streamheader = NULL;
217 
218   parse->streamheader_sent = TRUE;
219 
220   return GST_FLOW_OK;
221 }
222 
223 static void
gst_kate_parse_clear_queue(GstKateParse * parse)224 gst_kate_parse_clear_queue (GstKateParse * parse)
225 {
226   GST_DEBUG_OBJECT (parse, "Clearing queue");
227   while (parse->buffer_queue->length) {
228     GstBuffer *buf;
229 
230     buf = GST_BUFFER_CAST (g_queue_pop_head (parse->buffer_queue));
231     gst_buffer_unref (buf);
232   }
233   while (parse->event_queue->length) {
234     GstEvent *event;
235 
236     event = GST_EVENT_CAST (g_queue_pop_head (parse->event_queue));
237     gst_event_unref (event);
238   }
239 }
240 
241 static GstFlowReturn
gst_kate_parse_push_buffer(GstKateParse * parse,GstBuffer * buf,gint64 granulepos)242 gst_kate_parse_push_buffer (GstKateParse * parse, GstBuffer * buf,
243     gint64 granulepos)
244 {
245   GST_LOG_OBJECT (parse, "granulepos %16" G_GINT64_MODIFIER "x", granulepos);
246   if (granulepos < 0) {
247     /* packets coming not from Ogg won't have a granpos in the offset end,
248        so we have to synthesize one here - only problem is we don't know
249        the backlink - pretend there's none for now */
250     GST_INFO_OBJECT (parse, "No granulepos on buffer, synthesizing one");
251     granulepos =
252         kate_duration_granule (&parse->ki,
253         GST_BUFFER_TIMESTAMP (buf) /
254         (double) GST_SECOND) << kate_granule_shift (&parse->ki);
255   }
256   GST_BUFFER_OFFSET (buf) =
257       kate_granule_time (&parse->ki, granulepos) * GST_SECOND;
258   GST_BUFFER_OFFSET_END (buf) = granulepos;
259   GST_BUFFER_TIMESTAMP (buf) = GST_BUFFER_OFFSET (buf);
260 
261   return gst_pad_push (parse->srcpad, buf);
262 }
263 
264 static GstFlowReturn
gst_kate_parse_drain_queue_prematurely(GstKateParse * parse)265 gst_kate_parse_drain_queue_prematurely (GstKateParse * parse)
266 {
267   GstFlowReturn ret = GST_FLOW_OK;
268 
269   /* got an EOS event, make sure to push out any buffers that were in the queue
270    * -- won't normally be the case, but this catches the
271    * didn't-get-a-granulepos-on-the-last-packet case. Assuming a continuous
272    * stream. */
273 
274   /* if we got EOS before any buffers came, go ahead and push the other events
275    * first */
276   gst_kate_parse_drain_event_queue (parse);
277 
278   while (!g_queue_is_empty (parse->buffer_queue)) {
279     GstBuffer *buf;
280     gint64 granpos;
281 
282     buf = GST_BUFFER_CAST (g_queue_pop_head (parse->buffer_queue));
283 
284     granpos = GST_BUFFER_OFFSET_END (buf);
285     ret = gst_kate_parse_push_buffer (parse, buf, granpos);
286 
287     if (ret != GST_FLOW_OK)
288       goto done;
289   }
290 
291   g_assert (g_queue_is_empty (parse->buffer_queue));
292 
293 done:
294   return ret;
295 }
296 
297 static GstFlowReturn
gst_kate_parse_drain_queue(GstKateParse * parse,gint64 granulepos)298 gst_kate_parse_drain_queue (GstKateParse * parse, gint64 granulepos)
299 {
300   GstFlowReturn ret = GST_FLOW_OK;
301 
302   if (!g_queue_is_empty (parse->buffer_queue)) {
303     GstBuffer *buf;
304     buf = GST_BUFFER_CAST (g_queue_pop_head (parse->buffer_queue));
305     ret = gst_kate_parse_push_buffer (parse, buf, granulepos);
306 
307     if (ret != GST_FLOW_OK)
308       goto done;
309   }
310   g_assert (g_queue_is_empty (parse->buffer_queue));
311 
312 done:
313   return ret;
314 }
315 
316 static GstFlowReturn
gst_kate_parse_queue_buffer(GstKateParse * parse,GstBuffer * buf)317 gst_kate_parse_queue_buffer (GstKateParse * parse, GstBuffer * buf)
318 {
319   GstFlowReturn ret = GST_FLOW_OK;
320   gint64 granpos;
321 
322   buf = gst_buffer_make_writable (buf);
323 
324   /* oggdemux stores the granule pos in the offset end */
325   granpos = GST_BUFFER_OFFSET_END (buf);
326   GST_LOG_OBJECT (parse, "granpos %16" G_GINT64_MODIFIER "x", granpos);
327   g_queue_push_tail (parse->buffer_queue, buf);
328 
329 #if 1
330   /* if getting buffers from matroska, we won't have a granpos here... */
331   //if (GST_BUFFER_OFFSET_END_IS_VALID (buf)) {
332   ret = gst_kate_parse_drain_queue (parse, granpos);
333   //}
334 #else
335   if (granpos >= 0) {
336     ret = gst_kate_parse_drain_queue (parse, granpos);
337   } else {
338     GST_ELEMENT_ERROR (parse, STREAM, DECODE, (NULL),
339         ("Bad granulepos %" G_GINT64_FORMAT, granpos));
340     ret = GST_FLOW_ERROR;
341   }
342 #endif
343 
344   return ret;
345 }
346 
347 static GstFlowReturn
gst_kate_parse_parse_packet(GstKateParse * parse,GstBuffer * buf)348 gst_kate_parse_parse_packet (GstKateParse * parse, GstBuffer * buf)
349 {
350   GstFlowReturn ret = GST_FLOW_OK;
351   guint8 header[1];
352   gsize size;
353 
354   g_assert (parse);
355 
356   parse->packetno++;
357 
358   size = gst_buffer_extract (buf, 0, header, 1);
359 
360   GST_LOG_OBJECT (parse, "Got packet %02x, %" G_GSIZE_FORMAT " bytes",
361       size ? header[0] : -1, gst_buffer_get_size (buf));
362 
363   if (size > 0 && header[0] & 0x80) {
364     GST_DEBUG_OBJECT (parse, "Found header %02x", header[0]);
365     /* if 0x80 is set, it's streamheader,
366      * so put it on the streamheader list and return */
367     parse->streamheader = g_list_append (parse->streamheader, buf);
368     ret = GST_FLOW_OK;
369   } else {
370     if (!parse->streamheader_sent) {
371       GST_DEBUG_OBJECT (parse, "Found non header, pushing headers seen so far");
372       ret = gst_kate_parse_push_headers (parse);
373     }
374 
375     if (ret == GST_FLOW_OK) {
376       ret = gst_kate_parse_queue_buffer (parse, buf);
377     }
378   }
379 
380   return ret;
381 }
382 
383 static GstFlowReturn
gst_kate_parse_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)384 gst_kate_parse_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
385 {
386   GstKateParseClass *klass;
387   GstKateParse *parse;
388 
389   parse = GST_KATE_PARSE (parent);
390   klass = GST_KATE_PARSE_CLASS (G_OBJECT_GET_CLASS (parse));
391 
392   g_assert (klass->parse_packet != NULL);
393 
394   if (G_UNLIKELY (!gst_pad_has_current_caps (pad)))
395     return GST_FLOW_NOT_NEGOTIATED;
396 
397   return klass->parse_packet (parse, buffer);
398 }
399 
400 static gboolean
gst_kate_parse_queue_event(GstKateParse * parse,GstEvent * event)401 gst_kate_parse_queue_event (GstKateParse * parse, GstEvent * event)
402 {
403   GstFlowReturn ret = TRUE;
404 
405   g_queue_push_tail (parse->event_queue, event);
406 
407   return ret;
408 }
409 
410 static gboolean
gst_kate_parse_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)411 gst_kate_parse_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
412 {
413   gboolean ret;
414   GstKateParse *parse;
415 
416   parse = GST_KATE_PARSE (parent);
417 
418   switch (GST_EVENT_TYPE (event)) {
419     case GST_EVENT_FLUSH_STOP:
420       gst_kate_parse_clear_queue (parse);
421       ret = gst_pad_event_default (pad, parent, event);
422       break;
423     case GST_EVENT_EOS:
424       if (!parse->streamheader_sent) {
425         GST_DEBUG_OBJECT (parse, "Got EOS, pushing headers seen so far");
426         ret = gst_kate_parse_push_headers (parse);
427         if (ret != GST_FLOW_OK)
428           break;
429       }
430       gst_kate_parse_drain_queue_prematurely (parse);
431       ret = gst_pad_event_default (pad, parent, event);
432       break;
433     default:
434       if (!parse->streamheader_sent && GST_EVENT_IS_SERIALIZED (event)
435           && GST_EVENT_TYPE (event) > GST_EVENT_CAPS)
436         ret = gst_kate_parse_queue_event (parse, event);
437       else
438         ret = gst_pad_event_default (pad, parent, event);
439       break;
440   }
441 
442   return ret;
443 }
444 
445 #if 0
446 static gboolean
447 gst_kate_parse_convert (GstPad * pad,
448     GstFormat src_format, gint64 src_value,
449     GstFormat * dest_format, gint64 * dest_value)
450 {
451   gboolean res = TRUE;
452   GstKateParse *parse;
453 
454   parse = GST_KATE_PARSE (GST_PAD_PARENT (pad));
455 
456   /* fixme: assumes atomic access to lots of instance variables modified from
457    * the streaming thread, including 64-bit variables */
458 
459   if (!parse->streamheader_sent)
460     return FALSE;
461 
462   if (src_format == *dest_format) {
463     *dest_value = src_value;
464     return TRUE;
465   }
466 
467   if (parse->sinkpad == pad &&
468       (src_format == GST_FORMAT_BYTES || *dest_format == GST_FORMAT_BYTES))
469     return FALSE;
470 
471   switch (src_format) {
472     case GST_FORMAT_TIME:
473       switch (*dest_format) {
474         default:
475           res = FALSE;
476       }
477       break;
478     case GST_FORMAT_DEFAULT:
479       switch (*dest_format) {
480         case GST_FORMAT_TIME:
481           *dest_value = kate_granule_time (&parse->ki, src_value) * GST_SECOND;
482           break;
483         default:
484           res = FALSE;
485       }
486       break;
487     default:
488       res = FALSE;
489   }
490 
491   return res;
492 }
493 #endif
494 
495 static gboolean
gst_kate_parse_src_query(GstPad * pad,GstObject * parent,GstQuery * query)496 gst_kate_parse_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
497 {
498 #if 1
499   // TODO
500   GST_WARNING ("gst_kate_parse_src_query");
501   return FALSE;
502 #else
503   gint64 granulepos;
504   GstKateParse *parse;
505   gboolean res = FALSE;
506 
507   parse = GST_KATE_PARSE (parent);
508 
509   switch (GST_QUERY_TYPE (query)) {
510     case GST_QUERY_POSITION:
511     {
512       GstFormat format;
513       gint64 value;
514 
515       granulepos = parse->prev_granulepos;
516 
517       gst_query_parse_position (query, &format, NULL);
518 
519       /* and convert to the final format */
520       if (!(res =
521               gst_kate_parse_convert (pad, GST_FORMAT_DEFAULT, granulepos,
522                   &format, &value)))
523         goto error;
524 
525       /* fixme: support segments
526          value = (value - parse->segment_start) + parse->segment_time;
527        */
528 
529       gst_query_set_position (query, format, value);
530 
531       GST_LOG_OBJECT (parse, "query %p: peer returned granulepos: %"
532           G_GUINT64_FORMAT " - we return %" G_GUINT64_FORMAT " (format %u)",
533           query, granulepos, value, format);
534 
535       break;
536     }
537     case GST_QUERY_DURATION:
538     {
539       /* fixme: not threadsafe */
540       /* query peer for total length */
541       if (!gst_pad_is_linked (parse->sinkpad)) {
542         GST_WARNING_OBJECT (parse, "sink pad %" GST_PTR_FORMAT " is not linked",
543             parse->sinkpad);
544         goto error;
545       }
546       if (!(res = gst_pad_query (GST_PAD_PEER (parse->sinkpad), query)))
547         goto error;
548       break;
549     }
550     case GST_QUERY_CONVERT:
551     {
552       GstFormat src_fmt, dest_fmt;
553       gint64 src_val, dest_val;
554 
555       gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val);
556       if (!(res =
557               gst_kate_parse_convert (pad, src_fmt, src_val, &dest_fmt,
558                   &dest_val)))
559         goto error;
560       gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val);
561       break;
562     }
563     default:
564       res = gst_pad_query_default (pad, query);
565       break;
566   }
567   return res;
568 
569 error:
570   {
571     GST_WARNING_OBJECT (parse, "error handling query");
572     return res;
573   }
574 #endif
575 }
576 
577 static void
gst_kate_parse_free_stream_headers(GstKateParse * parse)578 gst_kate_parse_free_stream_headers (GstKateParse * parse)
579 {
580   while (parse->streamheader != NULL) {
581     gst_buffer_unref (GST_BUFFER (parse->streamheader->data));
582     parse->streamheader = g_list_delete_link (parse->streamheader,
583         parse->streamheader);
584   }
585 }
586 
587 static GstStateChangeReturn
gst_kate_parse_change_state(GstElement * element,GstStateChange transition)588 gst_kate_parse_change_state (GstElement * element, GstStateChange transition)
589 {
590   GstKateParse *parse = GST_KATE_PARSE (element);
591   GstStateChangeReturn ret;
592 
593   switch (transition) {
594     case GST_STATE_CHANGE_READY_TO_PAUSED:
595       kate_info_init (&parse->ki);
596       kate_comment_init (&parse->kc);
597       parse->packetno = 0;
598       parse->streamheader_sent = FALSE;
599       parse->streamheader = NULL;
600       parse->buffer_queue = g_queue_new ();
601       parse->event_queue = g_queue_new ();
602       break;
603     default:
604       break;
605   }
606 
607   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
608 
609   switch (transition) {
610     case GST_STATE_CHANGE_PAUSED_TO_READY:
611       kate_info_clear (&parse->ki);
612       kate_comment_clear (&parse->kc);
613 
614       gst_kate_parse_clear_queue (parse);
615       g_queue_free (parse->buffer_queue);
616       parse->buffer_queue = NULL;
617       g_queue_free (parse->event_queue);
618       parse->event_queue = NULL;
619       gst_kate_parse_free_stream_headers (parse);
620       break;
621 
622     default:
623       break;
624   }
625 
626   return ret;
627 }
628