1 /*
2  * This file is part of mpv.
3  *
4  * mpv is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or (at your option) any later version.
8  *
9  * mpv is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with mpv.  If not, see <http://www.gnu.org/licenses/>.
16  */
17 
18 #include <stdbool.h>
19 #include <assert.h>
20 
21 #include "common/common.h"
22 #include "osdep/threads.h"
23 #include "osdep/timer.h"
24 
25 #include "dispatch.h"
26 
27 struct mp_dispatch_queue {
28     struct mp_dispatch_item *head, *tail;
29     pthread_mutex_t lock;
30     pthread_cond_t cond;
31     void (*wakeup_fn)(void *wakeup_ctx);
32     void *wakeup_ctx;
33     void (*onlock_fn)(void *onlock_ctx);
34     void *onlock_ctx;
35     // Time at which mp_dispatch_queue_process() should return.
36     int64_t wait;
37     // Make mp_dispatch_queue_process() exit if it's idle.
38     bool interrupted;
39     // The target thread is in mp_dispatch_queue_process() (and either idling,
40     // locked, or running a dispatch callback).
41     bool in_process;
42     pthread_t in_process_thread;
43     // The target thread is in mp_dispatch_queue_process(), and currently
44     // something has exclusive access to it (e.g. running a dispatch callback,
45     // or a different thread got it with mp_dispatch_lock()).
46     bool locked;
47     // A mp_dispatch_lock() call is requesting an exclusive lock.
48     size_t lock_requests;
49     // locked==true is due to a mp_dispatch_lock() call (for debugging).
50     bool locked_explicit;
51     pthread_t locked_explicit_thread;
52 };
53 
54 struct mp_dispatch_item {
55     mp_dispatch_fn fn;
56     void *fn_data;
57     bool asynchronous;
58     bool mergeable;
59     bool completed;
60     struct mp_dispatch_item *next;
61 };
62 
queue_dtor(void * p)63 static void queue_dtor(void *p)
64 {
65     struct mp_dispatch_queue *queue = p;
66     assert(!queue->head);
67     assert(!queue->in_process);
68     assert(!queue->lock_requests);
69     assert(!queue->locked);
70     pthread_cond_destroy(&queue->cond);
71     pthread_mutex_destroy(&queue->lock);
72 }
73 
74 // A dispatch queue lets other threads run callbacks in a target thread.
75 // The target thread is the thread which calls mp_dispatch_queue_process().
76 // Free the dispatch queue with talloc_free(). At the time of destruction,
77 // the queue must be empty. The easiest way to guarantee this is to
78 // terminate all potential senders, then call mp_dispatch_run() with a
79 // function that e.g. makes the target thread exit, then pthread_join() the
80 // target thread, and finally destroy the queue. Another way is calling
81 // mp_dispatch_queue_process() after terminating all potential senders, and
82 // then destroying the queue.
mp_dispatch_create(void * ta_parent)83 struct mp_dispatch_queue *mp_dispatch_create(void *ta_parent)
84 {
85     struct mp_dispatch_queue *queue = talloc_ptrtype(ta_parent, queue);
86     *queue = (struct mp_dispatch_queue){0};
87     talloc_set_destructor(queue, queue_dtor);
88     pthread_mutex_init(&queue->lock, NULL);
89     pthread_cond_init(&queue->cond, NULL);
90     return queue;
91 }
92 
93 // Set a custom function that should be called to guarantee that the target
94 // thread wakes up. This is intended for use with code that needs to block
95 // on non-pthread primitives, such as e.g. select(). In the case of select(),
96 // the wakeup_fn could for example write a byte into a "wakeup" pipe in order
97 // to unblock the select(). The wakeup_fn is called from the dispatch queue
98 // when there are new dispatch items, and the target thread should then enter
99 // mp_dispatch_queue_process() as soon as possible.
100 // Note that this setter does not do internal synchronization, so you must set
101 // it before other threads see it.
mp_dispatch_set_wakeup_fn(struct mp_dispatch_queue * queue,void (* wakeup_fn)(void * wakeup_ctx),void * wakeup_ctx)102 void mp_dispatch_set_wakeup_fn(struct mp_dispatch_queue *queue,
103                                void (*wakeup_fn)(void *wakeup_ctx),
104                                void *wakeup_ctx)
105 {
106     queue->wakeup_fn = wakeup_fn;
107     queue->wakeup_ctx = wakeup_ctx;
108 }
109 
110 // Set a function that will be called by mp_dispatch_lock() if the target thread
111 // is not calling mp_dispatch_queue_process() right now. This is an obscure,
112 // optional mechanism to make a worker thread react to external events more
113 // quickly. The idea is that the callback will make the worker thread to stop
114 // doing whatever (e.g. by setting a flag), and call mp_dispatch_queue_process()
115 // in order to let mp_dispatch_lock() calls continue sooner.
116 // Like wakeup_fn, this setter does no internal synchronization, and you must
117 // not access the dispatch queue itself from the callback.
mp_dispatch_set_onlock_fn(struct mp_dispatch_queue * queue,void (* onlock_fn)(void * onlock_ctx),void * onlock_ctx)118 void mp_dispatch_set_onlock_fn(struct mp_dispatch_queue *queue,
119                                void (*onlock_fn)(void *onlock_ctx),
120                                void *onlock_ctx)
121 {
122     queue->onlock_fn = onlock_fn;
123     queue->onlock_ctx = onlock_ctx;
124 }
125 
mp_dispatch_append(struct mp_dispatch_queue * queue,struct mp_dispatch_item * item)126 static void mp_dispatch_append(struct mp_dispatch_queue *queue,
127                                struct mp_dispatch_item *item)
128 {
129     pthread_mutex_lock(&queue->lock);
130     if (item->mergeable) {
131         for (struct mp_dispatch_item *cur = queue->head; cur; cur = cur->next) {
132             if (cur->mergeable && cur->fn == item->fn &&
133                 cur->fn_data == item->fn_data)
134             {
135                 talloc_free(item);
136                 pthread_mutex_unlock(&queue->lock);
137                 return;
138             }
139         }
140     }
141 
142     if (queue->tail) {
143         queue->tail->next = item;
144     } else {
145         queue->head = item;
146     }
147     queue->tail = item;
148 
149     // Wake up the main thread; note that other threads might wait on this
150     // condition for reasons, so broadcast the condition.
151     pthread_cond_broadcast(&queue->cond);
152     // No wakeup callback -> assume mp_dispatch_queue_process() needs to be
153     // interrupted instead.
154     if (!queue->wakeup_fn)
155         queue->interrupted = true;
156     pthread_mutex_unlock(&queue->lock);
157 
158     if (queue->wakeup_fn)
159         queue->wakeup_fn(queue->wakeup_ctx);
160 }
161 
162 // Enqueue a callback to run it on the target thread asynchronously. The target
163 // thread will run fn(fn_data) as soon as it enter mp_dispatch_queue_process.
164 // Note that mp_dispatch_enqueue() will usually return long before that happens.
165 // It's up to the user to signal completion of the callback. It's also up to
166 // the user to guarantee that the context fn_data has correct lifetime, i.e.
167 // lives until the callback is run, and is freed after that.
mp_dispatch_enqueue(struct mp_dispatch_queue * queue,mp_dispatch_fn fn,void * fn_data)168 void mp_dispatch_enqueue(struct mp_dispatch_queue *queue,
169                          mp_dispatch_fn fn, void *fn_data)
170 {
171     struct mp_dispatch_item *item = talloc_ptrtype(NULL, item);
172     *item = (struct mp_dispatch_item){
173         .fn = fn,
174         .fn_data = fn_data,
175         .asynchronous = true,
176     };
177     mp_dispatch_append(queue, item);
178 }
179 
180 // Like mp_dispatch_enqueue(), but the queue code will call talloc_free(fn_data)
181 // after the fn callback has been run. (The callback could trivially do that
182 // itself, but it makes it easier to implement synchronous and asynchronous
183 // requests with the same callback implementation.)
mp_dispatch_enqueue_autofree(struct mp_dispatch_queue * queue,mp_dispatch_fn fn,void * fn_data)184 void mp_dispatch_enqueue_autofree(struct mp_dispatch_queue *queue,
185                                   mp_dispatch_fn fn, void *fn_data)
186 {
187     struct mp_dispatch_item *item = talloc_ptrtype(NULL, item);
188     *item = (struct mp_dispatch_item){
189         .fn = fn,
190         .fn_data = talloc_steal(item, fn_data),
191         .asynchronous = true,
192     };
193     mp_dispatch_append(queue, item);
194 }
195 
196 // Like mp_dispatch_enqueue(), but
mp_dispatch_enqueue_notify(struct mp_dispatch_queue * queue,mp_dispatch_fn fn,void * fn_data)197 void mp_dispatch_enqueue_notify(struct mp_dispatch_queue *queue,
198                                 mp_dispatch_fn fn, void *fn_data)
199 {
200     struct mp_dispatch_item *item = talloc_ptrtype(NULL, item);
201     *item = (struct mp_dispatch_item){
202         .fn = fn,
203         .fn_data = fn_data,
204         .mergeable = true,
205         .asynchronous = true,
206     };
207     mp_dispatch_append(queue, item);
208 }
209 
210 // Remove already queued item. Only items enqueued with the following functions
211 // can be canceled:
212 //  - mp_dispatch_enqueue()
213 //  - mp_dispatch_enqueue_notify()
214 // Items which were enqueued, and which are currently executing, can not be
215 // canceled anymore. This function is mostly for being called from the same
216 // context as mp_dispatch_queue_process(), where the "currently executing" case
217 // can be excluded.
mp_dispatch_cancel_fn(struct mp_dispatch_queue * queue,mp_dispatch_fn fn,void * fn_data)218 void mp_dispatch_cancel_fn(struct mp_dispatch_queue *queue,
219                            mp_dispatch_fn fn, void *fn_data)
220 {
221     pthread_mutex_lock(&queue->lock);
222     struct mp_dispatch_item **pcur = &queue->head;
223     queue->tail = NULL;
224     while (*pcur) {
225         struct mp_dispatch_item *cur = *pcur;
226         if (cur->fn == fn && cur->fn_data == fn_data) {
227             *pcur = cur->next;
228             talloc_free(cur);
229         } else {
230             queue->tail = cur;
231             pcur = &cur->next;
232         }
233     }
234     pthread_mutex_unlock(&queue->lock);
235 }
236 
237 // Run fn(fn_data) on the target thread synchronously. This function enqueues
238 // the callback and waits until the target thread is done doing this.
239 // This is redundant to calling the function inside mp_dispatch_[un]lock(),
240 // but can be helpful with code that relies on TLS (such as OpenGL).
mp_dispatch_run(struct mp_dispatch_queue * queue,mp_dispatch_fn fn,void * fn_data)241 void mp_dispatch_run(struct mp_dispatch_queue *queue,
242                      mp_dispatch_fn fn, void *fn_data)
243 {
244     struct mp_dispatch_item item = {
245         .fn = fn,
246         .fn_data = fn_data,
247     };
248     mp_dispatch_append(queue, &item);
249 
250     pthread_mutex_lock(&queue->lock);
251     while (!item.completed)
252         pthread_cond_wait(&queue->cond, &queue->lock);
253     pthread_mutex_unlock(&queue->lock);
254 }
255 
256 // Process any outstanding dispatch items in the queue. This also handles
257 // suspending or locking the this thread from another thread via
258 // mp_dispatch_lock().
259 // The timeout specifies the minimum wait time. The actual time spent in this
260 // function can be much higher if the suspending/locking functions are used, or
261 // if executing the dispatch items takes time. On the other hand, this function
262 // can return much earlier than the timeout due to sporadic wakeups.
263 // Note that this will strictly return only after:
264 //      - timeout has passed,
265 //      - all queue items were processed,
266 //      - the possibly acquired lock has been released
267 // It's possible to cancel the timeout by calling mp_dispatch_interrupt().
268 // Reentrant calls are not allowed. There can be only 1 thread calling
269 // mp_dispatch_queue_process() at a time. In addition, mp_dispatch_lock() can
270 // not be called from a thread that is calling mp_dispatch_queue_process() (i.e.
271 // no enqueued callback can call the lock/unlock functions).
mp_dispatch_queue_process(struct mp_dispatch_queue * queue,double timeout)272 void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
273 {
274     pthread_mutex_lock(&queue->lock);
275     queue->wait = timeout > 0 ? mp_add_timeout(mp_time_us(), timeout) : 0;
276     assert(!queue->in_process); // recursion not allowed
277     queue->in_process = true;
278     queue->in_process_thread = pthread_self();
279     // Wake up thread which called mp_dispatch_lock().
280     if (queue->lock_requests)
281         pthread_cond_broadcast(&queue->cond);
282     while (1) {
283         if (queue->lock_requests) {
284             // Block due to something having called mp_dispatch_lock().
285             pthread_cond_wait(&queue->cond, &queue->lock);
286         } else if (queue->head) {
287             struct mp_dispatch_item *item = queue->head;
288             queue->head = item->next;
289             if (!queue->head)
290                 queue->tail = NULL;
291             item->next = NULL;
292             // Unlock, because we want to allow other threads to queue items
293             // while the dispatch item is processed.
294             // At the same time, we must prevent other threads from returning
295             // from mp_dispatch_lock(), which is done by locked=true.
296             assert(!queue->locked);
297             queue->locked = true;
298             pthread_mutex_unlock(&queue->lock);
299 
300             item->fn(item->fn_data);
301 
302             pthread_mutex_lock(&queue->lock);
303             assert(queue->locked);
304             queue->locked = false;
305             // Wakeup mp_dispatch_run(), also mp_dispatch_lock().
306             pthread_cond_broadcast(&queue->cond);
307             if (item->asynchronous) {
308                 talloc_free(item);
309             } else {
310                 item->completed = true;
311             }
312         } else if (queue->wait > 0 && !queue->interrupted) {
313             struct timespec ts = mp_time_us_to_timespec(queue->wait);
314             if (pthread_cond_timedwait(&queue->cond, &queue->lock, &ts))
315                 queue->wait = 0;
316         } else {
317             break;
318         }
319     }
320     assert(!queue->locked);
321     queue->in_process = false;
322     queue->interrupted = false;
323     pthread_mutex_unlock(&queue->lock);
324 }
325 
326 // If the queue is inside of mp_dispatch_queue_process(), make it return as
327 // soon as all work items have been run, without waiting for the timeout. This
328 // does not make it return early if it's blocked by a mp_dispatch_lock().
329 // If the queue is _not_ inside of mp_dispatch_queue_process(), make the next
330 // call of it use a timeout of 0 (this is useful behavior if you need to
331 // wakeup the main thread from another thread in a race free way).
mp_dispatch_interrupt(struct mp_dispatch_queue * queue)332 void mp_dispatch_interrupt(struct mp_dispatch_queue *queue)
333 {
334     pthread_mutex_lock(&queue->lock);
335     queue->interrupted = true;
336     pthread_cond_broadcast(&queue->cond);
337     pthread_mutex_unlock(&queue->lock);
338 }
339 
340 // If a mp_dispatch_queue_process() call is in progress, then adjust the maximum
341 // time it blocks due to its timeout argument. Otherwise does nothing. (It
342 // makes sense to call this in code that uses both mp_dispatch_[un]lock() and
343 // a normal event loop.)
344 // Does not work correctly with queues that have mp_dispatch_set_wakeup_fn()
345 // called on them, because this implies you actually do waiting via
346 // mp_dispatch_queue_process(), while wakeup callbacks are used when you need
347 // to wait in external APIs.
mp_dispatch_adjust_timeout(struct mp_dispatch_queue * queue,int64_t until)348 void mp_dispatch_adjust_timeout(struct mp_dispatch_queue *queue, int64_t until)
349 {
350     pthread_mutex_lock(&queue->lock);
351     if (queue->in_process && queue->wait > until) {
352         queue->wait = until;
353         pthread_cond_broadcast(&queue->cond);
354     }
355     pthread_mutex_unlock(&queue->lock);
356 }
357 
358 // Grant exclusive access to the target thread's state. While this is active,
359 // no other thread can return from mp_dispatch_lock() (i.e. it behaves like
360 // a pthread mutex), and no other thread can get dispatch items completed.
361 // Other threads can still queue asynchronous dispatch items without waiting,
362 // and the mutex behavior applies to this function and dispatch callbacks only.
363 // The lock is non-recursive, and dispatch callback functions can be thought of
364 // already holding the dispatch lock.
mp_dispatch_lock(struct mp_dispatch_queue * queue)365 void mp_dispatch_lock(struct mp_dispatch_queue *queue)
366 {
367     pthread_mutex_lock(&queue->lock);
368     // Must not be called recursively from dispatched callbacks.
369     if (queue->in_process)
370         assert(!pthread_equal(queue->in_process_thread, pthread_self()));
371     // Must not be called recursively at all.
372     if (queue->locked_explicit)
373         assert(!pthread_equal(queue->locked_explicit_thread, pthread_self()));
374     queue->lock_requests += 1;
375     // And now wait until the target thread gets "trapped" within the
376     // mp_dispatch_queue_process() call, which will mean we get exclusive
377     // access to the target's thread state.
378     if (queue->onlock_fn)
379         queue->onlock_fn(queue->onlock_ctx);
380     while (!queue->in_process) {
381         pthread_mutex_unlock(&queue->lock);
382         if (queue->wakeup_fn)
383             queue->wakeup_fn(queue->wakeup_ctx);
384         pthread_mutex_lock(&queue->lock);
385         if (queue->in_process)
386             break;
387         pthread_cond_wait(&queue->cond, &queue->lock);
388     }
389     // Wait until we can get the lock.
390     while (!queue->in_process || queue->locked)
391         pthread_cond_wait(&queue->cond, &queue->lock);
392     // "Lock".
393     assert(queue->lock_requests);
394     assert(!queue->locked);
395     assert(!queue->locked_explicit);
396     queue->locked = true;
397     queue->locked_explicit = true;
398     queue->locked_explicit_thread = pthread_self();
399     pthread_mutex_unlock(&queue->lock);
400 }
401 
402 // Undo mp_dispatch_lock().
mp_dispatch_unlock(struct mp_dispatch_queue * queue)403 void mp_dispatch_unlock(struct mp_dispatch_queue *queue)
404 {
405     pthread_mutex_lock(&queue->lock);
406     assert(queue->locked);
407     // Must be called after a mp_dispatch_lock(), from the same thread.
408     assert(queue->locked_explicit);
409     assert(pthread_equal(queue->locked_explicit_thread, pthread_self()));
410     // "Unlock".
411     queue->locked = false;
412     queue->locked_explicit = false;
413     queue->lock_requests -= 1;
414     // Wakeup mp_dispatch_queue_process(), and maybe other mp_dispatch_lock()s.
415     // (Would be nice to wake up only 1 other locker if lock_requests>0.)
416     pthread_cond_broadcast(&queue->cond);
417     pthread_mutex_unlock(&queue->lock);
418 }
419