1 /* GStreamer
2 * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
3 * Copyright (C) 2006 Andy Wingo <wingo@pobox.com>
4 * Copyright (C) 2008 Vincent Penquerc'h <ogg.k.ogg.k@googlemail.com>
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
15 *
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19 * Boston, MA 02110-1301, USA.
20 */
21
22 /**
23 * SECTION:element-kateparse
24 * @title: kateparse
25 * @short_description: parses kate streams
26 * @see_also: katedec, vorbisparse, oggdemux, theoraparse
27 *
28 * The kateparse element will parse the header packets of the Kate
29 * stream and put them as the streamheader in the caps. This is used in the
30 * multifdsink case where you want to stream live kate streams to multiple
31 * clients, each client has to receive the streamheaders first before they can
32 * consume the kate packets.
33 *
34 * This element also makes sure that the buffers that it pushes out are properly
35 * timestamped and that their offset and offset_end are set. The buffers that
36 * kateparse outputs have all of the metadata that oggmux expects to receive,
37 * which allows you to (for example) remux an ogg/kate file.
38 *
39 * ## Example pipelines
40 *
41 * |[
42 * gst-launch-1.0 -v filesrc location=kate.ogg ! oggdemux ! kateparse ! fakesink
43 * ]|
44 * This pipeline shows that the streamheader is set in the caps, and that each
45 * buffer has the timestamp, duration, offset, and offset_end set.
46 *
47 * |[
48 * gst-launch-1.0 filesrc location=kate.ogg ! oggdemux ! kateparse \
49 * ! oggmux ! filesink location=kate-remuxed.ogg
50 * ]|
51 * This pipeline shows remuxing. kate-remuxed.ogg might not be exactly the same
52 * as kate.ogg, but they should produce exactly the same decoded data.
53 *
54 */
55
56 #ifdef HAVE_CONFIG_H
57 # include "config.h"
58 #endif
59
60 #include "gstkate.h"
61 #include "gstkateutil.h"
62 #include "gstkateparse.h"
63
64 GST_DEBUG_CATEGORY_EXTERN (gst_kateparse_debug);
65 #define GST_CAT_DEFAULT gst_kateparse_debug
66
67 static GstStaticPadTemplate gst_kate_parse_sink_factory =
68 GST_STATIC_PAD_TEMPLATE ("sink",
69 GST_PAD_SINK,
70 GST_PAD_ALWAYS,
71 GST_STATIC_CAPS ("subtitle/x-kate; application/x-kate")
72 );
73
74 static GstStaticPadTemplate gst_kate_parse_src_factory =
75 GST_STATIC_PAD_TEMPLATE ("src",
76 GST_PAD_SRC,
77 GST_PAD_ALWAYS,
78 GST_STATIC_CAPS ("subtitle/x-kate; application/x-kate")
79 );
80
81 #define gst_kate_parse_parent_class parent_class
82 G_DEFINE_TYPE (GstKateParse, gst_kate_parse, GST_TYPE_ELEMENT);
83
84 static GstFlowReturn gst_kate_parse_chain (GstPad * pad, GstObject * parent,
85 GstBuffer * buffer);
86 static GstStateChangeReturn gst_kate_parse_change_state (GstElement * element,
87 GstStateChange transition);
88 static gboolean gst_kate_parse_sink_event (GstPad * pad, GstObject * parent,
89 GstEvent * event);
90 static gboolean gst_kate_parse_src_query (GstPad * pad, GstObject * parent,
91 GstQuery * query);
92 #if 0
93 static gboolean gst_kate_parse_convert (GstPad * pad,
94 GstFormat src_format, gint64 src_value,
95 GstFormat * dest_format, gint64 * dest_value);
96 #endif
97 static GstFlowReturn gst_kate_parse_parse_packet (GstKateParse * parse,
98 GstBuffer * buf);
99
100 static void
gst_kate_parse_class_init(GstKateParseClass * klass)101 gst_kate_parse_class_init (GstKateParseClass * klass)
102 {
103 GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
104
105 gstelement_class->change_state = gst_kate_parse_change_state;
106
107 gst_element_class_add_static_pad_template (gstelement_class,
108 &gst_kate_parse_src_factory);
109 gst_element_class_add_static_pad_template (gstelement_class,
110 &gst_kate_parse_sink_factory);
111
112 gst_element_class_set_static_metadata (gstelement_class, "Kate stream parser",
113 "Codec/Parser/Subtitle",
114 "parse raw kate streams",
115 "Vincent Penquerc'h <ogg.k.ogg.k at googlemail dot com>");
116
117 klass->parse_packet = GST_DEBUG_FUNCPTR (gst_kate_parse_parse_packet);
118 }
119
120 static void
gst_kate_parse_init(GstKateParse * parse)121 gst_kate_parse_init (GstKateParse * parse)
122 {
123 parse->sinkpad =
124 gst_pad_new_from_static_template (&gst_kate_parse_sink_factory, "sink");
125 gst_pad_set_chain_function (parse->sinkpad,
126 GST_DEBUG_FUNCPTR (gst_kate_parse_chain));
127 gst_pad_set_event_function (parse->sinkpad,
128 GST_DEBUG_FUNCPTR (gst_kate_parse_sink_event));
129 gst_element_add_pad (GST_ELEMENT (parse), parse->sinkpad);
130
131 parse->srcpad =
132 gst_pad_new_from_static_template (&gst_kate_parse_src_factory, "src");
133 gst_pad_set_query_function (parse->srcpad,
134 GST_DEBUG_FUNCPTR (gst_kate_parse_src_query));
135 gst_element_add_pad (GST_ELEMENT (parse), parse->srcpad);
136 }
137
138 static void
gst_kate_parse_drain_event_queue(GstKateParse * parse)139 gst_kate_parse_drain_event_queue (GstKateParse * parse)
140 {
141 while (parse->event_queue->length) {
142 GstEvent *event;
143
144 event = GST_EVENT_CAST (g_queue_pop_head (parse->event_queue));
145 gst_pad_event_default (parse->sinkpad, NULL, event);
146 }
147 }
148
149 static GstFlowReturn
gst_kate_parse_push_headers(GstKateParse * parse)150 gst_kate_parse_push_headers (GstKateParse * parse)
151 {
152 /* mark and put on caps */
153 GstCaps *caps;
154 GstBuffer *outbuf;
155 kate_packet packet;
156 GList *headers, *outbuf_list = NULL;
157 int ret;
158 gboolean res;
159
160 /* get the headers into the caps, passing them to kate as we go */
161 caps =
162 gst_kate_util_set_header_on_caps (&parse->element,
163 gst_pad_get_current_caps (parse->sinkpad), parse->streamheader);
164
165 if (G_UNLIKELY (!caps)) {
166 GST_ELEMENT_ERROR (parse, STREAM, DECODE, (NULL),
167 ("Failed to set headers on caps"));
168 return GST_FLOW_ERROR;
169 }
170
171 GST_DEBUG_OBJECT (parse, "here are the caps: %" GST_PTR_FORMAT, caps);
172 res = gst_pad_set_caps (parse->srcpad, caps);
173 gst_caps_unref (caps);
174 if (G_UNLIKELY (!res)) {
175 GST_WARNING_OBJECT (parse->srcpad, "Failed to set caps on source pad");
176 return GST_FLOW_NOT_NEGOTIATED;
177 }
178
179 headers = parse->streamheader;
180 while (headers) {
181 GstMapInfo info;
182
183 outbuf = GST_BUFFER_CAST (headers->data);
184
185 if (!gst_buffer_map (outbuf, &info, GST_MAP_READ)) {
186 GST_WARNING_OBJECT (outbuf, "Failed to map buffer");
187 continue;
188 }
189
190 kate_packet_wrap (&packet, info.size, info.data);
191 ret = kate_decode_headerin (&parse->ki, &parse->kc, &packet);
192 if (G_UNLIKELY (ret < 0)) {
193 GST_WARNING_OBJECT (parse, "Failed to decode header: %s",
194 gst_kate_util_get_error_message (ret));
195 }
196 gst_buffer_unmap (outbuf, &info);
197 /* takes ownership of outbuf, which was previously in parse->streamheader */
198 GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_HEADER);
199 outbuf_list = g_list_append (outbuf_list, outbuf);
200 headers = headers->next;
201 }
202
203 /* first process queued events */
204 gst_kate_parse_drain_event_queue (parse);
205
206 /* push out buffers, ignoring return value... */
207 headers = outbuf_list;
208 while (headers) {
209 outbuf = GST_BUFFER_CAST (headers->data);
210 gst_pad_push (parse->srcpad, outbuf);
211 headers = headers->next;
212 }
213
214 g_list_free (outbuf_list);
215 g_list_free (parse->streamheader);
216 parse->streamheader = NULL;
217
218 parse->streamheader_sent = TRUE;
219
220 return GST_FLOW_OK;
221 }
222
223 static void
gst_kate_parse_clear_queue(GstKateParse * parse)224 gst_kate_parse_clear_queue (GstKateParse * parse)
225 {
226 GST_DEBUG_OBJECT (parse, "Clearing queue");
227 while (parse->buffer_queue->length) {
228 GstBuffer *buf;
229
230 buf = GST_BUFFER_CAST (g_queue_pop_head (parse->buffer_queue));
231 gst_buffer_unref (buf);
232 }
233 while (parse->event_queue->length) {
234 GstEvent *event;
235
236 event = GST_EVENT_CAST (g_queue_pop_head (parse->event_queue));
237 gst_event_unref (event);
238 }
239 }
240
241 static GstFlowReturn
gst_kate_parse_push_buffer(GstKateParse * parse,GstBuffer * buf,gint64 granulepos)242 gst_kate_parse_push_buffer (GstKateParse * parse, GstBuffer * buf,
243 gint64 granulepos)
244 {
245 GST_LOG_OBJECT (parse, "granulepos %16" G_GINT64_MODIFIER "x", granulepos);
246 if (granulepos < 0) {
247 /* packets coming not from Ogg won't have a granpos in the offset end,
248 so we have to synthesize one here - only problem is we don't know
249 the backlink - pretend there's none for now */
250 GST_INFO_OBJECT (parse, "No granulepos on buffer, synthesizing one");
251 granulepos =
252 kate_duration_granule (&parse->ki,
253 GST_BUFFER_TIMESTAMP (buf) /
254 (double) GST_SECOND) << kate_granule_shift (&parse->ki);
255 }
256 GST_BUFFER_OFFSET (buf) =
257 kate_granule_time (&parse->ki, granulepos) * GST_SECOND;
258 GST_BUFFER_OFFSET_END (buf) = granulepos;
259 GST_BUFFER_TIMESTAMP (buf) = GST_BUFFER_OFFSET (buf);
260
261 return gst_pad_push (parse->srcpad, buf);
262 }
263
264 static GstFlowReturn
gst_kate_parse_drain_queue_prematurely(GstKateParse * parse)265 gst_kate_parse_drain_queue_prematurely (GstKateParse * parse)
266 {
267 GstFlowReturn ret = GST_FLOW_OK;
268
269 /* got an EOS event, make sure to push out any buffers that were in the queue
270 * -- won't normally be the case, but this catches the
271 * didn't-get-a-granulepos-on-the-last-packet case. Assuming a continuous
272 * stream. */
273
274 /* if we got EOS before any buffers came, go ahead and push the other events
275 * first */
276 gst_kate_parse_drain_event_queue (parse);
277
278 while (!g_queue_is_empty (parse->buffer_queue)) {
279 GstBuffer *buf;
280 gint64 granpos;
281
282 buf = GST_BUFFER_CAST (g_queue_pop_head (parse->buffer_queue));
283
284 granpos = GST_BUFFER_OFFSET_END (buf);
285 ret = gst_kate_parse_push_buffer (parse, buf, granpos);
286
287 if (ret != GST_FLOW_OK)
288 goto done;
289 }
290
291 g_assert (g_queue_is_empty (parse->buffer_queue));
292
293 done:
294 return ret;
295 }
296
297 static GstFlowReturn
gst_kate_parse_drain_queue(GstKateParse * parse,gint64 granulepos)298 gst_kate_parse_drain_queue (GstKateParse * parse, gint64 granulepos)
299 {
300 GstFlowReturn ret = GST_FLOW_OK;
301
302 if (!g_queue_is_empty (parse->buffer_queue)) {
303 GstBuffer *buf;
304 buf = GST_BUFFER_CAST (g_queue_pop_head (parse->buffer_queue));
305 ret = gst_kate_parse_push_buffer (parse, buf, granulepos);
306
307 if (ret != GST_FLOW_OK)
308 goto done;
309 }
310 g_assert (g_queue_is_empty (parse->buffer_queue));
311
312 done:
313 return ret;
314 }
315
316 static GstFlowReturn
gst_kate_parse_queue_buffer(GstKateParse * parse,GstBuffer * buf)317 gst_kate_parse_queue_buffer (GstKateParse * parse, GstBuffer * buf)
318 {
319 GstFlowReturn ret = GST_FLOW_OK;
320 gint64 granpos;
321
322 buf = gst_buffer_make_writable (buf);
323
324 /* oggdemux stores the granule pos in the offset end */
325 granpos = GST_BUFFER_OFFSET_END (buf);
326 GST_LOG_OBJECT (parse, "granpos %16" G_GINT64_MODIFIER "x", granpos);
327 g_queue_push_tail (parse->buffer_queue, buf);
328
329 #if 1
330 /* if getting buffers from matroska, we won't have a granpos here... */
331 //if (GST_BUFFER_OFFSET_END_IS_VALID (buf)) {
332 ret = gst_kate_parse_drain_queue (parse, granpos);
333 //}
334 #else
335 if (granpos >= 0) {
336 ret = gst_kate_parse_drain_queue (parse, granpos);
337 } else {
338 GST_ELEMENT_ERROR (parse, STREAM, DECODE, (NULL),
339 ("Bad granulepos %" G_GINT64_FORMAT, granpos));
340 ret = GST_FLOW_ERROR;
341 }
342 #endif
343
344 return ret;
345 }
346
347 static GstFlowReturn
gst_kate_parse_parse_packet(GstKateParse * parse,GstBuffer * buf)348 gst_kate_parse_parse_packet (GstKateParse * parse, GstBuffer * buf)
349 {
350 GstFlowReturn ret = GST_FLOW_OK;
351 guint8 header[1];
352 gsize size;
353
354 g_assert (parse);
355
356 parse->packetno++;
357
358 size = gst_buffer_extract (buf, 0, header, 1);
359
360 GST_LOG_OBJECT (parse, "Got packet %02x, %" G_GSIZE_FORMAT " bytes",
361 size ? header[0] : -1, gst_buffer_get_size (buf));
362
363 if (size > 0 && header[0] & 0x80) {
364 GST_DEBUG_OBJECT (parse, "Found header %02x", header[0]);
365 /* if 0x80 is set, it's streamheader,
366 * so put it on the streamheader list and return */
367 parse->streamheader = g_list_append (parse->streamheader, buf);
368 ret = GST_FLOW_OK;
369 } else {
370 if (!parse->streamheader_sent) {
371 GST_DEBUG_OBJECT (parse, "Found non header, pushing headers seen so far");
372 ret = gst_kate_parse_push_headers (parse);
373 }
374
375 if (ret == GST_FLOW_OK) {
376 ret = gst_kate_parse_queue_buffer (parse, buf);
377 }
378 }
379
380 return ret;
381 }
382
383 static GstFlowReturn
gst_kate_parse_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)384 gst_kate_parse_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
385 {
386 GstKateParseClass *klass;
387 GstKateParse *parse;
388
389 parse = GST_KATE_PARSE (parent);
390 klass = GST_KATE_PARSE_CLASS (G_OBJECT_GET_CLASS (parse));
391
392 g_assert (klass->parse_packet != NULL);
393
394 if (G_UNLIKELY (!gst_pad_has_current_caps (pad)))
395 return GST_FLOW_NOT_NEGOTIATED;
396
397 return klass->parse_packet (parse, buffer);
398 }
399
400 static gboolean
gst_kate_parse_queue_event(GstKateParse * parse,GstEvent * event)401 gst_kate_parse_queue_event (GstKateParse * parse, GstEvent * event)
402 {
403 GstFlowReturn ret = TRUE;
404
405 g_queue_push_tail (parse->event_queue, event);
406
407 return ret;
408 }
409
410 static gboolean
gst_kate_parse_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)411 gst_kate_parse_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
412 {
413 gboolean ret;
414 GstKateParse *parse;
415
416 parse = GST_KATE_PARSE (parent);
417
418 switch (GST_EVENT_TYPE (event)) {
419 case GST_EVENT_FLUSH_STOP:
420 gst_kate_parse_clear_queue (parse);
421 ret = gst_pad_event_default (pad, parent, event);
422 break;
423 case GST_EVENT_EOS:
424 if (!parse->streamheader_sent) {
425 GST_DEBUG_OBJECT (parse, "Got EOS, pushing headers seen so far");
426 ret = gst_kate_parse_push_headers (parse);
427 if (ret != GST_FLOW_OK)
428 break;
429 }
430 gst_kate_parse_drain_queue_prematurely (parse);
431 ret = gst_pad_event_default (pad, parent, event);
432 break;
433 default:
434 if (!parse->streamheader_sent && GST_EVENT_IS_SERIALIZED (event)
435 && GST_EVENT_TYPE (event) > GST_EVENT_CAPS)
436 ret = gst_kate_parse_queue_event (parse, event);
437 else
438 ret = gst_pad_event_default (pad, parent, event);
439 break;
440 }
441
442 return ret;
443 }
444
445 #if 0
446 static gboolean
447 gst_kate_parse_convert (GstPad * pad,
448 GstFormat src_format, gint64 src_value,
449 GstFormat * dest_format, gint64 * dest_value)
450 {
451 gboolean res = TRUE;
452 GstKateParse *parse;
453
454 parse = GST_KATE_PARSE (GST_PAD_PARENT (pad));
455
456 /* fixme: assumes atomic access to lots of instance variables modified from
457 * the streaming thread, including 64-bit variables */
458
459 if (!parse->streamheader_sent)
460 return FALSE;
461
462 if (src_format == *dest_format) {
463 *dest_value = src_value;
464 return TRUE;
465 }
466
467 if (parse->sinkpad == pad &&
468 (src_format == GST_FORMAT_BYTES || *dest_format == GST_FORMAT_BYTES))
469 return FALSE;
470
471 switch (src_format) {
472 case GST_FORMAT_TIME:
473 switch (*dest_format) {
474 default:
475 res = FALSE;
476 }
477 break;
478 case GST_FORMAT_DEFAULT:
479 switch (*dest_format) {
480 case GST_FORMAT_TIME:
481 *dest_value = kate_granule_time (&parse->ki, src_value) * GST_SECOND;
482 break;
483 default:
484 res = FALSE;
485 }
486 break;
487 default:
488 res = FALSE;
489 }
490
491 return res;
492 }
493 #endif
494
495 static gboolean
gst_kate_parse_src_query(GstPad * pad,GstObject * parent,GstQuery * query)496 gst_kate_parse_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
497 {
498 #if 1
499 // TODO
500 GST_WARNING ("gst_kate_parse_src_query");
501 return FALSE;
502 #else
503 gint64 granulepos;
504 GstKateParse *parse;
505 gboolean res = FALSE;
506
507 parse = GST_KATE_PARSE (parent);
508
509 switch (GST_QUERY_TYPE (query)) {
510 case GST_QUERY_POSITION:
511 {
512 GstFormat format;
513 gint64 value;
514
515 granulepos = parse->prev_granulepos;
516
517 gst_query_parse_position (query, &format, NULL);
518
519 /* and convert to the final format */
520 if (!(res =
521 gst_kate_parse_convert (pad, GST_FORMAT_DEFAULT, granulepos,
522 &format, &value)))
523 goto error;
524
525 /* fixme: support segments
526 value = (value - parse->segment_start) + parse->segment_time;
527 */
528
529 gst_query_set_position (query, format, value);
530
531 GST_LOG_OBJECT (parse, "query %p: peer returned granulepos: %"
532 G_GUINT64_FORMAT " - we return %" G_GUINT64_FORMAT " (format %u)",
533 query, granulepos, value, format);
534
535 break;
536 }
537 case GST_QUERY_DURATION:
538 {
539 /* fixme: not threadsafe */
540 /* query peer for total length */
541 if (!gst_pad_is_linked (parse->sinkpad)) {
542 GST_WARNING_OBJECT (parse, "sink pad %" GST_PTR_FORMAT " is not linked",
543 parse->sinkpad);
544 goto error;
545 }
546 if (!(res = gst_pad_query (GST_PAD_PEER (parse->sinkpad), query)))
547 goto error;
548 break;
549 }
550 case GST_QUERY_CONVERT:
551 {
552 GstFormat src_fmt, dest_fmt;
553 gint64 src_val, dest_val;
554
555 gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val);
556 if (!(res =
557 gst_kate_parse_convert (pad, src_fmt, src_val, &dest_fmt,
558 &dest_val)))
559 goto error;
560 gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val);
561 break;
562 }
563 default:
564 res = gst_pad_query_default (pad, query);
565 break;
566 }
567 return res;
568
569 error:
570 {
571 GST_WARNING_OBJECT (parse, "error handling query");
572 return res;
573 }
574 #endif
575 }
576
577 static void
gst_kate_parse_free_stream_headers(GstKateParse * parse)578 gst_kate_parse_free_stream_headers (GstKateParse * parse)
579 {
580 while (parse->streamheader != NULL) {
581 gst_buffer_unref (GST_BUFFER (parse->streamheader->data));
582 parse->streamheader = g_list_delete_link (parse->streamheader,
583 parse->streamheader);
584 }
585 }
586
587 static GstStateChangeReturn
gst_kate_parse_change_state(GstElement * element,GstStateChange transition)588 gst_kate_parse_change_state (GstElement * element, GstStateChange transition)
589 {
590 GstKateParse *parse = GST_KATE_PARSE (element);
591 GstStateChangeReturn ret;
592
593 switch (transition) {
594 case GST_STATE_CHANGE_READY_TO_PAUSED:
595 kate_info_init (&parse->ki);
596 kate_comment_init (&parse->kc);
597 parse->packetno = 0;
598 parse->streamheader_sent = FALSE;
599 parse->streamheader = NULL;
600 parse->buffer_queue = g_queue_new ();
601 parse->event_queue = g_queue_new ();
602 break;
603 default:
604 break;
605 }
606
607 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
608
609 switch (transition) {
610 case GST_STATE_CHANGE_PAUSED_TO_READY:
611 kate_info_clear (&parse->ki);
612 kate_comment_clear (&parse->kc);
613
614 gst_kate_parse_clear_queue (parse);
615 g_queue_free (parse->buffer_queue);
616 parse->buffer_queue = NULL;
617 g_queue_free (parse->event_queue);
618 parse->event_queue = NULL;
619 gst_kate_parse_free_stream_headers (parse);
620 break;
621
622 default:
623 break;
624 }
625
626 return ret;
627 }
628