1 /* $Id: uv_extra.c 629837 2021-04-22 12:47:49Z ivanov $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Dmitri Dmitrienko
27 *
28 * File Description:
29 *
30 */
31
32 #include <stdlib.h>
33 #include <assert.h>
34 #include <unistd.h>
35
36 #include <uv.h>
37
38 #include "uv_extra.h"
39
40 #define CONTAINER_OF(ptr, type, member) ({ \
41 const typeof(((type*)(0))->member) *__mptr = ((const typeof(((type*)(0))->member) *)(ptr)); \
42 (type*)((char*)(__mptr) - offsetof(type, member)); \
43 })
44
45
46 struct import_worker_t
47 {
48 unsigned int m_Id;
49 struct uv_export_t * m_Exp;
50 uv_sem_t m_Sem;
51 uv_pipe_t m_Ipc;
52 uv_connect_t m_ConnectReq;
53 char m_StaticIpcBuf[256];
54 uv_stream_t * m_Handle;
55 volatile int m_Error;
56 char m_IpcClosed;
57 volatile char m_ImportStarted;
58 volatile char m_ImportFinished;
59 };
60
61
62 struct uv_export_t
63 {
64 unsigned int m_IdCounter;
65 const char * m_IpcName;
66 uv_stream_t * m_Handle;
67 uv_pipe_t m_Ipc;
68 uv_handle_type m_HandleType;
69 unsigned short m_WorkerCount;
70 struct import_worker_t * m_Workers;
71 volatile int m_Error;
72 volatile int m_ErrorCount;
73 volatile char m_InWaiting;
74 char m_IpcClosed;
75 };
76
77 /* uv_export related procedures and types */
78
79 struct ipc_peer_t
80 {
81 uv_pipe_t m_PeerHandle;
82 uv_write_t m_WriteReq;
83 struct uv_export_t * m_Exp;
84 };
85
86
s_on_peer_close(uv_handle_t * handle)87 static void s_on_peer_close(uv_handle_t * handle)
88 {
89 struct ipc_peer_t * pc = CONTAINER_OF(handle, struct ipc_peer_t,
90 m_PeerHandle);
91 free(pc);
92 }
93
94
s_on_ipc_write(uv_write_t * req,int status)95 static void s_on_ipc_write(uv_write_t * req, int status)
96 {
97 struct ipc_peer_t * pc = CONTAINER_OF(req, struct ipc_peer_t,
98 m_WriteReq);
99 if (status) {
100 ++pc->m_Exp->m_ErrorCount;
101 pc->m_Exp->m_Error = status;
102 }
103 uv_close((uv_handle_t*)&pc->m_PeerHandle, s_on_peer_close);
104 }
105
106
s_on_ipc_connection(uv_stream_t * ipc,int status)107 static void s_on_ipc_connection(uv_stream_t * ipc, int status)
108 {
109 char msg[] = "PING";
110 struct uv_export_t * exp = CONTAINER_OF(ipc, struct uv_export_t, m_Ipc);
111
112 if (status) {
113 ++exp->m_ErrorCount;
114 exp->m_Error = status;
115 return;
116 }
117
118 uv_buf_t buf = uv_buf_init(msg, sizeof(msg) - 1);
119
120 struct ipc_peer_t * pc = calloc(1, sizeof(*pc));
121 pc->m_Exp = exp;
122
123 assert(ipc->type == UV_NAMED_PIPE);
124
125 // Here the pipe must be intialized with ipc == 1 because the further
126 // call of uv_write2(...) sends a handle and thus must use ipc == 1
127 int e = uv_pipe_init(ipc->loop, (uv_pipe_t*)&pc->m_PeerHandle, 1);
128 if (e != 0) {
129 free(pc);
130 ++exp->m_ErrorCount;
131 exp->m_Error = e;
132 return;
133 }
134
135 do {
136 e = uv_accept(ipc, (uv_stream_t*)&pc->m_PeerHandle);
137 if (e == 0)
138 break;
139 else if (-e != EAGAIN) {
140 uv_close((uv_handle_t*)&pc->m_PeerHandle, s_on_peer_close);
141 ++exp->m_ErrorCount;
142 exp->m_Error = e;
143 return;
144 }
145 } while (1);
146
147 /* send the handle */
148 e = uv_write2(&pc->m_WriteReq,
149 (uv_stream_t*)&pc->m_PeerHandle,
150 &buf, 1,
151 exp->m_Handle, s_on_ipc_write);
152 if (e != 0) {
153 uv_close((uv_handle_t*)&pc->m_PeerHandle, s_on_peer_close);
154 ++exp->m_ErrorCount;
155 exp->m_Error = e;
156 return;
157 }
158 }
159
160
161
uv_export_start(uv_loop_t * loop,uv_stream_t * handle,const char * ipc_name,unsigned short count,struct uv_export_t ** rv_exp)162 int uv_export_start(uv_loop_t * loop, uv_stream_t * handle,
163 const char * ipc_name, unsigned short count,
164 struct uv_export_t ** rv_exp)
165 {
166 struct uv_export_t * exp = calloc(1, sizeof(struct uv_export_t));
167
168 *rv_exp = exp;
169
170 exp->m_IpcName = ipc_name;
171 exp->m_Handle = handle;
172 exp->m_WorkerCount = count;
173 exp->m_HandleType = handle->type;
174 exp->m_Workers = calloc(1, sizeof(struct import_worker_t) * count);
175 for (unsigned int i = 0; i < count; ++i) {
176 struct import_worker_t * worker = &exp->m_Workers[i];
177 worker->m_Id = i + 1;
178 worker->m_Exp = exp;
179 uv_sem_init(&worker->m_Sem, 0);
180 }
181
182 // Here the pipe must be initialized with ipc == 0 because otherwise
183 // listening on it is impossible
184 int e = uv_pipe_init(loop, &exp->m_Ipc, 0);
185 if (e)
186 goto error;
187
188 e = uv_pipe_bind(&exp->m_Ipc, exp->m_IpcName);
189 if (e)
190 goto error;
191
192 e = uv_listen((uv_stream_t*)&exp->m_Ipc, count * 2, s_on_ipc_connection);
193 if (e)
194 goto error;
195
196 e = exp->m_Error;
197 if (e)
198 goto error;
199
200 return 0;
201
202 error:
203 uv_export_close(exp);
204 *rv_exp = NULL;
205 return e;
206 }
207
208
uv_export_wait(struct uv_export_t * exp)209 static int uv_export_wait(struct uv_export_t * exp)
210 {
211 if (exp->m_InWaiting)
212 return UV_EBUSY;
213
214 int e = exp->m_Error;
215 if (e)
216 return e;
217
218 exp->m_InWaiting = 1;
219
220 for (unsigned int i = 0; i < exp->m_WorkerCount; ++i) {
221 struct import_worker_t * worker = &exp->m_Workers[i];
222 uv_sem_post(&worker->m_Sem);
223 }
224
225 /* This loop will finish once all workers have connected
226 * The listen pipe is closed by the last worker */
227 while (1) {
228 int e = uv_run(exp->m_Ipc.loop, UV_RUN_NOWAIT);
229 if (e == 0)
230 break;
231
232 unsigned int finished_count = 0;
233 for (unsigned int i = 0; i < exp->m_WorkerCount; ++i) {
234 struct import_worker_t * worker = &exp->m_Workers[i];
235 if (worker->m_ImportFinished || worker->m_Error)
236 finished_count++;
237 }
238 if (finished_count == exp->m_WorkerCount) {
239 uv_close((uv_handle_t*)&exp->m_Ipc, NULL);
240 uv_run(exp->m_Ipc.loop, UV_RUN_NOWAIT);
241 exp->m_IpcClosed = 1;
242 break;
243 }
244 }
245
246 e = exp->m_Error;
247 if (!e) {
248 for (unsigned int i = 0; i < exp->m_WorkerCount; ++i) {
249 struct import_worker_t * worker = &exp->m_Workers[i];
250 if (worker->m_Error) {
251 e = worker->m_Error;
252 break;
253 }
254 }
255 }
256
257 exp->m_InWaiting = 0;
258 return e;
259 }
260
261
uv_export_close(struct uv_export_t * exp)262 int uv_export_close(struct uv_export_t * exp)
263 {
264 if (exp->m_InWaiting)
265 return UV_EBUSY;
266
267 for (unsigned int i = 0; i < exp->m_WorkerCount; ++i) {
268 struct import_worker_t * worker = &exp->m_Workers[i];
269 if (worker->m_ImportStarted)
270 uv_sem_wait(&worker->m_Sem);
271 }
272 if (!exp->m_IpcClosed) {
273 exp->m_IpcClosed = 1;
274 uv_close((uv_handle_t*)&exp->m_Ipc, NULL);
275 }
276 if (exp->m_Workers) {
277 for (unsigned int i = 0; i < exp->m_WorkerCount; ++i) {
278 struct import_worker_t * worker = &exp->m_Workers[i];
279 uv_sem_destroy(&worker->m_Sem);
280 }
281 free(exp->m_Workers);
282 }
283 if (exp->m_IpcName)
284 unlink(exp->m_IpcName);
285 free(exp);
286 return 0;
287 }
288
289
uv_export_finish(struct uv_export_t * exp)290 int uv_export_finish(struct uv_export_t * exp)
291 {
292 int rc = uv_export_wait(exp);
293 if (rc) {
294 uv_export_close(exp);
295 return rc;
296 }
297 rc = uv_export_close(exp);
298 return rc;
299 }
300
301
302 /* uv_import related procedures and types */
303
s_on_ipc_alloc(uv_handle_t * handle,size_t suggested_size,uv_buf_t * buf)304 void s_on_ipc_alloc(uv_handle_t * handle, size_t suggested_size,
305 uv_buf_t * buf)
306 {
307 struct import_worker_t * worker = CONTAINER_OF(handle,
308 struct import_worker_t,
309 m_Ipc);
310 buf->base = worker->m_StaticIpcBuf;
311 buf->len = sizeof(worker->m_StaticIpcBuf);
312 }
313
314
s_on_ipc_read(uv_stream_t * handle,ssize_t nread,const uv_buf_t * buf)315 void s_on_ipc_read(uv_stream_t * handle, ssize_t nread,
316 const uv_buf_t * buf)
317 {
318 assert(handle->type == UV_NAMED_PIPE);
319
320 struct import_worker_t * worker = CONTAINER_OF(handle,
321 struct import_worker_t,
322 m_Ipc);
323 uv_pipe_t * handle_pipe = (uv_pipe_t*)handle;
324 assert(handle_pipe == &worker->m_Ipc);
325
326 int pending_count = uv_pipe_pending_count(handle_pipe);
327 if (pending_count != 1) {
328 worker->m_Error = 1;
329 return;
330 }
331
332 uv_handle_type type = uv_pipe_pending_type(handle_pipe);
333 if (type != worker->m_Exp->m_HandleType) {
334 worker->m_Error = 2;
335 return;
336 }
337
338 int e;
339 switch (type) {
340 case UV_TCP:
341 e = uv_tcp_init(handle->loop, (uv_tcp_t*)worker->m_Handle);
342 break;
343 case UV_UDP:
344 e = uv_udp_init(handle->loop, (uv_udp_t*)worker->m_Handle);
345 break;
346 case UV_NAMED_PIPE:
347 e = uv_pipe_init(handle->loop, (uv_pipe_t*)worker->m_Handle, 1);
348 break;
349 default:
350 worker->m_Error = 3;
351 return;
352 }
353
354 if (e != 0) {
355 uv_close((uv_handle_t*)(&worker->m_Handle), NULL);
356 worker->m_Error = e;
357 return;
358 }
359
360 e = uv_accept(handle, (uv_stream_t*)worker->m_Handle);
361 if (e) {
362 uv_close((uv_handle_t*)&worker->m_Handle, NULL);
363 worker->m_Error = e;
364 return;
365 }
366
367 /* closing the pipe will allow us to exit our loop */
368 if (!worker->m_IpcClosed) {
369 worker->m_IpcClosed = 1;
370 uv_close((uv_handle_t*)&worker->m_Ipc, NULL);
371 }
372 }
373
374
s_on_ipc_connected(uv_connect_t * req,int status)375 void s_on_ipc_connected(uv_connect_t * req, int status)
376 {
377 struct import_worker_t * worker = CONTAINER_OF(req,
378 struct import_worker_t,
379 m_ConnectReq);
380
381 if (status != 0) {
382 worker->m_Error = status;
383 return;
384 }
385
386 int e = uv_read_start((uv_stream_t*)&worker->m_Ipc, s_on_ipc_alloc,
387 s_on_ipc_read);
388 if (e) {
389 worker->m_Error = status;
390 return;
391 }
392 }
393
394
uv_import(uv_loop_t * loop,uv_stream_t * handle,struct uv_export_t * exp)395 int uv_import(uv_loop_t * loop, uv_stream_t * handle,
396 struct uv_export_t * exp)
397 {
398 if (!exp) {
399 return 1;
400 }
401
402 unsigned int id = __sync_fetch_and_add(&exp->m_IdCounter, 1);
403 if (id < 0 || id >= exp->m_WorkerCount) {
404 return 1;
405 }
406
407 struct import_worker_t * worker = &exp->m_Workers[id];
408
409 worker->m_Handle = handle;
410 worker->m_ImportStarted = 1;
411 uv_sem_wait(&worker->m_Sem);
412
413 int e;
414 if (!exp->m_InWaiting) {
415 worker->m_Error = 6;
416 e = 1;
417 goto done;
418 }
419
420 e = uv_pipe_init(loop, &worker->m_Ipc, 1);
421 if (e) {
422 worker->m_Error = e;
423 goto done;
424 }
425
426 uv_pipe_connect(&worker->m_ConnectReq,
427 &worker->m_Ipc,
428 worker->m_Exp->m_IpcName,
429 s_on_ipc_connected);
430
431 if (!worker->m_Error) {
432 e = uv_run(loop, UV_RUN_DEFAULT);
433 if (worker->m_Error)
434 e = worker->m_Error;
435 }
436
437 if (!worker->m_IpcClosed) {
438 worker->m_IpcClosed = 1;
439 uv_close((uv_handle_t*)&worker->m_Ipc, NULL);
440 }
441 uv_run(loop, UV_RUN_DEFAULT); /* close handles */
442
443 if (!e)
444 worker->m_ImportFinished = 1;
445
446 done:
447 uv_sem_post(&worker->m_Sem);
448 return e;
449 }
450