1 #include <math.h>
2 #include <pthread.h>
3 
4 #include "common/common.h"
5 #include "common/global.h"
6 #include "common/msg.h"
7 #include "osdep/atomic.h"
8 #include "osdep/timer.h"
9 #include "video/hwdec.h"
10 
11 #include "filter.h"
12 #include "filter_internal.h"
13 
14 // Note about connections:
15 // They can be confusing, because pins come in pairs, and multiple pins can be
16 // transitively connected via mp_pin_connect(). To avoid dealing with this,
17 // mp_pin.conn is used to skip redundant connected pins.
18 // Consider <1a|1b> a symbol for mp_pin pair #1 and f1 as filter #1. Then:
19 //      f1 <-> <1a|1b> <-> <2a|2b> <-> <3a|3b> <-> f2
20 // would be a connection from 1a to 3b. 1a could be a private pin of f1 (e.g.
21 // mp_filter.ppin[0]), and 1b would be the public pin (e.g. mp_filter.pin[0]).
22 // A user could have called mp_pin_connect(2a, 1b) mp_pin_connect(3a, 2b)
23 // (assuming 1b has dir==MP_PIN_OUT). The end result are the following values:
24 //  pin  user_conn  conn   manual_connection within_conn (uses mp_pin.data)
25 //   1a   NULL       3b     f1                false        no
26 //   1b   2a         NULL   NULL              true         no
27 //   2a   1b         NULL   NULL              true         no
28 //   2b   3a         NULL   NULL              true         no
29 //   3a   2b         NULL   NULL              true         no
30 //   3b   NULL       1a     f2                false        yes
31 // The minimal case of f1 <-> <1a|1b> <-> f2 (1b dir=out) would be:
32 //   1a   NULL       1b     f1                false        no
33 //   1b   NULL       1a     f2                false        yes
34 // In both cases, only the final output pin uses mp_pin.data/data_requested.
35 struct mp_pin {
36     const char *name;
37     enum mp_pin_dir dir;
38     struct mp_pin *other;           // paired mp_pin representing other end
39     struct mp_filter *owner;
40 
41     struct mp_pin *user_conn;       // as set by mp_pin_connect()
42     struct mp_pin *conn;            // transitive, actual end of the connection
43 
44     // Set if the pin is considered connected, but has no user_conn. pin
45     // state changes are handled by the given filter. (Defaults to the root
46     // filter if the pin is for the user of a filter graph.)
47     // As an invariant, conn and manual_connection are both either set or unset.
48     struct mp_filter *manual_connection;
49 
50     // Set if the pin is indirect part of a connection chain, but not one of
51     // the end pins. Basically it's a redundant in-between pin. You never access
52     // these with the pin data flow functions, because only the end pins matter.
53     // This flag is for checking and enforcing this.
54     bool within_conn;
55 
56     // This is used for the final output mp_pin in connections only.
57     bool data_requested;            // true if out wants new data
58     struct mp_frame data;           // possibly buffered frame (MP_FRAME_NONE if
59                                     // empty, usually only temporary)
60 };
61 
62 // Root filters create this, all other filters reference it.
63 struct filter_runner {
64     struct mpv_global *global;
65 
66     void (*wakeup_cb)(void *ctx);
67     void *wakeup_ctx;
68 
69     struct mp_filter *root_filter;
70 
71     double max_run_time;
72     atomic_bool interrupt_flag;
73 
74     // If we're currently running the filter graph (for avoiding recursion).
75     bool filtering;
76 
77     // If set, recursive filtering was initiated through this pin.
78     struct mp_pin *recursive;
79 
80     // Set of filters which need process() to be called. A filter is in this
81     // array iff mp_filter_internal.pending==true.
82     struct mp_filter **pending;
83     int num_pending;
84 
85     // Any outside pins have changed state.
86     bool external_pending;
87 
88     // For async notifications only. We don't bother making this fine grained
89     // across filters.
90     pthread_mutex_t async_lock;
91 
92     // Wakeup is pending. Protected by async_lock.
93     bool async_wakeup_sent;
94 
95     // Similar to pending[]. Uses mp_filter_internal.async_pending. Protected
96     // by async_lock.
97     struct mp_filter **async_pending;
98     int num_async_pending;
99 };
100 
101 struct mp_filter_internal {
102     const struct mp_filter_info *info;
103 
104     struct mp_filter *parent;
105     struct filter_runner *runner;
106 
107     struct mp_filter **children;
108     int num_children;
109 
110     struct mp_filter *error_handler;
111 
112     char *name;
113     bool high_priority;
114 
115     bool pending;
116     bool async_pending;
117     bool failed;
118 };
119 
120 // Called when new work needs to be done on a pin belonging to the filter:
121 //  - new data was requested
122 //  - new data has been queued
123 //  - or just an connect/disconnect/async notification happened
124 // This means the process function for this filter has to be called at some
125 // point in the future to continue filtering.
add_pending(struct mp_filter * f)126 static void add_pending(struct mp_filter *f)
127 {
128     struct filter_runner *r = f->in->runner;
129 
130     if (f->in->pending)
131         return;
132 
133     // This should probably really be some sort of priority queue, but for now
134     // something naive and dumb does the job too.
135     f->in->pending = true;
136     if (f->in->high_priority) {
137         MP_TARRAY_INSERT_AT(r, r->pending, r->num_pending, 0, f);
138     } else {
139         MP_TARRAY_APPEND(r, r->pending, r->num_pending, f);
140     }
141 }
142 
add_pending_pin(struct mp_pin * p)143 static void add_pending_pin(struct mp_pin *p)
144 {
145     struct mp_filter *f = p->manual_connection;
146     assert(f);
147 
148     if (f->in->pending)
149         return;
150 
151     add_pending(f);
152 
153     // Need to tell user that something changed.
154     if (f == f->in->runner->root_filter && p != f->in->runner->recursive)
155         f->in->runner->external_pending = true;
156 }
157 
158 // Possibly enter recursive filtering. This is done as convenience for
159 // "external" filter users only. (Normal filtering does this iteratively via
160 // mp_filter_graph_run() to avoid filter reentrancy issues and deep call
161 // stacks.) If the API users uses an external manually connected pin, do
162 // recursive filtering as a not strictly necessary feature which makes outside
163 // I/O with filters easier.
filter_recursive(struct mp_pin * p)164 static void filter_recursive(struct mp_pin *p)
165 {
166     struct mp_filter *f = p->conn->manual_connection;
167     assert(f);
168     struct filter_runner *r = f->in->runner;
169 
170     // Never do internal filtering recursively.
171     if (r->filtering)
172         return;
173 
174     assert(!r->recursive);
175     r->recursive = p;
176 
177     // Also don't lose the pending state, which the user may or may not
178     // care about.
179     r->external_pending |= mp_filter_graph_run(r->root_filter);
180 
181     assert(r->recursive == p);
182     r->recursive = NULL;
183 }
184 
mp_filter_internal_mark_progress(struct mp_filter * f)185 void mp_filter_internal_mark_progress(struct mp_filter *f)
186 {
187     struct filter_runner *r = f->in->runner;
188     assert(r->filtering); // only call from f's process()
189     add_pending(f);
190 }
191 
192 // Basically copy the async notifications to the sync ones. Done so that the
193 // sync notifications don't need any locking.
flush_async_notifications(struct filter_runner * r)194 static void flush_async_notifications(struct filter_runner *r)
195 {
196     pthread_mutex_lock(&r->async_lock);
197     for (int n = 0; n < r->num_async_pending; n++) {
198         struct mp_filter *f = r->async_pending[n];
199         add_pending(f);
200         f->in->async_pending = false;
201     }
202     r->num_async_pending = 0;
203     r->async_wakeup_sent = false;
204     pthread_mutex_unlock(&r->async_lock);
205 }
206 
mp_filter_graph_run(struct mp_filter * filter)207 bool mp_filter_graph_run(struct mp_filter *filter)
208 {
209     struct filter_runner *r = filter->in->runner;
210     assert(filter == r->root_filter); // user is supposed to call this on root only
211 
212     int64_t end_time = 0;
213     if (isfinite(r->max_run_time))
214         end_time = mp_add_timeout(mp_time_us(), MPMAX(r->max_run_time, 0));
215 
216     // (could happen with separate filter graphs calling each other, for now
217     // ignore this issue as we don't use such a setup anywhere)
218     assert(!r->filtering);
219 
220     r->filtering = true;
221 
222     flush_async_notifications(r);
223 
224     bool exit_req = false;
225 
226     while (1) {
227         if (atomic_exchange_explicit(&r->interrupt_flag, false,
228                                      memory_order_acq_rel))
229         {
230             pthread_mutex_lock(&r->async_lock);
231             if (!r->async_wakeup_sent && r->wakeup_cb)
232                 r->wakeup_cb(r->wakeup_ctx);
233             r->async_wakeup_sent = true;
234             pthread_mutex_unlock(&r->async_lock);
235             exit_req = true;
236         }
237 
238         if (!r->num_pending) {
239             flush_async_notifications(r);
240             if (!r->num_pending)
241                 break;
242         }
243 
244         struct mp_filter *next = NULL;
245 
246         if (r->pending[0]->in->high_priority) {
247             next = r->pending[0];
248             MP_TARRAY_REMOVE_AT(r->pending, r->num_pending, 0);
249         } else if (!exit_req) {
250             next = r->pending[r->num_pending - 1];
251             r->num_pending -= 1;
252         }
253 
254         if (!next)
255             break;
256 
257         next->in->pending = false;
258         if (next->in->info->process)
259             next->in->info->process(next);
260 
261         if (end_time && mp_time_us() >= end_time)
262             mp_filter_graph_interrupt(r->root_filter);
263     }
264 
265     r->filtering = false;
266 
267     bool externals = r->external_pending;
268     r->external_pending = false;
269     return externals;
270 }
271 
mp_pin_can_transfer_data(struct mp_pin * dst,struct mp_pin * src)272 bool mp_pin_can_transfer_data(struct mp_pin *dst, struct mp_pin *src)
273 {
274     return mp_pin_in_needs_data(dst) && mp_pin_out_request_data(src);
275 }
276 
mp_pin_transfer_data(struct mp_pin * dst,struct mp_pin * src)277 bool mp_pin_transfer_data(struct mp_pin *dst, struct mp_pin *src)
278 {
279     if (!mp_pin_can_transfer_data(dst, src))
280         return false;
281     mp_pin_in_write(dst, mp_pin_out_read(src));
282     return true;
283 }
284 
mp_pin_in_needs_data(struct mp_pin * p)285 bool mp_pin_in_needs_data(struct mp_pin *p)
286 {
287     assert(p->dir == MP_PIN_IN);
288     assert(!p->within_conn);
289     return p->conn && p->conn->manual_connection && p->conn->data_requested;
290 }
291 
mp_pin_in_write(struct mp_pin * p,struct mp_frame frame)292 bool mp_pin_in_write(struct mp_pin *p, struct mp_frame frame)
293 {
294     if (!mp_pin_in_needs_data(p) || frame.type == MP_FRAME_NONE) {
295         if (frame.type)
296             MP_ERR(p->owner, "losing frame on %s\n", p->name);
297         mp_frame_unref(&frame);
298         return false;
299     }
300     assert(p->conn->data.type == MP_FRAME_NONE);
301     p->conn->data = frame;
302     p->conn->data_requested = false;
303     add_pending_pin(p->conn);
304     filter_recursive(p);
305     return true;
306 }
307 
mp_pin_out_has_data(struct mp_pin * p)308 bool mp_pin_out_has_data(struct mp_pin *p)
309 {
310     assert(p->dir == MP_PIN_OUT);
311     assert(!p->within_conn);
312     return p->conn && p->conn->manual_connection && p->data.type != MP_FRAME_NONE;
313 }
314 
mp_pin_out_request_data(struct mp_pin * p)315 bool mp_pin_out_request_data(struct mp_pin *p)
316 {
317     if (mp_pin_out_has_data(p))
318         return true;
319     if (p->conn && p->conn->manual_connection) {
320         if (!p->data_requested) {
321             p->data_requested = true;
322             add_pending_pin(p->conn);
323         }
324         filter_recursive(p);
325     }
326     return mp_pin_out_has_data(p);
327 }
328 
mp_pin_out_request_data_next(struct mp_pin * p)329 void mp_pin_out_request_data_next(struct mp_pin *p)
330 {
331     if (mp_pin_out_request_data(p))
332         add_pending_pin(p->conn);
333 }
334 
mp_pin_out_read(struct mp_pin * p)335 struct mp_frame mp_pin_out_read(struct mp_pin *p)
336 {
337     if (!mp_pin_out_request_data(p))
338         return MP_NO_FRAME;
339     struct mp_frame res = p->data;
340     p->data = MP_NO_FRAME;
341     return res;
342 }
343 
mp_pin_out_unread(struct mp_pin * p,struct mp_frame frame)344 void mp_pin_out_unread(struct mp_pin *p, struct mp_frame frame)
345 {
346     assert(p->dir == MP_PIN_OUT);
347     assert(!p->within_conn);
348     assert(p->conn && p->conn->manual_connection);
349     // Unread is allowed strictly only if you didn't do anything else with
350     // the pin since the time you read it.
351     assert(!mp_pin_out_has_data(p));
352     assert(!p->data_requested);
353     p->data = frame;
354 }
355 
mp_pin_out_repeat_eof(struct mp_pin * p)356 void mp_pin_out_repeat_eof(struct mp_pin *p)
357 {
358     mp_pin_out_unread(p, MP_EOF_FRAME);
359 }
360 
361 // Follow mp_pin pairs/connection into the "other" direction of the pin, until
362 // the last pin is found. (In the simplest case, this is just p->other.) E.g.:
363 //      <1a|1b> <-> <2a|2b> <-> <3a|3b>
364 //          find_connected_end(2b)==1a
365 //          find_connected_end(1b)==1a
366 //          find_connected_end(1a)==3b
find_connected_end(struct mp_pin * p)367 static struct mp_pin *find_connected_end(struct mp_pin *p)
368 {
369     while (1) {
370         struct mp_pin *other = p->other;
371         if (!other->user_conn)
372             return other;
373         p = other->user_conn;
374     }
375     assert(0);
376 }
377 
378 // With p being part of a connection, create the pin_connection and set all
379 // state flags.
init_connection(struct mp_pin * p)380 static void init_connection(struct mp_pin *p)
381 {
382     struct filter_runner *runner = p->owner->in->runner;
383 
384     if (p->dir == MP_PIN_IN)
385         p = p->other;
386 
387     struct mp_pin *in = find_connected_end(p);
388     struct mp_pin *out = find_connected_end(p->other);
389 
390     // These are the "outer" pins by definition, they have no user connections.
391     assert(!in->user_conn);
392     assert(!out->user_conn);
393 
394     // This and similar checks enforce the same root filter requirement.
395     if (in->manual_connection)
396         assert(in->manual_connection->in->runner == runner);
397     if (out->manual_connection)
398         assert(out->manual_connection->in->runner == runner);
399 
400     // Logicaly, the ends are always manual connections. A pin chain without
401     // manual connections at the ends is still disconnected (or if this
402     // attempted to extend an existing connection, becomes dangling and gets
403     // disconnected).
404     if (!in->manual_connection || !out->manual_connection)
405         return;
406 
407     assert(in->dir == MP_PIN_IN);
408     assert(out->dir == MP_PIN_OUT);
409 
410     struct mp_pin *cur = in;
411     while (cur) {
412         assert(!cur->within_conn && !cur->other->within_conn);
413         assert(!cur->conn && !cur->other->conn);
414         assert(!cur->data_requested); // unused for in pins
415         assert(!cur->data.type); // unused for in pins
416         assert(!cur->other->data_requested); // unset for unconnected out pins
417         assert(!cur->other->data.type); // unset for unconnected out pins
418         assert(cur->owner->in->runner == runner);
419         cur->within_conn = cur->other->within_conn = true;
420         cur = cur->other->user_conn;
421     }
422 
423     in->conn = out;
424     in->within_conn = false;
425     out->conn = in;
426     out->within_conn = false;
427 
428     // Scheduling so far will be messed up.
429     add_pending(in->manual_connection);
430     add_pending(out->manual_connection);
431 }
432 
mp_pin_connect(struct mp_pin * dst,struct mp_pin * src)433 void mp_pin_connect(struct mp_pin *dst, struct mp_pin *src)
434 {
435     assert(src->dir == MP_PIN_OUT);
436     assert(dst->dir == MP_PIN_IN);
437 
438     if (dst->user_conn == src) {
439         assert(src->user_conn == dst);
440         return;
441     }
442 
443     mp_pin_disconnect(src);
444     mp_pin_disconnect(dst);
445 
446     src->user_conn = dst;
447     dst->user_conn = src;
448 
449     init_connection(src);
450 }
451 
mp_pin_set_manual_connection(struct mp_pin * p,bool connected)452 void mp_pin_set_manual_connection(struct mp_pin *p, bool connected)
453 {
454     mp_pin_set_manual_connection_for(p, connected ? p->owner->in->parent : NULL);
455 }
456 
mp_pin_set_manual_connection_for(struct mp_pin * p,struct mp_filter * f)457 void mp_pin_set_manual_connection_for(struct mp_pin *p, struct mp_filter *f)
458 {
459     if (p->manual_connection == f)
460         return;
461     if (p->within_conn)
462         mp_pin_disconnect(p);
463     p->manual_connection = f;
464     init_connection(p);
465 }
466 
mp_pin_get_manual_connection(struct mp_pin * p)467 struct mp_filter *mp_pin_get_manual_connection(struct mp_pin *p)
468 {
469     return p->manual_connection;
470 }
471 
deinit_connection(struct mp_pin * p)472 static void deinit_connection(struct mp_pin *p)
473 {
474     if (p->dir == MP_PIN_OUT)
475         p = p->other;
476 
477     p = find_connected_end(p);
478 
479     while (p) {
480         p->conn = p->other->conn = NULL;
481         p->within_conn = p->other->within_conn = false;
482         assert(!p->other->data_requested); // unused for in pins
483         assert(!p->other->data.type); // unused for in pins
484         p->data_requested = false;
485         if (p->data.type)
486             MP_VERBOSE(p->owner, "dropping frame due to pin disconnect\n");
487         if (p->data_requested)
488             MP_VERBOSE(p->owner, "dropping request due to pin disconnect\n");
489         mp_frame_unref(&p->data);
490         p = p->other->user_conn;
491     }
492 }
493 
mp_pin_disconnect(struct mp_pin * p)494 void mp_pin_disconnect(struct mp_pin *p)
495 {
496     if (!mp_pin_is_connected(p))
497         return;
498 
499     p->manual_connection = NULL;
500 
501     struct mp_pin *conn = p->user_conn;
502     if (conn) {
503         p->user_conn = NULL;
504         conn->user_conn = NULL;
505         deinit_connection(conn);
506     }
507 
508     deinit_connection(p);
509 }
510 
mp_pin_is_connected(struct mp_pin * p)511 bool mp_pin_is_connected(struct mp_pin *p)
512 {
513     return p->user_conn || p->manual_connection;
514 }
515 
mp_pin_get_name(struct mp_pin * p)516 const char *mp_pin_get_name(struct mp_pin *p)
517 {
518     return p->name;
519 }
520 
mp_pin_get_dir(struct mp_pin * p)521 enum mp_pin_dir mp_pin_get_dir(struct mp_pin *p)
522 {
523     return p->dir;
524 }
525 
mp_filter_get_name(struct mp_filter * f)526 const char *mp_filter_get_name(struct mp_filter *f)
527 {
528     return f->in->name;
529 }
530 
mp_filter_get_info(struct mp_filter * f)531 const struct mp_filter_info *mp_filter_get_info(struct mp_filter *f)
532 {
533     return f->in->info;
534 }
535 
mp_filter_set_high_priority(struct mp_filter * f,bool pri)536 void mp_filter_set_high_priority(struct mp_filter *f, bool pri)
537 {
538     f->in->high_priority = pri;
539 }
540 
mp_filter_set_name(struct mp_filter * f,const char * name)541 void mp_filter_set_name(struct mp_filter *f, const char *name)
542 {
543     talloc_free(f->in->name);
544     f->in->name = talloc_strdup(f, name);
545 }
546 
mp_filter_get_named_pin(struct mp_filter * f,const char * name)547 struct mp_pin *mp_filter_get_named_pin(struct mp_filter *f, const char *name)
548 {
549     for (int n = 0; n < f->num_pins; n++) {
550         if (name && strcmp(f->pins[n]->name, name) == 0)
551             return f->pins[n];
552     }
553     return NULL;
554 }
555 
mp_filter_set_error_handler(struct mp_filter * f,struct mp_filter * handler)556 void mp_filter_set_error_handler(struct mp_filter *f, struct mp_filter *handler)
557 {
558     f->in->error_handler = handler;
559 }
560 
mp_filter_internal_mark_failed(struct mp_filter * f)561 void mp_filter_internal_mark_failed(struct mp_filter *f)
562 {
563     while (f) {
564         f->in->failed = true;
565         if (f->in->error_handler) {
566             add_pending(f->in->error_handler);
567             break;
568         }
569         f = f->in->parent;
570     }
571 }
572 
mp_filter_has_failed(struct mp_filter * filter)573 bool mp_filter_has_failed(struct mp_filter *filter)
574 {
575     bool failed = filter->in->failed;
576     filter->in->failed = false;
577     return failed;
578 }
579 
reset_pin(struct mp_pin * p)580 static void reset_pin(struct mp_pin *p)
581 {
582     if (!p->conn || p->dir != MP_PIN_OUT) {
583         assert(!p->data.type);
584         assert(!p->data_requested);
585     }
586     mp_frame_unref(&p->data);
587     p->data_requested = false;
588 }
589 
mp_filter_reset(struct mp_filter * filter)590 void mp_filter_reset(struct mp_filter *filter)
591 {
592     for (int n = 0; n < filter->in->num_children; n++)
593         mp_filter_reset(filter->in->children[n]);
594 
595     for (int n = 0; n < filter->num_pins; n++) {
596         struct mp_pin *p = filter->ppins[n];
597         reset_pin(p);
598         reset_pin(p->other);
599     }
600 
601     if (filter->in->info->reset)
602         filter->in->info->reset(filter);
603 }
604 
mp_filter_add_pin(struct mp_filter * f,enum mp_pin_dir dir,const char * name)605 struct mp_pin *mp_filter_add_pin(struct mp_filter *f, enum mp_pin_dir dir,
606                                  const char *name)
607 {
608     assert(dir == MP_PIN_IN || dir == MP_PIN_OUT);
609     assert(name && name[0]);
610     assert(!mp_filter_get_named_pin(f, name));
611 
612     // "Public" pin
613     struct mp_pin *p = talloc_ptrtype(NULL, p);
614     *p = (struct mp_pin){
615         .name = talloc_strdup(p, name),
616         .dir = dir,
617         .owner = f,
618         .manual_connection = f->in->parent,
619     };
620 
621     // "Private" paired pin
622     p->other = talloc_ptrtype(NULL, p);
623     *p->other = (struct mp_pin){
624         .name = p->name,
625         .dir = p->dir == MP_PIN_IN ? MP_PIN_OUT : MP_PIN_IN,
626         .owner = f,
627         .other = p,
628         .manual_connection = f,
629     };
630 
631     MP_TARRAY_GROW(f, f->pins, f->num_pins);
632     MP_TARRAY_GROW(f, f->ppins, f->num_pins);
633     f->pins[f->num_pins] = p;
634     f->ppins[f->num_pins] = p->other;
635     f->num_pins += 1;
636 
637     init_connection(p);
638 
639     return p->other;
640 }
641 
mp_filter_remove_pin(struct mp_filter * f,struct mp_pin * p)642 void mp_filter_remove_pin(struct mp_filter *f, struct mp_pin *p)
643 {
644     if (!p)
645         return;
646 
647     assert(p->owner == f);
648     mp_pin_disconnect(p);
649     mp_pin_disconnect(p->other);
650 
651     int index = -1;
652     for (int n = 0; n < f->num_pins; n++) {
653         if (f->ppins[n] == p) {
654             index = n;
655             break;
656         }
657     }
658     assert(index >= 0);
659 
660     talloc_free(f->pins[index]);
661     talloc_free(f->ppins[index]);
662 
663     int count = f->num_pins;
664     MP_TARRAY_REMOVE_AT(f->pins, count, index);
665     count = f->num_pins;
666     MP_TARRAY_REMOVE_AT(f->ppins, count, index);
667     f->num_pins -= 1;
668 }
669 
mp_filter_command(struct mp_filter * f,struct mp_filter_command * cmd)670 bool mp_filter_command(struct mp_filter *f, struct mp_filter_command *cmd)
671 {
672     return f->in->info->command ? f->in->info->command(f, cmd) : false;
673 }
674 
mp_filter_find_stream_info(struct mp_filter * f)675 struct mp_stream_info *mp_filter_find_stream_info(struct mp_filter *f)
676 {
677     while (f) {
678         if (f->stream_info)
679             return f->stream_info;
680         f = f->in->parent;
681     }
682     return NULL;
683 }
684 
mp_filter_load_hwdec_device(struct mp_filter * f,int avtype)685 struct AVBufferRef *mp_filter_load_hwdec_device(struct mp_filter *f, int avtype)
686 {
687     struct mp_stream_info *info = mp_filter_find_stream_info(f);
688     if (!info || !info->hwdec_devs)
689         return NULL;
690 
691     hwdec_devices_request_all(info->hwdec_devs);
692 
693     return hwdec_devices_get_lavc(info->hwdec_devs, avtype);
694 }
695 
filter_wakeup(struct mp_filter * f,bool mark_only)696 static void filter_wakeup(struct mp_filter *f, bool mark_only)
697 {
698     struct filter_runner *r = f->in->runner;
699     pthread_mutex_lock(&r->async_lock);
700     if (!f->in->async_pending) {
701         f->in->async_pending = true;
702         // (not using a talloc parent for thread safety reasons)
703         MP_TARRAY_APPEND(NULL, r->async_pending, r->num_async_pending, f);
704     }
705     if (!mark_only && !r->async_wakeup_sent) {
706         if (r->wakeup_cb)
707             r->wakeup_cb(r->wakeup_ctx);
708         r->async_wakeup_sent = true;
709     }
710     pthread_mutex_unlock(&r->async_lock);
711 }
712 
mp_filter_wakeup(struct mp_filter * f)713 void mp_filter_wakeup(struct mp_filter *f)
714 {
715     filter_wakeup(f, false);
716 }
717 
mp_filter_mark_async_progress(struct mp_filter * f)718 void mp_filter_mark_async_progress(struct mp_filter *f)
719 {
720     filter_wakeup(f, true);
721 }
722 
mp_filter_graph_set_max_run_time(struct mp_filter * f,double seconds)723 void mp_filter_graph_set_max_run_time(struct mp_filter *f, double seconds)
724 {
725     struct filter_runner *r = f->in->runner;
726     assert(f == r->root_filter); // user is supposed to call this on root only
727     r->max_run_time = seconds;
728 }
729 
mp_filter_graph_interrupt(struct mp_filter * f)730 void mp_filter_graph_interrupt(struct mp_filter *f)
731 {
732     struct filter_runner *r = f->in->runner;
733     assert(f == r->root_filter); // user is supposed to call this on root only
734     atomic_store(&r->interrupt_flag, true);
735 }
736 
mp_filter_free_children(struct mp_filter * f)737 void mp_filter_free_children(struct mp_filter *f)
738 {
739     while(f->in->num_children)
740         talloc_free(f->in->children[0]);
741 }
742 
filter_destructor(void * p)743 static void filter_destructor(void *p)
744 {
745     struct mp_filter *f = p;
746     struct filter_runner *r = f->in->runner;
747 
748     if (f->in->info->destroy)
749         f->in->info->destroy(f);
750 
751     // For convenience, free child filters.
752     mp_filter_free_children(f);
753 
754     while (f->num_pins)
755         mp_filter_remove_pin(f, f->ppins[0]);
756 
757     // Just make sure the filter is not still in the async notifications set.
758     // There will be no more new notifications at this point (due to destroy()).
759     flush_async_notifications(r);
760 
761     for (int n = 0; n < r->num_pending; n++) {
762         if (r->pending[n] == f) {
763             MP_TARRAY_REMOVE_AT(r->pending, r->num_pending, n);
764             break;
765         }
766     }
767 
768     if (f->in->parent) {
769         struct mp_filter_internal *p_in = f->in->parent->in;
770         for (int n = 0; n < p_in->num_children; n++) {
771             if (p_in->children[n] == f) {
772                 MP_TARRAY_REMOVE_AT(p_in->children, p_in->num_children, n);
773                 break;
774             }
775         }
776     }
777 
778     if (r->root_filter == f) {
779         assert(!f->in->parent);
780         pthread_mutex_destroy(&r->async_lock);
781         talloc_free(r->async_pending);
782         talloc_free(r);
783     }
784 }
785 
786 
mp_filter_create_with_params(struct mp_filter_params * params)787 struct mp_filter *mp_filter_create_with_params(struct mp_filter_params *params)
788 {
789     struct mp_filter *f = talloc(NULL, struct mp_filter);
790     talloc_set_destructor(f, filter_destructor);
791     *f = (struct mp_filter){
792         .priv = params->info->priv_size ?
793                     talloc_zero_size(f, params->info->priv_size) : NULL,
794         .global = params->global,
795         .in = talloc(f, struct mp_filter_internal),
796     };
797     *f->in = (struct mp_filter_internal){
798         .info = params->info,
799         .parent = params->parent,
800         .runner = params->parent ? params->parent->in->runner : NULL,
801     };
802 
803     if (!f->in->runner) {
804         assert(params->global);
805 
806         f->in->runner = talloc(NULL, struct filter_runner);
807         *f->in->runner = (struct filter_runner){
808             .global = params->global,
809             .root_filter = f,
810             .max_run_time = INFINITY,
811         };
812         pthread_mutex_init(&f->in->runner->async_lock, NULL);
813     }
814 
815     if (!f->global)
816         f->global = f->in->runner->global;
817 
818     if (f->in->parent) {
819         struct mp_filter_internal *parent = f->in->parent->in;
820         MP_TARRAY_APPEND(parent, parent->children, parent->num_children, f);
821         f->log = mp_log_new(f, f->global->log, params->info->name);
822     } else {
823         f->log = mp_log_new(f, f->global->log, "!root");
824     }
825 
826     if (f->in->info->init) {
827         if (!f->in->info->init(f, params)) {
828             talloc_free(f);
829             return NULL;
830         }
831     }
832 
833     return f;
834 }
835 
mp_filter_create(struct mp_filter * parent,const struct mp_filter_info * info)836 struct mp_filter *mp_filter_create(struct mp_filter *parent,
837                                    const struct mp_filter_info *info)
838 {
839     assert(parent);
840     assert(info);
841     struct mp_filter_params params = {
842         .info = info,
843         .parent = parent,
844     };
845     return mp_filter_create_with_params(&params);
846 }
847 
848 // (the root filter is just a dummy filter - nothing special about it, except
849 // that it has no parent, and serves as manual connection for "external" pins)
850 static const struct mp_filter_info filter_root = {
851     .name = "root",
852 };
853 
mp_filter_create_root(struct mpv_global * global)854 struct mp_filter *mp_filter_create_root(struct mpv_global *global)
855 {
856     struct mp_filter_params params = {
857         .info = &filter_root,
858         .global = global,
859     };
860     return mp_filter_create_with_params(&params);
861 }
862 
mp_filter_graph_set_wakeup_cb(struct mp_filter * root,void (* wakeup_cb)(void * ctx),void * ctx)863 void mp_filter_graph_set_wakeup_cb(struct mp_filter *root,
864                                    void (*wakeup_cb)(void *ctx), void *ctx)
865 {
866     struct filter_runner *r = root->in->runner;
867     assert(root == r->root_filter); // user is supposed to call this on root only
868     pthread_mutex_lock(&r->async_lock);
869     r->wakeup_cb = wakeup_cb;
870     r->wakeup_ctx = ctx;
871     pthread_mutex_unlock(&r->async_lock);
872 }
873 
filt_name(struct mp_filter * f)874 static const char *filt_name(struct mp_filter *f)
875 {
876     return f ? f->in->info->name : "-";
877 }
878 
dump_pin_state(struct mp_filter * f,struct mp_pin * pin)879 static void dump_pin_state(struct mp_filter *f, struct mp_pin *pin)
880 {
881     MP_WARN(f, "  [%p] %s %s c=%s[%p] f=%s[%p] m=%s[%p] %s %s %s\n",
882         pin, pin->name, pin->dir == MP_PIN_IN ? "->" : "<-",
883         pin->user_conn ? filt_name(pin->user_conn->owner) : "-", pin->user_conn,
884         pin->conn ? filt_name(pin->conn->owner) : "-", pin->conn,
885         filt_name(pin->manual_connection), pin->manual_connection,
886         pin->within_conn ? "(within)" : "",
887         pin->data_requested ? "(request)" : "",
888         mp_frame_type_str(pin->data.type));
889 }
890 
mp_filter_dump_states(struct mp_filter * f)891 void mp_filter_dump_states(struct mp_filter *f)
892 {
893     MP_WARN(f, "%s[%p] (%s[%p])\n", filt_name(f), f,
894             filt_name(f->in->parent), f->in->parent);
895     for (int n = 0; n < f->num_pins; n++) {
896         dump_pin_state(f, f->pins[n]);
897         dump_pin_state(f, f->ppins[n]);
898     }
899 
900     for (int n = 0; n < f->in->num_children; n++)
901         mp_filter_dump_states(f->in->children[n]);
902 }
903