1 /*
2 * Copyright (c) 2021 Calvin Rose
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to
6 * deal in the Software without restriction, including without limitation the
7 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
8 * sell copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20 * IN THE SOFTWARE.
21 */
22 
23 
24 #ifndef JANET_AMALG
25 #include "features.h"
26 #include <janet.h>
27 #include "util.h"
28 #include "gc.h"
29 #include "state.h"
30 #include "fiber.h"
31 #endif
32 
33 #ifdef JANET_EV
34 
35 #include <math.h>
36 #ifdef JANET_WINDOWS
37 #include <winsock2.h>
38 #include <windows.h>
39 #else
40 #include <pthread.h>
41 #include <limits.h>
42 #include <errno.h>
43 #include <unistd.h>
44 #include <signal.h>
45 #include <sys/ioctl.h>
46 #include <sys/types.h>
47 #include <fcntl.h>
48 #include <netinet/in.h>
49 #include <netinet/tcp.h>
50 #include <netdb.h>
51 #include <sys/socket.h>
52 #include <sys/wait.h>
53 #ifdef JANET_EV_EPOLL
54 #include <sys/epoll.h>
55 #include <sys/timerfd.h>
56 #endif
57 #ifdef JANET_EV_KQUEUE
58 #include <sys/event.h>
59 #endif
60 #endif
61 
62 typedef struct {
63     JanetVM *thread;
64     JanetFiber *fiber;
65     uint32_t sched_id;
66     enum {
67         JANET_CP_MODE_READ,
68         JANET_CP_MODE_WRITE,
69         JANET_CP_MODE_CHOICE_READ,
70         JANET_CP_MODE_CHOICE_WRITE,
71         JANET_CP_MODE_CLOSE
72     } mode;
73 } JanetChannelPending;
74 
75 typedef struct {
76     JanetQueue items;
77     JanetQueue read_pending;
78     JanetQueue write_pending;
79     int32_t limit;
80     int closed;
81     int is_threaded;
82     JanetOSMutex lock;
83 } JanetChannel;
84 
85 typedef struct {
86     JanetFiber *fiber;
87     Janet value;
88     JanetSignal sig;
89     uint32_t expected_sched_id; /* If the fiber has been rescheduled this loop, don't run first scheduling. */
90 } JanetTask;
91 
92 /* Wrap return value by pairing it with the callback used to handle it
93  * in the main thread */
94 typedef struct {
95     JanetEVGenericMessage msg;
96     JanetThreadedCallback cb;
97 } JanetSelfPipeEvent;
98 
99 /* Structure used to initialize threads in the thread pool
100  * (same head structure as self pipe event)*/
101 typedef struct {
102     JanetEVGenericMessage msg;
103     JanetThreadedCallback cb;
104     JanetThreadedSubroutine subr;
105     JanetHandle write_pipe;
106 } JanetEVThreadInit;
107 
108 #define JANET_MAX_Q_CAPACITY 0x7FFFFFF
109 
janet_q_init(JanetQueue * q)110 static void janet_q_init(JanetQueue *q) {
111     q->data = NULL;
112     q->head = 0;
113     q->tail = 0;
114     q->capacity = 0;
115 }
116 
janet_q_deinit(JanetQueue * q)117 static void janet_q_deinit(JanetQueue *q) {
118     janet_free(q->data);
119 }
120 
janet_q_count(JanetQueue * q)121 static int32_t janet_q_count(JanetQueue *q) {
122     return (q->head > q->tail)
123            ? (q->tail + q->capacity - q->head)
124            : (q->tail - q->head);
125 }
126 
janet_q_push(JanetQueue * q,void * item,size_t itemsize)127 static int janet_q_push(JanetQueue *q, void *item, size_t itemsize) {
128     int32_t count = janet_q_count(q);
129     /* Resize if needed */
130     if (count + 1 >= q->capacity) {
131         if (count + 1 >= JANET_MAX_Q_CAPACITY) return 1;
132         int32_t newcap = (count + 2) * 2;
133         if (newcap > JANET_MAX_Q_CAPACITY) newcap = JANET_MAX_Q_CAPACITY;
134         q->data = janet_realloc(q->data, itemsize * newcap);
135         if (NULL == q->data) {
136             JANET_OUT_OF_MEMORY;
137         }
138         if (q->head > q->tail) {
139             /* Two segments, fix 2nd seg. */
140             int32_t newhead = q->head + (newcap - q->capacity);
141             size_t seg1 = (size_t)(q->capacity - q->head);
142             if (seg1 > 0) {
143                 memmove((char *) q->data + (newhead * itemsize),
144                         (char *) q->data + (q->head * itemsize),
145                         seg1 * itemsize);
146             }
147             q->head = newhead;
148         }
149         q->capacity = newcap;
150     }
151     memcpy((char *) q->data + itemsize * q->tail, item, itemsize);
152     q->tail = q->tail + 1 < q->capacity ? q->tail + 1 : 0;
153     return 0;
154 }
155 
janet_q_pop(JanetQueue * q,void * out,size_t itemsize)156 static int janet_q_pop(JanetQueue *q, void *out, size_t itemsize) {
157     if (q->head == q->tail) return 1;
158     memcpy(out, (char *) q->data + itemsize * q->head, itemsize);
159     q->head = q->head + 1 < q->capacity ? q->head + 1 : 0;
160     return 0;
161 }
162 
163 /* Forward declaration */
164 static void janet_unlisten(JanetListenerState *state, int is_gc);
165 
166 /* Get current timestamp (millisecond precision) */
167 static JanetTimestamp ts_now(void);
168 
169 /* Get current timestamp + an interval (millisecond precision) */
ts_delta(JanetTimestamp ts,double delta)170 static JanetTimestamp ts_delta(JanetTimestamp ts, double delta) {
171     ts += (int64_t)round(delta * 1000);
172     return ts;
173 }
174 
175 /* Look at the next timeout value without
176  * removing it. */
peek_timeout(JanetTimeout * out)177 static int peek_timeout(JanetTimeout *out) {
178     if (janet_vm.tq_count == 0) return 0;
179     *out = janet_vm.tq[0];
180     return 1;
181 }
182 
183 /* Remove the next timeout from the priority queue */
pop_timeout(size_t index)184 static void pop_timeout(size_t index) {
185     if (janet_vm.tq_count <= index) return;
186     janet_vm.tq[index] = janet_vm.tq[--janet_vm.tq_count];
187     for (;;) {
188         size_t left = (index << 1) + 1;
189         size_t right = left + 1;
190         size_t smallest = index;
191         if (left < janet_vm.tq_count &&
192                 (janet_vm.tq[left].when < janet_vm.tq[smallest].when))
193             smallest = left;
194         if (right < janet_vm.tq_count &&
195                 (janet_vm.tq[right].when < janet_vm.tq[smallest].when))
196             smallest = right;
197         if (smallest == index) return;
198         JanetTimeout temp = janet_vm.tq[index];
199         janet_vm.tq[index] = janet_vm.tq[smallest];
200         janet_vm.tq[smallest] = temp;
201         index = smallest;
202     }
203 }
204 
205 /* Add a timeout to the timeout min heap */
add_timeout(JanetTimeout to)206 static void add_timeout(JanetTimeout to) {
207     size_t oldcount = janet_vm.tq_count;
208     size_t newcount = oldcount + 1;
209     if (newcount > janet_vm.tq_capacity) {
210         size_t newcap = 2 * newcount;
211         JanetTimeout *tq = janet_realloc(janet_vm.tq, newcap * sizeof(JanetTimeout));
212         if (NULL == tq) {
213             JANET_OUT_OF_MEMORY;
214         }
215         janet_vm.tq = tq;
216         janet_vm.tq_capacity = newcap;
217     }
218     /* Append */
219     janet_vm.tq_count = (int32_t) newcount;
220     janet_vm.tq[oldcount] = to;
221     /* Heapify */
222     size_t index = oldcount;
223     while (index > 0) {
224         size_t parent = (index - 1) >> 1;
225         if (janet_vm.tq[parent].when <= janet_vm.tq[index].when) break;
226         /* Swap */
227         JanetTimeout tmp = janet_vm.tq[index];
228         janet_vm.tq[index] = janet_vm.tq[parent];
229         janet_vm.tq[parent] = tmp;
230         /* Next */
231         index = parent;
232     }
233 }
234 
235 /* Create a new event listener */
janet_listen_impl(JanetStream * stream,JanetListener behavior,int mask,size_t size,void * user)236 static JanetListenerState *janet_listen_impl(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) {
237     if (stream->flags & JANET_STREAM_CLOSED) {
238         janet_panic("cannot listen on closed stream");
239     }
240     if (stream->_mask & mask) {
241         janet_panic("cannot listen for duplicate event on stream");
242     }
243     if (janet_vm.root_fiber->waiting != NULL) {
244         janet_panic("current fiber is already waiting for event");
245     }
246     if (size < sizeof(JanetListenerState))
247         size = sizeof(JanetListenerState);
248     JanetListenerState *state = janet_malloc(size);
249     if (NULL == state) {
250         JANET_OUT_OF_MEMORY;
251     }
252     state->machine = behavior;
253     state->fiber = janet_vm.root_fiber;
254     janet_vm.root_fiber->waiting = state;
255     state->stream = stream;
256     state->_mask = mask;
257     stream->_mask |= mask;
258     state->_next = stream->state;
259     stream->state = state;
260 
261     /* Keep track of a listener for GC purposes */
262     int resize = janet_vm.listener_cap == janet_vm.listener_count;
263     if (resize) {
264         size_t newcap = janet_vm.listener_count ? janet_vm.listener_cap * 2 : 16;
265         janet_vm.listeners = janet_realloc(janet_vm.listeners, newcap * sizeof(JanetListenerState *));
266         if (NULL == janet_vm.listeners) {
267             JANET_OUT_OF_MEMORY;
268         }
269         janet_vm.listener_cap = newcap;
270     }
271     size_t index = janet_vm.listener_count++;
272     janet_vm.listeners[index] = state;
273     state->_index = index;
274 
275     /* Emit INIT event for convenience */
276     state->event = user;
277     state->machine(state, JANET_ASYNC_EVENT_INIT);
278     return state;
279 }
280 
281 /* Indicate we are no longer listening for an event. This
282  * frees the memory of the state machine as well. */
janet_unlisten_impl(JanetListenerState * state,int is_gc)283 static void janet_unlisten_impl(JanetListenerState *state, int is_gc) {
284     state->machine(state, JANET_ASYNC_EVENT_DEINIT);
285     /* Remove state machine from poll list */
286     JanetListenerState **iter = &(state->stream->state);
287     while (*iter && *iter != state)
288         iter = &((*iter)->_next);
289     janet_assert(*iter, "failed to remove listener");
290     *iter = state->_next;
291     /* Remove mask */
292     state->stream->_mask &= ~(state->_mask);
293     /* Ensure fiber does not reference this state */
294     if (!is_gc) {
295         JanetFiber *fiber = state->fiber;
296         if (NULL != fiber && fiber->waiting == state) {
297             fiber->waiting = NULL;
298         }
299     }
300     /* Untrack a listener for gc purposes */
301     size_t index = state->_index;
302     janet_vm.listeners[index] = janet_vm.listeners[--janet_vm.listener_count];
303     janet_vm.listeners[index]->_index = index;
304     janet_free(state);
305 }
306 
307 static const JanetMethod ev_default_stream_methods[] = {
308     {"close", janet_cfun_stream_close},
309     {"read", janet_cfun_stream_read},
310     {"chunk", janet_cfun_stream_chunk},
311     {"write", janet_cfun_stream_write},
312     {NULL, NULL}
313 };
314 
315 /* Create a stream*/
janet_stream(JanetHandle handle,uint32_t flags,const JanetMethod * methods)316 JanetStream *janet_stream(JanetHandle handle, uint32_t flags, const JanetMethod *methods) {
317     JanetStream *stream = janet_abstract(&janet_stream_type, sizeof(JanetStream));
318     stream->handle = handle;
319     stream->flags = flags;
320     stream->state = NULL;
321     stream->_mask = 0;
322     if (methods == NULL) methods = ev_default_stream_methods;
323     stream->methods = methods;
324     return stream;
325 }
326 
327 /* Close a stream */
janet_stream_close_impl(JanetStream * stream,int is_gc)328 static void janet_stream_close_impl(JanetStream *stream, int is_gc) {
329     if (stream->flags & JANET_STREAM_CLOSED) return;
330     JanetListenerState *state = stream->state;
331     while (NULL != state) {
332         if (!is_gc) {
333             state->machine(state, JANET_ASYNC_EVENT_CLOSE);
334         }
335         JanetListenerState *next_state = state->_next;
336         janet_unlisten(state, is_gc);
337         state = next_state;
338     }
339     stream->state = NULL;
340     stream->flags |= JANET_STREAM_CLOSED;
341 #ifdef JANET_WINDOWS
342 #ifdef JANET_NET
343     if (stream->flags & JANET_STREAM_SOCKET) {
344         closesocket((SOCKET) stream->handle);
345     } else
346 #endif
347     {
348         CloseHandle(stream->handle);
349     }
350     stream->handle = INVALID_HANDLE_VALUE;
351 #else
352     close(stream->handle);
353     stream->handle = -1;
354 #endif
355 }
356 
janet_stream_close(JanetStream * stream)357 void janet_stream_close(JanetStream *stream) {
358     janet_stream_close_impl(stream, 0);
359 }
360 
361 
362 /* Called to clean up a stream */
janet_stream_gc(void * p,size_t s)363 static int janet_stream_gc(void *p, size_t s) {
364     (void) s;
365     JanetStream *stream = (JanetStream *)p;
366     janet_stream_close_impl(stream, 1);
367     return 0;
368 }
369 
370 /* Mark a stream for GC */
janet_stream_mark(void * p,size_t s)371 static int janet_stream_mark(void *p, size_t s) {
372     (void) s;
373     JanetStream *stream = (JanetStream *) p;
374     JanetListenerState *state = stream->state;
375     while (NULL != state) {
376         if (NULL != state->fiber) {
377             janet_mark(janet_wrap_fiber(state->fiber));
378         }
379         (state->machine)(state, JANET_ASYNC_EVENT_MARK);
380         state = state->_next;
381     }
382     return 0;
383 }
384 
janet_stream_getter(void * p,Janet key,Janet * out)385 static int janet_stream_getter(void *p, Janet key, Janet *out) {
386     JanetStream *stream = (JanetStream *)p;
387     if (!janet_checktype(key, JANET_KEYWORD)) return 0;
388     const JanetMethod *stream_methods = stream->methods;
389     return janet_getmethod(janet_unwrap_keyword(key), stream_methods, out);
390 }
391 
janet_stream_marshal(void * p,JanetMarshalContext * ctx)392 static void janet_stream_marshal(void *p, JanetMarshalContext *ctx) {
393     JanetStream *s = p;
394     if (!(ctx->flags & JANET_MARSHAL_UNSAFE)) {
395         janet_panic("can only marshal stream with unsafe flag");
396     }
397     janet_marshal_abstract(ctx, p);
398     janet_marshal_int(ctx, (int32_t) s->flags);
399     janet_marshal_int64(ctx, (intptr_t) s->methods);
400 #ifdef JANET_WINDOWS
401     /* TODO - ref counting to avoid situation where a handle is closed or GCed
402      * while in transit, and it's value gets reused. DuplicateHandle does not work
403      * for network sockets, and in general for winsock it is better to nipt duplicate
404      * unless there is a need to. */
405     HANDLE duph = INVALID_HANDLE_VALUE;
406     if (s->flags & JANET_STREAM_SOCKET) {
407         duph = s->handle;
408     } else {
409         DuplicateHandle(
410             GetCurrentProcess(),
411             s->handle,
412             GetCurrentProcess(),
413             &duph,
414             0,
415             FALSE,
416             DUPLICATE_SAME_ACCESS);
417     }
418     janet_marshal_int64(ctx, (int64_t)(duph));
419 #else
420     /* Marshal after dup becuse it is easier than maintaining our own ref counting. */
421     int duph = dup(s->handle);
422     if (duph < 0) janet_panicf("failed to duplicate stream handle: %V", janet_ev_lasterr());
423     janet_marshal_int(ctx, (int32_t)(duph));
424 #endif
425 }
426 
janet_stream_unmarshal(JanetMarshalContext * ctx)427 static void *janet_stream_unmarshal(JanetMarshalContext *ctx) {
428     if (!(ctx->flags & JANET_MARSHAL_UNSAFE)) {
429         janet_panic("can only unmarshal stream with unsafe flag");
430     }
431     JanetStream *p = janet_unmarshal_abstract(ctx, sizeof(JanetStream));
432     /* Can't share listening state and such across threads */
433     p->_mask = 0;
434     p->state = NULL;
435     p->flags = (uint32_t) janet_unmarshal_int(ctx);
436     p->methods = (void *) janet_unmarshal_int64(ctx);
437 #ifdef JANET_WINDOWS
438     p->handle = (JanetHandle) janet_unmarshal_int64(ctx);
439 #else
440     p->handle = (JanetHandle) janet_unmarshal_int(ctx);
441 #endif
442     return p;
443 }
444 
janet_stream_next(void * p,Janet key)445 static Janet janet_stream_next(void *p, Janet key) {
446     JanetStream *stream = (JanetStream *)p;
447     return janet_nextmethod(stream->methods, key);
448 }
449 
450 const JanetAbstractType janet_stream_type = {
451     "core/stream",
452     janet_stream_gc,
453     janet_stream_mark,
454     janet_stream_getter,
455     NULL,
456     janet_stream_marshal,
457     janet_stream_unmarshal,
458     NULL,
459     NULL,
460     NULL,
461     janet_stream_next,
462     JANET_ATEND_NEXT
463 };
464 
465 /* Register a fiber to resume with value */
janet_schedule_signal(JanetFiber * fiber,Janet value,JanetSignal sig)466 void janet_schedule_signal(JanetFiber *fiber, Janet value, JanetSignal sig) {
467     if (fiber->flags & JANET_FIBER_FLAG_CANCELED) return;
468     JanetTask t = { fiber, value, sig, ++fiber->sched_id };
469     if (sig == JANET_SIGNAL_ERROR) fiber->flags |= JANET_FIBER_FLAG_CANCELED;
470     janet_q_push(&janet_vm.spawn, &t, sizeof(t));
471 }
472 
janet_cancel(JanetFiber * fiber,Janet value)473 void janet_cancel(JanetFiber *fiber, Janet value) {
474     janet_schedule_signal(fiber, value, JANET_SIGNAL_ERROR);
475 }
476 
janet_schedule(JanetFiber * fiber,Janet value)477 void janet_schedule(JanetFiber *fiber, Janet value) {
478     janet_schedule_signal(fiber, value, JANET_SIGNAL_OK);
479 }
480 
janet_fiber_did_resume(JanetFiber * fiber)481 void janet_fiber_did_resume(JanetFiber *fiber) {
482     /* Cancel any pending fibers */
483     if (fiber->waiting) {
484         fiber->waiting->machine(fiber->waiting, JANET_ASYNC_EVENT_CANCEL);
485         janet_unlisten(fiber->waiting, 0);
486     }
487 }
488 
489 /* Mark all pending tasks */
janet_ev_mark(void)490 void janet_ev_mark(void) {
491 
492     /* Pending tasks */
493     JanetTask *tasks = janet_vm.spawn.data;
494     if (janet_vm.spawn.head <= janet_vm.spawn.tail) {
495         for (int32_t i = janet_vm.spawn.head; i < janet_vm.spawn.tail; i++) {
496             janet_mark(janet_wrap_fiber(tasks[i].fiber));
497             janet_mark(tasks[i].value);
498         }
499     } else {
500         for (int32_t i = janet_vm.spawn.head; i < janet_vm.spawn.capacity; i++) {
501             janet_mark(janet_wrap_fiber(tasks[i].fiber));
502             janet_mark(tasks[i].value);
503         }
504         for (int32_t i = 0; i < janet_vm.spawn.tail; i++) {
505             janet_mark(janet_wrap_fiber(tasks[i].fiber));
506             janet_mark(tasks[i].value);
507         }
508     }
509 
510     /* Pending timeouts */
511     for (size_t i = 0; i < janet_vm.tq_count; i++) {
512         janet_mark(janet_wrap_fiber(janet_vm.tq[i].fiber));
513         if (janet_vm.tq[i].curr_fiber != NULL) {
514             janet_mark(janet_wrap_fiber(janet_vm.tq[i].curr_fiber));
515         }
516     }
517 
518     /* Pending listeners */
519     for (size_t i = 0; i < janet_vm.listener_count; i++) {
520         JanetListenerState *state = janet_vm.listeners[i];
521         if (NULL != state->fiber) {
522             janet_mark(janet_wrap_fiber(state->fiber));
523         }
524         janet_stream_mark(state->stream, sizeof(JanetStream));
525         (state->machine)(state, JANET_ASYNC_EVENT_MARK);
526     }
527 }
528 
529 static int janet_channel_push(JanetChannel *channel, Janet x, int mode);
530 static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice);
531 
make_supervisor_event(const char * name,JanetFiber * fiber,int threaded)532 static Janet make_supervisor_event(const char *name, JanetFiber *fiber, int threaded) {
533     Janet tup[2];
534     tup[0] = janet_ckeywordv(name);
535     tup[1] = threaded ? fiber->last_value : janet_wrap_fiber(fiber) ;
536     return janet_wrap_tuple(janet_tuple_n(tup, 2));
537 }
538 
539 /* Common init code */
janet_ev_init_common(void)540 void janet_ev_init_common(void) {
541     janet_q_init(&janet_vm.spawn);
542     janet_vm.listener_count = 0;
543     janet_vm.listener_cap = 0;
544     janet_vm.listeners = NULL;
545     janet_vm.tq = NULL;
546     janet_vm.tq_count = 0;
547     janet_vm.tq_capacity = 0;
548     janet_table_init_raw(&janet_vm.threaded_abstracts, 0);
549     janet_rng_seed(&janet_vm.ev_rng, 0);
550 }
551 
552 /* Common deinit code */
janet_ev_deinit_common(void)553 void janet_ev_deinit_common(void) {
554     janet_q_deinit(&janet_vm.spawn);
555     janet_free(janet_vm.tq);
556     janet_free(janet_vm.listeners);
557     janet_vm.listeners = NULL;
558     janet_table_deinit(&janet_vm.threaded_abstracts);
559 }
560 
561 /* Short hand to yield to event loop */
janet_await(void)562 void janet_await(void) {
563     janet_signalv(JANET_SIGNAL_EVENT, janet_wrap_nil());
564 }
565 
566 /* Set timeout for the current root fiber */
janet_addtimeout(double sec)567 void janet_addtimeout(double sec) {
568     JanetFiber *fiber = janet_vm.root_fiber;
569     JanetTimeout to;
570     to.when = ts_delta(ts_now(), sec);
571     to.fiber = fiber;
572     to.curr_fiber = NULL;
573     to.sched_id = fiber->sched_id;
574     to.is_error = 1;
575     add_timeout(to);
576 }
577 
janet_ev_inc_refcount(void)578 void janet_ev_inc_refcount(void) {
579     janet_vm.extra_listeners++;
580 }
581 
janet_ev_dec_refcount(void)582 void janet_ev_dec_refcount(void) {
583     janet_vm.extra_listeners--;
584 }
585 
586 /* Channels */
587 
588 #define JANET_MAX_CHANNEL_CAPACITY 0xFFFFFF
589 
janet_chan_is_threaded(JanetChannel * chan)590 static inline int janet_chan_is_threaded(JanetChannel *chan) {
591     return chan->is_threaded;
592 }
593 
janet_chan_pack(JanetChannel * chan,Janet * x)594 static int janet_chan_pack(JanetChannel *chan, Janet *x) {
595     if (!janet_chan_is_threaded(chan)) return 0;
596     switch (janet_type(*x)) {
597         default: {
598             JanetBuffer *buf = janet_malloc(sizeof(JanetBuffer));
599             if (NULL == buf) {
600                 JANET_OUT_OF_MEMORY;
601             }
602             janet_buffer_init(buf, 10);
603             janet_marshal(buf, *x, NULL, JANET_MARSHAL_UNSAFE);
604             *x = janet_wrap_buffer(buf);
605             return 0;
606         }
607         case JANET_NIL:
608         case JANET_NUMBER:
609         case JANET_POINTER:
610         case JANET_BOOLEAN:
611         case JANET_CFUNCTION:
612             return 0;
613     }
614 }
615 
janet_chan_unpack(JanetChannel * chan,Janet * x,int is_cleanup)616 static int janet_chan_unpack(JanetChannel *chan, Janet *x, int is_cleanup) {
617     if (!janet_chan_is_threaded(chan)) return 0;
618     switch (janet_type(*x)) {
619         default:
620             return 1;
621         case JANET_BUFFER: {
622             JanetBuffer *buf = janet_unwrap_buffer(*x);
623             int flags = is_cleanup ? (JANET_MARSHAL_UNSAFE | JANET_MARSHAL_DECREF) : JANET_MARSHAL_UNSAFE;
624             *x = janet_unmarshal(buf->data, buf->count, flags, NULL, NULL);
625             janet_buffer_deinit(buf);
626             janet_free(buf);
627             return 0;
628         }
629         case JANET_NIL:
630         case JANET_NUMBER:
631         case JANET_POINTER:
632         case JANET_BOOLEAN:
633         case JANET_CFUNCTION:
634             return 0;
635     }
636 }
637 
janet_chan_init(JanetChannel * chan,int32_t limit,int threaded)638 static void janet_chan_init(JanetChannel *chan, int32_t limit, int threaded) {
639     chan->limit = limit;
640     chan->closed = 0;
641     chan->is_threaded = threaded;
642     janet_q_init(&chan->items);
643     janet_q_init(&chan->read_pending);
644     janet_q_init(&chan->write_pending);
645     janet_os_mutex_init(&chan->lock);
646 }
647 
janet_chan_deinit(JanetChannel * chan)648 static void janet_chan_deinit(JanetChannel *chan) {
649     janet_q_deinit(&chan->read_pending);
650     janet_q_deinit(&chan->write_pending);
651     if (janet_chan_is_threaded(chan)) {
652         Janet item;
653         while (!janet_q_pop(&chan->items, &item, sizeof(item))) {
654             janet_chan_unpack(chan, &item, 1);
655         }
656     }
657     janet_q_deinit(&chan->items);
658     janet_os_mutex_deinit(&chan->lock);
659 }
660 
janet_chan_lock(JanetChannel * chan)661 static void janet_chan_lock(JanetChannel *chan) {
662     if (!janet_chan_is_threaded(chan)) return;
663     janet_os_mutex_lock(&chan->lock);
664 }
665 
janet_chan_unlock(JanetChannel * chan)666 static void janet_chan_unlock(JanetChannel *chan) {
667     if (!janet_chan_is_threaded(chan)) return;
668     janet_os_mutex_unlock(&chan->lock);
669 }
670 
671 /*
672  * Janet Channel abstract type
673  */
674 
janet_wrap_channel(JanetChannel * channel)675 static Janet janet_wrap_channel(JanetChannel *channel) {
676     return janet_wrap_abstract(channel);
677 }
678 
janet_chanat_gc(void * p,size_t s)679 static int janet_chanat_gc(void *p, size_t s) {
680     (void) s;
681     JanetChannel *channel = p;
682     janet_chan_deinit(channel);
683     return 0;
684 }
685 
janet_chanat_mark_fq(JanetQueue * fq)686 static void janet_chanat_mark_fq(JanetQueue *fq) {
687     JanetChannelPending *pending = fq->data;
688     if (fq->head <= fq->tail) {
689         for (int32_t i = fq->head; i < fq->tail; i++)
690             janet_mark(janet_wrap_fiber(pending[i].fiber));
691     } else {
692         for (int32_t i = fq->head; i < fq->capacity; i++)
693             janet_mark(janet_wrap_fiber(pending[i].fiber));
694         for (int32_t i = 0; i < fq->tail; i++)
695             janet_mark(janet_wrap_fiber(pending[i].fiber));
696     }
697 }
698 
janet_chanat_mark(void * p,size_t s)699 static int janet_chanat_mark(void *p, size_t s) {
700     (void) s;
701     JanetChannel *chan = p;
702     janet_chanat_mark_fq(&chan->read_pending);
703     janet_chanat_mark_fq(&chan->write_pending);
704     JanetQueue *items = &chan->items;
705     Janet *data = chan->items.data;
706     if (items->head <= items->tail) {
707         for (int32_t i = items->head; i < items->tail; i++)
708             janet_mark(data[i]);
709     } else {
710         for (int32_t i = items->head; i < items->capacity; i++)
711             janet_mark(data[i]);
712         for (int32_t i = 0; i < items->tail; i++)
713             janet_mark(data[i]);
714     }
715     return 0;
716 }
717 
make_write_result(JanetChannel * channel)718 static Janet make_write_result(JanetChannel *channel) {
719     Janet *tup = janet_tuple_begin(2);
720     tup[0] = janet_ckeywordv("give");
721     tup[1] = janet_wrap_channel(channel);
722     return janet_wrap_tuple(janet_tuple_end(tup));
723 }
724 
make_read_result(JanetChannel * channel,Janet x)725 static Janet make_read_result(JanetChannel *channel, Janet x) {
726     Janet *tup = janet_tuple_begin(3);
727     tup[0] = janet_ckeywordv("take");
728     tup[1] = janet_wrap_channel(channel);
729     tup[2] = x;
730     return janet_wrap_tuple(janet_tuple_end(tup));
731 }
732 
make_close_result(JanetChannel * channel)733 static Janet make_close_result(JanetChannel *channel) {
734     Janet *tup = janet_tuple_begin(2);
735     tup[0] = janet_ckeywordv("close");
736     tup[1] = janet_wrap_channel(channel);
737     return janet_wrap_tuple(janet_tuple_end(tup));
738 }
739 
740 /* Callback to use for scheduling a fiber from another thread. */
janet_thread_chan_cb(JanetEVGenericMessage msg)741 static void janet_thread_chan_cb(JanetEVGenericMessage msg) {
742     uint32_t sched_id = (uint32_t) msg.argi;
743     JanetFiber *fiber = msg.fiber;
744     int mode = msg.tag;
745     JanetChannel *channel = (JanetChannel *) msg.argp;
746     Janet x = msg.argj;
747     janet_ev_dec_refcount();
748     if (fiber->sched_id == sched_id) {
749         if (mode == JANET_CP_MODE_CHOICE_READ) {
750             janet_assert(!janet_chan_unpack(channel, &x, 0), "packing error");
751             janet_schedule(fiber, make_read_result(channel, x));
752         } else if (mode == JANET_CP_MODE_CHOICE_WRITE) {
753             janet_schedule(fiber, make_write_result(channel));
754         } else if (mode == JANET_CP_MODE_READ) {
755             janet_assert(!janet_chan_unpack(channel, &x, 0), "packing error");
756             janet_schedule(fiber, x);
757         } else if (mode == JANET_CP_MODE_WRITE) {
758             janet_schedule(fiber, janet_wrap_channel(channel));
759         } else { /* (mode == JANET_CP_MODE_CLOSE) */
760             janet_schedule(fiber, janet_wrap_nil());
761         }
762     } else if (mode != JANET_CP_MODE_CLOSE) {
763         /* Fiber has already been cancelled or resumed. */
764         /* Resend event to another waiting thread, depending on mode */
765         int is_read = (mode == JANET_CP_MODE_CHOICE_READ) || (mode == JANET_CP_MODE_READ);
766         if (is_read) {
767             JanetChannelPending reader;
768             janet_chan_lock(channel);
769             if (!janet_q_pop(&channel->read_pending, &reader, sizeof(reader))) {
770                 JanetVM *vm = reader.thread;
771                 JanetEVGenericMessage msg;
772                 msg.tag = reader.mode;
773                 msg.fiber = reader.fiber;
774                 msg.argi = (int32_t) reader.sched_id;
775                 msg.argp = channel;
776                 msg.argj = x;
777                 janet_ev_post_event(vm, janet_thread_chan_cb, msg);
778             }
779             janet_chan_unlock(channel);
780         } else {
781             JanetChannelPending writer;
782             janet_chan_lock(channel);
783             if (!janet_q_pop(&channel->write_pending, &writer, sizeof(writer))) {
784                 JanetVM *vm = writer.thread;
785                 JanetEVGenericMessage msg;
786                 msg.tag = writer.mode;
787                 msg.fiber = writer.fiber;
788                 msg.argi = (int32_t) writer.sched_id;
789                 msg.argp = channel;
790                 msg.argj = janet_wrap_nil();
791                 janet_ev_post_event(vm, janet_thread_chan_cb, msg);
792             }
793             janet_chan_unlock(channel);
794         }
795     }
796 }
797 
798 /* Push a value to a channel, and return 1 if channel should block, zero otherwise.
799  * If the push would block, will add to the write_pending queue in the channel.
800  * Handles both threaded and unthreaded channels. */
janet_channel_push(JanetChannel * channel,Janet x,int mode)801 static int janet_channel_push(JanetChannel *channel, Janet x, int mode) {
802     JanetChannelPending reader;
803     int is_empty;
804     if (janet_chan_pack(channel, &x)) {
805         janet_panicf("failed to pack value for channel: %v", x);
806     }
807     janet_chan_lock(channel);
808     if (channel->closed) {
809         janet_chan_unlock(channel);
810         janet_panic("cannot write to closed channel");
811     }
812     int is_threaded = janet_chan_is_threaded(channel);
813     if (is_threaded) {
814         /* don't dereference fiber from another thread */
815         is_empty = janet_q_pop(&channel->read_pending, &reader, sizeof(reader));
816     } else {
817         do {
818             is_empty = janet_q_pop(&channel->read_pending, &reader, sizeof(reader));
819         } while (!is_empty && (reader.sched_id != reader.fiber->sched_id));
820     }
821     if (is_empty) {
822         /* No pending reader */
823         if (janet_q_push(&channel->items, &x, sizeof(Janet))) {
824             janet_chan_unlock(channel);
825             janet_panicf("channel overflow: %v", x);
826         } else if (janet_q_count(&channel->items) > channel->limit) {
827             /* No root fiber, we are in completion on a root fiber. Don't block. */
828             if (mode == 2) {
829                 janet_chan_unlock(channel);
830                 return 0;
831             }
832             /* Pushed successfully, but should block. */
833             JanetChannelPending pending;
834             pending.thread = &janet_vm;
835             pending.fiber = janet_vm.root_fiber,
836             pending.sched_id = janet_vm.root_fiber->sched_id,
837             pending.mode = mode ? JANET_CP_MODE_CHOICE_WRITE : JANET_CP_MODE_WRITE;
838             janet_q_push(&channel->write_pending, &pending, sizeof(pending));
839             janet_chan_unlock(channel);
840             janet_ev_inc_refcount();
841             if (is_threaded) {
842                 janet_gcroot(janet_wrap_fiber(pending.fiber));
843             }
844             return 1;
845         }
846     } else {
847         /* Pending reader */
848         if (is_threaded) {
849             JanetVM *vm = reader.thread;
850             JanetEVGenericMessage msg;
851             msg.tag = reader.mode;
852             msg.fiber = reader.fiber;
853             msg.argi = (int32_t) reader.sched_id;
854             msg.argp = channel;
855             msg.argj = x;
856             janet_ev_post_event(vm, janet_thread_chan_cb, msg);
857         } else {
858             janet_ev_dec_refcount();
859             if (reader.mode == JANET_CP_MODE_CHOICE_READ) {
860                 janet_schedule(reader.fiber, make_read_result(channel, x));
861             } else {
862                 janet_schedule(reader.fiber, x);
863             }
864         }
865     }
866     janet_chan_unlock(channel);
867     return 0;
868 }
869 
870 /* Pop from a channel - returns 1 if item was obtained, 0 otherwise. The item
871  * is returned by reference. If the pop would block, will add to the read_pending
872  * queue in the channel. */
janet_channel_pop(JanetChannel * channel,Janet * item,int is_choice)873 static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice) {
874     JanetChannelPending writer;
875     janet_chan_lock(channel);
876     if (channel->closed) {
877         janet_chan_unlock(channel);
878         *item = janet_wrap_nil();
879         return 1;
880     }
881     int is_threaded = janet_chan_is_threaded(channel);
882     if (janet_q_pop(&channel->items, item, sizeof(Janet))) {
883         /* Queue empty */
884         JanetChannelPending pending;
885         pending.thread = &janet_vm;
886         pending.fiber = janet_vm.root_fiber,
887         pending.sched_id = janet_vm.root_fiber->sched_id;
888         pending.mode = is_choice ? JANET_CP_MODE_CHOICE_READ : JANET_CP_MODE_READ;
889         janet_q_push(&channel->read_pending, &pending, sizeof(pending));
890         janet_chan_unlock(channel);
891         janet_ev_inc_refcount();
892         if (is_threaded) {
893             janet_gcroot(janet_wrap_fiber(pending.fiber));
894         }
895         return 0;
896     }
897     janet_assert(!janet_chan_unpack(channel, item, 0), "bad channel packing");
898     if (!janet_q_pop(&channel->write_pending, &writer, sizeof(writer))) {
899         /* Pending writer */
900         if (is_threaded) {
901             JanetVM *vm = writer.thread;
902             JanetEVGenericMessage msg;
903             msg.tag = writer.mode;
904             msg.fiber = writer.fiber;
905             msg.argi = (int32_t) writer.sched_id;
906             msg.argp = channel;
907             msg.argj = janet_wrap_nil();
908             janet_ev_post_event(vm, janet_thread_chan_cb, msg);
909         } else {
910             janet_ev_dec_refcount();
911             if (writer.mode == JANET_CP_MODE_CHOICE_WRITE) {
912                 janet_schedule(writer.fiber, make_write_result(channel));
913             } else {
914                 janet_schedule(writer.fiber, janet_wrap_abstract(channel));
915             }
916         }
917     }
918     janet_chan_unlock(channel);
919     return 1;
920 }
921 
janet_channel_unwrap(void * abstract)922 JanetChannel *janet_channel_unwrap(void *abstract) {
923     return abstract;
924 }
925 
janet_getchannel(const Janet * argv,int32_t n)926 JanetChannel *janet_getchannel(const Janet *argv, int32_t n) {
927     return janet_channel_unwrap(janet_getabstract(argv, n, &janet_channel_type));
928 }
929 
janet_optchannel(const Janet * argv,int32_t argc,int32_t n,JanetChannel * dflt)930 JanetChannel *janet_optchannel(const Janet *argv, int32_t argc, int32_t n, JanetChannel *dflt) {
931     if (argc > n && !janet_checktype(argv[n], JANET_NIL)) {
932         return janet_getchannel(argv, n);
933     } else {
934         return dflt;
935     }
936 }
937 
938 /* Channel Methods */
939 
940 JANET_CORE_FN(cfun_channel_push,
941               "(ev/give channel value)",
942               "Write a value to a channel, suspending the current fiber if the channel is full. "
943               "Returns the channel if the write succeeded, nil otherwise.") {
944     janet_fixarity(argc, 2);
945     JanetChannel *channel = janet_getchannel(argv, 0);
946     if (janet_channel_push(channel, argv[1], 0)) {
947         janet_await();
948     }
949     return argv[0];
950 }
951 
952 JANET_CORE_FN(cfun_channel_pop,
953               "(ev/take channel)",
954               "Read from a channel, suspending the current fiber if no value is available.") {
955     janet_fixarity(argc, 1);
956     JanetChannel *channel = janet_getchannel(argv, 0);
957     Janet item;
958     if (janet_channel_pop(channel, &item, 0)) {
959         janet_schedule(janet_vm.root_fiber, item);
960     }
961     janet_await();
962 }
963 
964 JANET_CORE_FN(cfun_channel_choice,
965               "(ev/select & clauses)",
966               "Block until the first of several channel operations occur. Returns a tuple of the form [:give chan], [:take chan x], or [:close chan], where "
967               "a :give tuple is the result of a write and :take tuple is the result of a read. Each clause must be either a channel (for "
968               "a channel take operation) or a tuple [channel x] for a channel give operation. Operations are tried in order, such that the first "
969               "clauses will take precedence over later clauses. Both and give and take operations can return a [:close chan] tuple, which indicates that "
970               "the specified channel was closed while waiting, or that the channel was already closed.") {
971     janet_arity(argc, 1, -1);
972     int32_t len;
973     const Janet *data;
974 
975     /* Check channels for immediate reads and writes */
976     for (int32_t i = 0; i < argc; i++) {
977         if (janet_indexed_view(argv[i], &data, &len) && len == 2) {
978             /* Write */
979             JanetChannel *chan = janet_getchannel(data, 0);
980             janet_chan_lock(chan);
981             if (chan->closed) {
982                 return make_close_result(chan);
983             }
984             if (janet_q_count(&chan->items) < chan->limit) {
985                 janet_channel_push(chan, data[1], 1);
986                 return make_write_result(chan);
987             }
988         } else {
989             /* Read */
990             JanetChannel *chan = janet_getchannel(argv, i);
991             if (chan->closed) {
992                 return make_close_result(chan);
993             }
994             if (chan->items.head != chan->items.tail) {
995                 Janet item;
996                 janet_channel_pop(chan, &item, 1);
997                 return make_read_result(chan, item);
998             }
999         }
1000     }
1001 
1002     /* Wait for all readers or writers */
1003     for (int32_t i = 0; i < argc; i++) {
1004         if (janet_indexed_view(argv[i], &data, &len) && len == 2) {
1005             /* Write */
1006             JanetChannel *chan = janet_getchannel(data, 0);
1007             if (chan->closed) continue;
1008             janet_channel_push(chan, data[1], 1);
1009         } else {
1010             /* Read */
1011             Janet item;
1012             JanetChannel *chan = janet_getchannel(argv, i);
1013             if (chan->closed) continue;
1014             janet_channel_pop(chan, &item, 1);
1015         }
1016     }
1017 
1018     janet_await();
1019 }
1020 
1021 JANET_CORE_FN(cfun_channel_full,
1022               "(ev/full channel)",
1023               "Check if a channel is full or not.") {
1024     janet_fixarity(argc, 1);
1025     JanetChannel *channel = janet_getchannel(argv, 0);
1026     janet_chan_lock(channel);
1027     Janet ret = janet_wrap_boolean(janet_q_count(&channel->items) >= channel->limit);
1028     janet_chan_unlock(channel);
1029     return ret;
1030 }
1031 
1032 JANET_CORE_FN(cfun_channel_capacity,
1033               "(ev/capacity channel)",
1034               "Get the number of items a channel will store before blocking writers.") {
1035     janet_fixarity(argc, 1);
1036     JanetChannel *channel = janet_getchannel(argv, 0);
1037     janet_chan_lock(channel);
1038     Janet ret = janet_wrap_integer(channel->limit);
1039     janet_chan_unlock(channel);
1040     return ret;
1041 }
1042 
1043 JANET_CORE_FN(cfun_channel_count,
1044               "(ev/count channel)",
1045               "Get the number of items currently waiting in a channel.") {
1046     janet_fixarity(argc, 1);
1047     JanetChannel *channel = janet_getchannel(argv, 0);
1048     janet_chan_lock(channel);
1049     Janet ret = janet_wrap_integer(janet_q_count(&channel->items));
1050     janet_chan_unlock(channel);
1051     return ret;
1052 }
1053 
1054 /* Fisher yates shuffle of arguments to get fairness */
fisher_yates_args(int32_t argc,Janet * argv)1055 static void fisher_yates_args(int32_t argc, Janet *argv) {
1056     for (int32_t i = argc; i > 1; i--) {
1057         int32_t swap_index = janet_rng_u32(&janet_vm.ev_rng) % i;
1058         Janet temp = argv[swap_index];
1059         argv[swap_index] = argv[i - 1];
1060         argv[i - 1] = temp;
1061     }
1062 }
1063 
1064 JANET_CORE_FN(cfun_channel_rchoice,
1065               "(ev/rselect & clauses)",
1066               "Similar to ev/select, but will try clauses in a random order for fairness.") {
1067     fisher_yates_args(argc, argv);
1068     return cfun_channel_choice(argc, argv);
1069 }
1070 
1071 JANET_CORE_FN(cfun_channel_new,
1072               "(ev/chan &opt capacity)",
1073               "Create a new channel. capacity is the number of values to queue before "
1074               "blocking writers, defaults to 0 if not provided. Returns a new channel.") {
1075     janet_arity(argc, 0, 1);
1076     int32_t limit = janet_optnat(argv, argc, 0, 0);
1077     JanetChannel *channel = janet_abstract(&janet_channel_type, sizeof(JanetChannel));
1078     janet_chan_init(channel, limit, 0);
1079     return janet_wrap_abstract(channel);
1080 }
1081 
1082 JANET_CORE_FN(cfun_channel_new_threaded,
1083               "(ev/thread-chan &opt limit)",
1084               "Create a threaded channel. A threaded channel is a channel that can be shared between threads and "
1085               "used to communicate between any number of operating system threads.") {
1086     janet_arity(argc, 0, 1);
1087     int32_t limit = janet_optnat(argv, argc, 0, 0);
1088     JanetChannel *tchan = janet_abstract_threaded(&janet_channel_type, sizeof(JanetChannel));
1089     janet_chan_init(tchan, limit, 1);
1090     return janet_wrap_abstract(tchan);
1091 }
1092 
1093 JANET_CORE_FN(cfun_channel_close,
1094               "(ev/chan-close chan)",
1095               "Close a channel. A closed channel will cause all pending reads and writes to return nil. "
1096               "Returns the channel.") {
1097     janet_fixarity(argc, 1);
1098     JanetChannel *channel = janet_getchannel(argv, 0);
1099     janet_chan_lock(channel);
1100     if (!channel->closed) {
1101         channel->closed = 1;
1102         JanetChannelPending writer;
1103         while (!janet_q_pop(&channel->write_pending, &writer, sizeof(writer))) {
1104             if (writer.thread != &janet_vm) {
1105                 JanetVM *vm = writer.thread;
1106                 JanetEVGenericMessage msg;
1107                 msg.fiber = writer.fiber;
1108                 msg.argp = channel;
1109                 msg.tag = JANET_CP_MODE_CLOSE;
1110                 msg.argi = (int32_t) writer.sched_id;
1111                 msg.argj = janet_wrap_nil();
1112                 janet_ev_post_event(vm, janet_thread_chan_cb, msg);
1113             } else {
1114                 janet_ev_dec_refcount();
1115                 if (writer.mode == JANET_CP_MODE_CHOICE_WRITE) {
1116                     janet_schedule(writer.fiber, janet_wrap_nil());
1117                 } else {
1118                     janet_schedule(writer.fiber, make_close_result(channel));
1119                 }
1120             }
1121         }
1122         JanetChannelPending reader;
1123         while (!janet_q_pop(&channel->read_pending, &reader, sizeof(reader))) {
1124             if (reader.thread != &janet_vm) {
1125                 JanetVM *vm = reader.thread;
1126                 JanetEVGenericMessage msg;
1127                 msg.fiber = reader.fiber;
1128                 msg.argp = channel;
1129                 msg.tag = JANET_CP_MODE_CLOSE;
1130                 msg.argi = (int32_t) reader.sched_id;
1131                 msg.argj = janet_wrap_nil();
1132                 janet_ev_post_event(vm, janet_thread_chan_cb, msg);
1133             } else {
1134                 janet_ev_dec_refcount();
1135                 if (reader.mode == JANET_CP_MODE_CHOICE_READ) {
1136                     janet_schedule(reader.fiber, janet_wrap_nil());
1137                 } else {
1138                     janet_schedule(reader.fiber, make_close_result(channel));
1139                 }
1140             }
1141         }
1142     }
1143     janet_chan_unlock(channel);
1144     return argv[0];
1145 }
1146 
1147 static const JanetMethod ev_chanat_methods[] = {
1148     {"select", cfun_channel_choice},
1149     {"rselect", cfun_channel_rchoice},
1150     {"count", cfun_channel_count},
1151     {"take", cfun_channel_pop},
1152     {"give", cfun_channel_push},
1153     {"capacity", cfun_channel_capacity},
1154     {"full", cfun_channel_full},
1155     {"close", cfun_channel_close},
1156     {NULL, NULL}
1157 };
1158 
janet_chanat_get(void * p,Janet key,Janet * out)1159 static int janet_chanat_get(void *p, Janet key, Janet *out) {
1160     (void) p;
1161     if (!janet_checktype(key, JANET_KEYWORD)) return 0;
1162     return janet_getmethod(janet_unwrap_keyword(key), ev_chanat_methods, out);
1163 }
1164 
janet_chanat_next(void * p,Janet key)1165 static Janet janet_chanat_next(void *p, Janet key) {
1166     (void) p;
1167     return janet_nextmethod(ev_chanat_methods, key);
1168 }
1169 
1170 const JanetAbstractType janet_channel_type = {
1171     "core/channel",
1172     janet_chanat_gc,
1173     janet_chanat_mark,
1174     janet_chanat_get,
1175     NULL, /* put */
1176     NULL, /* marshal */
1177     NULL, /* unmarshal */
1178     NULL, /* tostring */
1179     NULL, /* compare */
1180     NULL, /* hash */
1181     janet_chanat_next,
1182     JANET_ATEND_NEXT
1183 };
1184 
1185 /* Main event loop */
1186 
1187 void janet_loop1_impl(int has_timeout, JanetTimestamp timeout);
1188 
janet_loop_done(void)1189 int janet_loop_done(void) {
1190     return !(janet_vm.listener_count ||
1191              (janet_vm.spawn.head != janet_vm.spawn.tail) ||
1192              janet_vm.tq_count ||
1193              janet_vm.extra_listeners);
1194 }
1195 
janet_loop1(void)1196 JanetFiber *janet_loop1(void) {
1197     /* Schedule expired timers */
1198     JanetTimeout to;
1199     JanetTimestamp now = ts_now();
1200     while (peek_timeout(&to) && to.when <= now) {
1201         pop_timeout(0);
1202         if (to.curr_fiber != NULL) {
1203             /* This is a deadline (for a fiber, not a function call) */
1204             JanetFiberStatus s = janet_fiber_status(to.curr_fiber);
1205             int isFinished = (s == JANET_STATUS_DEAD ||
1206                               s == JANET_STATUS_ERROR ||
1207                               s == JANET_STATUS_USER0 ||
1208                               s == JANET_STATUS_USER1 ||
1209                               s == JANET_STATUS_USER2 ||
1210                               s == JANET_STATUS_USER3 ||
1211                               s == JANET_STATUS_USER4);
1212             if (!isFinished) {
1213                 janet_cancel(to.fiber, janet_cstringv("deadline expired"));
1214             }
1215         } else {
1216             /* This is a timeout (for a function call, not a whole fiber) */
1217             if (to.fiber->sched_id == to.sched_id) {
1218                 if (to.is_error) {
1219                     janet_cancel(to.fiber, janet_cstringv("timeout"));
1220                 } else {
1221                     janet_schedule(to.fiber, janet_wrap_nil());
1222                 }
1223             }
1224         }
1225     }
1226 
1227     /* Run scheduled fibers */
1228     while (janet_vm.spawn.head != janet_vm.spawn.tail) {
1229         JanetTask task = {NULL, janet_wrap_nil(), JANET_SIGNAL_OK, 0};
1230         janet_q_pop(&janet_vm.spawn, &task, sizeof(task));
1231         task.fiber->flags &= ~JANET_FIBER_FLAG_CANCELED;
1232         if (task.expected_sched_id != task.fiber->sched_id) continue;
1233         Janet res;
1234         JanetSignal sig = janet_continue_signal(task.fiber, task.value, &res, task.sig);
1235         void *sv = task.fiber->supervisor_channel;
1236         int is_suspended = sig == JANET_SIGNAL_EVENT || sig == JANET_SIGNAL_YIELD || sig == JANET_SIGNAL_INTERRUPT;
1237         if (NULL == sv) {
1238             if (!is_suspended) {
1239                 janet_stacktrace_ext(task.fiber, res, "");
1240             }
1241         } else if (sig == JANET_SIGNAL_OK || (task.fiber->flags & (1 << sig))) {
1242             JanetChannel *chan = janet_channel_unwrap(sv);
1243             janet_channel_push(chan, make_supervisor_event(janet_signal_names[sig],
1244                                task.fiber, chan->is_threaded), 2);
1245         } else if (!is_suspended) {
1246             janet_stacktrace_ext(task.fiber, res, "");
1247         }
1248         if (sig == JANET_SIGNAL_INTERRUPT) {
1249             /* On interrupts, return the interrupted fiber immediately */
1250             return task.fiber;
1251         }
1252     }
1253 
1254     /* Poll for events */
1255     if (janet_vm.listener_count || janet_vm.tq_count || janet_vm.extra_listeners) {
1256         JanetTimeout to;
1257         memset(&to, 0, sizeof(to));
1258         int has_timeout;
1259         /* Drop timeouts that are no longer needed */
1260         while ((has_timeout = peek_timeout(&to)) && (to.curr_fiber == NULL) && to.fiber->sched_id != to.sched_id) {
1261             pop_timeout(0);
1262         }
1263         /* Run polling implementation only if pending timeouts or pending events */
1264         if (janet_vm.tq_count || janet_vm.listener_count || janet_vm.extra_listeners) {
1265             janet_loop1_impl(has_timeout, to.when);
1266         }
1267     }
1268 
1269     /* No fiber was interrupted */
1270     return NULL;
1271 }
1272 
1273 /* Same as janet_interpreter_interrupt, but will also
1274  * break out of the event loop if waiting for an event
1275  * (say, waiting for ev/sleep to finish). Does this by pushing
1276  * an empty event to the event loop. */
janet_loop1_interrupt(JanetVM * vm)1277 void janet_loop1_interrupt(JanetVM *vm) {
1278     janet_interpreter_interrupt(vm);
1279     JanetEVGenericMessage msg = {0};
1280     JanetCallback cb = NULL;
1281     janet_ev_post_event(vm, cb, msg);
1282 }
1283 
janet_loop(void)1284 void janet_loop(void) {
1285     while (!janet_loop_done()) {
1286         JanetFiber *interrupted_fiber = janet_loop1();
1287         if (NULL != interrupted_fiber) {
1288             janet_schedule(interrupted_fiber, janet_wrap_nil());
1289         }
1290     }
1291 }
1292 
1293 /*
1294  * Self-pipe handling code.
1295  */
1296 
1297 #ifdef JANET_WINDOWS
1298 
1299 /* On windows, use PostQueuedCompletionStatus instead for
1300  * custom events */
1301 
1302 #else
1303 
janet_ev_setup_selfpipe(void)1304 static void janet_ev_setup_selfpipe(void) {
1305     if (janet_make_pipe(janet_vm.selfpipe, 0)) {
1306         JANET_EXIT("failed to initialize self pipe in event loop");
1307     }
1308 }
1309 
1310 /* Handle events from the self pipe inside the event loop */
janet_ev_handle_selfpipe(void)1311 static void janet_ev_handle_selfpipe(void) {
1312     JanetSelfPipeEvent response;
1313     while (read(janet_vm.selfpipe[0], &response, sizeof(response)) > 0) {
1314         if (NULL != response.cb) {
1315             response.cb(response.msg);
1316         }
1317     }
1318 }
1319 
janet_ev_cleanup_selfpipe(void)1320 static void janet_ev_cleanup_selfpipe(void) {
1321     close(janet_vm.selfpipe[0]);
1322     close(janet_vm.selfpipe[1]);
1323 }
1324 
1325 #endif
1326 
1327 #ifdef JANET_WINDOWS
1328 
ts_now(void)1329 static JanetTimestamp ts_now(void) {
1330     return (JanetTimestamp) GetTickCount64();
1331 }
1332 
janet_ev_init(void)1333 void janet_ev_init(void) {
1334     janet_ev_init_common();
1335     janet_vm.iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
1336     if (NULL == janet_vm.iocp) janet_panic("could not create io completion port");
1337 }
1338 
janet_ev_deinit(void)1339 void janet_ev_deinit(void) {
1340     janet_ev_deinit_common();
1341     CloseHandle(janet_vm.iocp);
1342 }
1343 
janet_listen(JanetStream * stream,JanetListener behavior,int mask,size_t size,void * user)1344 JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) {
1345     /* Add the handle to the io completion port if not already added */
1346     JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user);
1347     if (!(stream->flags & JANET_STREAM_IOCP)) {
1348         if (NULL == CreateIoCompletionPort(stream->handle, janet_vm.iocp, (ULONG_PTR) stream, 0)) {
1349             janet_panicf("failed to listen for events: %V", janet_ev_lasterr());
1350         }
1351         stream->flags |= JANET_STREAM_IOCP;
1352     }
1353     return state;
1354 }
1355 
1356 
janet_unlisten(JanetListenerState * state,int is_gc)1357 static void janet_unlisten(JanetListenerState *state, int is_gc) {
1358     janet_unlisten_impl(state, is_gc);
1359 }
1360 
janet_loop1_impl(int has_timeout,JanetTimestamp to)1361 void janet_loop1_impl(int has_timeout, JanetTimestamp to) {
1362     ULONG_PTR completionKey = 0;
1363     DWORD num_bytes_transfered = 0;
1364     LPOVERLAPPED overlapped = NULL;
1365 
1366     /* Calculate how long to wait before timeout */
1367     uint64_t waittime;
1368     if (has_timeout) {
1369         JanetTimestamp now = ts_now();
1370         if (now > to) {
1371             waittime = 0;
1372         } else {
1373             waittime = (uint64_t)(to - now);
1374         }
1375     } else {
1376         waittime = INFINITE;
1377     }
1378     BOOL result = GetQueuedCompletionStatus(janet_vm.iocp, &num_bytes_transfered, &completionKey, &overlapped, (DWORD) waittime);
1379 
1380     if (result || overlapped) {
1381         if (0 == completionKey) {
1382             /* Custom event */
1383             JanetSelfPipeEvent *response = (JanetSelfPipeEvent *)(overlapped);
1384             if (NULL != response->cb) {
1385                 response->cb(response->msg);
1386             }
1387             janet_free(response);
1388         } else {
1389             /* Normal event */
1390             JanetStream *stream = (JanetStream *) completionKey;
1391             JanetListenerState *state = stream->state;
1392             while (state != NULL) {
1393                 if (state->tag == overlapped) {
1394                     state->event = overlapped;
1395                     state->bytes = num_bytes_transfered;
1396                     JanetAsyncStatus status = state->machine(state, JANET_ASYNC_EVENT_COMPLETE);
1397                     if (status == JANET_ASYNC_STATUS_DONE) {
1398                         janet_unlisten(state, 0);
1399                     }
1400                     break;
1401                 } else {
1402                     state = state->_next;
1403                 }
1404             }
1405         }
1406     }
1407 }
1408 
1409 #elif defined(JANET_EV_EPOLL)
1410 
ts_now(void)1411 static JanetTimestamp ts_now(void) {
1412     struct timespec now;
1413     janet_assert(-1 != clock_gettime(CLOCK_MONOTONIC, &now), "failed to get time");
1414     uint64_t res = 1000 * now.tv_sec;
1415     res += now.tv_nsec / 1000000;
1416     return res;
1417 }
1418 
make_epoll_events(int mask)1419 static int make_epoll_events(int mask) {
1420     int events = 0;
1421     if (mask & JANET_ASYNC_LISTEN_READ)
1422         events |= EPOLLIN;
1423     if (mask & JANET_ASYNC_LISTEN_WRITE)
1424         events |= EPOLLOUT;
1425     return events;
1426 }
1427 
janet_epoll_sync_callback(JanetEVGenericMessage msg)1428 static void janet_epoll_sync_callback(JanetEVGenericMessage msg) {
1429     JanetListenerState *state = msg.argp;
1430     JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE;
1431     JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE;
1432     if (state->stream->_mask & JANET_ASYNC_LISTEN_WRITE)
1433         status1 = state->machine(state, JANET_ASYNC_EVENT_WRITE);
1434     if (state->stream->_mask & JANET_ASYNC_LISTEN_WRITE)
1435         status2 = state->machine(state, JANET_ASYNC_EVENT_READ);
1436     if (status1 == JANET_ASYNC_STATUS_DONE ||
1437             status2 == JANET_ASYNC_STATUS_DONE) {
1438         janet_unlisten(state, 0);
1439     } else {
1440         /* Repost event */
1441         janet_ev_post_event(NULL, janet_epoll_sync_callback, msg);
1442     }
1443 }
1444 
1445 /* Wait for the next event */
janet_listen(JanetStream * stream,JanetListener behavior,int mask,size_t size,void * user)1446 JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) {
1447     int is_first = !(stream->state);
1448     int op = is_first ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
1449     JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user);
1450     struct epoll_event ev;
1451     ev.events = make_epoll_events(state->stream->_mask);
1452     ev.data.ptr = stream;
1453     int status;
1454     do {
1455         status = epoll_ctl(janet_vm.epoll, op, stream->handle, &ev);
1456     } while (status == -1 && errno == EINTR);
1457     if (status == -1) {
1458         if (errno == EPERM) {
1459             /* Couldn't add to event loop, so assume that it completes
1460              * synchronously. In that case, fire the completion
1461              * event manually, since this should be a read or write
1462              * event to a file. So we just post a custom event to do the read/write
1463              * asap. */
1464             /* Use flag to indicate state is not registered in epoll */
1465             state->_mask |= (1 << JANET_ASYNC_EVENT_COMPLETE);
1466             JanetEVGenericMessage msg = {0};
1467             msg.argp = state;
1468             janet_ev_post_event(NULL, janet_epoll_sync_callback, msg);
1469         } else {
1470             /* Unexpected error */
1471             janet_unlisten_impl(state, 0);
1472             janet_panicv(janet_ev_lasterr());
1473         }
1474     }
1475     return state;
1476 }
1477 
1478 /* Tell system we are done listening for a certain event */
janet_unlisten(JanetListenerState * state,int is_gc)1479 static void janet_unlisten(JanetListenerState *state, int is_gc) {
1480     JanetStream *stream = state->stream;
1481     if (!(stream->flags & JANET_STREAM_CLOSED)) {
1482         /* Use flag to indicate state is not registered in epoll */
1483         if (!(state->_mask & (1 << JANET_ASYNC_EVENT_COMPLETE))) {
1484             int is_last = (state->_next == NULL && stream->state == state);
1485             int op = is_last ? EPOLL_CTL_DEL : EPOLL_CTL_MOD;
1486             struct epoll_event ev;
1487             ev.events = make_epoll_events(stream->_mask & ~state->_mask);
1488             ev.data.ptr = stream;
1489             int status;
1490             do {
1491                 status = epoll_ctl(janet_vm.epoll, op, stream->handle, &ev);
1492             } while (status == -1 && errno == EINTR);
1493             if (status == -1) {
1494                 janet_panicv(janet_ev_lasterr());
1495             }
1496         }
1497     }
1498     /* Destroy state machine and free memory */
1499     janet_unlisten_impl(state, is_gc);
1500 }
1501 
1502 #define JANET_EPOLL_MAX_EVENTS 64
janet_loop1_impl(int has_timeout,JanetTimestamp timeout)1503 void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
1504     struct itimerspec its;
1505     if (janet_vm.timer_enabled || has_timeout) {
1506         memset(&its, 0, sizeof(its));
1507         if (has_timeout) {
1508             its.it_value.tv_sec = timeout / 1000;
1509             its.it_value.tv_nsec = (timeout % 1000) * 1000000;
1510         }
1511         timerfd_settime(janet_vm.timerfd, TFD_TIMER_ABSTIME, &its, NULL);
1512     }
1513     janet_vm.timer_enabled = has_timeout;
1514 
1515     /* Poll for events */
1516     struct epoll_event events[JANET_EPOLL_MAX_EVENTS];
1517     int ready;
1518     do {
1519         ready = epoll_wait(janet_vm.epoll, events, JANET_EPOLL_MAX_EVENTS, -1);
1520     } while (ready == -1 && errno == EINTR);
1521     if (ready == -1) {
1522         JANET_EXIT("failed to poll events");
1523     }
1524 
1525     /* Step state machines */
1526     for (int i = 0; i < ready; i++) {
1527         void *p = events[i].data.ptr;
1528         if (&janet_vm.timerfd == p) {
1529             /* Timer expired, ignore */;
1530         } else if (janet_vm.selfpipe == p) {
1531             /* Self-pipe handling */
1532             janet_ev_handle_selfpipe();
1533         } else {
1534             JanetStream *stream = p;
1535             int mask = events[i].events;
1536             JanetListenerState *state = stream->state;
1537             while (NULL != state) {
1538                 state->event = events + i;
1539                 JanetListenerState *next_state = state->_next;
1540                 JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE;
1541                 JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE;
1542                 JanetAsyncStatus status3 = JANET_ASYNC_STATUS_NOT_DONE;
1543                 JanetAsyncStatus status4 = JANET_ASYNC_STATUS_NOT_DONE;
1544                 if (mask & EPOLLOUT)
1545                     status1 = state->machine(state, JANET_ASYNC_EVENT_WRITE);
1546                 if (mask & EPOLLIN)
1547                     status2 = state->machine(state, JANET_ASYNC_EVENT_READ);
1548                 if (mask & EPOLLERR)
1549                     status3 = state->machine(state, JANET_ASYNC_EVENT_ERR);
1550                 if ((mask & EPOLLHUP) && !(mask & (EPOLLOUT | EPOLLIN)))
1551                     status4 = state->machine(state, JANET_ASYNC_EVENT_HUP);
1552                 if (status1 == JANET_ASYNC_STATUS_DONE ||
1553                         status2 == JANET_ASYNC_STATUS_DONE ||
1554                         status3 == JANET_ASYNC_STATUS_DONE ||
1555                         status4 == JANET_ASYNC_STATUS_DONE)
1556                     janet_unlisten(state, 0);
1557                 state = next_state;
1558             }
1559         }
1560     }
1561 }
1562 
janet_ev_init(void)1563 void janet_ev_init(void) {
1564     janet_ev_init_common();
1565     janet_ev_setup_selfpipe();
1566     janet_vm.epoll = epoll_create1(EPOLL_CLOEXEC);
1567     janet_vm.timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC | TFD_NONBLOCK);
1568     janet_vm.timer_enabled = 0;
1569     if (janet_vm.epoll == -1 || janet_vm.timerfd == -1) goto error;
1570     struct epoll_event ev;
1571     ev.events = EPOLLIN | EPOLLET;
1572     ev.data.ptr = &janet_vm.timerfd;
1573     if (-1 == epoll_ctl(janet_vm.epoll, EPOLL_CTL_ADD, janet_vm.timerfd, &ev)) goto error;
1574     ev.events = EPOLLIN | EPOLLET;
1575     ev.data.ptr = janet_vm.selfpipe;
1576     if (-1 == epoll_ctl(janet_vm.epoll, EPOLL_CTL_ADD, janet_vm.selfpipe[0], &ev)) goto error;
1577     return;
1578 error:
1579     JANET_EXIT("failed to initialize event loop");
1580 }
1581 
janet_ev_deinit(void)1582 void janet_ev_deinit(void) {
1583     janet_ev_deinit_common();
1584     close(janet_vm.epoll);
1585     close(janet_vm.timerfd);
1586     janet_ev_cleanup_selfpipe();
1587     janet_vm.epoll = 0;
1588 }
1589 
1590 /*
1591  * End epoll implementation
1592  */
1593 
1594 #elif defined(JANET_EV_KQUEUE)
1595 /* Definition from:
1596  *   https://github.com/wahern/cqueues/blob/master/src/lib/kpoll.c
1597  * NetBSD uses intptr_t while others use void * for .udata */
1598 #define EV_SETx(ev, a, b, c, d, e, f) EV_SET((ev), (a), (b), (c), (d), (e), ((__typeof__((ev)->udata))(f)))
1599 #define JANET_KQUEUE_TF (EV_ADD | EV_ENABLE | EV_CLEAR | EV_ONESHOT)
1600 #define JANET_KQUEUE_MIN_INTERVAL 0
1601 
1602 /* NOTE:
1603  * NetBSD and OpenBSD expect things are always intervals, and FreeBSD doesn't
1604  * like an ABSTIME in the past so just use intervals always. Introduces a
1605  * calculation to determine the minimum timeout per timeout requested of
1606  * kqueue. Also note that NetBSD doesn't accept timeout intervals less than 1
1607  * millisecond, so correct all intervals on that platform to be at least 1
1608  * millisecond.*/
to_interval(const JanetTimestamp ts)1609 JanetTimestamp to_interval(const JanetTimestamp ts) {
1610     return ts >= JANET_KQUEUE_MIN_INTERVAL ? ts : JANET_KQUEUE_MIN_INTERVAL;
1611 }
1612 #define JANET_KQUEUE_INTERVAL(timestamp) (to_interval((timestamp - ts_now())))
1613 
1614 
1615 /* TODO: make this available be we using kqueue or epoll, instead of
1616  * redefinining it for kqueue and epoll separately? */
ts_now(void)1617 static JanetTimestamp ts_now(void) {
1618     struct timespec now;
1619     janet_assert(-1 != clock_gettime(CLOCK_MONOTONIC, &now), "failed to get time");
1620     uint64_t res = 1000 * now.tv_sec;
1621     res += now.tv_nsec / 1000000;
1622     return res;
1623 }
1624 
1625 /* NOTE: Assumes Janet's timestamp precision is in milliseconds. */
timestamp2timespec(struct timespec * t,JanetTimestamp ts)1626 static void timestamp2timespec(struct timespec *t, JanetTimestamp ts) {
1627     t->tv_sec = ts == 0 ? 0 : ts / 1000;
1628     t->tv_nsec = ts == 0 ? 0 : (ts % 1000) * 1000000;
1629 }
1630 
add_kqueue_events(const struct kevent * events,int length)1631 void add_kqueue_events(const struct kevent *events, int length) {
1632     /* NOTE: Status should be equal to the amount of events added, which isn't
1633      * always known since deletions or modifications occur. Can't use the
1634      * eventlist argument for it to report to us what failed otherwise we may
1635      * poll in events to handle! This code assumes atomicity, that kqueue can
1636      * either succeed or fail, but never partially (which is seemingly how it
1637      * works in practice). When encountering an "inbetween" state we currently
1638      * just panic!
1639      *
1640      * The FreeBSD man page kqueue(2) shows a check through the change list to
1641      * check if kqueue had an error with any of the events being pushed to
1642      * change. Maybe we should do this, even tho the man page also doesn't
1643      * note that kqueue actually does this. We do not do this at this time.  */
1644     int status;
1645     status = kevent(janet_vm.kq, events, length, NULL, 0, NULL);
1646     if (status == -1 && errno != EINTR)
1647         janet_panicv(janet_ev_lasterr());
1648 }
1649 
janet_listen(JanetStream * stream,JanetListener behavior,int mask,size_t size,void * user)1650 JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) {
1651     JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user);
1652     struct kevent kev[2];
1653 
1654     int length = 0;
1655     if (state->stream->_mask & JANET_ASYNC_LISTEN_READ) {
1656         EV_SETx(&kev[length], stream->handle, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, stream);
1657         length++;
1658     }
1659     if (state->stream->_mask & JANET_ASYNC_LISTEN_WRITE) {
1660         EV_SETx(&kev[length], stream->handle, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, stream);
1661         length++;
1662     }
1663 
1664     if (length > 0) {
1665         add_kqueue_events(kev, length);
1666     }
1667 
1668     return state;
1669 }
1670 
janet_unlisten(JanetListenerState * state,int is_gc)1671 static void janet_unlisten(JanetListenerState *state, int is_gc) {
1672     JanetStream *stream = state->stream;
1673     if (!(stream->flags & JANET_STREAM_CLOSED)) {
1674         /* Use flag to indicate state is not registered in kqueue */
1675         if (!(state->_mask & (1 << JANET_ASYNC_EVENT_COMPLETE))) {
1676             int is_last = (state->_next == NULL && stream->state == state);
1677             int op = is_last ? EV_DELETE : EV_DISABLE | EV_ADD;
1678             struct kevent kev[2];
1679             EV_SETx(&kev[1], stream->handle, EVFILT_WRITE, op, 0, 0, stream);
1680 
1681             int length = 0;
1682             if (stream->_mask & JANET_ASYNC_EVENT_WRITE) {
1683                 EV_SETx(&kev[length], stream->handle, EVFILT_WRITE, op, 0, 0, stream);
1684                 length++;
1685             }
1686             if (stream->_mask & JANET_ASYNC_EVENT_READ) {
1687                 EV_SETx(&kev[length], stream->handle, EVFILT_READ, op, 0, 0, stream);
1688                 length++;
1689             }
1690 
1691             add_kqueue_events(kev, length);
1692         }
1693     }
1694     janet_unlisten_impl(state, is_gc);
1695 }
1696 
1697 #define JANET_KQUEUE_MAX_EVENTS 64
1698 
janet_loop1_impl(int has_timeout,JanetTimestamp timeout)1699 void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
1700     /* Poll for events */
1701     /* NOTE:
1702      * We calculate the timeout interval per iteration. When the interval
1703      * drops to 0 or negative, we effect a timeout of 0. Effecting a timeout
1704      * of infinity will not work and could make other fibers with timeouts
1705      * miss their timeouts if we did so.
1706      * JANET_KQUEUE_INTERVAL insures we have a timeout of no less than 0. */
1707     int status;
1708     struct timespec ts;
1709     struct kevent events[JANET_KQUEUE_MAX_EVENTS];
1710     do {
1711         if (janet_vm.timer_enabled || has_timeout) {
1712             timestamp2timespec(&ts, JANET_KQUEUE_INTERVAL(timeout));
1713             status = kevent(janet_vm.kq, NULL, 0, events,
1714                             JANET_KQUEUE_MAX_EVENTS, &ts);
1715         } else {
1716             status = kevent(janet_vm.kq, NULL, 0, events,
1717                             JANET_KQUEUE_MAX_EVENTS, NULL);
1718         }
1719     } while (status == -1 && errno == EINTR);
1720     if (status == -1)
1721         JANET_EXIT("failed to poll events");
1722 
1723     /* Make sure timer is set accordingly. */
1724     janet_vm.timer_enabled = has_timeout;
1725 
1726     /* Step state machines */
1727     for (int i = 0; i < status; i++) {
1728         void *p = (void *) events[i].udata;
1729         if (janet_vm.selfpipe == p) {
1730             /* Self-pipe handling */
1731             janet_ev_handle_selfpipe();
1732         } else {
1733             JanetStream *stream = p;
1734             JanetListenerState *state = stream->state;
1735             while (NULL != state) {
1736                 JanetListenerState *next_state = state->_next;
1737                 state->event = events + i;
1738                 JanetAsyncStatus statuses[4];
1739                 for (int i = 0; i < 4; i++)
1740                     statuses[i] = JANET_ASYNC_STATUS_NOT_DONE;
1741 
1742                 if (!(events[i].flags & EV_ERROR)) {
1743                     if (events[i].filter == EVFILT_WRITE)
1744                         statuses[0] = state->machine(state, JANET_ASYNC_EVENT_WRITE);
1745                     if (events[i].filter == EVFILT_READ)
1746                         statuses[1] = state->machine(state, JANET_ASYNC_EVENT_READ);
1747                     if ((events[i].flags & EV_EOF) && !(events[i].data > 0))
1748                         statuses[3] = state->machine(state, JANET_ASYNC_EVENT_HUP);
1749                 } else {
1750                     statuses[2] = state->machine(state, JANET_ASYNC_EVENT_ERR);
1751                 }
1752                 if (statuses[0] == JANET_ASYNC_STATUS_DONE ||
1753                         statuses[1] == JANET_ASYNC_STATUS_DONE ||
1754                         statuses[2] == JANET_ASYNC_STATUS_DONE ||
1755                         statuses[3] == JANET_ASYNC_STATUS_DONE)
1756                     janet_unlisten(state, 0);
1757 
1758                 state = next_state;
1759             }
1760         }
1761     }
1762 }
1763 
janet_ev_init(void)1764 void janet_ev_init(void) {
1765     janet_ev_init_common();
1766     janet_ev_setup_selfpipe();
1767     janet_vm.kq = kqueue();
1768     janet_vm.timer_enabled = 0;
1769     if (janet_vm.kq == -1) goto error;
1770     struct kevent event;
1771     EV_SETx(&event, janet_vm.selfpipe[0], EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, janet_vm.selfpipe);
1772     add_kqueue_events(&event, 1);
1773     return;
1774 error:
1775     JANET_EXIT("failed to initialize event loop");
1776 }
1777 
janet_ev_deinit(void)1778 void janet_ev_deinit(void) {
1779     janet_ev_deinit_common();
1780     close(janet_vm.kq);
1781     janet_ev_cleanup_selfpipe();
1782     janet_vm.kq = 0;
1783 }
1784 
1785 #else
1786 
1787 #include <poll.h>
1788 
ts_now(void)1789 static JanetTimestamp ts_now(void) {
1790     struct timespec now;
1791     janet_assert(-1 != clock_gettime(CLOCK_REALTIME, &now), "failed to get time");
1792     uint64_t res = 1000 * now.tv_sec;
1793     res += now.tv_nsec / 1000000;
1794     return res;
1795 }
1796 
make_poll_events(int mask)1797 static int make_poll_events(int mask) {
1798     int events = 0;
1799     if (mask & JANET_ASYNC_LISTEN_READ)
1800         events |= POLLIN;
1801     if (mask & JANET_ASYNC_LISTEN_WRITE)
1802         events |= POLLOUT;
1803     return events;
1804 }
1805 
1806 /* Wait for the next event */
janet_listen(JanetStream * stream,JanetListener behavior,int mask,size_t size,void * user)1807 JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) {
1808     size_t oldsize = janet_vm.listener_cap;
1809     JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user);
1810     size_t newsize = janet_vm.listener_cap;
1811     if (newsize > oldsize) {
1812         janet_vm.fds = janet_realloc(janet_vm.fds, (newsize + 1) * sizeof(struct pollfd));
1813         if (NULL == janet_vm.fds) {
1814             JANET_OUT_OF_MEMORY;
1815         }
1816     }
1817     struct pollfd ev;
1818     ev.fd = stream->handle;
1819     ev.events = make_poll_events(state->stream->_mask);
1820     ev.revents = 0;
1821     janet_vm.fds[state->_index + 1] = ev;
1822     return state;
1823 }
1824 
janet_unlisten(JanetListenerState * state,int is_gc)1825 static void janet_unlisten(JanetListenerState *state, int is_gc) {
1826     janet_vm.fds[state->_index + 1] = janet_vm.fds[janet_vm.listener_count];
1827     janet_unlisten_impl(state, is_gc);
1828 }
1829 
janet_loop1_impl(int has_timeout,JanetTimestamp timeout)1830 void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
1831     /* Poll for events */
1832     int ready;
1833     do {
1834         int to = -1;
1835         if (has_timeout) {
1836             JanetTimestamp now = ts_now();
1837             to = now > timeout ? 0 : (int)(timeout - now);
1838         }
1839         ready = poll(janet_vm.fds, janet_vm.listener_count + 1, to);
1840     } while (ready == -1 && errno == EINTR);
1841     if (ready == -1) {
1842         JANET_EXIT("failed to poll events");
1843     }
1844 
1845     /* Check selfpipe */
1846     if (janet_vm.fds[0].revents & POLLIN) {
1847         janet_vm.fds[0].revents = 0;
1848         janet_ev_handle_selfpipe();
1849     }
1850 
1851     /* Step state machines */
1852     for (size_t i = 0; i < janet_vm.listener_count; i++) {
1853         struct pollfd *pfd = janet_vm.fds + i + 1;
1854         /* Skip fds where nothing interesting happened */
1855         JanetListenerState *state = janet_vm.listeners[i];
1856         /* Normal event */
1857         int mask = pfd->revents;
1858         JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE;
1859         JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE;
1860         JanetAsyncStatus status3 = JANET_ASYNC_STATUS_NOT_DONE;
1861         JanetAsyncStatus status4 = JANET_ASYNC_STATUS_NOT_DONE;
1862         state->event = pfd;
1863         if (mask & POLLOUT)
1864             status1 = state->machine(state, JANET_ASYNC_EVENT_WRITE);
1865         if (mask & POLLIN)
1866             status2 = state->machine(state, JANET_ASYNC_EVENT_READ);
1867         if (mask & POLLERR)
1868             status3 = state->machine(state, JANET_ASYNC_EVENT_ERR);
1869         if ((mask & POLLHUP) && !(mask & (POLLIN | POLLOUT)))
1870             status4 = state->machine(state, JANET_ASYNC_EVENT_HUP);
1871         if (status1 == JANET_ASYNC_STATUS_DONE ||
1872                 status2 == JANET_ASYNC_STATUS_DONE ||
1873                 status3 == JANET_ASYNC_STATUS_DONE ||
1874                 status4 == JANET_ASYNC_STATUS_DONE)
1875             janet_unlisten(state, 0);
1876     }
1877 }
1878 
janet_ev_init(void)1879 void janet_ev_init(void) {
1880     janet_ev_init_common();
1881     janet_vm.fds = NULL;
1882     janet_ev_setup_selfpipe();
1883     janet_vm.fds = janet_malloc(sizeof(struct pollfd));
1884     if (NULL == janet_vm.fds) {
1885         JANET_OUT_OF_MEMORY;
1886     }
1887     janet_vm.fds[0].fd = janet_vm.selfpipe[0];
1888     janet_vm.fds[0].events = POLLIN;
1889     janet_vm.fds[0].revents = 0;
1890     return;
1891 }
1892 
janet_ev_deinit(void)1893 void janet_ev_deinit(void) {
1894     janet_ev_deinit_common();
1895     janet_ev_cleanup_selfpipe();
1896     janet_free(janet_vm.fds);
1897     janet_vm.fds = NULL;
1898 }
1899 
1900 #endif
1901 
1902 /*
1903  * End poll implementation
1904  */
1905 
1906 /*
1907  * Generic Callback system. Post a function pointer + data to the event loop (from another
1908  * thread or even a signal handler). Allows posting events from another thread or signal handler.
1909  */
janet_ev_post_event(JanetVM * vm,JanetCallback cb,JanetEVGenericMessage msg)1910 void janet_ev_post_event(JanetVM *vm, JanetCallback cb, JanetEVGenericMessage msg) {
1911     vm = vm ? vm : &janet_vm;
1912 #ifdef JANET_WINDOWS
1913     JanetHandle iocp = vm->iocp;
1914     JanetSelfPipeEvent *event = janet_malloc(sizeof(JanetSelfPipeEvent));
1915     if (NULL == event) {
1916         JANET_OUT_OF_MEMORY;
1917     }
1918     event->msg = msg;
1919     event->cb = cb;
1920     janet_assert(PostQueuedCompletionStatus(iocp,
1921                                             sizeof(JanetSelfPipeEvent),
1922                                             0,
1923                                             (LPOVERLAPPED) event),
1924                  "failed to post completion event");
1925 #else
1926     JanetSelfPipeEvent event;
1927     memset(&event, 0, sizeof(event));
1928     event.msg = msg;
1929     event.cb = cb;
1930     int fd = vm->selfpipe[1];
1931     /* handle a bit of back pressure before giving up. */
1932     int tries = 4;
1933     while (tries > 0) {
1934         int status;
1935         do {
1936             status = write(fd, &event, sizeof(event));
1937         } while (status == -1 && errno == EINTR);
1938         if (status > 0) break;
1939         sleep(0);
1940         tries--;
1941     }
1942     janet_assert(tries > 0, "failed to write event to self-pipe");
1943 #endif
1944 }
1945 
1946 /*
1947  * Threaded calls
1948  */
1949 
1950 #ifdef JANET_WINDOWS
janet_thread_body(LPVOID ptr)1951 static DWORD WINAPI janet_thread_body(LPVOID ptr) {
1952     JanetEVThreadInit *init = (JanetEVThreadInit *)ptr;
1953     JanetEVGenericMessage msg = init->msg;
1954     JanetThreadedSubroutine subr = init->subr;
1955     JanetThreadedCallback cb = init->cb;
1956     JanetHandle iocp = init->write_pipe;
1957     /* Reuse memory from thread init for returning data */
1958     init->msg = subr(msg);
1959     init->cb = cb;
1960     janet_assert(PostQueuedCompletionStatus(iocp,
1961                                             sizeof(JanetSelfPipeEvent),
1962                                             0,
1963                                             (LPOVERLAPPED) init),
1964                  "failed to post completion event");
1965     return 0;
1966 }
1967 #else
janet_thread_body(void * ptr)1968 static void *janet_thread_body(void *ptr) {
1969     JanetEVThreadInit *init = (JanetEVThreadInit *)ptr;
1970     JanetEVGenericMessage msg = init->msg;
1971     JanetThreadedSubroutine subr = init->subr;
1972     JanetThreadedCallback cb = init->cb;
1973     int fd = init->write_pipe;
1974     janet_free(init);
1975     JanetSelfPipeEvent response;
1976     memset(&response, 0, sizeof(response));
1977     response.msg = subr(msg);
1978     response.cb = cb;
1979     /* handle a bit of back pressure before giving up. */
1980     int tries = 4;
1981     while (tries > 0) {
1982         int status;
1983         do {
1984             status = write(fd, &response, sizeof(response));
1985         } while (status == -1 && errno == EINTR);
1986         if (status > 0) break;
1987         sleep(1);
1988         tries--;
1989     }
1990     return NULL;
1991 }
1992 #endif
1993 
janet_ev_threaded_call(JanetThreadedSubroutine fp,JanetEVGenericMessage arguments,JanetThreadedCallback cb)1994 void janet_ev_threaded_call(JanetThreadedSubroutine fp, JanetEVGenericMessage arguments, JanetThreadedCallback cb) {
1995     JanetEVThreadInit *init = janet_malloc(sizeof(JanetEVThreadInit));
1996     if (NULL == init) {
1997         JANET_OUT_OF_MEMORY;
1998     }
1999     init->msg = arguments;
2000     init->subr = fp;
2001     init->cb = cb;
2002 
2003 #ifdef JANET_WINDOWS
2004     init->write_pipe = janet_vm.iocp;
2005     HANDLE thread_handle = CreateThread(NULL, 0, janet_thread_body, init, 0, NULL);
2006     if (NULL == thread_handle) {
2007         janet_free(init);
2008         janet_panic("failed to create thread");
2009     }
2010     CloseHandle(thread_handle); /* detach from thread */
2011 #else
2012     init->write_pipe = janet_vm.selfpipe[1];
2013     pthread_t waiter_thread;
2014     int err = pthread_create(&waiter_thread, NULL, janet_thread_body, init);
2015     if (err) {
2016         janet_free(init);
2017         janet_panicf("%s", strerror(err));
2018     }
2019     pthread_detach(waiter_thread);
2020 #endif
2021 
2022     /* Increment ev refcount so we don't quit while waiting for a subprocess */
2023     janet_ev_inc_refcount();
2024 }
2025 
2026 /* Default callback for janet_ev_threaded_await. */
janet_ev_default_threaded_callback(JanetEVGenericMessage return_value)2027 void janet_ev_default_threaded_callback(JanetEVGenericMessage return_value) {
2028     janet_ev_dec_refcount();
2029     if (return_value.fiber == NULL) {
2030         return;
2031     }
2032     switch (return_value.tag) {
2033         default:
2034         case JANET_EV_TCTAG_NIL:
2035             janet_schedule(return_value.fiber, janet_wrap_nil());
2036             break;
2037         case JANET_EV_TCTAG_INTEGER:
2038             janet_schedule(return_value.fiber, janet_wrap_integer(return_value.argi));
2039             break;
2040         case JANET_EV_TCTAG_STRING:
2041         case JANET_EV_TCTAG_STRINGF:
2042             janet_schedule(return_value.fiber, janet_cstringv((const char *) return_value.argp));
2043             if (return_value.tag == JANET_EV_TCTAG_STRINGF) janet_free(return_value.argp);
2044             break;
2045         case JANET_EV_TCTAG_KEYWORD:
2046             janet_schedule(return_value.fiber, janet_ckeywordv((const char *) return_value.argp));
2047             break;
2048         case JANET_EV_TCTAG_ERR_STRING:
2049         case JANET_EV_TCTAG_ERR_STRINGF:
2050             janet_cancel(return_value.fiber, janet_cstringv((const char *) return_value.argp));
2051             if (return_value.tag == JANET_EV_TCTAG_STRINGF) janet_free(return_value.argp);
2052             break;
2053         case JANET_EV_TCTAG_ERR_KEYWORD:
2054             janet_cancel(return_value.fiber, janet_ckeywordv((const char *) return_value.argp));
2055             break;
2056         case JANET_EV_TCTAG_BOOLEAN:
2057             janet_schedule(return_value.fiber, janet_wrap_boolean(return_value.argi));
2058             break;
2059     }
2060     janet_gcunroot(janet_wrap_fiber(return_value.fiber));
2061 }
2062 
2063 
2064 /* Convenience method for common case */
2065 JANET_NO_RETURN
janet_ev_threaded_await(JanetThreadedSubroutine fp,int tag,int argi,void * argp)2066 void janet_ev_threaded_await(JanetThreadedSubroutine fp, int tag, int argi, void *argp) {
2067     JanetEVGenericMessage arguments;
2068     memset(&arguments, 0, sizeof(arguments));
2069     arguments.tag = tag;
2070     arguments.argi = argi;
2071     arguments.argp = argp;
2072     arguments.fiber = janet_root_fiber();
2073     janet_gcroot(janet_wrap_fiber(arguments.fiber));
2074     janet_ev_threaded_call(fp, arguments, janet_ev_default_threaded_callback);
2075     janet_await();
2076 }
2077 
2078 /*
2079  * C API helpers for reading and writing from streams.
2080  * There is some networking code in here as well as generic
2081  * reading and writing primitives.
2082  */
2083 
janet_stream_flags(JanetStream * stream,uint32_t flags)2084 void janet_stream_flags(JanetStream *stream, uint32_t flags) {
2085     if (stream->flags & JANET_STREAM_CLOSED) {
2086         janet_panic("stream is closed");
2087     }
2088     if ((stream->flags & flags) != flags) {
2089         const char *rmsg = "", *wmsg = "", *amsg = "", *dmsg = "", *smsg = "stream";
2090         if (flags & JANET_STREAM_READABLE) rmsg = "readable ";
2091         if (flags & JANET_STREAM_WRITABLE) wmsg = "writable ";
2092         if (flags & JANET_STREAM_ACCEPTABLE) amsg = "server ";
2093         if (flags & JANET_STREAM_UDPSERVER) dmsg = "datagram ";
2094         if (flags & JANET_STREAM_SOCKET) smsg = "socket";
2095         janet_panicf("bad stream, expected %s%s%s%s%s", rmsg, wmsg, amsg, dmsg, smsg);
2096     }
2097 }
2098 
2099 /* When there is an IO error, we need to be able to convert it to a Janet
2100  * string to raise a Janet error. */
2101 #ifdef JANET_WINDOWS
2102 #define JANET_EV_CHUNKSIZE 4096
janet_ev_lasterr(void)2103 Janet janet_ev_lasterr(void) {
2104     int code = GetLastError();
2105     char msgbuf[256];
2106     msgbuf[0] = '\0';
2107     FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
2108                   NULL,
2109                   code,
2110                   MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
2111                   msgbuf,
2112                   sizeof(msgbuf),
2113                   NULL);
2114     if (!*msgbuf) sprintf(msgbuf, "%d", code);
2115     char *c = msgbuf;
2116     while (*c) {
2117         if (*c == '\n' || *c == '\r') {
2118             *c = '\0';
2119             break;
2120         }
2121         c++;
2122     }
2123     return janet_cstringv(msgbuf);
2124 }
2125 #else
janet_ev_lasterr(void)2126 Janet janet_ev_lasterr(void) {
2127     return janet_cstringv(strerror(errno));
2128 }
2129 #endif
2130 
2131 /* State machine for read/recv/recvfrom */
2132 
2133 typedef enum {
2134     JANET_ASYNC_READMODE_READ,
2135     JANET_ASYNC_READMODE_RECV,
2136     JANET_ASYNC_READMODE_RECVFROM
2137 } JanetReadMode;
2138 
2139 typedef struct {
2140     JanetListenerState head;
2141     int32_t bytes_left;
2142     int32_t bytes_read;
2143     JanetBuffer *buf;
2144     int is_chunk;
2145     JanetReadMode mode;
2146 #ifdef JANET_WINDOWS
2147     OVERLAPPED overlapped;
2148 #ifdef JANET_NET
2149     WSABUF wbuf;
2150     DWORD flags;
2151     struct sockaddr from;
2152     int fromlen;
2153 #endif
2154     uint8_t chunk_buf[JANET_EV_CHUNKSIZE];
2155 #else
2156     int flags;
2157 #endif
2158 } StateRead;
2159 
ev_machine_read(JanetListenerState * s,JanetAsyncEvent event)2160 JanetAsyncStatus ev_machine_read(JanetListenerState *s, JanetAsyncEvent event) {
2161     StateRead *state = (StateRead *) s;
2162     switch (event) {
2163         default:
2164             break;
2165         case JANET_ASYNC_EVENT_MARK:
2166             janet_mark(janet_wrap_buffer(state->buf));
2167             break;
2168         case JANET_ASYNC_EVENT_CLOSE:
2169             janet_schedule(s->fiber, janet_wrap_nil());
2170             return JANET_ASYNC_STATUS_DONE;
2171 #ifdef JANET_WINDOWS
2172         case JANET_ASYNC_EVENT_COMPLETE: {
2173             /* Called when read finished */
2174             state->bytes_read += s->bytes;
2175             if (state->bytes_read == 0 && (state->mode != JANET_ASYNC_READMODE_RECVFROM)) {
2176                 janet_schedule(s->fiber, janet_wrap_nil());
2177                 return JANET_ASYNC_STATUS_DONE;
2178             }
2179 
2180             janet_buffer_push_bytes(state->buf, state->chunk_buf, s->bytes);
2181             state->bytes_left -= s->bytes;
2182 
2183             if (state->bytes_left == 0 || !state->is_chunk || s->bytes == 0) {
2184                 Janet resume_val;
2185 #ifdef JANET_NET
2186                 if (state->mode == JANET_ASYNC_READMODE_RECVFROM) {
2187                     void *abst = janet_abstract(&janet_address_type, state->fromlen);
2188                     memcpy(abst, &state->from, state->fromlen);
2189                     resume_val = janet_wrap_abstract(abst);
2190                 } else
2191 #endif
2192                 {
2193                     resume_val = janet_wrap_buffer(state->buf);
2194                 }
2195                 janet_schedule(s->fiber, resume_val);
2196                 return JANET_ASYNC_STATUS_DONE;
2197             }
2198         }
2199 
2200         /* fallthrough */
2201         case JANET_ASYNC_EVENT_USER: {
2202             int32_t chunk_size = state->bytes_left > JANET_EV_CHUNKSIZE ? JANET_EV_CHUNKSIZE : state->bytes_left;
2203             s->tag = &state->overlapped;
2204             memset(&(state->overlapped), 0, sizeof(OVERLAPPED));
2205             int status;
2206 #ifdef JANET_NET
2207             if (state->mode == JANET_ASYNC_READMODE_RECVFROM) {
2208                 state->wbuf.len = (ULONG) chunk_size;
2209                 state->wbuf.buf = state->chunk_buf;
2210                 status = WSARecvFrom((SOCKET) s->stream->handle, &state->wbuf, 1,
2211                                      NULL, &state->flags, &state->from, &state->fromlen, &state->overlapped, NULL);
2212                 if (status && (WSA_IO_PENDING != WSAGetLastError())) {
2213                     janet_cancel(s->fiber, janet_ev_lasterr());
2214                     return JANET_ASYNC_STATUS_DONE;
2215                 }
2216             } else
2217 #endif
2218             {
2219                 status = ReadFile(s->stream->handle, state->chunk_buf, chunk_size, NULL, &state->overlapped);
2220                 if (!status && (ERROR_IO_PENDING != WSAGetLastError())) {
2221                     if (WSAGetLastError() == ERROR_BROKEN_PIPE) {
2222                         if (state->bytes_read) {
2223                             janet_schedule(s->fiber, janet_wrap_buffer(state->buf));
2224                         } else {
2225                             janet_schedule(s->fiber, janet_wrap_nil());
2226                         }
2227                     } else {
2228                         janet_cancel(s->fiber, janet_ev_lasterr());
2229                     }
2230                     return JANET_ASYNC_STATUS_DONE;
2231                 }
2232             }
2233         }
2234         break;
2235 #else
2236         case JANET_ASYNC_EVENT_ERR: {
2237             if (state->bytes_read) {
2238                 janet_schedule(s->fiber, janet_wrap_buffer(state->buf));
2239             } else {
2240                 janet_schedule(s->fiber, janet_wrap_nil());
2241             }
2242             return JANET_ASYNC_STATUS_DONE;
2243         }
2244         case JANET_ASYNC_EVENT_HUP:
2245         case JANET_ASYNC_EVENT_READ: {
2246             JanetBuffer *buffer = state->buf;
2247             int32_t bytes_left = state->bytes_left;
2248             int32_t read_limit = state->is_chunk ? (bytes_left > 4096 ? 4096 : bytes_left) : bytes_left;
2249             janet_buffer_extra(buffer, read_limit);
2250             ssize_t nread;
2251 #ifdef JANET_NET
2252             char saddr[256];
2253             socklen_t socklen = sizeof(saddr);
2254 #endif
2255             do {
2256 #ifdef JANET_NET
2257                 if (state->mode == JANET_ASYNC_READMODE_RECVFROM) {
2258                     nread = recvfrom(s->stream->handle, buffer->data + buffer->count, read_limit, state->flags,
2259                                      (struct sockaddr *)&saddr, &socklen);
2260                 } else if (state->mode == JANET_ASYNC_READMODE_RECV) {
2261                     nread = recv(s->stream->handle, buffer->data + buffer->count, read_limit, state->flags);
2262                 } else
2263 #endif
2264                 {
2265                     nread = read(s->stream->handle, buffer->data + buffer->count, read_limit);
2266                 }
2267             } while (nread == -1 && errno == EINTR);
2268 
2269             /* Check for errors - special case errors that can just be waited on to fix */
2270             if (nread == -1) {
2271                 if (errno == EAGAIN || errno == EWOULDBLOCK) {
2272                     return JANET_ASYNC_STATUS_NOT_DONE;
2273                 }
2274                 /* In stream protocols, a pipe error is end of stream */
2275                 if (errno == EPIPE && (state->mode != JANET_ASYNC_READMODE_RECVFROM)) {
2276                     nread = 0;
2277                 } else {
2278                     janet_cancel(s->fiber, janet_ev_lasterr());
2279                     return JANET_ASYNC_STATUS_DONE;
2280                 }
2281             }
2282 
2283             /* Only allow 0-length packets in recv-from. In stream protocols, a zero length packet is EOS. */
2284             state->bytes_read += nread;
2285             if (state->bytes_read == 0 && (state->mode != JANET_ASYNC_READMODE_RECVFROM)) {
2286                 janet_schedule(s->fiber, janet_wrap_nil());
2287                 return JANET_ASYNC_STATUS_DONE;
2288             }
2289 
2290             /* Increment buffer counts */
2291             buffer->count += nread;
2292             bytes_left -= nread;
2293             state->bytes_left = bytes_left;
2294 
2295             /* Resume if done */
2296             if (!state->is_chunk || bytes_left == 0 || nread == 0) {
2297                 Janet resume_val;
2298 #ifdef JANET_NET
2299                 if (state->mode == JANET_ASYNC_READMODE_RECVFROM) {
2300                     void *abst = janet_abstract(&janet_address_type, socklen);
2301                     memcpy(abst, &saddr, socklen);
2302                     resume_val = janet_wrap_abstract(abst);
2303                 } else
2304 #endif
2305                 {
2306                     resume_val = janet_wrap_buffer(buffer);
2307                 }
2308                 janet_schedule(s->fiber, resume_val);
2309                 return JANET_ASYNC_STATUS_DONE;
2310             }
2311         }
2312         break;
2313 #endif
2314     }
2315     return JANET_ASYNC_STATUS_NOT_DONE;
2316 }
2317 
janet_ev_read_generic(JanetStream * stream,JanetBuffer * buf,int32_t nbytes,int is_chunked,JanetReadMode mode,int flags)2318 static void janet_ev_read_generic(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int is_chunked, JanetReadMode mode, int flags) {
2319     StateRead *state = (StateRead *) janet_listen(stream, ev_machine_read,
2320                        JANET_ASYNC_LISTEN_READ, sizeof(StateRead), NULL);
2321     state->is_chunk = is_chunked;
2322     state->buf = buf;
2323     state->bytes_left = nbytes;
2324     state->bytes_read = 0;
2325     state->mode = mode;
2326 #ifdef JANET_WINDOWS
2327     ev_machine_read((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
2328     state->flags = (DWORD) flags;
2329 #else
2330     state->flags = flags;
2331 #endif
2332 }
2333 
janet_ev_read(JanetStream * stream,JanetBuffer * buf,int32_t nbytes)2334 void janet_ev_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) {
2335     janet_ev_read_generic(stream, buf, nbytes, 0, JANET_ASYNC_READMODE_READ, 0);
2336 }
janet_ev_readchunk(JanetStream * stream,JanetBuffer * buf,int32_t nbytes)2337 void janet_ev_readchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) {
2338     janet_ev_read_generic(stream, buf, nbytes, 1, JANET_ASYNC_READMODE_READ, 0);
2339 }
2340 #ifdef JANET_NET
janet_ev_recv(JanetStream * stream,JanetBuffer * buf,int32_t nbytes,int flags)2341 void janet_ev_recv(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags) {
2342     janet_ev_read_generic(stream, buf, nbytes, 0, JANET_ASYNC_READMODE_RECV, flags);
2343 }
janet_ev_recvchunk(JanetStream * stream,JanetBuffer * buf,int32_t nbytes,int flags)2344 void janet_ev_recvchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags) {
2345     janet_ev_read_generic(stream, buf, nbytes, 1, JANET_ASYNC_READMODE_RECV, flags);
2346 }
janet_ev_recvfrom(JanetStream * stream,JanetBuffer * buf,int32_t nbytes,int flags)2347 void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags) {
2348     janet_ev_read_generic(stream, buf, nbytes, 0, JANET_ASYNC_READMODE_RECVFROM, flags);
2349 }
2350 #endif
2351 
2352 /*
2353  * State machine for write/send/send-to
2354  */
2355 
2356 typedef enum {
2357     JANET_ASYNC_WRITEMODE_WRITE,
2358     JANET_ASYNC_WRITEMODE_SEND,
2359     JANET_ASYNC_WRITEMODE_SENDTO
2360 } JanetWriteMode;
2361 
2362 typedef struct {
2363     JanetListenerState head;
2364     union {
2365         JanetBuffer *buf;
2366         const uint8_t *str;
2367     } src;
2368     int is_buffer;
2369     JanetWriteMode mode;
2370     void *dest_abst;
2371 #ifdef JANET_WINDOWS
2372     OVERLAPPED overlapped;
2373 #ifdef JANET_NET
2374     WSABUF wbuf;
2375     DWORD flags;
2376 #endif
2377 #else
2378     int flags;
2379     int32_t start;
2380 #endif
2381 } StateWrite;
2382 
ev_machine_write(JanetListenerState * s,JanetAsyncEvent event)2383 JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event) {
2384     StateWrite *state = (StateWrite *) s;
2385     switch (event) {
2386         default:
2387             break;
2388         case JANET_ASYNC_EVENT_MARK:
2389             janet_mark(state->is_buffer
2390                        ? janet_wrap_buffer(state->src.buf)
2391                        : janet_wrap_string(state->src.str));
2392             if (state->mode == JANET_ASYNC_WRITEMODE_SENDTO) {
2393                 janet_mark(janet_wrap_abstract(state->dest_abst));
2394             }
2395             break;
2396         case JANET_ASYNC_EVENT_CLOSE:
2397             janet_cancel(s->fiber, janet_cstringv("stream closed"));
2398             return JANET_ASYNC_STATUS_DONE;
2399 #ifdef JANET_WINDOWS
2400         case JANET_ASYNC_EVENT_COMPLETE: {
2401             /* Called when write finished */
2402             if (s->bytes == 0 && (state->mode != JANET_ASYNC_WRITEMODE_SENDTO)) {
2403                 janet_cancel(s->fiber, janet_cstringv("disconnect"));
2404                 return JANET_ASYNC_STATUS_DONE;
2405             }
2406 
2407             janet_schedule(s->fiber, janet_wrap_nil());
2408             return JANET_ASYNC_STATUS_DONE;
2409         }
2410         break;
2411         case JANET_ASYNC_EVENT_USER: {
2412             /* Begin write */
2413             int32_t len;
2414             const uint8_t *bytes;
2415             if (state->is_buffer) {
2416                 /* If buffer, convert to string. */
2417                 /* TODO - be more efficient about this */
2418                 JanetBuffer *buffer = state->src.buf;
2419                 JanetString str = janet_string(buffer->data, buffer->count);
2420                 bytes = str;
2421                 len = buffer->count;
2422                 state->is_buffer = 0;
2423                 state->src.str = str;
2424             } else {
2425                 bytes = state->src.str;
2426                 len = janet_string_length(bytes);
2427             }
2428             s->tag = &state->overlapped;
2429             memset(&(state->overlapped), 0, sizeof(WSAOVERLAPPED));
2430 
2431             int status;
2432 #ifdef JANET_NET
2433             if (state->mode == JANET_ASYNC_WRITEMODE_SENDTO) {
2434                 SOCKET sock = (SOCKET) s->stream->handle;
2435                 state->wbuf.buf = (char *) bytes;
2436                 state->wbuf.len = len;
2437                 const struct sockaddr *to = state->dest_abst;
2438                 int tolen = (int) janet_abstract_size((void *) to);
2439                 status = WSASendTo(sock, &state->wbuf, 1, NULL, state->flags, to, tolen, &state->overlapped, NULL);
2440                 if (status && (WSA_IO_PENDING != WSAGetLastError())) {
2441                     janet_cancel(s->fiber, janet_ev_lasterr());
2442                     return JANET_ASYNC_STATUS_DONE;
2443                 }
2444             } else
2445 #endif
2446             {
2447                 /*
2448                  * File handles in IOCP need to specify this if they are writing to the
2449                  * ends of files, like how this is used here.
2450                  * If the underlying resource doesn't support seeking
2451                  * byte offsets, they will be ignored
2452                  * but this otherwise writes to the end of the file in question
2453                  * Right now, os/open streams aren't seekable, so this works.
2454                  * for more details see the lpOverlapped parameter in
2455                  * https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-writefile
2456                  */
2457                 state->overlapped.Offset = (DWORD) 0xFFFFFFFF;
2458                 state->overlapped.OffsetHigh = (DWORD) 0xFFFFFFFF;
2459                 status = WriteFile(s->stream->handle, bytes, len, NULL, &state->overlapped);
2460                 if (!status && (ERROR_IO_PENDING != WSAGetLastError())) {
2461                     janet_cancel(s->fiber, janet_ev_lasterr());
2462                     return JANET_ASYNC_STATUS_DONE;
2463                 }
2464             }
2465         }
2466         break;
2467 #else
2468         case JANET_ASYNC_EVENT_ERR:
2469             janet_cancel(s->fiber, janet_cstringv("stream err"));
2470             return JANET_ASYNC_STATUS_DONE;
2471         case JANET_ASYNC_EVENT_HUP:
2472             janet_cancel(s->fiber, janet_cstringv("stream hup"));
2473             return JANET_ASYNC_STATUS_DONE;
2474         case JANET_ASYNC_EVENT_WRITE: {
2475             int32_t start, len;
2476             const uint8_t *bytes;
2477             start = state->start;
2478             if (state->is_buffer) {
2479                 JanetBuffer *buffer = state->src.buf;
2480                 bytes = buffer->data;
2481                 len = buffer->count;
2482             } else {
2483                 bytes = state->src.str;
2484                 len = janet_string_length(bytes);
2485             }
2486             ssize_t nwrote = 0;
2487             if (start < len) {
2488                 int32_t nbytes = len - start;
2489                 void *dest_abst = state->dest_abst;
2490                 do {
2491 #ifdef JANET_NET
2492                     if (state->mode == JANET_ASYNC_WRITEMODE_SENDTO) {
2493                         nwrote = sendto(s->stream->handle, bytes + start, nbytes, state->flags,
2494                                         (struct sockaddr *) dest_abst, janet_abstract_size(dest_abst));
2495                     } else if (state->mode == JANET_ASYNC_WRITEMODE_SEND) {
2496                         nwrote = send(s->stream->handle, bytes + start, nbytes, state->flags);
2497                     } else
2498 #endif
2499                     {
2500                         nwrote = write(s->stream->handle, bytes + start, nbytes);
2501                     }
2502                 } while (nwrote == -1 && errno == EINTR);
2503 
2504                 /* Handle write errors */
2505                 if (nwrote == -1) {
2506                     if (errno == EAGAIN || errno  == EWOULDBLOCK) break;
2507                     janet_cancel(s->fiber, janet_ev_lasterr());
2508                     return JANET_ASYNC_STATUS_DONE;
2509                 }
2510 
2511                 /* Unless using datagrams, empty message is a disconnect */
2512                 if (nwrote == 0 && !dest_abst) {
2513                     janet_cancel(s->fiber, janet_cstringv("disconnect"));
2514                     return JANET_ASYNC_STATUS_DONE;
2515                 }
2516 
2517                 if (nwrote > 0) {
2518                     start += nwrote;
2519                 } else {
2520                     start = len;
2521                 }
2522             }
2523             state->start = start;
2524             if (start >= len) {
2525                 janet_schedule(s->fiber, janet_wrap_nil());
2526                 return JANET_ASYNC_STATUS_DONE;
2527             }
2528             break;
2529         }
2530         break;
2531 #endif
2532     }
2533     return JANET_ASYNC_STATUS_NOT_DONE;
2534 }
2535 
janet_ev_write_generic(JanetStream * stream,void * buf,void * dest_abst,JanetWriteMode mode,int is_buffer,int flags)2536 static void janet_ev_write_generic(JanetStream *stream, void *buf, void *dest_abst, JanetWriteMode mode, int is_buffer, int flags) {
2537     StateWrite *state = (StateWrite *) janet_listen(stream, ev_machine_write,
2538                         JANET_ASYNC_LISTEN_WRITE, sizeof(StateWrite), NULL);
2539     state->is_buffer = is_buffer;
2540     state->src.buf = buf;
2541     state->dest_abst = dest_abst;
2542     state->mode = mode;
2543 #ifdef JANET_WINDOWS
2544     state->flags = (DWORD) flags;
2545     ev_machine_write((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
2546 #else
2547     state->start = 0;
2548     state->flags = flags;
2549 #endif
2550 }
2551 
2552 
janet_ev_write_buffer(JanetStream * stream,JanetBuffer * buf)2553 void janet_ev_write_buffer(JanetStream *stream, JanetBuffer *buf) {
2554     janet_ev_write_generic(stream, buf, NULL, JANET_ASYNC_WRITEMODE_WRITE, 1, 0);
2555 }
2556 
janet_ev_write_string(JanetStream * stream,JanetString str)2557 void janet_ev_write_string(JanetStream *stream, JanetString str) {
2558     janet_ev_write_generic(stream, (void *) str, NULL, JANET_ASYNC_WRITEMODE_WRITE, 0, 0);
2559 }
2560 
2561 #ifdef JANET_NET
janet_ev_send_buffer(JanetStream * stream,JanetBuffer * buf,int flags)2562 void janet_ev_send_buffer(JanetStream *stream, JanetBuffer *buf, int flags) {
2563     janet_ev_write_generic(stream, buf, NULL, JANET_ASYNC_WRITEMODE_SEND, 1, flags);
2564 }
2565 
janet_ev_send_string(JanetStream * stream,JanetString str,int flags)2566 void janet_ev_send_string(JanetStream *stream, JanetString str, int flags) {
2567     janet_ev_write_generic(stream, (void *) str, NULL, JANET_ASYNC_WRITEMODE_SEND, 0, flags);
2568 }
2569 
janet_ev_sendto_buffer(JanetStream * stream,JanetBuffer * buf,void * dest,int flags)2570 void janet_ev_sendto_buffer(JanetStream *stream, JanetBuffer *buf, void *dest, int flags) {
2571     janet_ev_write_generic(stream, buf, dest, JANET_ASYNC_WRITEMODE_SENDTO, 1, flags);
2572 }
2573 
janet_ev_sendto_string(JanetStream * stream,JanetString str,void * dest,int flags)2574 void janet_ev_sendto_string(JanetStream *stream, JanetString str, void *dest, int flags) {
2575     janet_ev_write_generic(stream, (void *) str, dest, JANET_ASYNC_WRITEMODE_SENDTO, 0, flags);
2576 }
2577 #endif
2578 
2579 /* For a pipe ID */
2580 #ifdef JANET_WINDOWS
2581 static volatile long PipeSerialNumber;
2582 #endif
2583 
2584 /*
2585  * mode = 0: both sides non-blocking.
2586  * mode = 1: only read side non-blocking: write side sent to subprocess
2587  * mode = 2: only write side non-blocking: read side sent to subprocess
2588  */
janet_make_pipe(JanetHandle handles[2],int mode)2589 int janet_make_pipe(JanetHandle handles[2], int mode) {
2590 #ifdef JANET_WINDOWS
2591     /*
2592      * On windows, the built in CreatePipe function doesn't support overlapped IO
2593      * so we lift from the windows source code and modify for our own version.
2594      */
2595     JanetHandle shandle, chandle;
2596     UCHAR PipeNameBuffer[MAX_PATH];
2597     SECURITY_ATTRIBUTES saAttr;
2598     memset(&saAttr, 0, sizeof(saAttr));
2599     saAttr.nLength = sizeof(saAttr);
2600     saAttr.bInheritHandle = TRUE;
2601     sprintf(PipeNameBuffer,
2602             "\\\\.\\Pipe\\JanetPipeFile.%08x.%08x",
2603             GetCurrentProcessId(),
2604             InterlockedIncrement(&PipeSerialNumber));
2605 
2606     /* server handle goes to subprocess */
2607     shandle = CreateNamedPipeA(
2608                   PipeNameBuffer,
2609                   (mode == 2 ? PIPE_ACCESS_INBOUND : PIPE_ACCESS_OUTBOUND) | FILE_FLAG_OVERLAPPED,
2610                   PIPE_TYPE_BYTE | PIPE_WAIT,
2611                   255,           /* Max number of pipes for duplication. */
2612                   4096,          /* Out buffer size */
2613                   4096,          /* In buffer size */
2614                   120 * 1000,    /* Timeout in ms */
2615                   &saAttr);
2616     if (shandle == INVALID_HANDLE_VALUE) {
2617         return -1;
2618     }
2619 
2620     /* we keep client handle */
2621     chandle = CreateFileA(
2622                   PipeNameBuffer,
2623                   (mode == 2 ? GENERIC_WRITE : GENERIC_READ),
2624                   0,
2625                   &saAttr,
2626                   OPEN_EXISTING,
2627                   FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED,
2628                   NULL);
2629 
2630     if (chandle == INVALID_HANDLE_VALUE) {
2631         CloseHandle(shandle);
2632         return -1;
2633     }
2634     if (mode == 2) {
2635         handles[0] = shandle;
2636         handles[1] = chandle;
2637     } else {
2638         handles[0] = chandle;
2639         handles[1] = shandle;
2640     }
2641     return 0;
2642 #else
2643     if (pipe(handles)) return -1;
2644     if (mode != 2 && fcntl(handles[0], F_SETFD, FD_CLOEXEC)) goto error;
2645     if (mode != 1 && fcntl(handles[1], F_SETFD, FD_CLOEXEC)) goto error;
2646     if (mode != 2 && fcntl(handles[0], F_SETFL, O_NONBLOCK)) goto error;
2647     if (mode != 1 && fcntl(handles[1], F_SETFL, O_NONBLOCK)) goto error;
2648     return 0;
2649 error:
2650     close(handles[0]);
2651     close(handles[1]);
2652     return -1;
2653 #endif
2654 }
2655 
2656 /* C functions */
2657 
2658 JANET_CORE_FN(cfun_ev_go,
2659               "(ev/go fiber &opt value supervisor)",
2660               "Put a fiber on the event loop to be resumed later. Optionally pass "
2661               "a value to resume with, otherwise resumes with nil. Returns the fiber. "
2662               "An optional `core/channel` can be provided as a supervisor. When various "
2663               "events occur in the newly scheduled fiber, an event will be pushed to the supervisor. "
2664               "If not provided, the new fiber will inherit the current supervisor.") {
2665     janet_arity(argc, 1, 3);
2666     Janet value = argc >= 2 ? argv[1] : janet_wrap_nil();
2667     void *supervisor = janet_optabstract(argv, argc, 2, &janet_channel_type, janet_vm.root_fiber->supervisor_channel);
2668     JanetFiber *fiber;
2669     if (janet_checktype(argv[0], JANET_FUNCTION)) {
2670         /* Create a fiber for the user */
2671         JanetFunction *func = janet_unwrap_function(argv[0]);
2672         if (func->def->min_arity > 1) {
2673             janet_panicf("task function must accept 0 or 1 arguments");
2674         }
2675         fiber = janet_fiber(func, 64, func->def->min_arity, &value);
2676         fiber->flags |=
2677             JANET_FIBER_MASK_ERROR |
2678             JANET_FIBER_MASK_USER0 |
2679             JANET_FIBER_MASK_USER1 |
2680             JANET_FIBER_MASK_USER2 |
2681             JANET_FIBER_MASK_USER3 |
2682             JANET_FIBER_MASK_USER4;
2683         if (!janet_vm.fiber->env) {
2684             janet_vm.fiber->env = janet_table(0);
2685         }
2686         fiber->env = janet_table(0);
2687         fiber->env->proto = janet_vm.fiber->env;
2688     } else {
2689         fiber = janet_getfiber(argv, 0);
2690     }
2691     fiber->supervisor_channel = supervisor;
2692     janet_schedule(fiber, value);
2693     return janet_wrap_fiber(fiber);
2694 }
2695 
2696 /* For ev/thread - Run an interpreter in the new thread. */
janet_go_thread_subr(JanetEVGenericMessage args)2697 static JanetEVGenericMessage janet_go_thread_subr(JanetEVGenericMessage args) {
2698     JanetBuffer *buffer = (JanetBuffer *) args.argp;
2699     const uint8_t *nextbytes = buffer->data;
2700     const uint8_t *endbytes = nextbytes + buffer->count;
2701     uint32_t flags = args.tag;
2702     args.tag = 0;
2703     janet_init();
2704     JanetTryState tstate;
2705     JanetSignal signal = janet_try(&tstate);
2706     if (!signal) {
2707 
2708         /* Set abstract registry */
2709         if (!(flags & 0x2)) {
2710             Janet aregv = janet_unmarshal(nextbytes, endbytes - nextbytes,
2711                                           JANET_MARSHAL_UNSAFE, NULL, &nextbytes);
2712             if (!janet_checktype(aregv, JANET_TABLE)) janet_panic("expected table for abstract registry");
2713             janet_vm.abstract_registry = janet_unwrap_table(aregv);
2714             janet_gcroot(janet_wrap_table(janet_vm.abstract_registry));
2715         }
2716 
2717         /* Get supervsior */
2718         if (flags & 0x8) {
2719             Janet sup =
2720                 janet_unmarshal(nextbytes, endbytes - nextbytes,
2721                                 JANET_MARSHAL_UNSAFE, NULL, &nextbytes);
2722             /* Hack - use a global variable to avoid longjmp clobber */
2723             janet_vm.user = janet_unwrap_pointer(sup);
2724         }
2725 
2726         /* Set cfunction registry */
2727         if (!(flags & 0x4)) {
2728             uint32_t count1;
2729             memcpy(&count1, nextbytes, sizeof(count1));
2730             size_t count = (size_t) count1;
2731             if (count > (endbytes - nextbytes) * sizeof(JanetCFunRegistry)) {
2732                 janet_panic("thread message invalid");
2733             }
2734             janet_vm.registry_count = count;
2735             janet_vm.registry_cap = count;
2736             janet_vm.registry = janet_malloc(count * sizeof(JanetCFunRegistry));
2737             if (janet_vm.registry == NULL) {
2738                 JANET_OUT_OF_MEMORY;
2739             }
2740             janet_vm.registry_dirty = 1;
2741             nextbytes += sizeof(uint32_t);
2742             memcpy(janet_vm.registry, nextbytes, count * sizeof(JanetCFunRegistry));
2743             nextbytes += count * sizeof(JanetCFunRegistry);
2744         }
2745 
2746         Janet fiberv = janet_unmarshal(nextbytes, endbytes - nextbytes,
2747                                        JANET_MARSHAL_UNSAFE, NULL, &nextbytes);
2748         Janet value = janet_unmarshal(nextbytes, endbytes - nextbytes,
2749                                       JANET_MARSHAL_UNSAFE, NULL, &nextbytes);
2750         JanetFiber *fiber;
2751         if (!janet_checktype(fiberv, JANET_FIBER)) {
2752             if (!janet_checktype(fiberv, JANET_FUNCTION)) {
2753                 janet_panicf("expected function|fiber, got %v", fiberv);
2754             }
2755             JanetFunction *func = janet_unwrap_function(fiberv);
2756             if (func->def->min_arity > 1) {
2757                 janet_panicf("thread function must accept 0 or 1 arguments");
2758             }
2759             fiber = janet_fiber(func, 64, func->def->min_arity, &value);
2760             fiber->flags |=
2761                 JANET_FIBER_MASK_ERROR |
2762                 JANET_FIBER_MASK_USER0 |
2763                 JANET_FIBER_MASK_USER1 |
2764                 JANET_FIBER_MASK_USER2 |
2765                 JANET_FIBER_MASK_USER3 |
2766                 JANET_FIBER_MASK_USER4;
2767         } else {
2768             fiber = janet_unwrap_fiber(fiberv);
2769         }
2770         fiber->supervisor_channel = janet_vm.user;
2771         janet_schedule(fiber, value);
2772         janet_loop();
2773         args.tag = JANET_EV_TCTAG_NIL;
2774     } else {
2775         void *supervisor = janet_vm.user;
2776         if (NULL != supervisor) {
2777             /* Got a supervisor, write error there */
2778             Janet pair[] = {
2779                 janet_ckeywordv("error"),
2780                 tstate.payload
2781             };
2782             janet_channel_push((JanetChannel *)supervisor,
2783                                janet_wrap_tuple(janet_tuple_n(pair, 2)), 2);
2784         } else if (flags & 0x1) {
2785             /* No wait, just print to stderr */
2786             janet_eprintf("thread start failure: %v\n", tstate.payload);
2787         } else {
2788             /* Make ev/thread call from parent thread error */
2789             if (janet_checktype(tstate.payload, JANET_STRING)) {
2790                 args.tag = JANET_EV_TCTAG_ERR_STRINGF;
2791                 args.argp = strdup((const char *) janet_unwrap_string(tstate.payload));
2792             } else {
2793                 args.tag = JANET_EV_TCTAG_ERR_STRING;
2794                 args.argp = "failed to start thread";
2795             }
2796         }
2797     }
2798     janet_restore(&tstate);
2799     janet_buffer_deinit(buffer);
2800     janet_free(buffer);
2801     janet_deinit();
2802     return args;
2803 }
2804 
2805 JANET_CORE_FN(cfun_ev_thread,
2806               "(ev/thread main &opt value flags supervisor)",
2807               "Run `main` in a new operating system thread, optionally passing `value` "
2808               "to resume with. The parameter `main` can either be a fiber, or a function that accepts "
2809               "0 or 1 arguments. "
2810               "Unlike `ev/go`, this function will suspend the current fiber until the thread is complete. "
2811               "If you want to run the thread without waiting for a result, pass the `:n` flag to return nil immediately. "
2812               "Otherwise, returns nil. Available flags:\n\n"
2813               "* `:n` - return immediately\n"
2814               "* `:a` - don't copy abstract registry to new thread (performance optimization)\n"
2815               "* `:c` - don't copy cfunction registry to new thread (performance optimization)") {
2816     janet_arity(argc, 1, 4);
2817     Janet value = argc >= 2 ? argv[1] : janet_wrap_nil();
2818     if (!janet_checktype(argv[0], JANET_FUNCTION)) janet_getfiber(argv, 0);
2819     uint64_t flags = 0;
2820     if (argc >= 3) {
2821         flags = janet_getflags(argv, 2, "nac");
2822     }
2823     void *supervisor = janet_optabstract(argv, argc, 3, &janet_channel_type, janet_vm.root_fiber->supervisor_channel);
2824     if (NULL != supervisor) flags |= 0x8;
2825 
2826     /* Marshal arguments for the new thread. */
2827     JanetBuffer *buffer = janet_malloc(sizeof(JanetBuffer));
2828     if (NULL == buffer) {
2829         JANET_OUT_OF_MEMORY;
2830     }
2831     janet_buffer_init(buffer, 0);
2832     if (!(flags & 0x2)) {
2833         janet_marshal(buffer, janet_wrap_table(janet_vm.abstract_registry), NULL, JANET_MARSHAL_UNSAFE);
2834     }
2835     if (flags & 0x8) {
2836         janet_marshal(buffer, janet_wrap_abstract(supervisor), NULL, JANET_MARSHAL_UNSAFE);
2837     }
2838     if (!(flags & 0x4)) {
2839         janet_assert(janet_vm.registry_count <= INT32_MAX, "assert failed size check");
2840         uint32_t temp = (uint32_t) janet_vm.registry_count;
2841         janet_buffer_push_bytes(buffer, (uint8_t *) &temp, sizeof(temp));
2842         janet_buffer_push_bytes(buffer, (uint8_t *) janet_vm.registry, (int32_t) janet_vm.registry_count * sizeof(JanetCFunRegistry));
2843     }
2844     janet_marshal(buffer, argv[0], NULL, JANET_MARSHAL_UNSAFE);
2845     janet_marshal(buffer, value, NULL, JANET_MARSHAL_UNSAFE);
2846     if (flags & 0x1) {
2847         /* Return immediately */
2848         JanetEVGenericMessage arguments;
2849         memset(&arguments, 0, sizeof(arguments));
2850         arguments.tag = (uint32_t) flags;
2851         arguments.argi = argc;
2852         arguments.argp = buffer;
2853         arguments.fiber = NULL;
2854         janet_ev_threaded_call(janet_go_thread_subr, arguments, janet_ev_default_threaded_callback);
2855         return janet_wrap_nil();
2856     } else {
2857         janet_ev_threaded_await(janet_go_thread_subr, (uint32_t) flags, argc, buffer);
2858     }
2859 }
2860 
2861 JANET_CORE_FN(cfun_ev_give_supervisor,
2862               "(ev/give-supervisor tag & payload)",
2863               "Send a message to the current supervior channel if there is one. The message will be a "
2864               "tuple of all of the arguments combined into a single message, where the first element is tag. "
2865               "By convention, tag should be a keyword indicating the type of message. Returns nil.") {
2866     janet_arity(argc, 1, -1);
2867     void *chanv = janet_vm.root_fiber->supervisor_channel;
2868     if (NULL != chanv) {
2869         JanetChannel *chan = janet_channel_unwrap(chanv);
2870         if (janet_channel_push(chan, janet_wrap_tuple(janet_tuple_n(argv, argc)), 0)) {
2871             janet_await();
2872         }
2873     }
2874     return janet_wrap_nil();
2875 }
2876 
janet_sleep_await(double sec)2877 JANET_NO_RETURN void janet_sleep_await(double sec) {
2878     JanetTimeout to;
2879     to.when = ts_delta(ts_now(), sec);
2880     to.fiber = janet_vm.root_fiber;
2881     to.is_error = 0;
2882     to.sched_id = to.fiber->sched_id;
2883     to.curr_fiber = NULL;
2884     add_timeout(to);
2885     janet_await();
2886 }
2887 
2888 JANET_CORE_FN(cfun_ev_sleep,
2889               "(ev/sleep sec)",
2890               "Suspend the current fiber for sec seconds without blocking the event loop.") {
2891     janet_fixarity(argc, 1);
2892     double sec = janet_getnumber(argv, 0);
2893     janet_sleep_await(sec);
2894 }
2895 
2896 JANET_CORE_FN(cfun_ev_deadline,
2897               "(ev/deadline sec &opt tocancel tocheck)",
2898               "Set a deadline for a fiber `tocheck`. If `tocheck` is not finished after `sec` seconds, "
2899               "`tocancel` will be canceled as with `ev/cancel`. "
2900               "If `tocancel` and `tocheck` are not given, they default to `(fiber/root)` and "
2901               "`(fiber/current)` respectively. Returns `tocancel`.") {
2902     janet_arity(argc, 1, 3);
2903     double sec = janet_getnumber(argv, 0);
2904     JanetFiber *tocancel = janet_optfiber(argv, argc, 1, janet_vm.root_fiber);
2905     JanetFiber *tocheck = janet_optfiber(argv, argc, 2, janet_vm.fiber);
2906     JanetTimeout to;
2907     to.when = ts_delta(ts_now(), sec);
2908     to.fiber = tocancel;
2909     to.curr_fiber = tocheck;
2910     to.is_error = 0;
2911     to.sched_id = to.fiber->sched_id;
2912     add_timeout(to);
2913     return janet_wrap_fiber(tocancel);
2914 }
2915 
2916 JANET_CORE_FN(cfun_ev_cancel,
2917               "(ev/cancel fiber err)",
2918               "Cancel a suspended fiber in the event loop. Differs from cancel in that it returns the canceled fiber immediately.") {
2919     janet_fixarity(argc, 2);
2920     JanetFiber *fiber = janet_getfiber(argv, 0);
2921     Janet err = argv[1];
2922     janet_cancel(fiber, err);
2923     return argv[0];
2924 }
2925 
2926 JANET_CORE_FN(janet_cfun_stream_close,
2927               "(ev/close stream)",
2928               "Close a stream. This should be the same as calling (:close stream) for all streams.") {
2929     janet_fixarity(argc, 1);
2930     JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
2931     janet_stream_close(stream);
2932     return argv[0];
2933 }
2934 
2935 JANET_CORE_FN(janet_cfun_stream_read,
2936               "(ev/read stream n &opt buffer timeout)",
2937               "Read up to n bytes into a buffer asynchronously from a stream. `n` can also be the keyword "
2938               "`:all` to read into the buffer until end of stream. "
2939               "Optionally provide a buffer to write into "
2940               "as well as a timeout in seconds after which to cancel the operation and raise an error. "
2941               "Returns the buffer if the read was successful or nil if end-of-stream reached. Will raise an "
2942               "error if there are problems with the IO operation.") {
2943     janet_arity(argc, 2, 4);
2944     JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
2945     janet_stream_flags(stream, JANET_STREAM_READABLE);
2946     JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10);
2947     double to = janet_optnumber(argv, argc, 3, INFINITY);
2948     if (janet_keyeq(argv[1], "all")) {
2949         if (to != INFINITY) janet_addtimeout(to);
2950         janet_ev_readchunk(stream, buffer, INT32_MAX);
2951     } else {
2952         int32_t n = janet_getnat(argv, 1);
2953         if (to != INFINITY) janet_addtimeout(to);
2954         janet_ev_read(stream, buffer, n);
2955     }
2956     janet_await();
2957 }
2958 
2959 JANET_CORE_FN(janet_cfun_stream_chunk,
2960               "(ev/chunk stream n &opt buffer timeout)",
2961               "Same as ev/read, but will not return early if less than n bytes are available. If an end of "
2962               "stream is reached, will also return early with the collected bytes.") {
2963     janet_arity(argc, 2, 4);
2964     JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
2965     janet_stream_flags(stream, JANET_STREAM_READABLE);
2966     int32_t n = janet_getnat(argv, 1);
2967     JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10);
2968     double to = janet_optnumber(argv, argc, 3, INFINITY);
2969     if (to != INFINITY) janet_addtimeout(to);
2970     janet_ev_readchunk(stream, buffer, n);
2971     janet_await();
2972 }
2973 
2974 JANET_CORE_FN(janet_cfun_stream_write,
2975               "(ev/write stream data &opt timeout)",
2976               "Write data to a stream, suspending the current fiber until the write "
2977               "completes. Takes an optional timeout in seconds, after which will return nil. "
2978               "Returns nil, or raises an error if the write failed.") {
2979     janet_arity(argc, 2, 3);
2980     JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
2981     janet_stream_flags(stream, JANET_STREAM_WRITABLE);
2982     double to = janet_optnumber(argv, argc, 2, INFINITY);
2983     if (janet_checktype(argv[1], JANET_BUFFER)) {
2984         if (to != INFINITY) janet_addtimeout(to);
2985         janet_ev_write_buffer(stream, janet_getbuffer(argv, 1));
2986     } else {
2987         JanetByteView bytes = janet_getbytes(argv, 1);
2988         if (to != INFINITY) janet_addtimeout(to);
2989         janet_ev_write_string(stream, bytes.bytes);
2990     }
2991     janet_await();
2992 }
2993 
janet_lib_ev(JanetTable * env)2994 void janet_lib_ev(JanetTable *env) {
2995     JanetRegExt ev_cfuns_ext[] = {
2996         JANET_CORE_REG("ev/give", cfun_channel_push),
2997         JANET_CORE_REG("ev/take", cfun_channel_pop),
2998         JANET_CORE_REG("ev/full", cfun_channel_full),
2999         JANET_CORE_REG("ev/capacity", cfun_channel_capacity),
3000         JANET_CORE_REG("ev/count", cfun_channel_count),
3001         JANET_CORE_REG("ev/select", cfun_channel_choice),
3002         JANET_CORE_REG("ev/rselect", cfun_channel_rchoice),
3003         JANET_CORE_REG("ev/chan", cfun_channel_new),
3004         JANET_CORE_REG("ev/thread-chan", cfun_channel_new_threaded),
3005         JANET_CORE_REG("ev/chan-close", cfun_channel_close),
3006         JANET_CORE_REG("ev/go", cfun_ev_go),
3007         JANET_CORE_REG("ev/thread", cfun_ev_thread),
3008         JANET_CORE_REG("ev/give-supervisor", cfun_ev_give_supervisor),
3009         JANET_CORE_REG("ev/sleep", cfun_ev_sleep),
3010         JANET_CORE_REG("ev/deadline", cfun_ev_deadline),
3011         JANET_CORE_REG("ev/cancel", cfun_ev_cancel),
3012         JANET_CORE_REG("ev/close", janet_cfun_stream_close),
3013         JANET_CORE_REG("ev/read", janet_cfun_stream_read),
3014         JANET_CORE_REG("ev/chunk", janet_cfun_stream_chunk),
3015         JANET_CORE_REG("ev/write", janet_cfun_stream_write),
3016         JANET_REG_END
3017     };
3018 
3019     janet_core_cfuns_ext(env, NULL, ev_cfuns_ext);
3020     janet_register_abstract_type(&janet_stream_type);
3021     janet_register_abstract_type(&janet_channel_type);
3022 }
3023 
3024 #endif
3025