1 #include <limits.h>
2 #include <pthread.h>
3 
4 #include "audio/aframe.h"
5 #include "common/common.h"
6 #include "common/msg.h"
7 #include "osdep/atomic.h"
8 
9 #include "f_async_queue.h"
10 #include "filter_internal.h"
11 
12 struct mp_async_queue {
13     // This is just a wrapper, so the API user can talloc_free() it, instead of
14     // having to call a special unref function.
15     struct async_queue *q;
16 };
17 
18 struct async_queue {
19     mp_atomic_uint64 refcount;
20 
21     pthread_mutex_t lock;
22 
23     // -- protected by lock
24     struct mp_async_queue_config cfg;
25     bool active; // queue was resumed; consumer may request frames
26     bool reading; // data flow: reading => consumer has requested frames
27     int64_t samples_size; // queue size in the cfg.sample_unit
28     size_t byte_size; // queue size in bytes (using approx. frame sizes)
29     int num_frames;
30     struct mp_frame *frames;
31     int eof_count; // number of MP_FRAME_EOF in frames[], for draining
32     struct mp_filter *conn[2]; // filters: in (0), out (1)
33 };
34 
reset_queue(struct async_queue * q)35 static void reset_queue(struct async_queue *q)
36 {
37     pthread_mutex_lock(&q->lock);
38     q->active = q->reading = false;
39     for (int n = 0; n < q->num_frames; n++)
40         mp_frame_unref(&q->frames[n]);
41     q->num_frames = 0;
42     q->eof_count = 0;
43     q->samples_size = 0;
44     q->byte_size = 0;
45     for (int n = 0; n < 2; n++) {
46         if (q->conn[n])
47             mp_filter_wakeup(q->conn[n]);
48     }
49     pthread_mutex_unlock(&q->lock);
50 }
51 
unref_queue(struct async_queue * q)52 static void unref_queue(struct async_queue *q)
53 {
54     if (!q)
55         return;
56     int count = atomic_fetch_add(&q->refcount, -1) - 1;
57     assert(count >= 0);
58     if (count == 0) {
59         reset_queue(q);
60         pthread_mutex_destroy(&q->lock);
61         talloc_free(q);
62     }
63 }
64 
on_free_queue(void * p)65 static void on_free_queue(void *p)
66 {
67     struct mp_async_queue *q = p;
68     unref_queue(q->q);
69 }
70 
mp_async_queue_create(void)71 struct mp_async_queue *mp_async_queue_create(void)
72 {
73     struct mp_async_queue *r = talloc_zero(NULL, struct mp_async_queue);
74     r->q = talloc_zero(NULL, struct async_queue);
75     *r->q = (struct async_queue){
76         .refcount = ATOMIC_VAR_INIT(1),
77     };
78     pthread_mutex_init(&r->q->lock, NULL);
79     talloc_set_destructor(r, on_free_queue);
80     mp_async_queue_set_config(r, (struct mp_async_queue_config){0});
81     return r;
82 }
83 
frame_get_samples(struct async_queue * q,struct mp_frame frame)84 static int64_t frame_get_samples(struct async_queue *q, struct mp_frame frame)
85 {
86     int64_t res = 1;
87     if (frame.type == MP_FRAME_AUDIO && q->cfg.sample_unit == AQUEUE_UNIT_SAMPLES) {
88         struct mp_aframe *aframe = frame.data;
89         res = mp_aframe_get_size(aframe);
90     }
91     if (mp_frame_is_signaling(frame))
92         return 0;
93     return res;
94 }
95 
is_full(struct async_queue * q)96 static bool is_full(struct async_queue *q)
97 {
98     if (q->samples_size >= q->cfg.max_samples || q->byte_size >= q->cfg.max_bytes)
99         return true;
100     if (q->num_frames >= 2 && q->cfg.max_duration > 0) {
101         double pts1 = mp_frame_get_pts(q->frames[q->num_frames - 1]);
102         double pts2 = mp_frame_get_pts(q->frames[0]);
103         if (pts1 != MP_NOPTS_VALUE && pts2 != MP_NOPTS_VALUE &&
104             pts2 - pts1 >= q->cfg.max_duration)
105             return true;
106     }
107     return false;
108 }
109 
110 // Add or remove a frame from the accounted queue size.
111 //  dir==1: add, dir==-1: remove
account_frame(struct async_queue * q,struct mp_frame frame,int dir)112 static void account_frame(struct async_queue *q, struct mp_frame frame,
113                           int dir)
114 {
115     assert(dir == 1 || dir == -1);
116 
117     q->samples_size += dir * frame_get_samples(q, frame);
118     q->byte_size += dir * mp_frame_approx_size(frame);
119 
120     if (frame.type == MP_FRAME_EOF)
121         q->eof_count += dir;
122 }
123 
recompute_sizes(struct async_queue * q)124 static void recompute_sizes(struct async_queue *q)
125 {
126     q->eof_count = 0;
127     q->samples_size = 0;
128     q->byte_size = 0;
129     for (int n = 0; n < q->num_frames; n++)
130         account_frame(q, q->frames[n], 1);
131 }
132 
mp_async_queue_set_config(struct mp_async_queue * queue,struct mp_async_queue_config cfg)133 void mp_async_queue_set_config(struct mp_async_queue *queue,
134                                struct mp_async_queue_config cfg)
135 {
136     struct async_queue *q = queue->q;
137 
138     cfg.max_bytes = MPCLAMP(cfg.max_bytes, 1, (size_t)-1 / 2);
139 
140     assert(cfg.sample_unit == AQUEUE_UNIT_FRAME ||
141            cfg.sample_unit == AQUEUE_UNIT_SAMPLES);
142 
143     cfg.max_samples = MPMAX(cfg.max_samples, 1);
144 
145     pthread_mutex_lock(&q->lock);
146     bool recompute = q->cfg.sample_unit != cfg.sample_unit;
147     q->cfg = cfg;
148     if (recompute)
149         recompute_sizes(q);
150     pthread_mutex_unlock(&q->lock);
151 }
152 
mp_async_queue_reset(struct mp_async_queue * queue)153 void mp_async_queue_reset(struct mp_async_queue *queue)
154 {
155     reset_queue(queue->q);
156 }
157 
mp_async_queue_is_active(struct mp_async_queue * queue)158 bool mp_async_queue_is_active(struct mp_async_queue *queue)
159 {
160     struct async_queue *q = queue->q;
161     pthread_mutex_lock(&q->lock);
162     bool res = q->active;
163     pthread_mutex_unlock(&q->lock);
164     return res;
165 }
166 
mp_async_queue_is_full(struct mp_async_queue * queue)167 bool mp_async_queue_is_full(struct mp_async_queue *queue)
168 {
169     struct async_queue *q = queue->q;
170     pthread_mutex_lock(&q->lock);
171     bool res = is_full(q);
172     pthread_mutex_unlock(&q->lock);
173     return res;
174 }
175 
mp_async_queue_resume(struct mp_async_queue * queue)176 void mp_async_queue_resume(struct mp_async_queue *queue)
177 {
178     struct async_queue *q = queue->q;
179 
180     pthread_mutex_lock(&q->lock);
181     if (!q->active) {
182         q->active = true;
183         // Possibly make the consumer request new frames.
184         if (q->conn[1])
185             mp_filter_wakeup(q->conn[1]);
186     }
187     pthread_mutex_unlock(&q->lock);
188 }
189 
mp_async_queue_resume_reading(struct mp_async_queue * queue)190 void mp_async_queue_resume_reading(struct mp_async_queue *queue)
191 {
192     struct async_queue *q = queue->q;
193 
194     pthread_mutex_lock(&q->lock);
195     if (!q->active || !q->reading) {
196         q->active = true;
197         q->reading = true;
198         // Possibly start producer/consumer.
199         for (int n = 0; n < 2; n++) {
200             if (q->conn[n])
201                 mp_filter_wakeup(q->conn[n]);
202         }
203     }
204     pthread_mutex_unlock(&q->lock);
205 }
206 
mp_async_queue_get_samples(struct mp_async_queue * queue)207 int64_t mp_async_queue_get_samples(struct mp_async_queue *queue)
208 {
209     struct async_queue *q = queue->q;
210     pthread_mutex_lock(&q->lock);
211     int64_t res = q->samples_size;
212     pthread_mutex_unlock(&q->lock);
213     return res;
214 }
215 
mp_async_queue_get_frames(struct mp_async_queue * queue)216 int mp_async_queue_get_frames(struct mp_async_queue *queue)
217 {
218     struct async_queue *q = queue->q;
219     pthread_mutex_lock(&q->lock);
220     int res = q->num_frames;
221     pthread_mutex_unlock(&q->lock);
222     return res;
223 }
224 
225 struct priv {
226     struct async_queue *q;
227     struct mp_filter *notify;
228 };
229 
destroy(struct mp_filter * f)230 static void destroy(struct mp_filter *f)
231 {
232     struct priv *p = f->priv;
233     struct async_queue *q = p->q;
234 
235     pthread_mutex_lock(&q->lock);
236     for (int n = 0; n < 2; n++) {
237         if (q->conn[n] == f)
238             q->conn[n] = NULL;
239     }
240     pthread_mutex_unlock(&q->lock);
241 
242     unref_queue(q);
243 }
244 
process_in(struct mp_filter * f)245 static void process_in(struct mp_filter *f)
246 {
247     struct priv *p = f->priv;
248     struct async_queue *q = p->q;
249     assert(q->conn[0] == f);
250 
251     pthread_mutex_lock(&q->lock);
252     if (!q->reading) {
253         // mp_async_queue_reset()/reset_queue() is usually called asynchronously,
254         // so we might have requested a frame earlier, and now can't use it.
255         // Discard it; the expectation is that this is a benign logical race
256         // condition, and the filter graph will be reset anyway.
257         if (mp_pin_out_has_data(f->ppins[0])) {
258             struct mp_frame frame = mp_pin_out_read(f->ppins[0]);
259             mp_frame_unref(&frame);
260             MP_DBG(f, "discarding frame due to async reset\n");
261         }
262     } else if (!is_full(q) && mp_pin_out_request_data(f->ppins[0])) {
263         struct mp_frame frame = mp_pin_out_read(f->ppins[0]);
264         account_frame(q, frame, 1);
265         MP_TARRAY_INSERT_AT(q, q->frames, q->num_frames, 0, frame);
266         // Notify reader that we have new frames.
267         if (q->conn[1])
268             mp_filter_wakeup(q->conn[1]);
269         bool full = is_full(q);
270         if (!full)
271             mp_pin_out_request_data_next(f->ppins[0]);
272         if (p->notify && full)
273             mp_filter_wakeup(p->notify);
274     }
275     if (p->notify && !q->num_frames)
276         mp_filter_wakeup(p->notify);
277     pthread_mutex_unlock(&q->lock);
278 }
279 
process_out(struct mp_filter * f)280 static void process_out(struct mp_filter *f)
281 {
282     struct priv *p = f->priv;
283     struct async_queue *q = p->q;
284     assert(q->conn[1] == f);
285 
286     if (!mp_pin_in_needs_data(f->ppins[0]))
287         return;
288 
289     pthread_mutex_lock(&q->lock);
290     if (q->active && !q->reading) {
291         q->reading = true;
292         mp_filter_wakeup(q->conn[0]);
293     }
294     if (q->active && q->num_frames) {
295         struct mp_frame frame = q->frames[q->num_frames - 1];
296         q->num_frames -= 1;
297         account_frame(q, frame, -1);
298         assert(q->samples_size >= 0);
299         mp_pin_in_write(f->ppins[0], frame);
300         // Notify writer that we need new frames.
301         if (q->conn[0])
302             mp_filter_wakeup(q->conn[0]);
303     }
304     pthread_mutex_unlock(&q->lock);
305 }
306 
reset(struct mp_filter * f)307 static void reset(struct mp_filter *f)
308 {
309     struct priv *p = f->priv;
310     struct async_queue *q = p->q;
311 
312     pthread_mutex_lock(&q->lock);
313     // If the queue is in reading state, it is logical that it should request
314     // input immediately.
315     if (mp_pin_get_dir(f->pins[0]) == MP_PIN_IN && q->reading)
316         mp_filter_wakeup(f);
317     pthread_mutex_unlock(&q->lock);
318 }
319 
320 // producer
321 static const struct mp_filter_info info_in = {
322     .name = "async_queue_in",
323     .priv_size = sizeof(struct priv),
324     .destroy = destroy,
325     .process = process_in,
326     .reset = reset,
327 };
328 
329 // consumer
330 static const struct mp_filter_info info_out = {
331     .name = "async_queue_out",
332     .priv_size = sizeof(struct priv),
333     .destroy = destroy,
334     .process = process_out,
335 };
336 
mp_async_queue_set_notifier(struct mp_filter * f,struct mp_filter * notify)337 void mp_async_queue_set_notifier(struct mp_filter *f, struct mp_filter *notify)
338 {
339     assert(mp_filter_get_info(f) == &info_in);
340     struct priv *p = f->priv;
341     if (p->notify != notify) {
342         p->notify = notify;
343         if (notify)
344             mp_filter_wakeup(notify);
345     }
346 }
347 
mp_async_queue_create_filter(struct mp_filter * parent,enum mp_pin_dir dir,struct mp_async_queue * queue)348 struct mp_filter *mp_async_queue_create_filter(struct mp_filter *parent,
349                                                enum mp_pin_dir dir,
350                                                struct mp_async_queue *queue)
351 {
352     bool is_in = dir == MP_PIN_IN;
353     assert(queue);
354 
355     struct mp_filter *f = mp_filter_create(parent, is_in ? &info_in : &info_out);
356     if (!f)
357         return NULL;
358 
359     struct priv *p = f->priv;
360 
361     struct async_queue *q = queue->q;
362 
363     mp_filter_add_pin(f, dir, is_in ? "in" : "out");
364 
365     atomic_fetch_add(&q->refcount, 1);
366     p->q = q;
367 
368     pthread_mutex_lock(&q->lock);
369     int slot = is_in ? 0 : 1;
370     assert(!q->conn[slot]); // fails if already connected on this end
371     q->conn[slot] = f;
372     pthread_mutex_unlock(&q->lock);
373 
374     return f;
375 }
376