1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
9 *
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 * IN THE SOFTWARE.
20 */
21
22 #include "internal.h"
23 #include <stdlib.h>
24
25 #define MAX_THREADPOOL_SIZE 128
26
27 static uv_once_t once = UV_ONCE_INIT;
28 static uv_cond_t cond;
29 static uv_mutex_t mutex;
30 static unsigned int nthreads;
31 static uv_thread_t* threads;
32 static uv_thread_t default_threads[4];
33 static QUEUE exit_message;
34 static QUEUE wq;
35 static volatile int initialized;
36
37
uv__cancelled(struct uv__work * w)38 static void uv__cancelled(struct uv__work* w) {
39 abort();
40 }
41
42
43 /* To avoid deadlock with uv_cancel() it's crucial that the worker
44 * never holds the global mutex and the loop-local mutex at the same time.
45 */
worker(void * arg)46 static void worker(void* arg) {
47 struct uv__work* w;
48 QUEUE* q;
49
50 (void) arg;
51
52 for (;;) {
53 uv_mutex_lock(&mutex);
54
55 while (QUEUE_EMPTY(&wq))
56 uv_cond_wait(&cond, &mutex);
57
58 q = QUEUE_HEAD(&wq);
59
60 if (q == &exit_message)
61 uv_cond_signal(&cond);
62 else {
63 QUEUE_REMOVE(q);
64 QUEUE_INIT(q); /* Signal uv_cancel() that the work req is
65 executing. */
66 }
67
68 uv_mutex_unlock(&mutex);
69
70 if (q == &exit_message)
71 break;
72
73 w = QUEUE_DATA(q, struct uv__work, wq);
74 w->work(w);
75
76 uv_mutex_lock(&w->loop->wq_mutex);
77 w->work = NULL; /* Signal uv_cancel() that the work req is done
78 executing. */
79 QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
80 uv_async_send(&w->loop->wq_async);
81 uv_mutex_unlock(&w->loop->wq_mutex);
82 }
83 }
84
85
post(QUEUE * q)86 static void post(QUEUE* q) {
87 uv_mutex_lock(&mutex);
88 QUEUE_INSERT_TAIL(&wq, q);
89 uv_cond_signal(&cond);
90 uv_mutex_unlock(&mutex);
91 }
92
93
init_once(void)94 static void init_once(void) {
95 unsigned int i;
96 const char* val;
97
98 nthreads = ARRAY_SIZE(default_threads);
99 val = getenv("UV_THREADPOOL_SIZE");
100 if (val != NULL)
101 nthreads = atoi(val);
102 if (nthreads == 0)
103 nthreads = 1;
104 if (nthreads > MAX_THREADPOOL_SIZE)
105 nthreads = MAX_THREADPOOL_SIZE;
106
107 threads = default_threads;
108 if (nthreads > ARRAY_SIZE(default_threads)) {
109 threads = malloc(nthreads * sizeof(threads[0]));
110 if (threads == NULL) {
111 nthreads = ARRAY_SIZE(default_threads);
112 threads = default_threads;
113 }
114 }
115
116 if (uv_cond_init(&cond))
117 abort();
118
119 if (uv_mutex_init(&mutex))
120 abort();
121
122 QUEUE_INIT(&wq);
123
124 for (i = 0; i < nthreads; i++)
125 if (uv_thread_create(threads + i, worker, NULL))
126 abort();
127
128 initialized = 1;
129 }
130
131
UV_DESTRUCTOR(static void cleanup (void))132 UV_DESTRUCTOR(static void cleanup(void)) {
133 unsigned int i;
134
135 if (initialized == 0)
136 return;
137
138 post(&exit_message);
139
140 for (i = 0; i < nthreads; i++)
141 if (uv_thread_join(threads + i))
142 abort();
143
144 if (threads != default_threads)
145 free(threads);
146
147 uv_mutex_destroy(&mutex);
148 uv_cond_destroy(&cond);
149
150 threads = NULL;
151 nthreads = 0;
152 initialized = 0;
153 }
154
155
uv__work_submit(uv_loop_t * loop,struct uv__work * w,void (* work)(struct uv__work * w),void (* done)(struct uv__work * w,int status))156 void uv__work_submit(uv_loop_t* loop,
157 struct uv__work* w,
158 void (*work)(struct uv__work* w),
159 void (*done)(struct uv__work* w, int status)) {
160 uv_once(&once, init_once);
161 w->loop = loop;
162 w->work = work;
163 w->done = done;
164 post(&w->wq);
165 }
166
167
uv__work_cancel(uv_loop_t * loop,uv_req_t * req,struct uv__work * w)168 static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
169 int cancelled;
170
171 uv_mutex_lock(&mutex);
172 uv_mutex_lock(&w->loop->wq_mutex);
173
174 cancelled = !QUEUE_EMPTY(&w->wq) && w->work != NULL;
175 if (cancelled)
176 QUEUE_REMOVE(&w->wq);
177
178 uv_mutex_unlock(&w->loop->wq_mutex);
179 uv_mutex_unlock(&mutex);
180
181 if (!cancelled)
182 return -EBUSY;
183
184 w->work = uv__cancelled;
185 uv_mutex_lock(&loop->wq_mutex);
186 QUEUE_INSERT_TAIL(&loop->wq, &w->wq);
187 uv_async_send(&loop->wq_async);
188 uv_mutex_unlock(&loop->wq_mutex);
189
190 return 0;
191 }
192
193
uv__work_done(uv_async_t * handle,int status)194 void uv__work_done(uv_async_t* handle, int status) {
195 struct uv__work* w;
196 uv_loop_t* loop;
197 QUEUE* q;
198 QUEUE wq;
199 int err;
200
201 loop = container_of(handle, uv_loop_t, wq_async);
202 QUEUE_INIT(&wq);
203
204 uv_mutex_lock(&loop->wq_mutex);
205 if (!QUEUE_EMPTY(&loop->wq)) {
206 q = QUEUE_HEAD(&loop->wq);
207 QUEUE_SPLIT(&loop->wq, q, &wq);
208 }
209 uv_mutex_unlock(&loop->wq_mutex);
210
211 while (!QUEUE_EMPTY(&wq)) {
212 q = QUEUE_HEAD(&wq);
213 QUEUE_REMOVE(q);
214
215 w = container_of(q, struct uv__work, wq);
216 err = (w->work == uv__cancelled) ? -ECANCELED : 0;
217 w->done(w, err);
218 }
219 }
220
221
uv__queue_work(struct uv__work * w)222 static void uv__queue_work(struct uv__work* w) {
223 uv_work_t* req = container_of(w, uv_work_t, work_req);
224
225 req->work_cb(req);
226 }
227
228
uv__queue_done(struct uv__work * w,int err)229 static void uv__queue_done(struct uv__work* w, int err) {
230 uv_work_t* req;
231
232 req = container_of(w, uv_work_t, work_req);
233 uv__req_unregister(req->loop, req);
234
235 if (req->after_work_cb == NULL)
236 return;
237
238 req->after_work_cb(req, err);
239 }
240
241
uv_queue_work(uv_loop_t * loop,uv_work_t * req,uv_work_cb work_cb,uv_after_work_cb after_work_cb)242 int uv_queue_work(uv_loop_t* loop,
243 uv_work_t* req,
244 uv_work_cb work_cb,
245 uv_after_work_cb after_work_cb) {
246 if (work_cb == NULL)
247 return -EINVAL;
248
249 uv__req_init(loop, req, UV_WORK);
250 req->loop = loop;
251 req->work_cb = work_cb;
252 req->after_work_cb = after_work_cb;
253 uv__work_submit(loop, &req->work_req, uv__queue_work, uv__queue_done);
254 return 0;
255 }
256
257
uv_cancel(uv_req_t * req)258 int uv_cancel(uv_req_t* req) {
259 struct uv__work* wreq;
260 uv_loop_t* loop;
261
262 switch (req->type) {
263 case UV_FS:
264 loop = ((uv_fs_t*) req)->loop;
265 wreq = &((uv_fs_t*) req)->work_req;
266 break;
267 case UV_GETADDRINFO:
268 loop = ((uv_getaddrinfo_t*) req)->loop;
269 wreq = &((uv_getaddrinfo_t*) req)->work_req;
270 break;
271 case UV_WORK:
272 loop = ((uv_work_t*) req)->loop;
273 wreq = &((uv_work_t*) req)->work_req;
274 break;
275 default:
276 return -EINVAL;
277 }
278
279 return uv__work_cancel(loop, req, wreq);
280 }
281