1 /*
2  * gstrtponviftimestamp.h
3  *
4  * Copyright (C) 2014 Axis Communications AB
5  *  Author: Guillaume Desmottes <guillaume.desmottes@collabora.com>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, see <http://www.gnu.org/licenses/>.
19  */
20 
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24 
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 
29 #include <gst/rtp/gstrtpbuffer.h>
30 
31 #include "gstrtponviftimestamp.h"
32 
33 #define GST_NTP_OFFSET_EVENT_NAME "GstNtpOffset"
34 
35 #define DEFAULT_NTP_OFFSET GST_CLOCK_TIME_NONE
36 #define DEFAULT_CSEQ 0
37 #define DEFAULT_SET_E_BIT FALSE
38 #define DEFAULT_SET_T_BIT FALSE
39 
40 GST_DEBUG_CATEGORY_STATIC (rtponviftimestamp_debug);
41 #define GST_CAT_DEFAULT (rtponviftimestamp_debug)
42 
43 static GstFlowReturn gst_rtp_onvif_timestamp_chain (GstPad * pad,
44     GstObject * parent, GstBuffer * buf);
45 static GstFlowReturn gst_rtp_onvif_timestamp_chain_list (GstPad * pad,
46     GstObject * parent, GstBufferList * list);
47 
48 static GstFlowReturn handle_and_push_buffer (GstRtpOnvifTimestamp * self,
49     GstBuffer * buf);
50 static GstFlowReturn handle_and_push_buffer_list (GstRtpOnvifTimestamp * self,
51     GstBufferList * list);
52 
53 static GstStaticPadTemplate sink_template_factory =
54 GST_STATIC_PAD_TEMPLATE ("sink",
55     GST_PAD_SINK,
56     GST_PAD_ALWAYS,
57     GST_STATIC_CAPS ("application/x-rtp")
58     );
59 
60 static GstStaticPadTemplate src_template_factory =
61 GST_STATIC_PAD_TEMPLATE ("src",
62     GST_PAD_SRC,
63     GST_PAD_ALWAYS,
64     GST_STATIC_CAPS ("application/x-rtp")
65     );
66 
67 enum
68 {
69   PROP_0,
70   PROP_NTP_OFFSET,
71   PROP_CSEQ,
72   PROP_SET_E_BIT,
73   PROP_SET_T_BIT,
74 };
75 
76 /*static guint gst_rtp_onvif_timestamp_signals[LAST_SIGNAL] = { 0 }; */
77 
78 G_DEFINE_TYPE (GstRtpOnvifTimestamp, gst_rtp_onvif_timestamp, GST_TYPE_ELEMENT);
79 
80 static void
gst_rtp_onvif_timestamp_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)81 gst_rtp_onvif_timestamp_get_property (GObject * object,
82     guint prop_id, GValue * value, GParamSpec * pspec)
83 {
84   GstRtpOnvifTimestamp *self = GST_RTP_ONVIF_TIMESTAMP (object);
85 
86   switch (prop_id) {
87     case PROP_NTP_OFFSET:
88       g_value_set_uint64 (value, self->prop_ntp_offset);
89       break;
90     case PROP_CSEQ:
91       g_value_set_uint (value, self->prop_cseq);
92       break;
93     case PROP_SET_E_BIT:
94       g_value_set_boolean (value, self->prop_set_e_bit);
95       break;
96     case PROP_SET_T_BIT:
97       g_value_set_boolean (value, self->prop_set_t_bit);
98       break;
99     default:
100       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
101       break;
102   }
103 }
104 
105 static void
gst_rtp_onvif_timestamp_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)106 gst_rtp_onvif_timestamp_set_property (GObject * object,
107     guint prop_id, const GValue * value, GParamSpec * pspec)
108 {
109   GstRtpOnvifTimestamp *self = GST_RTP_ONVIF_TIMESTAMP (object);
110 
111   switch (prop_id) {
112     case PROP_NTP_OFFSET:
113       self->prop_ntp_offset = g_value_get_uint64 (value);
114       break;
115     case PROP_CSEQ:
116       self->prop_cseq = g_value_get_uint (value);
117       break;
118     case PROP_SET_E_BIT:
119       self->prop_set_e_bit = g_value_get_boolean (value);
120       break;
121     case PROP_SET_T_BIT:
122       self->prop_set_t_bit = g_value_get_boolean (value);
123       break;
124     default:
125       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
126       break;
127   }
128 }
129 
130 /* send cached buffer or list, and events, if present */
131 static GstFlowReturn
send_cached_buffer_and_events(GstRtpOnvifTimestamp * self)132 send_cached_buffer_and_events (GstRtpOnvifTimestamp * self)
133 {
134   GstFlowReturn ret = GST_FLOW_OK;
135 
136   g_assert (!(self->buffer && self->list));
137 
138   if (self->buffer) {
139     GST_DEBUG_OBJECT (self, "pushing %" GST_PTR_FORMAT, self->buffer);
140     ret = handle_and_push_buffer (self, self->buffer);
141     self->buffer = NULL;
142   }
143   if (self->list) {
144     GST_DEBUG_OBJECT (self, "pushing %" GST_PTR_FORMAT, self->list);
145     ret = handle_and_push_buffer_list (self, self->list);
146     self->list = NULL;
147   }
148 
149   if (ret != GST_FLOW_OK)
150     goto out;
151 
152   while (!g_queue_is_empty (self->event_queue)) {
153     GstEvent *event;
154 
155     event = GST_EVENT_CAST (g_queue_pop_head (self->event_queue));
156     GST_LOG_OBJECT (self->sinkpad, "sending %" GST_PTR_FORMAT, event);
157     (void) gst_pad_send_event (self->sinkpad, event);
158   }
159 
160 out:
161   return ret;
162 }
163 
164 static void
purge_cached_buffer_and_events(GstRtpOnvifTimestamp * self)165 purge_cached_buffer_and_events (GstRtpOnvifTimestamp * self)
166 {
167   g_assert (!(self->buffer && self->list));
168 
169   if (self->buffer) {
170     GST_DEBUG_OBJECT (self, "purging %" GST_PTR_FORMAT, self->buffer);
171     gst_buffer_unref (self->buffer);
172     self->buffer = NULL;
173   }
174   if (self->list) {
175     GST_DEBUG_OBJECT (self, "purging %" GST_PTR_FORMAT, self->list);
176     gst_buffer_list_unref (self->list);
177     self->list = NULL;
178   }
179 
180   while (!g_queue_is_empty (self->event_queue)) {
181     GstEvent *event;
182 
183     event = GST_EVENT_CAST (g_queue_pop_head (self->event_queue));
184     gst_event_unref (event);
185   }
186 }
187 
188 static GstStateChangeReturn
gst_rtp_onvif_timestamp_change_state(GstElement * element,GstStateChange transition)189 gst_rtp_onvif_timestamp_change_state (GstElement * element,
190     GstStateChange transition)
191 {
192   GstRtpOnvifTimestamp *self = GST_RTP_ONVIF_TIMESTAMP (element);
193   GstStateChangeReturn ret;
194 
195   switch (transition) {
196     case GST_STATE_CHANGE_READY_TO_PAUSED:
197       self->ntp_offset = self->prop_ntp_offset;
198       GST_DEBUG_OBJECT (self, "ntp-offset: %" GST_TIME_FORMAT,
199           GST_TIME_ARGS (self->ntp_offset));
200       self->set_d_bit = TRUE;
201       self->set_e_bit = FALSE;
202       self->set_t_bit = FALSE;
203       break;
204     default:
205       break;
206   }
207 
208   ret = GST_ELEMENT_CLASS (gst_rtp_onvif_timestamp_parent_class)->change_state
209       (element, transition);
210 
211   if (ret == GST_STATE_CHANGE_FAILURE)
212     return ret;
213 
214   switch (transition) {
215     case GST_STATE_CHANGE_PAUSED_TO_READY:
216       purge_cached_buffer_and_events (self);
217       gst_segment_init (&self->segment, GST_FORMAT_UNDEFINED);
218       break;
219     default:
220       break;
221   }
222 
223   return ret;
224 }
225 
226 static void
gst_rtp_onvif_timestamp_finalize(GObject * object)227 gst_rtp_onvif_timestamp_finalize (GObject * object)
228 {
229   GstRtpOnvifTimestamp *self = GST_RTP_ONVIF_TIMESTAMP (object);
230 
231   g_queue_free (self->event_queue);
232 
233   G_OBJECT_CLASS (gst_rtp_onvif_timestamp_parent_class)->finalize (object);
234 }
235 
236 static void
gst_rtp_onvif_timestamp_class_init(GstRtpOnvifTimestampClass * klass)237 gst_rtp_onvif_timestamp_class_init (GstRtpOnvifTimestampClass * klass)
238 {
239   GObjectClass *gobject_class;
240   GstElementClass *gstelement_class;
241 
242   gobject_class = G_OBJECT_CLASS (klass);
243   gstelement_class = GST_ELEMENT_CLASS (klass);
244 
245   gobject_class->get_property = gst_rtp_onvif_timestamp_get_property;
246   gobject_class->set_property = gst_rtp_onvif_timestamp_set_property;
247   gobject_class->finalize = gst_rtp_onvif_timestamp_finalize;
248 
249   g_object_class_install_property (gobject_class, PROP_NTP_OFFSET,
250       g_param_spec_uint64 ("ntp-offset", "NTP offset",
251           "Offset between the pipeline running time and the absolute UTC time, "
252           "in nano-seconds since 1900 (-1 for automatic computation)",
253           0, G_MAXUINT64,
254           DEFAULT_NTP_OFFSET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
255 
256   g_object_class_install_property (gobject_class, PROP_CSEQ,
257       g_param_spec_uint ("cseq", "CSeq",
258           "The RTSP CSeq which initiated the playback",
259           0, G_MAXUINT32,
260           DEFAULT_CSEQ, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
261 
262   g_object_class_install_property (gobject_class, PROP_SET_E_BIT,
263       g_param_spec_boolean ("set-e-bit", "Set 'E' bit",
264           "If the element should set the 'E' bit as defined in the ONVIF RTP "
265           "extension. This increases latency by one packet",
266           DEFAULT_SET_E_BIT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
267 
268   g_object_class_install_property (gobject_class, PROP_SET_T_BIT,
269       g_param_spec_boolean ("set-t-bit", "Set 'T' bit",
270           "If the element should set the 'T' bit as defined in the ONVIF RTP "
271           "extension. This increases latency by one packet",
272           DEFAULT_SET_T_BIT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
273 
274   /* register pads */
275   gst_element_class_add_static_pad_template (gstelement_class,
276       &sink_template_factory);
277   gst_element_class_add_static_pad_template (gstelement_class,
278       &src_template_factory);
279 
280   gst_element_class_set_static_metadata (gstelement_class,
281       "ONVIF NTP timestamps RTP extension", "Effect/RTP",
282       "Add absolute timestamps and flags of recorded data in a playback "
283       "session", "Guillaume Desmottes <guillaume.desmottes@collabora.com>");
284 
285   gstelement_class->change_state =
286       GST_DEBUG_FUNCPTR (gst_rtp_onvif_timestamp_change_state);
287 
288   GST_DEBUG_CATEGORY_INIT (rtponviftimestamp_debug, "rtponviftimestamp",
289       0, "ONVIF NTP timestamps RTP extension");
290 }
291 
292 static gboolean
parse_event_ntp_offset(GstRtpOnvifTimestamp * self,GstEvent * event,GstClockTime * offset,gboolean * discont)293 parse_event_ntp_offset (GstRtpOnvifTimestamp * self, GstEvent * event,
294     GstClockTime * offset, gboolean * discont)
295 {
296   const GstStructure *structure = gst_event_get_structure (event);
297   GstClockTime event_offset;
298   gboolean event_discont;
299 
300   if (!gst_structure_get_clock_time (structure, "ntp-offset", &event_offset)) {
301     GST_ERROR_OBJECT (self, "no ntp-offset in %" GST_PTR_FORMAT, event);
302     return FALSE;
303   }
304   if (!gst_structure_get_boolean (structure, "discont", &event_discont)) {
305     GST_ERROR_OBJECT (self, "no discontinue in %" GST_PTR_FORMAT, event);
306     return FALSE;
307   }
308 
309   if (offset)
310     *offset = event_offset;
311 
312   if (discont)
313     *discont = event_discont;
314 
315   return TRUE;
316 }
317 
318 static gboolean
gst_rtp_onvif_timestamp_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)319 gst_rtp_onvif_timestamp_sink_event (GstPad * pad, GstObject * parent,
320     GstEvent * event)
321 {
322   GstRtpOnvifTimestamp *self = GST_RTP_ONVIF_TIMESTAMP (parent);
323   gboolean drop = FALSE;
324   gboolean ret = TRUE;
325 
326   GST_DEBUG_OBJECT (pad, "handling event %s", GST_EVENT_TYPE_NAME (event));
327 
328   /* handle serialized events, which, should not be enqueued */
329   switch (GST_EVENT_TYPE (event)) {
330     case GST_EVENT_CUSTOM_DOWNSTREAM:
331       /* if the "set-e-bit" property is set, an offset event might mark the
332        * stream as discontinued. We need to check if the currently cached buffer
333        * needs the e-bit before it's pushed */
334       if (self->buffer != NULL && self->prop_set_e_bit &&
335           gst_event_has_name (event, GST_NTP_OFFSET_EVENT_NAME)) {
336         gboolean discont;
337         if (parse_event_ntp_offset (self, event, NULL, &discont)) {
338           GST_DEBUG_OBJECT (self, "stream %s discontinued",
339               (discont ? "is" : "is not"));
340           self->set_e_bit = discont;
341         } else {
342           drop = TRUE;
343           ret = FALSE;
344           goto out;
345         }
346       }
347       break;
348     case GST_EVENT_EOS:
349     {
350       GstFlowReturn res;
351 
352       /* Push pending buffers, if any */
353       self->set_e_bit = TRUE;
354       if (self->prop_set_t_bit)
355         self->set_t_bit = TRUE;
356       res = send_cached_buffer_and_events (self);
357       if (res != GST_FLOW_OK) {
358         drop = TRUE;
359         ret = FALSE;
360         goto out;
361       }
362       break;
363     }
364     case GST_EVENT_FLUSH_STOP:
365       purge_cached_buffer_and_events (self);
366       self->set_d_bit = TRUE;
367       self->set_e_bit = FALSE;
368       self->set_t_bit = FALSE;
369       gst_segment_init (&self->segment, GST_FORMAT_UNDEFINED);
370       break;
371     default:
372       break;
373   }
374 
375   /* enqueue serialized events if there is a cached buffer */
376   if (GST_EVENT_IS_SERIALIZED (event) && (self->buffer || self->list)) {
377     GST_DEBUG ("enqueueing serialized event");
378     g_queue_push_tail (self->event_queue, event);
379     event = NULL;
380     goto out;
381   }
382 
383   /* handle rest of the events */
384   switch (GST_EVENT_TYPE (event)) {
385     case GST_EVENT_CUSTOM_DOWNSTREAM:
386       /* update the ntp-offset after any cached buffer/buffer list has been
387        * pushed. the d-bit of the next buffer/buffer list should be set if
388        * the stream is discontinued */
389       if (gst_event_has_name (event, GST_NTP_OFFSET_EVENT_NAME)) {
390         GstClockTime offset;
391         gboolean discont;
392         if (parse_event_ntp_offset (self, event, &offset, &discont)) {
393           GST_DEBUG_OBJECT (self, "new ntp-offset: %" GST_TIME_FORMAT
394               ", stream %s discontinued", GST_TIME_ARGS (offset),
395               (discont ? "is" : "is not"));
396           self->ntp_offset = offset;
397           self->set_d_bit = discont;
398         } else {
399           ret = FALSE;
400         }
401         drop = TRUE;
402       }
403       break;
404     case GST_EVENT_SEGMENT:
405       gst_event_copy_segment (event, &self->segment);
406       break;
407     default:
408       break;
409   }
410 
411 out:
412   if (drop)
413     gst_event_unref (event);
414   else if (event)
415     ret = gst_pad_event_default (pad, parent, event);
416 
417   return ret;
418 }
419 
420 static void
gst_rtp_onvif_timestamp_init(GstRtpOnvifTimestamp * self)421 gst_rtp_onvif_timestamp_init (GstRtpOnvifTimestamp * self)
422 {
423   self->sinkpad =
424       gst_pad_new_from_static_template (&sink_template_factory, "sink");
425   gst_pad_set_chain_function (self->sinkpad, gst_rtp_onvif_timestamp_chain);
426   gst_pad_set_chain_list_function (self->sinkpad,
427       gst_rtp_onvif_timestamp_chain_list);
428   gst_pad_set_event_function (self->sinkpad,
429       gst_rtp_onvif_timestamp_sink_event);
430   gst_element_add_pad (GST_ELEMENT (self), self->sinkpad);
431   GST_PAD_SET_PROXY_CAPS (self->sinkpad);
432   GST_PAD_SET_PROXY_ALLOCATION (self->sinkpad);
433 
434   self->srcpad =
435       gst_pad_new_from_static_template (&src_template_factory, "src");
436   gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
437 
438   self->prop_ntp_offset = DEFAULT_NTP_OFFSET;
439   self->prop_set_e_bit = DEFAULT_SET_E_BIT;
440   self->prop_set_t_bit = DEFAULT_SET_T_BIT;
441 
442   gst_segment_init (&self->segment, GST_FORMAT_UNDEFINED);
443 
444   self->event_queue = g_queue_new ();
445   self->buffer = NULL;
446   self->list = NULL;
447 }
448 
449 #define EXTENSION_ID 0xABAC
450 #define EXTENSION_SIZE 3
451 
452 static gboolean
handle_buffer(GstRtpOnvifTimestamp * self,GstBuffer * buf)453 handle_buffer (GstRtpOnvifTimestamp * self, GstBuffer * buf)
454 {
455   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
456   guint8 *data;
457   guint16 bits;
458   guint wordlen;
459   guint64 time;
460   guint8 field = 0;
461 
462   if (!GST_CLOCK_TIME_IS_VALID (self->ntp_offset)) {
463     GstClock *clock = gst_element_get_clock (GST_ELEMENT (self));
464 
465     if (clock) {
466       GstClockTime clock_time = gst_clock_get_time (clock);
467       guint64 real_time = g_get_real_time ();
468       GstClockTime running_time = clock_time -
469           gst_element_get_base_time (GST_ELEMENT (self));
470 
471       /* convert microseconds to nanoseconds */
472       real_time *= 1000;
473 
474       /* add constant to convert from 1970 based time to 1900 based time */
475       real_time += (G_GUINT64_CONSTANT (2208988800) * GST_SECOND);
476 
477       self->ntp_offset = real_time - running_time;
478 
479       GST_DEBUG_OBJECT (self, "new ntp-offset: %" GST_TIME_FORMAT,
480           GST_TIME_ARGS (self->ntp_offset));
481 
482       gst_object_unref (clock);
483     } else {
484       GST_ELEMENT_ERROR (self, STREAM, FAILED, ("No ntp-offset present"),
485           ("Can not guess ntp-offset with no clock."));
486       /* Received a buffer in PAUSED, so we can't guess the match
487        * between the running time and the NTP clock yet.
488        */
489       return FALSE;
490     }
491   }
492 
493   if (self->segment.format != GST_FORMAT_TIME) {
494     GST_ELEMENT_ERROR (self, STREAM, FAILED,
495         ("did not receive a time segment yet"), (NULL));
496     return FALSE;
497   }
498 
499   if (!gst_rtp_buffer_map (buf, GST_MAP_READWRITE, &rtp)) {
500     GST_ELEMENT_ERROR (self, STREAM, FAILED,
501         ("Failed to map RTP buffer"), (NULL));
502     return FALSE;
503   }
504 
505   if (!gst_rtp_buffer_set_extension_data (&rtp, EXTENSION_ID, EXTENSION_SIZE)) {
506     GST_ELEMENT_ERROR (self, STREAM, FAILED, ("Failed to set extension data"),
507         (NULL));
508     gst_rtp_buffer_unmap (&rtp);
509     return FALSE;
510   }
511 
512   if (!gst_rtp_buffer_get_extension_data (&rtp, &bits, (gpointer) & data,
513           &wordlen)) {
514     GST_ELEMENT_ERROR (self, STREAM, FAILED, ("Failed to get extension data"),
515         (NULL));
516     gst_rtp_buffer_unmap (&rtp);
517     return FALSE;
518   }
519 
520   /* NTP timestamp */
521   if (GST_BUFFER_PTS_IS_VALID (buf)) {
522     time = gst_segment_to_stream_time (&self->segment, GST_FORMAT_TIME,
523         GST_BUFFER_PTS (buf));
524   } else if (GST_BUFFER_DTS_IS_VALID (buf)) {
525     time = gst_segment_to_stream_time (&self->segment, GST_FORMAT_TIME,
526         GST_BUFFER_DTS (buf));
527   } else {
528     GST_INFO_OBJECT (self,
529         "Buffer doesn't contain any valid DTS or PTS timestamp");
530     goto done;
531   }
532 
533   if (time == GST_CLOCK_TIME_NONE) {
534     GST_ERROR_OBJECT (self, "Failed to get stream time");
535     goto done;
536   }
537 
538   /* add the offset (in seconds) */
539   time += self->ntp_offset;
540 
541   /* convert to NTP time. upper 32 bits should contain the seconds
542    * and the lower 32 bits, the fractions of a second. */
543   time = gst_util_uint64_scale (time, (G_GINT64_CONSTANT (1) << 32),
544       GST_SECOND);
545 
546   GST_DEBUG_OBJECT (self, "timestamp: %" G_GUINT64_FORMAT, time);
547 
548   GST_WRITE_UINT64_BE (data, time);
549 
550   /* The next byte is composed of: C E D T mbz (4 bits) */
551 
552   /* Set C if the buffer does *not* have the DELTA_UNIT flag as it means
553    * that's a key frame (or 'clean point'). */
554   if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
555     GST_DEBUG_OBJECT (self, "set C flag");
556     field |= (1 << 7);
557   }
558 
559   /* Set E if the next buffer has DISCONT */
560   if (self->set_e_bit) {
561     GST_DEBUG_OBJECT (self, "set E flag");
562     field |= (1 << 6);
563     self->set_e_bit = FALSE;
564   }
565 
566   /* Set D if the buffer has the DISCONT flag */
567   if (self->set_d_bit) {
568     GST_DEBUG_OBJECT (self, "set D flag");
569     field |= (1 << 5);
570     self->set_d_bit = FALSE;
571   }
572 
573   /* Set T if we have received EOS */
574   if (self->set_t_bit) {
575     GST_DEBUG_OBJECT (self, "set T flag");
576     field |= (1 << 4);
577     self->set_t_bit = FALSE;
578   }
579 
580   GST_WRITE_UINT8 (data + 8, field);
581 
582   /* CSeq (low-order byte) */
583   GST_WRITE_UINT8 (data + 9, (guchar) self->prop_cseq);
584 
585   memset (data + 10, 0, 3);
586 
587 done:
588   gst_rtp_buffer_unmap (&rtp);
589   return TRUE;
590 }
591 
592 /* @buf: (transfer full) */
593 static GstFlowReturn
handle_and_push_buffer(GstRtpOnvifTimestamp * self,GstBuffer * buf)594 handle_and_push_buffer (GstRtpOnvifTimestamp * self, GstBuffer * buf)
595 {
596   if (!handle_buffer (self, buf)) {
597     gst_buffer_unref (buf);
598     return GST_FLOW_ERROR;
599   }
600 
601   return gst_pad_push (self->srcpad, buf);
602 }
603 
604 static GstFlowReturn
gst_rtp_onvif_timestamp_chain(GstPad * pad,GstObject * parent,GstBuffer * buf)605 gst_rtp_onvif_timestamp_chain (GstPad * pad, GstObject * parent,
606     GstBuffer * buf)
607 {
608   GstRtpOnvifTimestamp *self = GST_RTP_ONVIF_TIMESTAMP (parent);
609   GstFlowReturn result = GST_FLOW_OK;
610 
611   if (!self->prop_set_e_bit && !self->prop_set_t_bit) {
612     /* Modify and push this buffer right away */
613     return handle_and_push_buffer (self, buf);
614   }
615 
616   /* send any previously cached item(s), this leaves an empty queue */
617   result = send_cached_buffer_and_events (self);
618 
619   /* enqueue the new item, as the only item in the queue */
620   self->buffer = buf;
621   return result;
622 }
623 
624 /* @buf: (transfer full) */
625 static GstFlowReturn
handle_and_push_buffer_list(GstRtpOnvifTimestamp * self,GstBufferList * list)626 handle_and_push_buffer_list (GstRtpOnvifTimestamp * self, GstBufferList * list)
627 {
628   GstBuffer *buf;
629 
630   /* Set the extension on the *first* buffer */
631   buf = gst_buffer_list_get (list, 0);
632   if (!handle_buffer (self, buf)) {
633     gst_buffer_list_unref (list);
634     return GST_FLOW_ERROR;
635   }
636 
637   return gst_pad_push_list (self->srcpad, list);
638 }
639 
640 /* gst_pad_chain_list_default() refs the buffer when passing it to the chain
641  * function, making it not writable. We implement our own chain_list function
642  * to avoid having to copy each buffer. */
643 static GstFlowReturn
gst_rtp_onvif_timestamp_chain_list(GstPad * pad,GstObject * parent,GstBufferList * list)644 gst_rtp_onvif_timestamp_chain_list (GstPad * pad, GstObject * parent,
645     GstBufferList * list)
646 {
647   GstRtpOnvifTimestamp *self = GST_RTP_ONVIF_TIMESTAMP (parent);
648   GstFlowReturn result = GST_FLOW_OK;
649 
650   if (!self->prop_set_e_bit && !self->prop_set_t_bit) {
651     return handle_and_push_buffer_list (self, list);
652   }
653 
654   /* send any previously cached item(s), this leaves an empty queue */
655   result = send_cached_buffer_and_events (self);
656 
657   /* enqueue the new item, as the only item in the queue */
658   self->list = list;
659   return result;
660 }
661