1 /* Copyright (C) 2018 the mpv developers
2  *
3  * Permission to use, copy, modify, and/or distribute this software for any
4  * purpose with or without fee is hereby granted, provided that the above
5  * copyright notice and this permission notice appear in all copies.
6  *
7  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
8  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
9  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
10  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
11  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
12  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
13  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
14  */
15 
16 #include <assert.h>
17 #include <string.h>
18 #include <sys/types.h>
19 #include <unistd.h>
20 #include <errno.h>
21 
22 #ifdef __MINGW32__
23 #include <windows.h>
24 #else
25 #include <poll.h>
26 #endif
27 
28 #include "common/common.h"
29 #include "misc/linked_list.h"
30 #include "osdep/atomic.h"
31 #include "osdep/io.h"
32 #include "osdep/timer.h"
33 
34 #include "thread_tools.h"
35 
mp_waiter_wait(struct mp_waiter * waiter)36 uintptr_t mp_waiter_wait(struct mp_waiter *waiter)
37 {
38     pthread_mutex_lock(&waiter->lock);
39     while (!waiter->done)
40         pthread_cond_wait(&waiter->wakeup, &waiter->lock);
41     pthread_mutex_unlock(&waiter->lock);
42 
43     uintptr_t ret = waiter->value;
44 
45     // We document that after mp_waiter_wait() the waiter object becomes
46     // invalid. (It strictly returns only after mp_waiter_wakeup() has returned,
47     // and the object is "single-shot".) So destroy it here.
48 
49     // Normally, we expect that the system uses futexes, in which case the
50     // following functions will do nearly nothing. This is true for Windows
51     // and Linux. But some lesser OSes still might allocate kernel objects
52     // when initializing mutexes, so destroy them here.
53     pthread_mutex_destroy(&waiter->lock);
54     pthread_cond_destroy(&waiter->wakeup);
55 
56     memset(waiter, 0xCA, sizeof(*waiter)); // for debugging
57 
58     return ret;
59 }
60 
mp_waiter_wakeup(struct mp_waiter * waiter,uintptr_t value)61 void mp_waiter_wakeup(struct mp_waiter *waiter, uintptr_t value)
62 {
63     pthread_mutex_lock(&waiter->lock);
64     assert(!waiter->done);
65     waiter->done = true;
66     waiter->value = value;
67     pthread_cond_signal(&waiter->wakeup);
68     pthread_mutex_unlock(&waiter->lock);
69 }
70 
mp_waiter_poll(struct mp_waiter * waiter)71 bool mp_waiter_poll(struct mp_waiter *waiter)
72 {
73     pthread_mutex_lock(&waiter->lock);
74     bool r = waiter->done;
75     pthread_mutex_unlock(&waiter->lock);
76     return r;
77 }
78 
79 struct mp_cancel {
80     pthread_mutex_t lock;
81     pthread_cond_t wakeup;
82 
83     // Semaphore state and "mirrors".
84     atomic_bool triggered;
85     void (*cb)(void *ctx);
86     void *cb_ctx;
87     int wakeup_pipe[2];
88     void *win32_event; // actually HANDLE
89 
90     // Slave list. These are automatically notified as well.
91     struct {
92         struct mp_cancel *head, *tail;
93     } slaves;
94 
95     // For slaves. Synchronization is managed by parent.lock!
96     struct mp_cancel *parent;
97     struct {
98         struct mp_cancel *next, *prev;
99     } siblings;
100 };
101 
cancel_destroy(void * p)102 static void cancel_destroy(void *p)
103 {
104     struct mp_cancel *c = p;
105 
106     assert(!c->slaves.head); // API user error
107 
108     mp_cancel_set_parent(c, NULL);
109 
110     if (c->wakeup_pipe[0] >= 0) {
111         close(c->wakeup_pipe[0]);
112         close(c->wakeup_pipe[1]);
113     }
114 
115 #ifdef __MINGW32__
116     if (c->win32_event)
117         CloseHandle(c->win32_event);
118 #endif
119 
120     pthread_mutex_destroy(&c->lock);
121     pthread_cond_destroy(&c->wakeup);
122 }
123 
mp_cancel_new(void * talloc_ctx)124 struct mp_cancel *mp_cancel_new(void *talloc_ctx)
125 {
126     struct mp_cancel *c = talloc_ptrtype(talloc_ctx, c);
127     talloc_set_destructor(c, cancel_destroy);
128     *c = (struct mp_cancel){
129         .triggered = ATOMIC_VAR_INIT(false),
130         .wakeup_pipe = {-1, -1},
131     };
132     pthread_mutex_init(&c->lock, NULL);
133     pthread_cond_init(&c->wakeup, NULL);
134     return c;
135 }
136 
trigger_locked(struct mp_cancel * c)137 static void trigger_locked(struct mp_cancel *c)
138 {
139     atomic_store(&c->triggered, true);
140 
141     pthread_cond_broadcast(&c->wakeup); // condition bound to c->triggered
142 
143     if (c->cb)
144         c->cb(c->cb_ctx);
145 
146     for (struct mp_cancel *sub = c->slaves.head; sub; sub = sub->siblings.next)
147         mp_cancel_trigger(sub);
148 
149     if (c->wakeup_pipe[1] >= 0)
150         (void)write(c->wakeup_pipe[1], &(char){0}, 1);
151 
152 #ifdef __MINGW32__
153     if (c->win32_event)
154         SetEvent(c->win32_event);
155 #endif
156 }
157 
mp_cancel_trigger(struct mp_cancel * c)158 void mp_cancel_trigger(struct mp_cancel *c)
159 {
160     pthread_mutex_lock(&c->lock);
161     trigger_locked(c);
162     pthread_mutex_unlock(&c->lock);
163 }
164 
mp_cancel_reset(struct mp_cancel * c)165 void mp_cancel_reset(struct mp_cancel *c)
166 {
167     pthread_mutex_lock(&c->lock);
168 
169     atomic_store(&c->triggered, false);
170 
171     if (c->wakeup_pipe[0] >= 0) {
172         // Flush it fully.
173         while (1) {
174             int r = read(c->wakeup_pipe[0], &(char[256]){0}, 256);
175             if (r <= 0 && !(r < 0 && errno == EINTR))
176                 break;
177         }
178     }
179 
180 #ifdef __MINGW32__
181     if (c->win32_event)
182         ResetEvent(c->win32_event);
183 #endif
184 
185     pthread_mutex_unlock(&c->lock);
186 }
187 
mp_cancel_test(struct mp_cancel * c)188 bool mp_cancel_test(struct mp_cancel *c)
189 {
190     return c ? atomic_load_explicit(&c->triggered, memory_order_relaxed) : false;
191 }
192 
mp_cancel_wait(struct mp_cancel * c,double timeout)193 bool mp_cancel_wait(struct mp_cancel *c, double timeout)
194 {
195     struct timespec ts = mp_rel_time_to_timespec(timeout);
196     pthread_mutex_lock(&c->lock);
197     while (!mp_cancel_test(c)) {
198         if (pthread_cond_timedwait(&c->wakeup, &c->lock, &ts))
199             break;
200     }
201     pthread_mutex_unlock(&c->lock);
202 
203     return mp_cancel_test(c);
204 }
205 
206 // If a new notification mechanism was added, and the mp_cancel state was
207 // already triggered, make sure the newly added mechanism is also triggered.
retrigger_locked(struct mp_cancel * c)208 static void retrigger_locked(struct mp_cancel *c)
209 {
210     if (mp_cancel_test(c))
211         trigger_locked(c);
212 }
213 
mp_cancel_set_cb(struct mp_cancel * c,void (* cb)(void * ctx),void * ctx)214 void mp_cancel_set_cb(struct mp_cancel *c, void (*cb)(void *ctx), void *ctx)
215 {
216     pthread_mutex_lock(&c->lock);
217     c->cb = cb;
218     c->cb_ctx = ctx;
219     retrigger_locked(c);
220     pthread_mutex_unlock(&c->lock);
221 }
222 
mp_cancel_set_parent(struct mp_cancel * slave,struct mp_cancel * parent)223 void mp_cancel_set_parent(struct mp_cancel *slave, struct mp_cancel *parent)
224 {
225     // We can access c->parent without synchronization, because:
226     //  - concurrent mp_cancel_set_parent() calls to slave are not allowed
227     //  - slave->parent needs to stay valid as long as the slave exists
228     if (slave->parent == parent)
229         return;
230     if (slave->parent) {
231         pthread_mutex_lock(&slave->parent->lock);
232         LL_REMOVE(siblings, &slave->parent->slaves, slave);
233         pthread_mutex_unlock(&slave->parent->lock);
234     }
235     slave->parent = parent;
236     if (slave->parent) {
237         pthread_mutex_lock(&slave->parent->lock);
238         LL_APPEND(siblings, &slave->parent->slaves, slave);
239         retrigger_locked(slave->parent);
240         pthread_mutex_unlock(&slave->parent->lock);
241     }
242 }
243 
mp_cancel_get_fd(struct mp_cancel * c)244 int mp_cancel_get_fd(struct mp_cancel *c)
245 {
246     pthread_mutex_lock(&c->lock);
247     if (c->wakeup_pipe[0] < 0) {
248         mp_make_wakeup_pipe(c->wakeup_pipe);
249         retrigger_locked(c);
250     }
251     pthread_mutex_unlock(&c->lock);
252 
253 
254     return c->wakeup_pipe[0];
255 }
256 
257 #ifdef __MINGW32__
mp_cancel_get_event(struct mp_cancel * c)258 void *mp_cancel_get_event(struct mp_cancel *c)
259 {
260     pthread_mutex_lock(&c->lock);
261     if (!c->win32_event) {
262         c->win32_event = CreateEventW(NULL, TRUE, FALSE, NULL);
263         retrigger_locked(c);
264     }
265     pthread_mutex_unlock(&c->lock);
266 
267     return c->win32_event;
268 }
269 #endif
270