1 /*  $Id: uv_extra.c 629837 2021-04-22 12:47:49Z ivanov $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Dmitri Dmitrienko
27  *
28  * File Description:
29  *
30  */
31 
32 #include <stdlib.h>
33 #include <assert.h>
34 #include <unistd.h>
35 
36 #include <uv.h>
37 
38 #include "uv_extra.h"
39 
40 #define CONTAINER_OF(ptr, type, member) ({                                                      \
41     const typeof(((type*)(0))->member) *__mptr = ((const typeof(((type*)(0))->member) *)(ptr)); \
42     (type*)((char*)(__mptr) - offsetof(type, member));                                          \
43 })
44 
45 
46 struct import_worker_t
47 {
48     unsigned int                m_Id;
49     struct uv_export_t *        m_Exp;
50     uv_sem_t                    m_Sem;
51     uv_pipe_t                   m_Ipc;
52     uv_connect_t                m_ConnectReq;
53     char                        m_StaticIpcBuf[256];
54     uv_stream_t *               m_Handle;
55     volatile int                m_Error;
56     char                        m_IpcClosed;
57     volatile char               m_ImportStarted;
58     volatile char               m_ImportFinished;
59 };
60 
61 
62 struct uv_export_t
63 {
64     unsigned int                m_IdCounter;
65     const char *                m_IpcName;
66     uv_stream_t *               m_Handle;
67     uv_pipe_t                   m_Ipc;
68     uv_handle_type              m_HandleType;
69     unsigned short              m_WorkerCount;
70     struct import_worker_t *    m_Workers;
71     volatile int                m_Error;
72     volatile int                m_ErrorCount;
73     volatile char               m_InWaiting;
74     char                        m_IpcClosed;
75 };
76 
77 /* uv_export related procedures and types */
78 
79 struct ipc_peer_t
80 {
81     uv_pipe_t                   m_PeerHandle;
82     uv_write_t                  m_WriteReq;
83     struct uv_export_t *        m_Exp;
84 };
85 
86 
s_on_peer_close(uv_handle_t * handle)87 static void s_on_peer_close(uv_handle_t *  handle)
88 {
89     struct ipc_peer_t *     pc = CONTAINER_OF(handle, struct ipc_peer_t,
90                                               m_PeerHandle);
91     free(pc);
92 }
93 
94 
s_on_ipc_write(uv_write_t * req,int status)95 static void s_on_ipc_write(uv_write_t *  req, int  status)
96 {
97     struct ipc_peer_t *     pc = CONTAINER_OF(req, struct ipc_peer_t,
98                                               m_WriteReq);
99     if (status) {
100         ++pc->m_Exp->m_ErrorCount;
101         pc->m_Exp->m_Error = status;
102     }
103     uv_close((uv_handle_t*)&pc->m_PeerHandle, s_on_peer_close);
104 }
105 
106 
s_on_ipc_connection(uv_stream_t * ipc,int status)107 static void s_on_ipc_connection(uv_stream_t *  ipc, int  status)
108 {
109     char                    msg[] = "PING";
110     struct uv_export_t *    exp = CONTAINER_OF(ipc, struct uv_export_t, m_Ipc);
111 
112     if (status) {
113         ++exp->m_ErrorCount;
114         exp->m_Error = status;
115         return;
116     }
117 
118     uv_buf_t buf = uv_buf_init(msg, sizeof(msg) - 1);
119 
120     struct ipc_peer_t *     pc = calloc(1, sizeof(*pc));
121     pc->m_Exp = exp;
122 
123     assert(ipc->type == UV_NAMED_PIPE);
124 
125     // Here the pipe must be intialized with ipc == 1 because the further
126     // call of uv_write2(...) sends a handle and thus must use ipc == 1
127     int     e = uv_pipe_init(ipc->loop, (uv_pipe_t*)&pc->m_PeerHandle, 1);
128     if (e != 0) {
129         free(pc);
130         ++exp->m_ErrorCount;
131         exp->m_Error = e;
132         return;
133     }
134 
135     do {
136         e = uv_accept(ipc, (uv_stream_t*)&pc->m_PeerHandle);
137         if (e == 0)
138             break;
139         else if (-e != EAGAIN) {
140             uv_close((uv_handle_t*)&pc->m_PeerHandle, s_on_peer_close);
141             ++exp->m_ErrorCount;
142             exp->m_Error = e;
143             return;
144         }
145     } while (1);
146 
147     /* send the handle */
148     e = uv_write2(&pc->m_WriteReq,
149                   (uv_stream_t*)&pc->m_PeerHandle,
150                   &buf, 1,
151                   exp->m_Handle, s_on_ipc_write);
152     if (e != 0) {
153         uv_close((uv_handle_t*)&pc->m_PeerHandle, s_on_peer_close);
154         ++exp->m_ErrorCount;
155         exp->m_Error = e;
156         return;
157     }
158 }
159 
160 
161 
uv_export_start(uv_loop_t * loop,uv_stream_t * handle,const char * ipc_name,unsigned short count,struct uv_export_t ** rv_exp)162 int uv_export_start(uv_loop_t *  loop, uv_stream_t *  handle,
163                     const char *  ipc_name, unsigned short  count,
164                     struct uv_export_t **  rv_exp)
165 {
166     struct uv_export_t *    exp = calloc(1, sizeof(struct uv_export_t));
167 
168     *rv_exp = exp;
169 
170     exp->m_IpcName = ipc_name;
171     exp->m_Handle = handle;
172     exp->m_WorkerCount = count;
173     exp->m_HandleType = handle->type;
174     exp->m_Workers = calloc(1, sizeof(struct import_worker_t) * count);
175     for (unsigned int i = 0; i < count; ++i) {
176         struct import_worker_t *    worker = &exp->m_Workers[i];
177         worker->m_Id = i + 1;
178         worker->m_Exp = exp;
179         uv_sem_init(&worker->m_Sem, 0);
180     }
181 
182     // Here the pipe must be initialized with ipc == 0 because otherwise
183     // listening on it is impossible
184     int     e = uv_pipe_init(loop, &exp->m_Ipc, 0);
185     if (e)
186         goto error;
187 
188     e = uv_pipe_bind(&exp->m_Ipc, exp->m_IpcName);
189     if (e)
190         goto error;
191 
192     e = uv_listen((uv_stream_t*)&exp->m_Ipc, count * 2, s_on_ipc_connection);
193     if (e)
194         goto error;
195 
196     e = exp->m_Error;
197     if (e)
198         goto error;
199 
200     return 0;
201 
202 error:
203     uv_export_close(exp);
204     *rv_exp = NULL;
205     return e;
206 }
207 
208 
uv_export_wait(struct uv_export_t * exp)209 static int uv_export_wait(struct uv_export_t *  exp)
210 {
211     if (exp->m_InWaiting)
212         return UV_EBUSY;
213 
214     int         e = exp->m_Error;
215     if (e)
216         return e;
217 
218     exp->m_InWaiting = 1;
219 
220     for (unsigned int i = 0; i < exp->m_WorkerCount; ++i) {
221         struct import_worker_t *    worker = &exp->m_Workers[i];
222         uv_sem_post(&worker->m_Sem);
223     }
224 
225     /* This loop will finish once all workers have connected
226      * The listen pipe is closed by the last worker */
227     while (1) {
228         int     e = uv_run(exp->m_Ipc.loop, UV_RUN_NOWAIT);
229         if (e == 0)
230             break;
231 
232         unsigned int    finished_count = 0;
233         for (unsigned int i = 0; i < exp->m_WorkerCount; ++i) {
234             struct import_worker_t *    worker = &exp->m_Workers[i];
235             if (worker->m_ImportFinished || worker->m_Error)
236                 finished_count++;
237         }
238         if (finished_count == exp->m_WorkerCount) {
239             uv_close((uv_handle_t*)&exp->m_Ipc, NULL);
240             uv_run(exp->m_Ipc.loop, UV_RUN_NOWAIT);
241             exp->m_IpcClosed = 1;
242             break;
243         }
244     }
245 
246     e = exp->m_Error;
247     if (!e) {
248         for (unsigned int i = 0; i < exp->m_WorkerCount; ++i) {
249             struct import_worker_t *    worker = &exp->m_Workers[i];
250             if (worker->m_Error) {
251                 e = worker->m_Error;
252                 break;
253             }
254         }
255     }
256 
257     exp->m_InWaiting = 0;
258     return e;
259 }
260 
261 
uv_export_close(struct uv_export_t * exp)262 int uv_export_close(struct uv_export_t *  exp)
263 {
264     if (exp->m_InWaiting)
265         return UV_EBUSY;
266 
267     for (unsigned int i = 0; i < exp->m_WorkerCount; ++i) {
268         struct import_worker_t *    worker = &exp->m_Workers[i];
269         if (worker->m_ImportStarted)
270             uv_sem_wait(&worker->m_Sem);
271     }
272     if (!exp->m_IpcClosed) {
273         exp->m_IpcClosed = 1;
274         uv_close((uv_handle_t*)&exp->m_Ipc, NULL);
275     }
276     if (exp->m_Workers) {
277         for (unsigned int i = 0; i < exp->m_WorkerCount; ++i) {
278             struct import_worker_t *    worker = &exp->m_Workers[i];
279             uv_sem_destroy(&worker->m_Sem);
280         }
281         free(exp->m_Workers);
282     }
283     if (exp->m_IpcName)
284         unlink(exp->m_IpcName);
285     free(exp);
286     return 0;
287 }
288 
289 
uv_export_finish(struct uv_export_t * exp)290 int uv_export_finish(struct uv_export_t *  exp)
291 {
292     int         rc = uv_export_wait(exp);
293     if (rc) {
294         uv_export_close(exp);
295         return rc;
296     }
297     rc = uv_export_close(exp);
298     return rc;
299 }
300 
301 
302 /* uv_import related procedures and types */
303 
s_on_ipc_alloc(uv_handle_t * handle,size_t suggested_size,uv_buf_t * buf)304 void s_on_ipc_alloc(uv_handle_t *  handle, size_t  suggested_size,
305                     uv_buf_t *  buf)
306 {
307     struct import_worker_t *    worker = CONTAINER_OF(handle,
308                                                       struct import_worker_t,
309                                                       m_Ipc);
310     buf->base = worker->m_StaticIpcBuf;
311     buf->len = sizeof(worker->m_StaticIpcBuf);
312 }
313 
314 
s_on_ipc_read(uv_stream_t * handle,ssize_t nread,const uv_buf_t * buf)315 void s_on_ipc_read(uv_stream_t *  handle, ssize_t  nread,
316                    const uv_buf_t *  buf)
317 {
318     assert(handle->type == UV_NAMED_PIPE);
319 
320     struct import_worker_t *    worker = CONTAINER_OF(handle,
321                                                       struct import_worker_t,
322                                                       m_Ipc);
323     uv_pipe_t *         handle_pipe = (uv_pipe_t*)handle;
324     assert(handle_pipe == &worker->m_Ipc);
325 
326     int                 pending_count = uv_pipe_pending_count(handle_pipe);
327     if (pending_count != 1) {
328         worker->m_Error = 1;
329         return;
330     }
331 
332     uv_handle_type type = uv_pipe_pending_type(handle_pipe);
333     if (type != worker->m_Exp->m_HandleType) {
334         worker->m_Error = 2;
335         return;
336     }
337 
338     int     e;
339     switch (type) {
340         case UV_TCP:
341             e = uv_tcp_init(handle->loop, (uv_tcp_t*)worker->m_Handle);
342             break;
343         case UV_UDP:
344             e = uv_udp_init(handle->loop, (uv_udp_t*)worker->m_Handle);
345             break;
346         case UV_NAMED_PIPE:
347             e = uv_pipe_init(handle->loop, (uv_pipe_t*)worker->m_Handle, 1);
348             break;
349     default:
350         worker->m_Error = 3;
351         return;
352     }
353 
354     if (e != 0) {
355         uv_close((uv_handle_t*)(&worker->m_Handle), NULL);
356         worker->m_Error = e;
357         return;
358     }
359 
360     e = uv_accept(handle, (uv_stream_t*)worker->m_Handle);
361     if (e) {
362         uv_close((uv_handle_t*)&worker->m_Handle, NULL);
363         worker->m_Error = e;
364         return;
365     }
366 
367     /* closing the pipe will allow us to exit our loop */
368     if (!worker->m_IpcClosed) {
369         worker->m_IpcClosed = 1;
370         uv_close((uv_handle_t*)&worker->m_Ipc, NULL);
371     }
372 }
373 
374 
s_on_ipc_connected(uv_connect_t * req,int status)375 void s_on_ipc_connected(uv_connect_t *  req, int  status)
376 {
377     struct import_worker_t *    worker = CONTAINER_OF(req,
378                                                       struct import_worker_t,
379                                                       m_ConnectReq);
380 
381     if (status != 0) {
382         worker->m_Error = status;
383         return;
384     }
385 
386     int     e = uv_read_start((uv_stream_t*)&worker->m_Ipc, s_on_ipc_alloc,
387                               s_on_ipc_read);
388     if (e) {
389         worker->m_Error = status;
390         return;
391     }
392 }
393 
394 
uv_import(uv_loop_t * loop,uv_stream_t * handle,struct uv_export_t * exp)395 int uv_import(uv_loop_t *  loop, uv_stream_t *  handle,
396               struct uv_export_t *  exp)
397 {
398     if (!exp) {
399         return 1;
400     }
401 
402     unsigned int    id = __sync_fetch_and_add(&exp->m_IdCounter, 1);
403     if (id < 0 || id >= exp->m_WorkerCount) {
404         return 1;
405     }
406 
407     struct import_worker_t *    worker = &exp->m_Workers[id];
408 
409     worker->m_Handle = handle;
410     worker->m_ImportStarted = 1;
411     uv_sem_wait(&worker->m_Sem);
412 
413     int         e;
414     if (!exp->m_InWaiting) {
415         worker->m_Error = 6;
416         e = 1;
417         goto done;
418     }
419 
420     e = uv_pipe_init(loop, &worker->m_Ipc, 1);
421     if (e) {
422         worker->m_Error = e;
423         goto done;
424     }
425 
426     uv_pipe_connect(&worker->m_ConnectReq,
427                     &worker->m_Ipc,
428                     worker->m_Exp->m_IpcName,
429                     s_on_ipc_connected);
430 
431     if (!worker->m_Error) {
432         e = uv_run(loop, UV_RUN_DEFAULT);
433         if (worker->m_Error)
434             e = worker->m_Error;
435     }
436 
437     if (!worker->m_IpcClosed) {
438         worker->m_IpcClosed = 1;
439         uv_close((uv_handle_t*)&worker->m_Ipc, NULL);
440     }
441     uv_run(loop, UV_RUN_DEFAULT); /* close handles */
442 
443     if (!e)
444         worker->m_ImportFinished = 1;
445 
446 done:
447     uv_sem_post(&worker->m_Sem);
448     return e;
449 }
450