1 /*
2 * Copyright (c) 2021 Calvin Rose and contributors.
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to
6 * deal in the Software without restriction, including without limitation the
7 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
8 * sell copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20 * IN THE SOFTWARE.
21 */
22 
23 #ifndef JANET_AMALG
24 #include "features.h"
25 #include <janet.h>
26 #include "util.h"
27 #endif
28 
29 #ifdef JANET_NET
30 
31 #include <math.h>
32 #ifdef JANET_WINDOWS
33 #include <winsock2.h>
34 #include <windows.h>
35 #include <ws2tcpip.h>
36 #include <mswsock.h>
37 #pragma comment (lib, "Ws2_32.lib")
38 #pragma comment (lib, "Mswsock.lib")
39 #pragma comment (lib, "Advapi32.lib")
40 #else
41 #include <arpa/inet.h>
42 #include <unistd.h>
43 #include <signal.h>
44 #include <sys/ioctl.h>
45 #include <sys/types.h>
46 #include <sys/socket.h>
47 #include <sys/un.h>
48 #include <netinet/in.h>
49 #include <netinet/tcp.h>
50 #include <netdb.h>
51 #include <fcntl.h>
52 #endif
53 
54 const JanetAbstractType janet_address_type = {
55     "core/socket-address",
56     JANET_ATEND_NAME
57 };
58 
59 #ifdef JANET_WINDOWS
60 #define JSOCKCLOSE(x) closesocket((SOCKET) x)
61 #define JSOCKDEFAULT INVALID_SOCKET
62 #define JSOCKVALID(x) ((x) != INVALID_SOCKET)
63 #define JSock SOCKET
64 #define JSOCKFLAGS 0
65 #else
66 #define JSOCKCLOSE(x) close(x)
67 #define JSOCKDEFAULT 0
68 #define JSOCKVALID(x) ((x) >= 0)
69 #define JSock int
70 #ifdef SOCK_CLOEXEC
71 #define JSOCKFLAGS SOCK_CLOEXEC
72 #else
73 #define JSOCKFLAGS 0
74 #endif
75 #endif
76 
77 /* maximum number of bytes in a socket address host (post name resolution) */
78 #ifdef JANET_WINDOWS
79 #define SA_ADDRSTRLEN (INET6_ADDRSTRLEN + 1)
80 typedef unsigned short in_port_t;
81 #else
82 #define JANET_SA_MAX(a, b) (((a) > (b))? (a) : (b))
83 #define SA_ADDRSTRLEN JANET_SA_MAX(INET6_ADDRSTRLEN + 1, (sizeof ((struct sockaddr_un *)0)->sun_path) + 1)
84 #endif
85 
86 static JanetStream *make_stream(JSock handle, uint32_t flags);
87 
88 /* We pass this flag to all send calls to prevent sigpipe */
89 #ifndef MSG_NOSIGNAL
90 #define MSG_NOSIGNAL 0
91 #endif
92 
93 /* Make sure a socket doesn't block */
janet_net_socknoblock(JSock s)94 static void janet_net_socknoblock(JSock s) {
95 #ifdef JANET_WINDOWS
96     unsigned long arg = 1;
97     ioctlsocket(s, FIONBIO, &arg);
98 #else
99 #if !defined(SOCK_CLOEXEC) && defined(O_CLOEXEC)
100     int extra = O_CLOEXEC;
101 #else
102     int extra = 0;
103 #endif
104     fcntl(s, F_SETFL, fcntl(s, F_GETFL, 0) | O_NONBLOCK | extra);
105 #ifdef SO_NOSIGPIPE
106     int enable = 1;
107     setsockopt(s, SOL_SOCKET, SO_NOSIGPIPE, &enable, sizeof(int));
108 #endif
109 #endif
110 }
111 
112 /* State machine for accepting connections. */
113 
114 #ifdef JANET_WINDOWS
115 
116 typedef struct {
117     JanetListenerState head;
118     WSAOVERLAPPED overlapped;
119     JanetFunction *function;
120     JanetStream *lstream;
121     JanetStream *astream;
122     char buf[1024];
123 } NetStateAccept;
124 
125 static int net_sched_accept_impl(NetStateAccept *state, Janet *err);
126 
net_machine_accept(JanetListenerState * s,JanetAsyncEvent event)127 JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event) {
128     NetStateAccept *state = (NetStateAccept *)s;
129     switch (event) {
130         default:
131             break;
132         case JANET_ASYNC_EVENT_MARK: {
133             if (state->lstream) janet_mark(janet_wrap_abstract(state->lstream));
134             if (state->astream) janet_mark(janet_wrap_abstract(state->astream));
135             if (state->function) janet_mark(janet_wrap_function(state->function));
136             break;
137         }
138         case JANET_ASYNC_EVENT_CLOSE:
139             janet_schedule(s->fiber, janet_wrap_nil());
140             return JANET_ASYNC_STATUS_DONE;
141         case JANET_ASYNC_EVENT_COMPLETE: {
142             if (state->astream->flags & JANET_STREAM_CLOSED) {
143                 janet_cancel(s->fiber, janet_cstringv("failed to accept connection"));
144                 return JANET_ASYNC_STATUS_DONE;
145             }
146             SOCKET lsock = (SOCKET) state->lstream->handle;
147             if (NO_ERROR != setsockopt((SOCKET) state->astream->handle, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
148                                        (char *) &lsock, sizeof(lsock))) {
149                 janet_cancel(s->fiber, janet_cstringv("failed to accept connection"));
150                 return JANET_ASYNC_STATUS_DONE;
151             }
152 
153             Janet streamv = janet_wrap_abstract(state->astream);
154             if (state->function) {
155                 /* Schedule worker */
156                 JanetFiber *fiber = janet_fiber(state->function, 64, 1, &streamv);
157                 fiber->supervisor_channel = s->fiber->supervisor_channel;
158                 janet_schedule(fiber, janet_wrap_nil());
159                 /* Now listen again for next connection */
160                 Janet err;
161                 if (net_sched_accept_impl(state, &err)) {
162                     janet_cancel(s->fiber, err);
163                     return JANET_ASYNC_STATUS_DONE;
164                 }
165             } else {
166                 janet_schedule(s->fiber, streamv);
167                 return JANET_ASYNC_STATUS_DONE;
168             }
169         }
170     }
171     return JANET_ASYNC_STATUS_NOT_DONE;
172 }
173 
janet_sched_accept(JanetStream * stream,JanetFunction * fun)174 JANET_NO_RETURN static void janet_sched_accept(JanetStream *stream, JanetFunction *fun) {
175     Janet err;
176     SOCKET lsock = (SOCKET) stream->handle;
177     JanetListenerState *s = janet_listen(stream, net_machine_accept, JANET_ASYNC_LISTEN_READ, sizeof(NetStateAccept), NULL);
178     NetStateAccept *state = (NetStateAccept *)s;
179     memset(&state->overlapped, 0, sizeof(WSAOVERLAPPED));
180     memset(&state->buf, 0, 1024);
181     state->function = fun;
182     state->lstream = stream;
183     s->tag = &state->overlapped;
184     if (net_sched_accept_impl(state, &err)) janet_panicv(err);
185     janet_await();
186 }
187 
net_sched_accept_impl(NetStateAccept * state,Janet * err)188 static int net_sched_accept_impl(NetStateAccept *state, Janet *err) {
189     SOCKET lsock = (SOCKET) state->lstream->handle;
190     SOCKET asock = WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
191     if (asock == INVALID_SOCKET) {
192         *err = janet_ev_lasterr();
193         return 1;
194     }
195     JanetStream *astream = make_stream(asock, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
196     state->astream = astream;
197     int socksize = sizeof(SOCKADDR_STORAGE) + 16;
198     if (FALSE == AcceptEx(lsock, asock, state->buf, 0, socksize, socksize, NULL, &state->overlapped)) {
199         int code = WSAGetLastError();
200         if (code == WSA_IO_PENDING) return 0; /* indicates io is happening async */
201         *err = janet_ev_lasterr();
202         return 1;
203     }
204     return 0;
205 }
206 
207 #else
208 
209 typedef struct {
210     JanetListenerState head;
211     JanetFunction *function;
212 } NetStateAccept;
213 
net_machine_accept(JanetListenerState * s,JanetAsyncEvent event)214 JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event) {
215     NetStateAccept *state = (NetStateAccept *)s;
216     switch (event) {
217         default:
218             break;
219         case JANET_ASYNC_EVENT_MARK: {
220             if (state->function) janet_mark(janet_wrap_function(state->function));
221             break;
222         }
223         case JANET_ASYNC_EVENT_CLOSE:
224             janet_schedule(s->fiber, janet_wrap_nil());
225             return JANET_ASYNC_STATUS_DONE;
226         case JANET_ASYNC_EVENT_READ: {
227             JSock connfd = accept(s->stream->handle, NULL, NULL);
228             if (JSOCKVALID(connfd)) {
229                 janet_net_socknoblock(connfd);
230                 JanetStream *stream = make_stream(connfd, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
231                 Janet streamv = janet_wrap_abstract(stream);
232                 if (state->function) {
233                     JanetFiber *fiber = janet_fiber(state->function, 64, 1, &streamv);
234                     fiber->supervisor_channel = s->fiber->supervisor_channel;
235                     janet_schedule(fiber, janet_wrap_nil());
236                 } else {
237                     janet_schedule(s->fiber, streamv);
238                     return JANET_ASYNC_STATUS_DONE;
239                 }
240             }
241             break;
242         }
243     }
244     return JANET_ASYNC_STATUS_NOT_DONE;
245 }
246 
janet_sched_accept(JanetStream * stream,JanetFunction * fun)247 JANET_NO_RETURN static void janet_sched_accept(JanetStream *stream, JanetFunction *fun) {
248     NetStateAccept *state = (NetStateAccept *) janet_listen(stream, net_machine_accept, JANET_ASYNC_LISTEN_READ, sizeof(NetStateAccept), NULL);
249     state->function = fun;
250     janet_await();
251 }
252 
253 
254 #endif
255 
256 /* Adress info */
257 
janet_get_sockettype(Janet * argv,int32_t argc,int32_t n)258 static int janet_get_sockettype(Janet *argv, int32_t argc, int32_t n) {
259     JanetKeyword stype = janet_optkeyword(argv, argc, n, NULL);
260     int socktype = SOCK_DGRAM;
261     if ((NULL == stype) || !janet_cstrcmp(stype, "stream")) {
262         socktype = SOCK_STREAM;
263     } else if (janet_cstrcmp(stype, "datagram")) {
264         janet_panicf("expected socket type as :stream or :datagram, got %v", argv[n]);
265     }
266     return socktype;
267 }
268 
269 /* Needs argc >= offset + 2 */
270 /* For unix paths, just rertuns a single sockaddr and sets *is_unix to 1,
271  * otherwise 0. Also, ignores is_bind when is a unix socket. */
janet_get_addrinfo(Janet * argv,int32_t offset,int socktype,int passive,int * is_unix)272 static struct addrinfo *janet_get_addrinfo(Janet *argv, int32_t offset, int socktype, int passive, int *is_unix) {
273     /* Unix socket support - not yet supported on windows. */
274 #ifndef JANET_WINDOWS
275     if (janet_keyeq(argv[offset], "unix")) {
276         const char *path = janet_getcstring(argv, offset + 1);
277         struct sockaddr_un *saddr = janet_calloc(1, sizeof(struct sockaddr_un));
278         if (saddr == NULL) {
279             JANET_OUT_OF_MEMORY;
280         }
281         saddr->sun_family = AF_UNIX;
282         size_t path_size = sizeof(saddr->sun_path);
283 #ifdef JANET_LINUX
284         if (path[0] == '@') {
285             saddr->sun_path[0] = '\0';
286             snprintf(saddr->sun_path + 1, path_size - 1, "%s", path + 1);
287         } else
288 #endif
289         {
290             snprintf(saddr->sun_path, path_size, "%s", path);
291         }
292         *is_unix = 1;
293         return (struct addrinfo *) saddr;
294     }
295 #endif
296     /* Get host and port */
297     char *host = (char *)janet_getcstring(argv, offset);
298     char *port = NULL;
299     if (janet_checkint(argv[offset + 1])) {
300         port = (char *)janet_to_string(argv[offset + 1]);
301     } else {
302         port = (char *)janet_optcstring(argv, offset + 2, offset + 1, NULL);
303     }
304     /* getaddrinfo */
305     struct addrinfo *ai = NULL;
306     struct addrinfo hints;
307     memset(&hints, 0, sizeof(hints));
308     hints.ai_family = AF_UNSPEC;
309     hints.ai_socktype = socktype;
310     hints.ai_flags = passive ? AI_PASSIVE : 0;
311     int status = getaddrinfo(host, port, &hints, &ai);
312     if (status) {
313         janet_panicf("could not get address info: %s", gai_strerror(status));
314     }
315     *is_unix = 0;
316     return ai;
317 }
318 
319 /*
320  * C Funs
321  */
322 
323 JANET_CORE_FN(cfun_net_sockaddr,
324               "(net/address host port &opt type multi)",
325               "Look up the connection information for a given hostname, port, and connection type. Returns "
326               "a handle that can be used to send datagrams over network without establishing a connection. "
327               "On Posix platforms, you can use :unix for host to connect to a unix domain socket, where the name is "
328               "given in the port argument. On Linux, abstract "
329               "unix domain sockets are specified with a leading '@' character in port. If `multi` is truthy, will "
330               "return all address that match in an array instead of just the first.") {
331     janet_arity(argc, 2, 4);
332     int socktype = janet_get_sockettype(argv, argc, 2);
333     int is_unix = 0;
334     int make_arr = (argc >= 3 && janet_truthy(argv[3]));
335     struct addrinfo *ai = janet_get_addrinfo(argv, 0, socktype, 0, &is_unix);
336 #ifndef JANET_WINDOWS
337     /* no unix domain socket support on windows yet */
338     if (is_unix) {
339         void *abst = janet_abstract(&janet_address_type, sizeof(struct sockaddr_un));
340         memcpy(abst, ai, sizeof(struct sockaddr_un));
341         Janet ret = janet_wrap_abstract(abst);
342         return make_arr ? janet_wrap_array(janet_array_n(&ret, 1)) : ret;
343     }
344 #endif
345     if (make_arr) {
346         /* Select all */
347         JanetArray *arr = janet_array(10);
348         struct addrinfo *iter = ai;
349         while (NULL != iter) {
350             void *abst = janet_abstract(&janet_address_type, iter->ai_addrlen);
351             memcpy(abst, iter->ai_addr, iter->ai_addrlen);
352             janet_array_push(arr, janet_wrap_abstract(abst));
353             iter = iter->ai_next;
354         }
355         freeaddrinfo(ai);
356         return janet_wrap_array(arr);
357     } else {
358         /* Select first */
359         if (NULL == ai) {
360             janet_panic("no data for given address");
361         }
362         void *abst = janet_abstract(&janet_address_type, ai->ai_addrlen);
363         memcpy(abst, ai->ai_addr, ai->ai_addrlen);
364         freeaddrinfo(ai);
365         return janet_wrap_abstract(abst);
366     }
367 }
368 
369 JANET_CORE_FN(cfun_net_connect,
370               "(net/connect host port &opt type bindhost bindport)",
371               "Open a connection to communicate with a server. Returns a duplex stream "
372               "that can be used to communicate with the server. Type is an optional keyword "
373               "to specify a connection type, either :stream or :datagram. The default is :stream. "
374               "Bindhost is an optional string to select from what address to make the outgoing "
375               "connection, with the default being the same as using the OS's preferred address. ") {
376     janet_arity(argc, 2, 5);
377 
378     /* Check arguments */
379     int socktype = janet_get_sockettype(argv, argc, 2);
380     int is_unix = 0;
381     char *bindhost = (char *) janet_optcstring(argv, argc, 3, NULL);
382     char *bindport = NULL;
383     if (argc >= 5 && janet_checkint(argv[4])) {
384         bindport = (char *)janet_to_string(argv[4]);
385     } else {
386         bindport = (char *)janet_optcstring(argv, argc, 4, NULL);
387     }
388 
389     /* Where we're connecting to */
390     struct addrinfo *ai = janet_get_addrinfo(argv, 0, socktype, 0, &is_unix);
391 
392     /* Check if we're binding address */
393     struct addrinfo *binding = NULL;
394     if (bindhost != NULL) {
395         if (is_unix) {
396             freeaddrinfo(ai);
397             janet_panic("bindhost not supported for unix domain sockets");
398         }
399         /* getaddrinfo */
400         struct addrinfo hints;
401         memset(&hints, 0, sizeof(hints));
402         hints.ai_family = AF_UNSPEC;
403         hints.ai_socktype = socktype;
404         hints.ai_flags = 0;
405         int status = getaddrinfo(bindhost, bindport, &hints, &binding);
406         if (status) {
407             freeaddrinfo(ai);
408             janet_panicf("could not get address info for bindhost: %s", gai_strerror(status));
409         }
410     }
411 
412 
413     /* Create socket */
414     JSock sock = JSOCKDEFAULT;
415     void *addr = NULL;
416     socklen_t addrlen = 0;
417 #ifndef JANET_WINDOWS
418     if (is_unix) {
419         sock = socket(AF_UNIX, socktype | JSOCKFLAGS, 0);
420         if (!JSOCKVALID(sock)) {
421             Janet v = janet_ev_lasterr();
422             janet_free(ai);
423             janet_panicf("could not create socket: %V", v);
424         }
425         addr = (void *) ai;
426         addrlen = sizeof(struct sockaddr_un);
427     } else
428 #endif
429     {
430         struct addrinfo *rp = NULL;
431         for (rp = ai; rp != NULL; rp = rp->ai_next) {
432 #ifdef JANET_WINDOWS
433             sock = WSASocketW(rp->ai_family, rp->ai_socktype, rp->ai_protocol, NULL, 0, WSA_FLAG_OVERLAPPED);
434 #else
435             sock = socket(rp->ai_family, rp->ai_socktype | JSOCKFLAGS, rp->ai_protocol);
436 #endif
437             if (JSOCKVALID(sock)) {
438                 addr = rp->ai_addr;
439                 addrlen = (socklen_t) rp->ai_addrlen;
440                 break;
441             }
442         }
443         if (NULL == addr) {
444             Janet v = janet_ev_lasterr();
445             if (binding) freeaddrinfo(binding);
446             freeaddrinfo(ai);
447             janet_panicf("could not create socket: %V", v);
448         }
449     }
450 
451     /* Bind to bindhost and bindport if given */
452     if (binding) {
453         struct addrinfo *rp = NULL;
454         int did_bind = 0;
455         for (rp = ai; rp != NULL; rp = rp->ai_next) {
456             if (bind(sock, rp->ai_addr, (int) rp->ai_addrlen) == 0) {
457                 did_bind = 1;
458                 break;
459             }
460         }
461         if (!did_bind) {
462             Janet v = janet_ev_lasterr();
463             freeaddrinfo(binding);
464             freeaddrinfo(ai);
465             JSOCKCLOSE(sock);
466             janet_panicf("could not bind outgoing address: %V", v);
467         } else {
468             freeaddrinfo(binding);
469         }
470     }
471 
472     /* Connect to socket */
473 #ifdef JANET_WINDOWS
474     int status = WSAConnect(sock, addr, addrlen, NULL, NULL, NULL, NULL);
475     Janet lasterr = janet_ev_lasterr();
476     freeaddrinfo(ai);
477 #else
478     int status = connect(sock, addr, addrlen);
479     Janet lasterr = janet_ev_lasterr();
480     if (is_unix) {
481         janet_free(ai);
482     } else {
483         freeaddrinfo(ai);
484     }
485 #endif
486 
487     if (status == -1) {
488         JSOCKCLOSE(sock);
489         janet_panicf("could not connect socket: %V", lasterr);
490     }
491 
492     /* Set up the socket for non-blocking IO after connect - TODO - non-blocking connect? */
493     janet_net_socknoblock(sock);
494 
495     /* Wrap socket in abstract type JanetStream */
496     JanetStream *stream = make_stream(sock, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
497     return janet_wrap_abstract(stream);
498 }
499 
serverify_socket(JSock sfd)500 static const char *serverify_socket(JSock sfd) {
501     /* Set various socket options */
502     int enable = 1;
503     if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (char *) &enable, sizeof(int)) < 0) {
504         return "setsockopt(SO_REUSEADDR) failed";
505     }
506 #ifdef SO_REUSEPORT
507     if (setsockopt(sfd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(int)) < 0) {
508         return "setsockopt(SO_REUSEPORT) failed";
509     }
510 #endif
511     janet_net_socknoblock(sfd);
512     return NULL;
513 }
514 
515 #ifdef JANET_WINDOWS
516 #define JANET_SHUTDOWN_RW SD_BOTH
517 #define JANET_SHUTDOWN_R SD_RECEIVE
518 #define JANET_SHUTDOWN_W SD_SEND
519 #else
520 #define JANET_SHUTDOWN_RW SHUT_RDWR
521 #define JANET_SHUTDOWN_R SHUT_RD
522 #define JANET_SHUTDOWN_W SHUT_WR
523 #endif
524 
525 JANET_CORE_FN(cfun_net_shutdown,
526               "(net/shutdown stream &opt mode)",
527               "Stop communication on this socket in a graceful manner, either in both directions or just "
528               "reading/writing from the stream. The `mode` parameter controls which communication to stop on the socket. "
529               "\n\n* `:wr` is the default and prevents both reading new data from the socket and writing new data to the socket.\n"
530               "* `:r` disables reading new data from the socket.\n"
531               "* `:w` disable writing data to the socket.\n\n"
532               "Returns the original socket.") {
533     janet_arity(argc, 1, 2);
534     JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
535     janet_stream_flags(stream, JANET_STREAM_SOCKET);
536     int shutdown_type = JANET_SHUTDOWN_RW;
537     if (argc == 2) {
538         const uint8_t *kw = janet_getkeyword(argv, 1);
539         if (0 == janet_cstrcmp(kw, "rw")) {
540             shutdown_type = JANET_SHUTDOWN_RW;
541         } else if (0 == janet_cstrcmp(kw, "r")) {
542             shutdown_type = JANET_SHUTDOWN_R;
543         } else if (0 == janet_cstrcmp(kw, "w")) {
544             shutdown_type = JANET_SHUTDOWN_W;
545         } else {
546             janet_panicf("unexpected keyword %v", argv[1]);
547         }
548     }
549     int status;
550 #ifdef JANET_WINDOWS
551     status = shutdown((SOCKET) stream->handle, shutdown_type);
552 #else
553     do {
554         status = shutdown(stream->handle, shutdown_type);
555     } while (status == -1 && errno == EINTR);
556 #endif
557     if (status) {
558         janet_panicf("could not shutdown socket: %V", janet_ev_lasterr());
559     }
560     return argv[0];
561 }
562 
563 JANET_CORE_FN(cfun_net_listen,
564               "(net/listen host port &opt type)",
565               "Creates a server. Returns a new stream that is neither readable nor "
566               "writeable. Use net/accept or net/accept-loop be to handle connections and start the server. "
567               "The type parameter specifies the type of network connection, either "
568               "a :stream (usually tcp), or :datagram (usually udp). If not specified, the default is "
569               ":stream. The host and port arguments are the same as in net/address.") {
570     janet_arity(argc, 2, 3);
571 
572     /* Get host, port, and handler*/
573     int socktype = janet_get_sockettype(argv, argc, 2);
574     int is_unix = 0;
575     struct addrinfo *ai = janet_get_addrinfo(argv, 0, socktype, 1, &is_unix);
576 
577     JSock sfd = JSOCKDEFAULT;
578 #ifndef JANET_WINDOWS
579     if (is_unix) {
580         sfd = socket(AF_UNIX, socktype | JSOCKFLAGS, 0);
581         if (!JSOCKVALID(sfd)) {
582             janet_free(ai);
583             janet_panicf("could not create socket: %V", janet_ev_lasterr());
584         }
585         const char *err = serverify_socket(sfd);
586         if (NULL != err || bind(sfd, (struct sockaddr *)ai, sizeof(struct sockaddr_un))) {
587             JSOCKCLOSE(sfd);
588             janet_free(ai);
589             if (err) {
590                 janet_panic(err);
591             } else {
592                 janet_panicf("could not bind socket: %V", janet_ev_lasterr());
593             }
594         }
595         janet_free(ai);
596     } else
597 #endif
598     {
599         /* Check all addrinfos in a loop for the first that we can bind to. */
600         struct addrinfo *rp = NULL;
601         for (rp = ai; rp != NULL; rp = rp->ai_next) {
602 #ifdef JANET_WINDOWS
603             sfd = WSASocketW(rp->ai_family, rp->ai_socktype | JSOCKFLAGS, rp->ai_protocol, NULL, 0, WSA_FLAG_OVERLAPPED);
604 #else
605             sfd = socket(rp->ai_family, rp->ai_socktype | JSOCKFLAGS, rp->ai_protocol);
606 #endif
607             if (!JSOCKVALID(sfd)) continue;
608             const char *err = serverify_socket(sfd);
609             if (NULL != err) {
610                 JSOCKCLOSE(sfd);
611                 continue;
612             }
613             /* Bind */
614             if (bind(sfd, rp->ai_addr, (int) rp->ai_addrlen) == 0) break;
615             JSOCKCLOSE(sfd);
616         }
617         freeaddrinfo(ai);
618         if (NULL == rp) {
619             janet_panic("could not bind to any sockets");
620         }
621     }
622 
623     if (socktype == SOCK_DGRAM) {
624         /* Datagram server (UDP) */
625         JanetStream *stream = make_stream(sfd, JANET_STREAM_UDPSERVER | JANET_STREAM_READABLE);
626         return janet_wrap_abstract(stream);
627     } else {
628         /* Stream server (TCP) */
629 
630         /* listen */
631         int status = listen(sfd, 1024);
632         if (status) {
633             JSOCKCLOSE(sfd);
634             janet_panicf("could not listen on file descriptor: %V", janet_ev_lasterr());
635         }
636 
637         /* Put sfd on our loop */
638         JanetStream *stream = make_stream(sfd, JANET_STREAM_ACCEPTABLE);
639         return janet_wrap_abstract(stream);
640     }
641 }
642 
643 /* Types of socket's we need to deal with - relevant type puns below.
644 struct sockaddr *sa;           // Common base structure
645 struct sockaddr_storage *ss;   // Size of largest socket address type
646 struct sockaddr_in *sin;       // IPv4 address + port
647 struct sockaddr_in6 *sin6;     // IPv6 address + port
648 struct sockaddr_un *sun;       // Unix Domain Socket Address
649 */
650 
651 /* Turn a socket address into a host, port pair.
652  * For unix domain sockets, returned tuple will have only a single element, the path string. */
janet_so_getname(const void * sa_any)653 static Janet janet_so_getname(const void *sa_any) {
654     const struct sockaddr *sa = sa_any;
655     char buffer[SA_ADDRSTRLEN];
656     switch (sa->sa_family) {
657         default:
658             janet_panic("unknown address family");
659         case AF_INET: {
660             const struct sockaddr_in *sai = sa_any;
661             if (!inet_ntop(AF_INET, &(sai->sin_addr), buffer, sizeof(buffer))) {
662                 janet_panic("unable to decode ipv4 host address");
663             }
664             Janet pair[2] = {janet_cstringv(buffer), janet_wrap_integer(ntohs(sai->sin_port))};
665             return janet_wrap_tuple(janet_tuple_n(pair, 2));
666         }
667         case AF_INET6: {
668             const struct sockaddr_in6 *sai6 = sa_any;
669             if (!inet_ntop(AF_INET6, &(sai6->sin6_addr), buffer, sizeof(buffer))) {
670                 janet_panic("unable to decode ipv4 host address");
671             }
672             Janet pair[2] = {janet_cstringv(buffer), janet_wrap_integer(ntohs(sai6->sin6_port))};
673             return janet_wrap_tuple(janet_tuple_n(pair, 2));
674         }
675 #ifndef JANET_WINDOWS
676         case AF_UNIX: {
677             const struct sockaddr_un *sun = sa_any;
678             Janet pathname;
679             if (sun->sun_path[0] == '\0') {
680                 memcpy(buffer, sun->sun_path, sizeof(sun->sun_path));
681                 buffer[0] = '@';
682                 pathname = janet_cstringv(buffer);
683             } else {
684                 pathname = janet_cstringv(sun->sun_path);
685             }
686             return janet_wrap_tuple(janet_tuple_n(&pathname, 1));
687         }
688 #endif
689     }
690 }
691 
692 JANET_CORE_FN(cfun_net_getsockname,
693               "(net/localname stream)",
694               "Gets the local address and port in a tuple in that order.") {
695     janet_fixarity(argc, 1);
696     JanetStream *js = janet_getabstract(argv, 0, &janet_stream_type);
697     if (js->flags & JANET_STREAM_CLOSED) janet_panic("stream closed");
698     struct sockaddr_storage ss;
699     socklen_t slen = sizeof(ss);
700     memset(&ss, 0, slen);
701     if (getsockname((JSock)js->handle, (struct sockaddr *) &ss, &slen)) {
702         janet_panicf("Failed to get localname on %v: %V", argv[0], janet_ev_lasterr());
703     }
704     janet_assert(slen <= sizeof(ss), "socket address truncated");
705     return janet_so_getname(&ss);
706 }
707 
708 JANET_CORE_FN(cfun_net_getpeername,
709               "(net/peername stream)",
710               "Gets the remote peer's address and port in a tuple in that order.") {
711     janet_fixarity(argc, 1);
712     JanetStream *js = janet_getabstract(argv, 0, &janet_stream_type);
713     if (js->flags & JANET_STREAM_CLOSED) janet_panic("stream closed");
714     struct sockaddr_storage ss;
715     socklen_t slen = sizeof(ss);
716     memset(&ss, 0, slen);
717     if (getpeername((JSock)js->handle, (struct sockaddr *)&ss, &slen)) {
718         janet_panicf("Failed to get peername on %v: %V", argv[0], janet_ev_lasterr());
719     }
720     janet_assert(slen <= sizeof(ss), "socket address truncated");
721     return janet_so_getname(&ss);
722 }
723 
724 JANET_CORE_FN(cfun_net_address_unpack,
725               "(net/address-unpack address)",
726               "Given an address returned by net/adress, return a host, port pair. Unix domain sockets "
727               "will have only the path in the returned tuple.") {
728     janet_fixarity(argc, 1);
729     struct sockaddr *sa = janet_getabstract(argv, 0, &janet_address_type);
730     return janet_so_getname(sa);
731 }
732 
733 JANET_CORE_FN(cfun_stream_accept_loop,
734               "(net/accept-loop stream handler)",
735               "Shorthand for running a server stream that will continuously accept new connections. "
736               "Blocks the current fiber until the stream is closed, and will return the stream.") {
737     janet_fixarity(argc, 2);
738     JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
739     janet_stream_flags(stream, JANET_STREAM_ACCEPTABLE | JANET_STREAM_SOCKET);
740     JanetFunction *fun = janet_getfunction(argv, 1);
741     janet_sched_accept(stream, fun);
742 }
743 
744 JANET_CORE_FN(cfun_stream_accept,
745               "(net/accept stream &opt timeout)",
746               "Get the next connection on a server stream. This would usually be called in a loop in a dedicated fiber. "
747               "Takes an optional timeout in seconds, after which will return nil. "
748               "Returns a new duplex stream which represents a connection to the client.") {
749     janet_arity(argc, 1, 2);
750     JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
751     janet_stream_flags(stream, JANET_STREAM_ACCEPTABLE | JANET_STREAM_SOCKET);
752     double to = janet_optnumber(argv, argc, 1, INFINITY);
753     if (to != INFINITY) janet_addtimeout(to);
754     janet_sched_accept(stream, NULL);
755 }
756 
757 JANET_CORE_FN(cfun_stream_read,
758               "(net/read stream nbytes &opt buf timeout)",
759               "Read up to n bytes from a stream, suspending the current fiber until the bytes are available. "
760               "`n` can also be the keyword `:all` to read into the buffer until end of stream. "
761               "If less than n bytes are available (and more than 0), will push those bytes and return early. "
762               "Takes an optional timeout in seconds, after which will return nil. "
763               "Returns a buffer with up to n more bytes in it, or raises an error if the read failed.") {
764     janet_arity(argc, 2, 4);
765     JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
766     janet_stream_flags(stream, JANET_STREAM_READABLE | JANET_STREAM_SOCKET);
767     JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10);
768     double to = janet_optnumber(argv, argc, 3, INFINITY);
769     if (janet_keyeq(argv[1], "all")) {
770         if (to != INFINITY) janet_addtimeout(to);
771         janet_ev_recvchunk(stream, buffer, INT32_MAX, MSG_NOSIGNAL);
772     } else {
773         int32_t n = janet_getnat(argv, 1);
774         if (to != INFINITY) janet_addtimeout(to);
775         janet_ev_recv(stream, buffer, n, MSG_NOSIGNAL);
776     }
777     janet_await();
778 }
779 
780 JANET_CORE_FN(cfun_stream_chunk,
781               "(net/chunk stream nbytes &opt buf timeout)",
782               "Same a net/read, but will wait for all n bytes to arrive rather than return early. "
783               "Takes an optional timeout in seconds, after which will return nil.") {
784     janet_arity(argc, 2, 4);
785     JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
786     janet_stream_flags(stream, JANET_STREAM_READABLE | JANET_STREAM_SOCKET);
787     int32_t n = janet_getnat(argv, 1);
788     JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10);
789     double to = janet_optnumber(argv, argc, 3, INFINITY);
790     if (to != INFINITY) janet_addtimeout(to);
791     janet_ev_recvchunk(stream, buffer, n, MSG_NOSIGNAL);
792     janet_await();
793 }
794 
795 JANET_CORE_FN(cfun_stream_recv_from,
796               "(net/recv-from stream nbytes buf &opt timoeut)",
797               "Receives data from a server stream and puts it into a buffer. Returns the socket-address the "
798               "packet came from. Takes an optional timeout in seconds, after which will return nil.") {
799     janet_arity(argc, 3, 4);
800     JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
801     janet_stream_flags(stream, JANET_STREAM_UDPSERVER | JANET_STREAM_SOCKET);
802     int32_t n = janet_getnat(argv, 1);
803     JanetBuffer *buffer = janet_getbuffer(argv, 2);
804     double to = janet_optnumber(argv, argc, 3, INFINITY);
805     if (to != INFINITY) janet_addtimeout(to);
806     janet_ev_recvfrom(stream, buffer, n, MSG_NOSIGNAL);
807     janet_await();
808 }
809 
810 JANET_CORE_FN(cfun_stream_write,
811               "(net/write stream data &opt timeout)",
812               "Write data to a stream, suspending the current fiber until the write "
813               "completes. Takes an optional timeout in seconds, after which will return nil. "
814               "Returns nil, or raises an error if the write failed.") {
815     janet_arity(argc, 2, 3);
816     JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
817     janet_stream_flags(stream, JANET_STREAM_WRITABLE | JANET_STREAM_SOCKET);
818     double to = janet_optnumber(argv, argc, 2, INFINITY);
819     if (janet_checktype(argv[1], JANET_BUFFER)) {
820         if (to != INFINITY) janet_addtimeout(to);
821         janet_ev_send_buffer(stream, janet_getbuffer(argv, 1), MSG_NOSIGNAL);
822     } else {
823         JanetByteView bytes = janet_getbytes(argv, 1);
824         if (to != INFINITY) janet_addtimeout(to);
825         janet_ev_send_string(stream, bytes.bytes, MSG_NOSIGNAL);
826     }
827     janet_await();
828 }
829 
830 JANET_CORE_FN(cfun_stream_send_to,
831               "(net/send-to stream dest data &opt timeout)",
832               "Writes a datagram to a server stream. dest is a the destination address of the packet. "
833               "Takes an optional timeout in seconds, after which will return nil. "
834               "Returns stream.") {
835     janet_arity(argc, 3, 4);
836     JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
837     janet_stream_flags(stream, JANET_STREAM_UDPSERVER | JANET_STREAM_SOCKET);
838     void *dest = janet_getabstract(argv, 1, &janet_address_type);
839     double to = janet_optnumber(argv, argc, 3, INFINITY);
840     if (janet_checktype(argv[2], JANET_BUFFER)) {
841         if (to != INFINITY) janet_addtimeout(to);
842         janet_ev_sendto_buffer(stream, janet_getbuffer(argv, 2), dest, MSG_NOSIGNAL);
843     } else {
844         JanetByteView bytes = janet_getbytes(argv, 2);
845         if (to != INFINITY) janet_addtimeout(to);
846         janet_ev_sendto_string(stream, bytes.bytes, dest, MSG_NOSIGNAL);
847     }
848     janet_await();
849 }
850 
851 JANET_CORE_FN(cfun_stream_flush,
852               "(net/flush stream)",
853               "Make sure that a stream is not buffering any data. This temporarily disables Nagle's algorithm. "
854               "Use this to make sure data is sent without delay. Returns stream.") {
855     janet_fixarity(argc, 1);
856     JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
857     janet_stream_flags(stream, JANET_STREAM_WRITABLE | JANET_STREAM_SOCKET);
858     /* Toggle no delay flag */
859     int flag = 1;
860     setsockopt((JSock) stream->handle, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
861     flag = 0;
862     setsockopt((JSock) stream->handle, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
863     return argv[0];
864 }
865 
866 static const JanetMethod net_stream_methods[] = {
867     {"chunk", cfun_stream_chunk},
868     {"close", janet_cfun_stream_close},
869     {"read", cfun_stream_read},
870     {"write", cfun_stream_write},
871     {"flush", cfun_stream_flush},
872     {"accept", cfun_stream_accept},
873     {"accept-loop", cfun_stream_accept_loop},
874     {"send-to", cfun_stream_send_to},
875     {"recv-from", cfun_stream_recv_from},
876     {"evread", janet_cfun_stream_read},
877     {"evchunk", janet_cfun_stream_chunk},
878     {"evwrite", janet_cfun_stream_write},
879     {"shutdown", cfun_net_shutdown},
880     {NULL, NULL}
881 };
882 
make_stream(JSock handle,uint32_t flags)883 static JanetStream *make_stream(JSock handle, uint32_t flags) {
884     return janet_stream((JanetHandle) handle, flags | JANET_STREAM_SOCKET, net_stream_methods);
885 }
886 
887 
janet_lib_net(JanetTable * env)888 void janet_lib_net(JanetTable *env) {
889     JanetRegExt net_cfuns[] = {
890         JANET_CORE_REG("net/address", cfun_net_sockaddr),
891         JANET_CORE_REG("net/listen", cfun_net_listen),
892         JANET_CORE_REG("net/accept", cfun_stream_accept),
893         JANET_CORE_REG("net/accept-loop", cfun_stream_accept_loop),
894         JANET_CORE_REG("net/read", cfun_stream_read),
895         JANET_CORE_REG("net/chunk", cfun_stream_chunk),
896         JANET_CORE_REG("net/write", cfun_stream_write),
897         JANET_CORE_REG("net/send-to", cfun_stream_send_to),
898         JANET_CORE_REG("net/recv-from", cfun_stream_recv_from),
899         JANET_CORE_REG("net/flush", cfun_stream_flush),
900         JANET_CORE_REG("net/connect", cfun_net_connect),
901         JANET_CORE_REG("net/shutdown", cfun_net_shutdown),
902         JANET_CORE_REG("net/peername", cfun_net_getpeername),
903         JANET_CORE_REG("net/localname", cfun_net_getsockname),
904         JANET_CORE_REG("net/address-unpack", cfun_net_address_unpack),
905         JANET_REG_END
906     };
907     janet_core_cfuns_ext(env, NULL, net_cfuns);
908 }
909 
janet_net_init(void)910 void janet_net_init(void) {
911 #ifdef JANET_WINDOWS
912     WSADATA wsaData;
913     janet_assert(!WSAStartup(MAKEWORD(2, 2), &wsaData), "could not start winsock");
914 #endif
915 }
916 
janet_net_deinit(void)917 void janet_net_deinit(void) {
918 #ifdef JANET_WINDOWS
919     WSACleanup();
920 #endif
921 }
922 
923 #endif
924