1 #include <limits.h>
2 #include <pthread.h>
3
4 #include "audio/aframe.h"
5 #include "common/common.h"
6 #include "common/msg.h"
7 #include "osdep/atomic.h"
8
9 #include "f_async_queue.h"
10 #include "filter_internal.h"
11
12 struct mp_async_queue {
13 // This is just a wrapper, so the API user can talloc_free() it, instead of
14 // having to call a special unref function.
15 struct async_queue *q;
16 };
17
18 struct async_queue {
19 mp_atomic_uint64 refcount;
20
21 pthread_mutex_t lock;
22
23 // -- protected by lock
24 struct mp_async_queue_config cfg;
25 bool active; // queue was resumed; consumer may request frames
26 bool reading; // data flow: reading => consumer has requested frames
27 int64_t samples_size; // queue size in the cfg.sample_unit
28 size_t byte_size; // queue size in bytes (using approx. frame sizes)
29 int num_frames;
30 struct mp_frame *frames;
31 int eof_count; // number of MP_FRAME_EOF in frames[], for draining
32 struct mp_filter *conn[2]; // filters: in (0), out (1)
33 };
34
reset_queue(struct async_queue * q)35 static void reset_queue(struct async_queue *q)
36 {
37 pthread_mutex_lock(&q->lock);
38 q->active = q->reading = false;
39 for (int n = 0; n < q->num_frames; n++)
40 mp_frame_unref(&q->frames[n]);
41 q->num_frames = 0;
42 q->eof_count = 0;
43 q->samples_size = 0;
44 q->byte_size = 0;
45 for (int n = 0; n < 2; n++) {
46 if (q->conn[n])
47 mp_filter_wakeup(q->conn[n]);
48 }
49 pthread_mutex_unlock(&q->lock);
50 }
51
unref_queue(struct async_queue * q)52 static void unref_queue(struct async_queue *q)
53 {
54 if (!q)
55 return;
56 int count = atomic_fetch_add(&q->refcount, -1) - 1;
57 assert(count >= 0);
58 if (count == 0) {
59 reset_queue(q);
60 pthread_mutex_destroy(&q->lock);
61 talloc_free(q);
62 }
63 }
64
on_free_queue(void * p)65 static void on_free_queue(void *p)
66 {
67 struct mp_async_queue *q = p;
68 unref_queue(q->q);
69 }
70
mp_async_queue_create(void)71 struct mp_async_queue *mp_async_queue_create(void)
72 {
73 struct mp_async_queue *r = talloc_zero(NULL, struct mp_async_queue);
74 r->q = talloc_zero(NULL, struct async_queue);
75 *r->q = (struct async_queue){
76 .refcount = ATOMIC_VAR_INIT(1),
77 };
78 pthread_mutex_init(&r->q->lock, NULL);
79 talloc_set_destructor(r, on_free_queue);
80 mp_async_queue_set_config(r, (struct mp_async_queue_config){0});
81 return r;
82 }
83
frame_get_samples(struct async_queue * q,struct mp_frame frame)84 static int64_t frame_get_samples(struct async_queue *q, struct mp_frame frame)
85 {
86 int64_t res = 1;
87 if (frame.type == MP_FRAME_AUDIO && q->cfg.sample_unit == AQUEUE_UNIT_SAMPLES) {
88 struct mp_aframe *aframe = frame.data;
89 res = mp_aframe_get_size(aframe);
90 }
91 if (mp_frame_is_signaling(frame))
92 return 0;
93 return res;
94 }
95
is_full(struct async_queue * q)96 static bool is_full(struct async_queue *q)
97 {
98 if (q->samples_size >= q->cfg.max_samples || q->byte_size >= q->cfg.max_bytes)
99 return true;
100 if (q->num_frames >= 2 && q->cfg.max_duration > 0) {
101 double pts1 = mp_frame_get_pts(q->frames[q->num_frames - 1]);
102 double pts2 = mp_frame_get_pts(q->frames[0]);
103 if (pts1 != MP_NOPTS_VALUE && pts2 != MP_NOPTS_VALUE &&
104 pts2 - pts1 >= q->cfg.max_duration)
105 return true;
106 }
107 return false;
108 }
109
110 // Add or remove a frame from the accounted queue size.
111 // dir==1: add, dir==-1: remove
account_frame(struct async_queue * q,struct mp_frame frame,int dir)112 static void account_frame(struct async_queue *q, struct mp_frame frame,
113 int dir)
114 {
115 assert(dir == 1 || dir == -1);
116
117 q->samples_size += dir * frame_get_samples(q, frame);
118 q->byte_size += dir * mp_frame_approx_size(frame);
119
120 if (frame.type == MP_FRAME_EOF)
121 q->eof_count += dir;
122 }
123
recompute_sizes(struct async_queue * q)124 static void recompute_sizes(struct async_queue *q)
125 {
126 q->eof_count = 0;
127 q->samples_size = 0;
128 q->byte_size = 0;
129 for (int n = 0; n < q->num_frames; n++)
130 account_frame(q, q->frames[n], 1);
131 }
132
mp_async_queue_set_config(struct mp_async_queue * queue,struct mp_async_queue_config cfg)133 void mp_async_queue_set_config(struct mp_async_queue *queue,
134 struct mp_async_queue_config cfg)
135 {
136 struct async_queue *q = queue->q;
137
138 cfg.max_bytes = MPCLAMP(cfg.max_bytes, 1, (size_t)-1 / 2);
139
140 assert(cfg.sample_unit == AQUEUE_UNIT_FRAME ||
141 cfg.sample_unit == AQUEUE_UNIT_SAMPLES);
142
143 cfg.max_samples = MPMAX(cfg.max_samples, 1);
144
145 pthread_mutex_lock(&q->lock);
146 bool recompute = q->cfg.sample_unit != cfg.sample_unit;
147 q->cfg = cfg;
148 if (recompute)
149 recompute_sizes(q);
150 pthread_mutex_unlock(&q->lock);
151 }
152
mp_async_queue_reset(struct mp_async_queue * queue)153 void mp_async_queue_reset(struct mp_async_queue *queue)
154 {
155 reset_queue(queue->q);
156 }
157
mp_async_queue_is_active(struct mp_async_queue * queue)158 bool mp_async_queue_is_active(struct mp_async_queue *queue)
159 {
160 struct async_queue *q = queue->q;
161 pthread_mutex_lock(&q->lock);
162 bool res = q->active;
163 pthread_mutex_unlock(&q->lock);
164 return res;
165 }
166
mp_async_queue_is_full(struct mp_async_queue * queue)167 bool mp_async_queue_is_full(struct mp_async_queue *queue)
168 {
169 struct async_queue *q = queue->q;
170 pthread_mutex_lock(&q->lock);
171 bool res = is_full(q);
172 pthread_mutex_unlock(&q->lock);
173 return res;
174 }
175
mp_async_queue_resume(struct mp_async_queue * queue)176 void mp_async_queue_resume(struct mp_async_queue *queue)
177 {
178 struct async_queue *q = queue->q;
179
180 pthread_mutex_lock(&q->lock);
181 if (!q->active) {
182 q->active = true;
183 // Possibly make the consumer request new frames.
184 if (q->conn[1])
185 mp_filter_wakeup(q->conn[1]);
186 }
187 pthread_mutex_unlock(&q->lock);
188 }
189
mp_async_queue_resume_reading(struct mp_async_queue * queue)190 void mp_async_queue_resume_reading(struct mp_async_queue *queue)
191 {
192 struct async_queue *q = queue->q;
193
194 pthread_mutex_lock(&q->lock);
195 if (!q->active || !q->reading) {
196 q->active = true;
197 q->reading = true;
198 // Possibly start producer/consumer.
199 for (int n = 0; n < 2; n++) {
200 if (q->conn[n])
201 mp_filter_wakeup(q->conn[n]);
202 }
203 }
204 pthread_mutex_unlock(&q->lock);
205 }
206
mp_async_queue_get_samples(struct mp_async_queue * queue)207 int64_t mp_async_queue_get_samples(struct mp_async_queue *queue)
208 {
209 struct async_queue *q = queue->q;
210 pthread_mutex_lock(&q->lock);
211 int64_t res = q->samples_size;
212 pthread_mutex_unlock(&q->lock);
213 return res;
214 }
215
mp_async_queue_get_frames(struct mp_async_queue * queue)216 int mp_async_queue_get_frames(struct mp_async_queue *queue)
217 {
218 struct async_queue *q = queue->q;
219 pthread_mutex_lock(&q->lock);
220 int res = q->num_frames;
221 pthread_mutex_unlock(&q->lock);
222 return res;
223 }
224
225 struct priv {
226 struct async_queue *q;
227 struct mp_filter *notify;
228 };
229
destroy(struct mp_filter * f)230 static void destroy(struct mp_filter *f)
231 {
232 struct priv *p = f->priv;
233 struct async_queue *q = p->q;
234
235 pthread_mutex_lock(&q->lock);
236 for (int n = 0; n < 2; n++) {
237 if (q->conn[n] == f)
238 q->conn[n] = NULL;
239 }
240 pthread_mutex_unlock(&q->lock);
241
242 unref_queue(q);
243 }
244
process_in(struct mp_filter * f)245 static void process_in(struct mp_filter *f)
246 {
247 struct priv *p = f->priv;
248 struct async_queue *q = p->q;
249 assert(q->conn[0] == f);
250
251 pthread_mutex_lock(&q->lock);
252 if (!q->reading) {
253 // mp_async_queue_reset()/reset_queue() is usually called asynchronously,
254 // so we might have requested a frame earlier, and now can't use it.
255 // Discard it; the expectation is that this is a benign logical race
256 // condition, and the filter graph will be reset anyway.
257 if (mp_pin_out_has_data(f->ppins[0])) {
258 struct mp_frame frame = mp_pin_out_read(f->ppins[0]);
259 mp_frame_unref(&frame);
260 MP_DBG(f, "discarding frame due to async reset\n");
261 }
262 } else if (!is_full(q) && mp_pin_out_request_data(f->ppins[0])) {
263 struct mp_frame frame = mp_pin_out_read(f->ppins[0]);
264 account_frame(q, frame, 1);
265 MP_TARRAY_INSERT_AT(q, q->frames, q->num_frames, 0, frame);
266 // Notify reader that we have new frames.
267 if (q->conn[1])
268 mp_filter_wakeup(q->conn[1]);
269 bool full = is_full(q);
270 if (!full)
271 mp_pin_out_request_data_next(f->ppins[0]);
272 if (p->notify && full)
273 mp_filter_wakeup(p->notify);
274 }
275 if (p->notify && !q->num_frames)
276 mp_filter_wakeup(p->notify);
277 pthread_mutex_unlock(&q->lock);
278 }
279
process_out(struct mp_filter * f)280 static void process_out(struct mp_filter *f)
281 {
282 struct priv *p = f->priv;
283 struct async_queue *q = p->q;
284 assert(q->conn[1] == f);
285
286 if (!mp_pin_in_needs_data(f->ppins[0]))
287 return;
288
289 pthread_mutex_lock(&q->lock);
290 if (q->active && !q->reading) {
291 q->reading = true;
292 mp_filter_wakeup(q->conn[0]);
293 }
294 if (q->active && q->num_frames) {
295 struct mp_frame frame = q->frames[q->num_frames - 1];
296 q->num_frames -= 1;
297 account_frame(q, frame, -1);
298 assert(q->samples_size >= 0);
299 mp_pin_in_write(f->ppins[0], frame);
300 // Notify writer that we need new frames.
301 if (q->conn[0])
302 mp_filter_wakeup(q->conn[0]);
303 }
304 pthread_mutex_unlock(&q->lock);
305 }
306
reset(struct mp_filter * f)307 static void reset(struct mp_filter *f)
308 {
309 struct priv *p = f->priv;
310 struct async_queue *q = p->q;
311
312 pthread_mutex_lock(&q->lock);
313 // If the queue is in reading state, it is logical that it should request
314 // input immediately.
315 if (mp_pin_get_dir(f->pins[0]) == MP_PIN_IN && q->reading)
316 mp_filter_wakeup(f);
317 pthread_mutex_unlock(&q->lock);
318 }
319
320 // producer
321 static const struct mp_filter_info info_in = {
322 .name = "async_queue_in",
323 .priv_size = sizeof(struct priv),
324 .destroy = destroy,
325 .process = process_in,
326 .reset = reset,
327 };
328
329 // consumer
330 static const struct mp_filter_info info_out = {
331 .name = "async_queue_out",
332 .priv_size = sizeof(struct priv),
333 .destroy = destroy,
334 .process = process_out,
335 };
336
mp_async_queue_set_notifier(struct mp_filter * f,struct mp_filter * notify)337 void mp_async_queue_set_notifier(struct mp_filter *f, struct mp_filter *notify)
338 {
339 assert(mp_filter_get_info(f) == &info_in);
340 struct priv *p = f->priv;
341 if (p->notify != notify) {
342 p->notify = notify;
343 if (notify)
344 mp_filter_wakeup(notify);
345 }
346 }
347
mp_async_queue_create_filter(struct mp_filter * parent,enum mp_pin_dir dir,struct mp_async_queue * queue)348 struct mp_filter *mp_async_queue_create_filter(struct mp_filter *parent,
349 enum mp_pin_dir dir,
350 struct mp_async_queue *queue)
351 {
352 bool is_in = dir == MP_PIN_IN;
353 assert(queue);
354
355 struct mp_filter *f = mp_filter_create(parent, is_in ? &info_in : &info_out);
356 if (!f)
357 return NULL;
358
359 struct priv *p = f->priv;
360
361 struct async_queue *q = queue->q;
362
363 mp_filter_add_pin(f, dir, is_in ? "in" : "out");
364
365 atomic_fetch_add(&q->refcount, 1);
366 p->q = q;
367
368 pthread_mutex_lock(&q->lock);
369 int slot = is_in ? 0 : 1;
370 assert(!q->conn[slot]); // fails if already connected on this end
371 q->conn[slot] = f;
372 pthread_mutex_unlock(&q->lock);
373
374 return f;
375 }
376