1 /* Copyright (C) 2018 the mpv developers
2 *
3 * Permission to use, copy, modify, and/or distribute this software for any
4 * purpose with or without fee is hereby granted, provided that the above
5 * copyright notice and this permission notice appear in all copies.
6 *
7 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
8 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
9 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
10 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
11 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
12 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
13 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
14 */
15
16 #include <assert.h>
17 #include <string.h>
18 #include <sys/types.h>
19 #include <unistd.h>
20 #include <errno.h>
21
22 #ifdef __MINGW32__
23 #include <windows.h>
24 #else
25 #include <poll.h>
26 #endif
27
28 #include "common/common.h"
29 #include "misc/linked_list.h"
30 #include "osdep/atomic.h"
31 #include "osdep/io.h"
32 #include "osdep/timer.h"
33
34 #include "thread_tools.h"
35
mp_waiter_wait(struct mp_waiter * waiter)36 uintptr_t mp_waiter_wait(struct mp_waiter *waiter)
37 {
38 pthread_mutex_lock(&waiter->lock);
39 while (!waiter->done)
40 pthread_cond_wait(&waiter->wakeup, &waiter->lock);
41 pthread_mutex_unlock(&waiter->lock);
42
43 uintptr_t ret = waiter->value;
44
45 // We document that after mp_waiter_wait() the waiter object becomes
46 // invalid. (It strictly returns only after mp_waiter_wakeup() has returned,
47 // and the object is "single-shot".) So destroy it here.
48
49 // Normally, we expect that the system uses futexes, in which case the
50 // following functions will do nearly nothing. This is true for Windows
51 // and Linux. But some lesser OSes still might allocate kernel objects
52 // when initializing mutexes, so destroy them here.
53 pthread_mutex_destroy(&waiter->lock);
54 pthread_cond_destroy(&waiter->wakeup);
55
56 memset(waiter, 0xCA, sizeof(*waiter)); // for debugging
57
58 return ret;
59 }
60
mp_waiter_wakeup(struct mp_waiter * waiter,uintptr_t value)61 void mp_waiter_wakeup(struct mp_waiter *waiter, uintptr_t value)
62 {
63 pthread_mutex_lock(&waiter->lock);
64 assert(!waiter->done);
65 waiter->done = true;
66 waiter->value = value;
67 pthread_cond_signal(&waiter->wakeup);
68 pthread_mutex_unlock(&waiter->lock);
69 }
70
mp_waiter_poll(struct mp_waiter * waiter)71 bool mp_waiter_poll(struct mp_waiter *waiter)
72 {
73 pthread_mutex_lock(&waiter->lock);
74 bool r = waiter->done;
75 pthread_mutex_unlock(&waiter->lock);
76 return r;
77 }
78
79 struct mp_cancel {
80 pthread_mutex_t lock;
81 pthread_cond_t wakeup;
82
83 // Semaphore state and "mirrors".
84 atomic_bool triggered;
85 void (*cb)(void *ctx);
86 void *cb_ctx;
87 int wakeup_pipe[2];
88 void *win32_event; // actually HANDLE
89
90 // Slave list. These are automatically notified as well.
91 struct {
92 struct mp_cancel *head, *tail;
93 } slaves;
94
95 // For slaves. Synchronization is managed by parent.lock!
96 struct mp_cancel *parent;
97 struct {
98 struct mp_cancel *next, *prev;
99 } siblings;
100 };
101
cancel_destroy(void * p)102 static void cancel_destroy(void *p)
103 {
104 struct mp_cancel *c = p;
105
106 assert(!c->slaves.head); // API user error
107
108 mp_cancel_set_parent(c, NULL);
109
110 if (c->wakeup_pipe[0] >= 0) {
111 close(c->wakeup_pipe[0]);
112 close(c->wakeup_pipe[1]);
113 }
114
115 #ifdef __MINGW32__
116 if (c->win32_event)
117 CloseHandle(c->win32_event);
118 #endif
119
120 pthread_mutex_destroy(&c->lock);
121 pthread_cond_destroy(&c->wakeup);
122 }
123
mp_cancel_new(void * talloc_ctx)124 struct mp_cancel *mp_cancel_new(void *talloc_ctx)
125 {
126 struct mp_cancel *c = talloc_ptrtype(talloc_ctx, c);
127 talloc_set_destructor(c, cancel_destroy);
128 *c = (struct mp_cancel){
129 .triggered = ATOMIC_VAR_INIT(false),
130 .wakeup_pipe = {-1, -1},
131 };
132 pthread_mutex_init(&c->lock, NULL);
133 pthread_cond_init(&c->wakeup, NULL);
134 return c;
135 }
136
trigger_locked(struct mp_cancel * c)137 static void trigger_locked(struct mp_cancel *c)
138 {
139 atomic_store(&c->triggered, true);
140
141 pthread_cond_broadcast(&c->wakeup); // condition bound to c->triggered
142
143 if (c->cb)
144 c->cb(c->cb_ctx);
145
146 for (struct mp_cancel *sub = c->slaves.head; sub; sub = sub->siblings.next)
147 mp_cancel_trigger(sub);
148
149 if (c->wakeup_pipe[1] >= 0)
150 (void)write(c->wakeup_pipe[1], &(char){0}, 1);
151
152 #ifdef __MINGW32__
153 if (c->win32_event)
154 SetEvent(c->win32_event);
155 #endif
156 }
157
mp_cancel_trigger(struct mp_cancel * c)158 void mp_cancel_trigger(struct mp_cancel *c)
159 {
160 pthread_mutex_lock(&c->lock);
161 trigger_locked(c);
162 pthread_mutex_unlock(&c->lock);
163 }
164
mp_cancel_reset(struct mp_cancel * c)165 void mp_cancel_reset(struct mp_cancel *c)
166 {
167 pthread_mutex_lock(&c->lock);
168
169 atomic_store(&c->triggered, false);
170
171 if (c->wakeup_pipe[0] >= 0) {
172 // Flush it fully.
173 while (1) {
174 int r = read(c->wakeup_pipe[0], &(char[256]){0}, 256);
175 if (r <= 0 && !(r < 0 && errno == EINTR))
176 break;
177 }
178 }
179
180 #ifdef __MINGW32__
181 if (c->win32_event)
182 ResetEvent(c->win32_event);
183 #endif
184
185 pthread_mutex_unlock(&c->lock);
186 }
187
mp_cancel_test(struct mp_cancel * c)188 bool mp_cancel_test(struct mp_cancel *c)
189 {
190 return c ? atomic_load_explicit(&c->triggered, memory_order_relaxed) : false;
191 }
192
mp_cancel_wait(struct mp_cancel * c,double timeout)193 bool mp_cancel_wait(struct mp_cancel *c, double timeout)
194 {
195 struct timespec ts = mp_rel_time_to_timespec(timeout);
196 pthread_mutex_lock(&c->lock);
197 while (!mp_cancel_test(c)) {
198 if (pthread_cond_timedwait(&c->wakeup, &c->lock, &ts))
199 break;
200 }
201 pthread_mutex_unlock(&c->lock);
202
203 return mp_cancel_test(c);
204 }
205
206 // If a new notification mechanism was added, and the mp_cancel state was
207 // already triggered, make sure the newly added mechanism is also triggered.
retrigger_locked(struct mp_cancel * c)208 static void retrigger_locked(struct mp_cancel *c)
209 {
210 if (mp_cancel_test(c))
211 trigger_locked(c);
212 }
213
mp_cancel_set_cb(struct mp_cancel * c,void (* cb)(void * ctx),void * ctx)214 void mp_cancel_set_cb(struct mp_cancel *c, void (*cb)(void *ctx), void *ctx)
215 {
216 pthread_mutex_lock(&c->lock);
217 c->cb = cb;
218 c->cb_ctx = ctx;
219 retrigger_locked(c);
220 pthread_mutex_unlock(&c->lock);
221 }
222
mp_cancel_set_parent(struct mp_cancel * slave,struct mp_cancel * parent)223 void mp_cancel_set_parent(struct mp_cancel *slave, struct mp_cancel *parent)
224 {
225 // We can access c->parent without synchronization, because:
226 // - concurrent mp_cancel_set_parent() calls to slave are not allowed
227 // - slave->parent needs to stay valid as long as the slave exists
228 if (slave->parent == parent)
229 return;
230 if (slave->parent) {
231 pthread_mutex_lock(&slave->parent->lock);
232 LL_REMOVE(siblings, &slave->parent->slaves, slave);
233 pthread_mutex_unlock(&slave->parent->lock);
234 }
235 slave->parent = parent;
236 if (slave->parent) {
237 pthread_mutex_lock(&slave->parent->lock);
238 LL_APPEND(siblings, &slave->parent->slaves, slave);
239 retrigger_locked(slave->parent);
240 pthread_mutex_unlock(&slave->parent->lock);
241 }
242 }
243
mp_cancel_get_fd(struct mp_cancel * c)244 int mp_cancel_get_fd(struct mp_cancel *c)
245 {
246 pthread_mutex_lock(&c->lock);
247 if (c->wakeup_pipe[0] < 0) {
248 mp_make_wakeup_pipe(c->wakeup_pipe);
249 retrigger_locked(c);
250 }
251 pthread_mutex_unlock(&c->lock);
252
253
254 return c->wakeup_pipe[0];
255 }
256
257 #ifdef __MINGW32__
mp_cancel_get_event(struct mp_cancel * c)258 void *mp_cancel_get_event(struct mp_cancel *c)
259 {
260 pthread_mutex_lock(&c->lock);
261 if (!c->win32_event) {
262 c->win32_event = CreateEventW(NULL, TRUE, FALSE, NULL);
263 retrigger_locked(c);
264 }
265 pthread_mutex_unlock(&c->lock);
266
267 return c->win32_event;
268 }
269 #endif
270