1 /* GStreamer plugin for forward error correction
2 * Copyright (C) 2017 Pexip
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
13 *
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with this library; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 *
18 * Author: Mikhail Fludkov <misha@pexip.com>
19 */
20
21 #include <gst/rtp/gstrtpbuffer.h>
22
23 #include "rtpstorage.h"
24 #include "rtpstoragestream.h"
25
26 #define GST_CAT_DEFAULT (gst_rtp_storage_debug)
27
28 enum
29 {
30 SIGNAL_PACKET_RECOVERED,
31 LAST_SIGNAL,
32 };
33
34 static guint rtp_storage_signals[LAST_SIGNAL] = { 0 };
35
36 G_DEFINE_TYPE (RtpStorage, rtp_storage, G_TYPE_OBJECT);
37
38 #define STORAGE_LOCK(s) g_mutex_lock (&(s)->streams_lock)
39 #define STORAGE_UNLOCK(s) g_mutex_unlock (&(s)->streams_lock)
40 #define DEFAULT_SIZE_TIME (0)
41
42 static void
rtp_storage_init(RtpStorage * self)43 rtp_storage_init (RtpStorage * self)
44 {
45 self->size_time = DEFAULT_SIZE_TIME;
46 self->streams = g_hash_table_new_full (NULL, NULL, NULL,
47 (GDestroyNotify) rtp_storage_stream_free);
48 g_mutex_init (&self->streams_lock);
49 }
50
51 static void
rtp_storage_dispose(GObject * obj)52 rtp_storage_dispose (GObject * obj)
53 {
54 RtpStorage *self = RTP_STORAGE (obj);
55 STORAGE_LOCK (self);
56 g_hash_table_unref (self->streams);
57 self->streams = NULL;
58 STORAGE_UNLOCK (self);
59 g_mutex_clear (&self->streams_lock);
60 G_OBJECT_CLASS (rtp_storage_parent_class)->dispose (obj);
61 }
62
63 static void
rtp_storage_class_init(RtpStorageClass * klass)64 rtp_storage_class_init (RtpStorageClass * klass)
65 {
66 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
67
68 rtp_storage_signals[SIGNAL_PACKET_RECOVERED] =
69 g_signal_new ("packet-recovered", G_TYPE_FROM_CLASS (klass),
70 G_SIGNAL_RUN_LAST, 0, NULL, NULL,
71 g_cclosure_marshal_generic, G_TYPE_NONE, 1, GST_TYPE_BUFFER);
72
73 gobject_class->dispose = rtp_storage_dispose;
74 }
75
76 GstBufferList *
rtp_storage_get_packets_for_recovery(RtpStorage * self,gint fec_pt,guint32 ssrc,guint16 lost_seq)77 rtp_storage_get_packets_for_recovery (RtpStorage * self, gint fec_pt,
78 guint32 ssrc, guint16 lost_seq)
79 {
80 GstBufferList *ret = NULL;
81 RtpStorageStream *stream;
82
83 if (0 == self->size_time) {
84 GST_WARNING_OBJECT (self, "Received request for recovery RTP packets"
85 " around lost_seqnum=%u fec_pt=%u for ssrc=%08x, but size is 0",
86 lost_seq, fec_pt, ssrc);
87 return NULL;
88 }
89
90 STORAGE_LOCK (self);
91 stream = g_hash_table_lookup (self->streams, GUINT_TO_POINTER (ssrc));
92 STORAGE_UNLOCK (self);
93
94 if (NULL == stream) {
95 GST_ERROR_OBJECT (self, "Cant find ssrc = 0x08%x", ssrc);
96 } else {
97 STREAM_LOCK (stream);
98 if (stream->queue.length > 0) {
99 GST_LOG_OBJECT (self, "Looking for recovery packets for fec_pt=%u around"
100 " lost_seq=%u for ssrc=%08x", fec_pt, lost_seq, ssrc);
101 ret =
102 rtp_storage_stream_get_packets_for_recovery (stream, fec_pt,
103 lost_seq);
104 } else {
105 GST_DEBUG_OBJECT (self, "Empty RTP storage for ssrc=%08x", ssrc);
106 }
107 STREAM_UNLOCK (stream);
108 }
109
110 return ret;
111 }
112
113 GstBuffer *
rtp_storage_get_redundant_packet(RtpStorage * self,guint32 ssrc,guint16 lost_seq)114 rtp_storage_get_redundant_packet (RtpStorage * self, guint32 ssrc,
115 guint16 lost_seq)
116 {
117 GstBuffer *ret = NULL;
118 RtpStorageStream *stream;
119
120 if (0 == self->size_time) {
121 GST_WARNING_OBJECT (self, "Received request for redundant RTP packet with"
122 " seq=%u for ssrc=%08x, but size is 0", lost_seq, ssrc);
123 return NULL;
124 }
125
126 STORAGE_LOCK (self);
127 stream = g_hash_table_lookup (self->streams, GUINT_TO_POINTER (ssrc));
128 STORAGE_UNLOCK (self);
129
130 if (NULL == stream) {
131 GST_ERROR_OBJECT (self, "Cant find ssrc = 0x%x", ssrc);
132 } else {
133 STREAM_LOCK (stream);
134 if (stream->queue.length > 0) {
135 ret = rtp_storage_stream_get_redundant_packet (stream, lost_seq);
136 } else {
137 GST_DEBUG_OBJECT (self, "Empty RTP storage for ssrc=%08x", ssrc);
138 }
139 STREAM_UNLOCK (stream);
140 }
141
142 return ret;
143 }
144
145 static void
rtp_storage_do_put_recovered_packet(RtpStorage * self,GstBuffer * buffer,guint8 pt,guint32 ssrc,guint16 seq)146 rtp_storage_do_put_recovered_packet (RtpStorage * self,
147 GstBuffer * buffer, guint8 pt, guint32 ssrc, guint16 seq)
148 {
149 RtpStorageStream *stream;
150
151 STORAGE_LOCK (self);
152 stream = g_hash_table_lookup (self->streams, GUINT_TO_POINTER (ssrc));
153 STORAGE_UNLOCK (self);
154
155 g_assert (stream);
156
157 GST_LOG_OBJECT (self,
158 "Storing recovered RTP packet with ssrc=%08x pt=%u seq=%u %"
159 GST_PTR_FORMAT, ssrc, pt, seq, buffer);
160
161 STREAM_LOCK (stream);
162 rtp_storage_stream_add_item (stream, buffer, pt, seq);
163 STREAM_UNLOCK (stream);
164 }
165
166 void
rtp_storage_put_recovered_packet(RtpStorage * self,GstBuffer * buffer,guint8 pt,guint32 ssrc,guint16 seq)167 rtp_storage_put_recovered_packet (RtpStorage * self,
168 GstBuffer * buffer, guint8 pt, guint32 ssrc, guint16 seq)
169 {
170 rtp_storage_do_put_recovered_packet (self, buffer, pt, ssrc, seq);
171 g_signal_emit (self, rtp_storage_signals[SIGNAL_PACKET_RECOVERED], 0, buffer);
172 }
173
174 gboolean
rtp_storage_append_buffer(RtpStorage * self,GstBuffer * buf)175 rtp_storage_append_buffer (RtpStorage * self, GstBuffer * buf)
176 {
177 GstRTPBuffer rtpbuf = GST_RTP_BUFFER_INIT;
178 RtpStorageStream *stream;
179 guint32 ssrc;
180 guint8 pt;
181 guint16 seq;
182
183 if (0 == self->size_time)
184 return TRUE;
185
186 /* We are about to save it in the queue, it so it is better take a ref before
187 * mapping the buffer */
188 gst_buffer_ref (buf);
189
190 if (!gst_rtp_buffer_map (buf, GST_MAP_READ |
191 GST_RTP_BUFFER_MAP_FLAG_SKIP_PADDING, &rtpbuf)) {
192 gst_buffer_unref (buf);
193 return TRUE;
194 }
195
196 ssrc = gst_rtp_buffer_get_ssrc (&rtpbuf);
197 pt = gst_rtp_buffer_get_payload_type (&rtpbuf);
198 seq = gst_rtp_buffer_get_seq (&rtpbuf);
199
200 STORAGE_LOCK (self);
201
202 stream = g_hash_table_lookup (self->streams, GUINT_TO_POINTER (ssrc));
203 if (NULL == stream) {
204 GST_DEBUG_OBJECT (self,
205 "New media stream (ssrc=0x%08x, pt=%u) detected", ssrc, pt);
206 stream = rtp_storage_stream_new (ssrc);
207 g_hash_table_insert (self->streams, GUINT_TO_POINTER (ssrc), stream);
208 }
209
210 STORAGE_UNLOCK (self);
211
212 GST_LOG_OBJECT (self,
213 "Storing RTP packet with ssrc=%08x pt=%u seq=%u %" GST_PTR_FORMAT,
214 ssrc, pt, seq, buf);
215
216 STREAM_LOCK (stream);
217
218 /* Saving the buffer, now the storage owns it */
219 rtp_storage_stream_resize_and_add_item (stream, self->size_time, buf, pt,
220 seq);
221
222 STREAM_UNLOCK (stream);
223
224 gst_rtp_buffer_unmap (&rtpbuf);
225
226 if (GST_BUFFER_FLAG_IS_SET (buf, GST_RTP_BUFFER_FLAG_REDUNDANT)) {
227 gst_buffer_unref (buf);
228 return FALSE;
229 }
230
231 return TRUE;
232 }
233
234 void
rtp_storage_clear(RtpStorage * self)235 rtp_storage_clear (RtpStorage * self)
236 {
237 STORAGE_LOCK (self);
238 g_hash_table_remove_all (self->streams);
239 STORAGE_UNLOCK (self);
240 }
241
242 void
rtp_storage_set_size(RtpStorage * self,GstClockTime size)243 rtp_storage_set_size (RtpStorage * self, GstClockTime size)
244 {
245 self->size_time = size;
246 if (0 == self->size_time)
247 rtp_storage_clear (self);
248 }
249
250 GstClockTime
rtp_storage_get_size(RtpStorage * self)251 rtp_storage_get_size (RtpStorage * self)
252 {
253 return self->size_time;
254 }
255
256 RtpStorage *
rtp_storage_new(void)257 rtp_storage_new (void)
258 {
259 return g_object_new (RTP_TYPE_STORAGE, NULL);
260 }
261