1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21 
22 #include "internal.h"
23 #include <stdlib.h>
24 
25 #define MAX_THREADPOOL_SIZE 128
26 
27 static uv_once_t once = UV_ONCE_INIT;
28 static uv_cond_t cond;
29 static uv_mutex_t mutex;
30 static unsigned int nthreads;
31 static uv_thread_t* threads;
32 static uv_thread_t default_threads[4];
33 static QUEUE exit_message;
34 static QUEUE wq;
35 static volatile int initialized;
36 
37 
uv__cancelled(struct uv__work * w)38 static void uv__cancelled(struct uv__work* w) {
39   abort();
40 }
41 
42 
43 /* To avoid deadlock with uv_cancel() it's crucial that the worker
44  * never holds the global mutex and the loop-local mutex at the same time.
45  */
worker(void * arg)46 static void worker(void* arg) {
47   struct uv__work* w;
48   QUEUE* q;
49 
50   (void) arg;
51 
52   for (;;) {
53     uv_mutex_lock(&mutex);
54 
55     while (QUEUE_EMPTY(&wq))
56       uv_cond_wait(&cond, &mutex);
57 
58     q = QUEUE_HEAD(&wq);
59 
60     if (q == &exit_message)
61       uv_cond_signal(&cond);
62     else {
63       QUEUE_REMOVE(q);
64       QUEUE_INIT(q);  /* Signal uv_cancel() that the work req is
65                              executing. */
66     }
67 
68     uv_mutex_unlock(&mutex);
69 
70     if (q == &exit_message)
71       break;
72 
73     w = QUEUE_DATA(q, struct uv__work, wq);
74     w->work(w);
75 
76     uv_mutex_lock(&w->loop->wq_mutex);
77     w->work = NULL;  /* Signal uv_cancel() that the work req is done
78                         executing. */
79     QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
80     uv_async_send(&w->loop->wq_async);
81     uv_mutex_unlock(&w->loop->wq_mutex);
82   }
83 }
84 
85 
post(QUEUE * q)86 static void post(QUEUE* q) {
87   uv_mutex_lock(&mutex);
88   QUEUE_INSERT_TAIL(&wq, q);
89   uv_cond_signal(&cond);
90   uv_mutex_unlock(&mutex);
91 }
92 
93 
init_once(void)94 static void init_once(void) {
95   unsigned int i;
96   const char* val;
97 
98   nthreads = ARRAY_SIZE(default_threads);
99   val = getenv("UV_THREADPOOL_SIZE");
100   if (val != NULL)
101     nthreads = atoi(val);
102   if (nthreads == 0)
103     nthreads = 1;
104   if (nthreads > MAX_THREADPOOL_SIZE)
105     nthreads = MAX_THREADPOOL_SIZE;
106 
107   threads = default_threads;
108   if (nthreads > ARRAY_SIZE(default_threads)) {
109     threads = malloc(nthreads * sizeof(threads[0]));
110     if (threads == NULL) {
111       nthreads = ARRAY_SIZE(default_threads);
112       threads = default_threads;
113     }
114   }
115 
116   if (uv_cond_init(&cond))
117     abort();
118 
119   if (uv_mutex_init(&mutex))
120     abort();
121 
122   QUEUE_INIT(&wq);
123 
124   for (i = 0; i < nthreads; i++)
125     if (uv_thread_create(threads + i, worker, NULL))
126       abort();
127 
128   initialized = 1;
129 }
130 
131 
UV_DESTRUCTOR(static void cleanup (void))132 UV_DESTRUCTOR(static void cleanup(void)) {
133   unsigned int i;
134 
135   if (initialized == 0)
136     return;
137 
138   post(&exit_message);
139 
140   for (i = 0; i < nthreads; i++)
141     if (uv_thread_join(threads + i))
142       abort();
143 
144   if (threads != default_threads)
145     free(threads);
146 
147   uv_mutex_destroy(&mutex);
148   uv_cond_destroy(&cond);
149 
150   threads = NULL;
151   nthreads = 0;
152   initialized = 0;
153 }
154 
155 
uv__work_submit(uv_loop_t * loop,struct uv__work * w,void (* work)(struct uv__work * w),void (* done)(struct uv__work * w,int status))156 void uv__work_submit(uv_loop_t* loop,
157                      struct uv__work* w,
158                      void (*work)(struct uv__work* w),
159                      void (*done)(struct uv__work* w, int status)) {
160   uv_once(&once, init_once);
161   w->loop = loop;
162   w->work = work;
163   w->done = done;
164   post(&w->wq);
165 }
166 
167 
uv__work_cancel(uv_loop_t * loop,uv_req_t * req,struct uv__work * w)168 static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
169   int cancelled;
170 
171   uv_mutex_lock(&mutex);
172   uv_mutex_lock(&w->loop->wq_mutex);
173 
174   cancelled = !QUEUE_EMPTY(&w->wq) && w->work != NULL;
175   if (cancelled)
176     QUEUE_REMOVE(&w->wq);
177 
178   uv_mutex_unlock(&w->loop->wq_mutex);
179   uv_mutex_unlock(&mutex);
180 
181   if (!cancelled)
182     return -EBUSY;
183 
184   w->work = uv__cancelled;
185   uv_mutex_lock(&loop->wq_mutex);
186   QUEUE_INSERT_TAIL(&loop->wq, &w->wq);
187   uv_async_send(&loop->wq_async);
188   uv_mutex_unlock(&loop->wq_mutex);
189 
190   return 0;
191 }
192 
193 
uv__work_done(uv_async_t * handle,int status)194 void uv__work_done(uv_async_t* handle, int status) {
195   struct uv__work* w;
196   uv_loop_t* loop;
197   QUEUE* q;
198   QUEUE wq;
199   int err;
200 
201   loop = container_of(handle, uv_loop_t, wq_async);
202   QUEUE_INIT(&wq);
203 
204   uv_mutex_lock(&loop->wq_mutex);
205   if (!QUEUE_EMPTY(&loop->wq)) {
206     q = QUEUE_HEAD(&loop->wq);
207     QUEUE_SPLIT(&loop->wq, q, &wq);
208   }
209   uv_mutex_unlock(&loop->wq_mutex);
210 
211   while (!QUEUE_EMPTY(&wq)) {
212     q = QUEUE_HEAD(&wq);
213     QUEUE_REMOVE(q);
214 
215     w = container_of(q, struct uv__work, wq);
216     err = (w->work == uv__cancelled) ? -ECANCELED : 0;
217     w->done(w, err);
218   }
219 }
220 
221 
uv__queue_work(struct uv__work * w)222 static void uv__queue_work(struct uv__work* w) {
223   uv_work_t* req = container_of(w, uv_work_t, work_req);
224 
225   req->work_cb(req);
226 }
227 
228 
uv__queue_done(struct uv__work * w,int err)229 static void uv__queue_done(struct uv__work* w, int err) {
230   uv_work_t* req;
231 
232   req = container_of(w, uv_work_t, work_req);
233   uv__req_unregister(req->loop, req);
234 
235   if (req->after_work_cb == NULL)
236     return;
237 
238   req->after_work_cb(req, err);
239 }
240 
241 
uv_queue_work(uv_loop_t * loop,uv_work_t * req,uv_work_cb work_cb,uv_after_work_cb after_work_cb)242 int uv_queue_work(uv_loop_t* loop,
243                   uv_work_t* req,
244                   uv_work_cb work_cb,
245                   uv_after_work_cb after_work_cb) {
246   if (work_cb == NULL)
247     return -EINVAL;
248 
249   uv__req_init(loop, req, UV_WORK);
250   req->loop = loop;
251   req->work_cb = work_cb;
252   req->after_work_cb = after_work_cb;
253   uv__work_submit(loop, &req->work_req, uv__queue_work, uv__queue_done);
254   return 0;
255 }
256 
257 
uv_cancel(uv_req_t * req)258 int uv_cancel(uv_req_t* req) {
259   struct uv__work* wreq;
260   uv_loop_t* loop;
261 
262   switch (req->type) {
263   case UV_FS:
264     loop =  ((uv_fs_t*) req)->loop;
265     wreq = &((uv_fs_t*) req)->work_req;
266     break;
267   case UV_GETADDRINFO:
268     loop =  ((uv_getaddrinfo_t*) req)->loop;
269     wreq = &((uv_getaddrinfo_t*) req)->work_req;
270     break;
271   case UV_WORK:
272     loop =  ((uv_work_t*) req)->loop;
273     wreq = &((uv_work_t*) req)->work_req;
274     break;
275   default:
276     return -EINVAL;
277   }
278 
279   return uv__work_cancel(loop, req, wreq);
280 }
281