1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
2 // Licensed under the MIT License:
3 //
4 // Permission is hereby granted, free of charge, to any person obtaining a copy
5 // of this software and associated documentation files (the "Software"), to deal
6 // in the Software without restriction, including without limitation the rights
7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 // copies of the Software, and to permit persons to whom the Software is
9 // furnished to do so, subject to the following conditions:
10 //
11 // The above copyright notice and this permission notice shall be included in
12 // all copies or substantial portions of the Software.
13 //
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20 // THE SOFTWARE.
21 
22 #if !_WIN32
23 // For Win32 implementation, see async-io-win32.c++.
24 
25 #ifndef _GNU_SOURCE
26 #define _GNU_SOURCE
27 #endif
28 
29 #include "async-io.h"
30 #include "async-io-internal.h"
31 #include "async-unix.h"
32 #include "debug.h"
33 #include "thread.h"
34 #include "io.h"
35 #include "miniposix.h"
36 #include <unistd.h>
37 #include <sys/uio.h>
38 #include <errno.h>
39 #include <fcntl.h>
40 #include <sys/types.h>
41 #include <sys/socket.h>
42 #include <sys/un.h>
43 #include <netinet/in.h>
44 #include <netinet/tcp.h>
45 #include <stddef.h>
46 #include <stdlib.h>
47 #include <arpa/inet.h>
48 #include <netdb.h>
49 #include <set>
50 #include <poll.h>
51 #include <limits.h>
52 #include <sys/ioctl.h>
53 
54 namespace kj {
55 
56 namespace {
57 
setNonblocking(int fd)58 void setNonblocking(int fd) {
59 #ifdef FIONBIO
60   int opt = 1;
61   KJ_SYSCALL(ioctl(fd, FIONBIO, &opt));
62 #else
63   int flags;
64   KJ_SYSCALL(flags = fcntl(fd, F_GETFL));
65   if ((flags & O_NONBLOCK) == 0) {
66     KJ_SYSCALL(fcntl(fd, F_SETFL, flags | O_NONBLOCK));
67   }
68 #endif
69 }
70 
setCloseOnExec(int fd)71 void setCloseOnExec(int fd) {
72 #ifdef FIOCLEX
73   KJ_SYSCALL(ioctl(fd, FIOCLEX));
74 #else
75   int flags;
76   KJ_SYSCALL(flags = fcntl(fd, F_GETFD));
77   if ((flags & FD_CLOEXEC) == 0) {
78     KJ_SYSCALL(fcntl(fd, F_SETFD, flags | FD_CLOEXEC));
79   }
80 #endif
81 }
82 
83 static constexpr uint NEW_FD_FLAGS =
84 #if __linux__ && !__BIONIC__
85     LowLevelAsyncIoProvider::ALREADY_CLOEXEC | LowLevelAsyncIoProvider::ALREADY_NONBLOCK |
86 #endif
87     LowLevelAsyncIoProvider::TAKE_OWNERSHIP;
88 // We always try to open FDs with CLOEXEC and NONBLOCK already set on Linux, but on other platforms
89 // this is not possible.
90 
91 class OwnedFileDescriptor {
92 public:
OwnedFileDescriptor(int fd,uint flags)93   OwnedFileDescriptor(int fd, uint flags): fd(fd), flags(flags) {
94     if (flags & LowLevelAsyncIoProvider::ALREADY_NONBLOCK) {
95       KJ_DREQUIRE(fcntl(fd, F_GETFL) & O_NONBLOCK, "You claimed you set NONBLOCK, but you didn't.");
96     } else {
97       setNonblocking(fd);
98     }
99 
100     if (flags & LowLevelAsyncIoProvider::TAKE_OWNERSHIP) {
101       if (flags & LowLevelAsyncIoProvider::ALREADY_CLOEXEC) {
102         KJ_DREQUIRE(fcntl(fd, F_GETFD) & FD_CLOEXEC,
103                     "You claimed you set CLOEXEC, but you didn't.");
104       } else {
105         setCloseOnExec(fd);
106       }
107     }
108   }
109 
~OwnedFileDescriptor()110   ~OwnedFileDescriptor() noexcept(false) {
111     // Don't use SYSCALL() here because close() should not be repeated on EINTR.
112     if ((flags & LowLevelAsyncIoProvider::TAKE_OWNERSHIP) && close(fd) < 0) {
113       KJ_FAIL_SYSCALL("close", errno, fd) {
114         // Recoverable exceptions are safe in destructors.
115         break;
116       }
117     }
118   }
119 
120 protected:
121   const int fd;
122 
123 private:
124   uint flags;
125 };
126 
127 // =======================================================================================
128 
129 class AsyncStreamFd: public OwnedFileDescriptor, public AsyncCapabilityStream {
130 public:
AsyncStreamFd(UnixEventPort & eventPort,int fd,uint flags)131   AsyncStreamFd(UnixEventPort& eventPort, int fd, uint flags)
132       : OwnedFileDescriptor(fd, flags),
133         eventPort(eventPort),
134         observer(eventPort, fd, UnixEventPort::FdObserver::OBSERVE_READ_WRITE) {}
~AsyncStreamFd()135   virtual ~AsyncStreamFd() noexcept(false) {}
136 
tryRead(void * buffer,size_t minBytes,size_t maxBytes)137   Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
138     return tryReadInternal(buffer, minBytes, maxBytes, nullptr, 0, {0,0})
139         .then([](ReadResult r) { return r.byteCount; });
140   }
141 
tryReadWithFds(void * buffer,size_t minBytes,size_t maxBytes,AutoCloseFd * fdBuffer,size_t maxFds)142   Promise<ReadResult> tryReadWithFds(void* buffer, size_t minBytes, size_t maxBytes,
143                                      AutoCloseFd* fdBuffer, size_t maxFds) override {
144     return tryReadInternal(buffer, minBytes, maxBytes, fdBuffer, maxFds, {0,0});
145   }
146 
tryReadWithStreams(void * buffer,size_t minBytes,size_t maxBytes,Own<AsyncCapabilityStream> * streamBuffer,size_t maxStreams)147   Promise<ReadResult> tryReadWithStreams(
148       void* buffer, size_t minBytes, size_t maxBytes,
149       Own<AsyncCapabilityStream>* streamBuffer, size_t maxStreams) override {
150     auto fdBuffer = kj::heapArray<AutoCloseFd>(maxStreams);
151     auto promise = tryReadInternal(buffer, minBytes, maxBytes, fdBuffer.begin(), maxStreams, {0,0});
152 
153     return promise.then([this, fdBuffer = kj::mv(fdBuffer), streamBuffer]
154                         (ReadResult result) mutable {
155       for (auto i: kj::zeroTo(result.capCount)) {
156         streamBuffer[i] = kj::heap<AsyncStreamFd>(eventPort, fdBuffer[i].release(),
157             LowLevelAsyncIoProvider::TAKE_OWNERSHIP | LowLevelAsyncIoProvider::ALREADY_CLOEXEC);
158       }
159       return result;
160     });
161   }
162 
write(const void * buffer,size_t size)163   Promise<void> write(const void* buffer, size_t size) override {
164     ssize_t n;
165     KJ_NONBLOCKING_SYSCALL(n = ::write(fd, buffer, size)) {
166       // Error.
167 
168       // We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to
169       // a bug that exists in both Clang and GCC:
170       //   http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799
171       //   http://llvm.org/bugs/show_bug.cgi?id=12286
172       goto error;
173     }
174     if (false) {
175     error:
176       return kj::READY_NOW;
177     }
178 
179     if (n < 0) {
180       // EAGAIN -- need to wait for writability and try again.
181       return observer.whenBecomesWritable().then([=]() {
182         return write(buffer, size);
183       });
184     } else if (n == size) {
185       // All done.
186       return READY_NOW;
187     } else {
188       // Fewer than `size` bytes were written, but we CANNOT assume we're out of buffer space, as
189       // Linux is known to return partial reads/writes when interrupted by a signal -- yes, even
190       // for non-blocking operations. So, we'll need to write() again now, even though it will
191       // almost certainly fail with EAGAIN. See comments in the read path for more info.
192       buffer = reinterpret_cast<const byte*>(buffer) + n;
193       size -= n;
194       return write(buffer, size);
195     }
196   }
197 
write(ArrayPtr<const ArrayPtr<const byte>> pieces)198   Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
199     if (pieces.size() == 0) {
200       return writeInternal(nullptr, nullptr, nullptr);
201     } else {
202       return writeInternal(pieces[0], pieces.slice(1, pieces.size()), nullptr);
203     }
204   }
205 
writeWithFds(ArrayPtr<const byte> data,ArrayPtr<const ArrayPtr<const byte>> moreData,ArrayPtr<const int> fds)206   Promise<void> writeWithFds(ArrayPtr<const byte> data,
207                              ArrayPtr<const ArrayPtr<const byte>> moreData,
208                              ArrayPtr<const int> fds) override {
209     return writeInternal(data, moreData, fds);
210   }
211 
writeWithStreams(ArrayPtr<const byte> data,ArrayPtr<const ArrayPtr<const byte>> moreData,Array<Own<AsyncCapabilityStream>> streams)212   Promise<void> writeWithStreams(ArrayPtr<const byte> data,
213                                  ArrayPtr<const ArrayPtr<const byte>> moreData,
214                                  Array<Own<AsyncCapabilityStream>> streams) override {
215     auto fds = KJ_MAP(stream, streams) {
216       return downcast<AsyncStreamFd>(*stream).fd;
217     };
218     auto promise = writeInternal(data, moreData, fds);
219     return promise.attach(kj::mv(fds), kj::mv(streams));
220   }
221 
whenWriteDisconnected()222   Promise<void> whenWriteDisconnected() override {
223     KJ_IF_MAYBE(p, writeDisconnectedPromise) {
224       return p->addBranch();
225     } else {
226       auto fork = observer.whenWriteDisconnected().fork();
227       auto result = fork.addBranch();
228       writeDisconnectedPromise = kj::mv(fork);
229       return kj::mv(result);
230     }
231   }
232 
shutdownWrite()233   void shutdownWrite() override {
234     // There's no legitimate way to get an AsyncStreamFd that isn't a socket through the
235     // UnixAsyncIoProvider interface.
236     KJ_SYSCALL(shutdown(fd, SHUT_WR));
237   }
238 
abortRead()239   void abortRead() override {
240     // There's no legitimate way to get an AsyncStreamFd that isn't a socket through the
241     // UnixAsyncIoProvider interface.
242     KJ_SYSCALL(shutdown(fd, SHUT_RD));
243   }
244 
getsockopt(int level,int option,void * value,uint * length)245   void getsockopt(int level, int option, void* value, uint* length) override {
246     socklen_t socklen = *length;
247     KJ_SYSCALL(::getsockopt(fd, level, option, value, &socklen));
248     *length = socklen;
249   }
250 
setsockopt(int level,int option,const void * value,uint length)251   void setsockopt(int level, int option, const void* value, uint length) override {
252     KJ_SYSCALL(::setsockopt(fd, level, option, value, length));
253   }
254 
getsockname(struct sockaddr * addr,uint * length)255   void getsockname(struct sockaddr* addr, uint* length) override {
256     socklen_t socklen = *length;
257     KJ_SYSCALL(::getsockname(fd, addr, &socklen));
258     *length = socklen;
259   }
260 
getpeername(struct sockaddr * addr,uint * length)261   void getpeername(struct sockaddr* addr, uint* length) override {
262     socklen_t socklen = *length;
263     KJ_SYSCALL(::getpeername(fd, addr, &socklen));
264     *length = socklen;
265   }
266 
waitConnected()267   Promise<void> waitConnected() {
268     // Wait until initial connection has completed. This actually just waits until it is writable.
269 
270     // Can't just go directly to writeObserver.whenBecomesWritable() because of edge triggering. We
271     // need to explicitly check if the socket is already connected.
272 
273     struct pollfd pollfd;
274     memset(&pollfd, 0, sizeof(pollfd));
275     pollfd.fd = fd;
276     pollfd.events = POLLOUT;
277 
278     int pollResult;
279     KJ_SYSCALL(pollResult = poll(&pollfd, 1, 0));
280 
281     if (pollResult == 0) {
282       // Not ready yet. We can safely use the edge-triggered observer.
283       return observer.whenBecomesWritable();
284     } else {
285       // Ready now.
286       return kj::READY_NOW;
287     }
288   }
289 
290 private:
291   UnixEventPort& eventPort;
292   UnixEventPort::FdObserver observer;
293   Maybe<ForkedPromise<void>> writeDisconnectedPromise;
294 
tryReadInternal(void * buffer,size_t minBytes,size_t maxBytes,AutoCloseFd * fdBuffer,size_t maxFds,ReadResult alreadyRead)295   Promise<ReadResult> tryReadInternal(void* buffer, size_t minBytes, size_t maxBytes,
296                                       AutoCloseFd* fdBuffer, size_t maxFds,
297                                       ReadResult alreadyRead) {
298     // `alreadyRead` is the number of bytes we have already received via previous reads -- minBytes,
299     // maxBytes, and buffer have already been adjusted to account for them, but this count must
300     // be included in the final return value.
301 
302     ssize_t n;
303     if (maxFds == 0) {
304       KJ_NONBLOCKING_SYSCALL(n = ::read(fd, buffer, maxBytes)) {
305         // Error.
306 
307         // We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to
308         // a bug that exists in both Clang and GCC:
309         //   http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799
310         //   http://llvm.org/bugs/show_bug.cgi?id=12286
311         goto error;
312       }
313     } else {
314       struct msghdr msg;
315       memset(&msg, 0, sizeof(msg));
316 
317       struct iovec iov;
318       memset(&iov, 0, sizeof(iov));
319       iov.iov_base = buffer;
320       iov.iov_len = maxBytes;
321       msg.msg_iov = &iov;
322       msg.msg_iovlen = 1;
323 
324       // Allocate space to receive a cmsg.
325 #if __APPLE__ || __FreeBSD__
326       // Until very recently (late 2018 / early 2019), FreeBSD suffered from a bug in which when
327       // an SCM_RIGHTS message was truncated on delivery, it would not close the FDs that weren't
328       // delivered -- they would simply leak: https://bugs.freebsd.org/131876
329       //
330       // My testing indicates that MacOS has this same bug as of today (April 2019). I don't know
331       // if they plan to fix it or are even aware of it.
332       //
333       // To handle both cases, we will always provide space to receive 512 FDs. Hopefully, this is
334       // greater than the maximum number of FDs that these kernels will transmit in one message
335       // PLUS enough space for any other ancillary messages that could be sent before the
336       // SCM_RIGHTS message to push it back in the buffer. I couldn't find any firm documentation
337       // on these limits, though -- I only know that Linux is limited to 253, and I saw a hint in
338       // a comment in someone else's application that suggested FreeBSD is the same. Hopefully,
339       // then, this is sufficient to prevent attacks. But if not, there's nothing more we can do;
340       // it's really up to the kernel to fix this.
341       size_t msgBytes = CMSG_SPACE(sizeof(int) * 512);
342 #else
343       size_t msgBytes = CMSG_SPACE(sizeof(int) * maxFds);
344 #endif
345       // On Linux, CMSG_SPACE will align to a word-size boundary, but on Mac it always aligns to a
346       // 32-bit boundary. I guess aligning to 32 bits helps avoid the problem where you
347       // surprisingly end up with space for two file descriptors when you only wanted one. However,
348       // cmsghdr's preferred alignment is word-size (it contains a size_t). If we stack-allocate
349       // the buffer, we need to make sure it is aligned properly (maybe not on x64, but maybe on
350       // other platforms), so we want to allocate an array of words (we use void*). So... we use
351       // CMSG_SPACE() and then additionally round up to deal with Mac.
352       size_t msgWords = (msgBytes + sizeof(void*) - 1) / sizeof(void*);
353       KJ_STACK_ARRAY(void*, cmsgSpace, msgWords, 16, 256);
354       auto cmsgBytes = cmsgSpace.asBytes();
355       memset(cmsgBytes.begin(), 0, cmsgBytes.size());
356       msg.msg_control = cmsgBytes.begin();
357       msg.msg_controllen = msgBytes;
358 
359 #ifdef MSG_CMSG_CLOEXEC
360       static constexpr int RECVMSG_FLAGS = MSG_CMSG_CLOEXEC;
361 #else
362       static constexpr int RECVMSG_FLAGS = 0;
363 #endif
364 
365       KJ_NONBLOCKING_SYSCALL(n = ::recvmsg(fd, &msg, RECVMSG_FLAGS)) {
366         // Error.
367 
368         // We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to
369         // a bug that exists in both Clang and GCC:
370         //   http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799
371         //   http://llvm.org/bugs/show_bug.cgi?id=12286
372         goto error;
373       }
374 
375       if (n >= 0) {
376         // Process all messages.
377         //
378         // WARNING DANGER: We have to be VERY careful not to miss a file descriptor here, because
379         // if we do, then that FD will never be closed, and a malicious peer could exploit this to
380         // fill up our FD table, creating a DoS attack. Some things to keep in mind:
381         // - CMSG_SPACE() could have rounded up the space for alignment purposes, and this could
382         //   mean we permitted the kernel to deliver more file descriptors than `maxFds`. We need
383         //   to close the extras.
384         // - We can receive multiple ancillary messages at once. In particular, there is also
385         //   SCM_CREDENTIALS. The sender decides what to send. They could send SCM_CREDENTIALS
386         //   first followed by SCM_RIGHTS. We need to make sure we see both.
387         size_t nfds = 0;
388         size_t spaceLeft = msg.msg_controllen;
389         for (struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
390             cmsg != nullptr; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
391           if (spaceLeft >= CMSG_LEN(0) &&
392               cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
393             // Some operating systems (like MacOS) do not adjust csmg_len when the message is
394             // truncated. We must do so ourselves or risk overrunning the buffer.
395             auto len = kj::min(cmsg->cmsg_len, spaceLeft);
396             auto data = arrayPtr(reinterpret_cast<int*>(CMSG_DATA(cmsg)),
397                                  (len - CMSG_LEN(0)) / sizeof(int));
398             kj::Vector<kj::AutoCloseFd> trashFds;
399             for (auto fd: data) {
400               kj::AutoCloseFd ownFd(fd);
401               if (nfds < maxFds) {
402                 fdBuffer[nfds++] = kj::mv(ownFd);
403               } else {
404                 trashFds.add(kj::mv(ownFd));
405               }
406             }
407           }
408 
409           if (spaceLeft >= CMSG_LEN(0) && spaceLeft >= cmsg->cmsg_len) {
410             spaceLeft -= cmsg->cmsg_len;
411           } else {
412             spaceLeft = 0;
413           }
414         }
415 
416 #ifndef MSG_CMSG_CLOEXEC
417         for (size_t i = 0; i < nfds; i++) {
418           setCloseOnExec(fdBuffer[i]);
419         }
420 #endif
421 
422         alreadyRead.capCount += nfds;
423         fdBuffer += nfds;
424         maxFds -= nfds;
425       }
426     }
427 
428     if (false) {
429     error:
430       return alreadyRead;
431     }
432 
433     if (n < 0) {
434       // Read would block.
435       return observer.whenBecomesReadable().then([=]() {
436         return tryReadInternal(buffer, minBytes, maxBytes, fdBuffer, maxFds, alreadyRead);
437       });
438     } else if (n == 0) {
439       // EOF -OR- maxBytes == 0.
440       return alreadyRead;
441     } else if (implicitCast<size_t>(n) >= minBytes) {
442       // We read enough to stop here.
443       alreadyRead.byteCount += n;
444       return alreadyRead;
445     } else {
446       // The kernel returned fewer bytes than we asked for (and fewer than we need).
447 
448       buffer = reinterpret_cast<byte*>(buffer) + n;
449       minBytes -= n;
450       maxBytes -= n;
451       alreadyRead.byteCount += n;
452 
453       // According to David Klempner, who works on Stubby at Google, we sadly CANNOT assume that
454       // we've consumed the whole read buffer here. If a signal is delivered in the middle of a
455       // read() -- yes, even a non-blocking read -- it can cause the kernel to return a partial
456       // result, with data still in the buffer.
457       //     https://bugzilla.kernel.org/show_bug.cgi?id=199131
458       //     https://twitter.com/CaptainSegfault/status/1112622245531144194
459       //
460       // Unfortunately, we have no choice but to issue more read()s until it either tells us EOF
461       // or EAGAIN. We used to have an optimization here using observer.atEndHint() (when it is
462       // non-null) to avoid a redundant call to read(). Alas...
463       return tryReadInternal(buffer, minBytes, maxBytes, fdBuffer, maxFds, alreadyRead);
464     }
465   }
466 
writeInternal(ArrayPtr<const byte> firstPiece,ArrayPtr<const ArrayPtr<const byte>> morePieces,ArrayPtr<const int> fds)467   Promise<void> writeInternal(ArrayPtr<const byte> firstPiece,
468                               ArrayPtr<const ArrayPtr<const byte>> morePieces,
469                               ArrayPtr<const int> fds) {
470     const size_t iovmax = kj::miniposix::iovMax(1 + morePieces.size());
471     // If there are more than IOV_MAX pieces, we'll only write the first IOV_MAX for now, and
472     // then we'll loop later.
473     KJ_STACK_ARRAY(struct iovec, iov, kj::min(1 + morePieces.size(), iovmax), 16, 128);
474     size_t iovTotal = 0;
475 
476     // writev() interface is not const-correct.  :(
477     iov[0].iov_base = const_cast<byte*>(firstPiece.begin());
478     iov[0].iov_len = firstPiece.size();
479     iovTotal += iov[0].iov_len;
480     for (uint i = 1; i < iov.size(); i++) {
481       iov[i].iov_base = const_cast<byte*>(morePieces[i - 1].begin());
482       iov[i].iov_len = morePieces[i - 1].size();
483       iovTotal += iov[i].iov_len;
484     }
485 
486     if (iovTotal == 0) {
487       KJ_REQUIRE(fds.size() == 0, "can't write FDs without bytes");
488       return kj::READY_NOW;
489     }
490 
491     ssize_t n;
492     if (fds.size() == 0) {
493       KJ_NONBLOCKING_SYSCALL(n = ::writev(fd, iov.begin(), iov.size())) {
494         // Error.
495 
496         // We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to
497         // a bug that exists in both Clang and GCC:
498         //   http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799
499         //   http://llvm.org/bugs/show_bug.cgi?id=12286
500         goto error;
501       }
502     } else {
503       struct msghdr msg;
504       memset(&msg, 0, sizeof(msg));
505       msg.msg_iov = iov.begin();
506       msg.msg_iovlen = iov.size();
507 
508       // Allocate space to receive a cmsg.
509       size_t msgBytes = CMSG_SPACE(sizeof(int) * fds.size());
510       // On Linux, CMSG_SPACE will align to a word-size boundary, but on Mac it always aligns to a
511       // 32-bit boundary. I guess aligning to 32 bits helps avoid the problem where you
512       // surprisingly end up with space for two file descriptors when you only wanted one. However,
513       // cmsghdr's preferred alignment is word-size (it contains a size_t). If we stack-allocate
514       // the buffer, we need to make sure it is aligned properly (maybe not on x64, but maybe on
515       // other platforms), so we want to allocate an array of words (we use void*). So... we use
516       // CMSG_SPACE() and then additionally round up to deal with Mac.
517       size_t msgWords = (msgBytes + sizeof(void*) - 1) / sizeof(void*);
518       KJ_STACK_ARRAY(void*, cmsgSpace, msgWords, 16, 256);
519       auto cmsgBytes = cmsgSpace.asBytes();
520       memset(cmsgBytes.begin(), 0, cmsgBytes.size());
521       msg.msg_control = cmsgBytes.begin();
522       msg.msg_controllen = msgBytes;
523 
524       struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
525       cmsg->cmsg_level = SOL_SOCKET;
526       cmsg->cmsg_type = SCM_RIGHTS;
527       cmsg->cmsg_len = CMSG_LEN(sizeof(int) * fds.size());
528       memcpy(CMSG_DATA(cmsg), fds.begin(), fds.asBytes().size());
529 
530       KJ_NONBLOCKING_SYSCALL(n = ::sendmsg(fd, &msg, 0)) {
531         // Error.
532 
533         // We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to
534         // a bug that exists in both Clang and GCC:
535         //   http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799
536         //   http://llvm.org/bugs/show_bug.cgi?id=12286
537         goto error;
538       }
539     }
540 
541     if (false) {
542     error:
543       return kj::READY_NOW;
544     }
545 
546     if (n < 0) {
547       // Got EAGAIN. Nothing was written.
548       return observer.whenBecomesWritable().then([=]() {
549         return writeInternal(firstPiece, morePieces, fds);
550       });
551     } else if (n == 0) {
552       // Why would a sendmsg() with a non-empty message ever return 0 when writing to a stream
553       // socket? If there's no room in the send buffer, it should fail with EAGAIN. If the
554       // connection is closed, it should fail with EPIPE. Various documents and forum posts around
555       // the internet claim this can happen but no one seems to know when. My guess is it can only
556       // happen if we try to send an empty message -- which we didn't. So I think this is
557       // impossible. If it is possible, we need to figure out how to correctly handle it, which
558       // depends on what caused it.
559       //
560       // Note in particular that if 0 is a valid return here, and we sent an SCM_RIGHTS message,
561       // we need to know whether the message was sent or not, in order to decide whether to retry
562       // sending it!
563       KJ_FAIL_ASSERT("non-empty sendmsg() returned 0");
564     }
565 
566     // Non-zero bytes were written. This also implies that *all* FDs were written.
567 
568     // Discard all data that was written, then issue a new write for what's left (if any).
569     for (;;) {
570       if (n < firstPiece.size()) {
571         // Only part of the first piece was consumed.  Wait for buffer space and then write again.
572         firstPiece = firstPiece.slice(n, firstPiece.size());
573         iovTotal -= n;
574 
575         if (iovTotal == 0) {
576           // Oops, what actually happened is that we hit the IOV_MAX limit. Don't wait.
577           return writeInternal(firstPiece, morePieces, nullptr);
578         }
579 
580         // As with read(), we cannot assume that a short write() really means the write buffer is
581         // full (see comments in the read path above). We have to write again.
582         return writeInternal(firstPiece, morePieces, nullptr);
583       } else if (morePieces.size() == 0) {
584         // First piece was fully-consumed and there are no more pieces, so we're done.
585         KJ_DASSERT(n == firstPiece.size(), n);
586         return READY_NOW;
587       } else {
588         // First piece was fully consumed, so move on to the next piece.
589         n -= firstPiece.size();
590         iovTotal -= firstPiece.size();
591         firstPiece = morePieces[0];
592         morePieces = morePieces.slice(1, morePieces.size());
593       }
594     }
595   }
596 };
597 
598 // =======================================================================================
599 
600 class SocketAddress {
601 public:
SocketAddress(const void * sockaddr,uint len)602   SocketAddress(const void* sockaddr, uint len): addrlen(len) {
603     KJ_REQUIRE(len <= sizeof(addr), "Sorry, your sockaddr is too big for me.");
604     memcpy(&addr.generic, sockaddr, len);
605   }
606 
operator <(const SocketAddress & other) const607   bool operator<(const SocketAddress& other) const {
608     // So we can use std::set<SocketAddress>...  see DNS lookup code.
609 
610     if (wildcard < other.wildcard) return true;
611     if (wildcard > other.wildcard) return false;
612 
613     if (addrlen < other.addrlen) return true;
614     if (addrlen > other.addrlen) return false;
615 
616     return memcmp(&addr.generic, &other.addr.generic, addrlen) < 0;
617   }
618 
getRaw() const619   const struct sockaddr* getRaw() const { return &addr.generic; }
getRawSize() const620   socklen_t getRawSize() const { return addrlen; }
621 
socket(int type) const622   int socket(int type) const {
623     bool isStream = type == SOCK_STREAM;
624 
625     int result;
626 #if __linux__ && !__BIONIC__
627     type |= SOCK_NONBLOCK | SOCK_CLOEXEC;
628 #endif
629     KJ_SYSCALL(result = ::socket(addr.generic.sa_family, type, 0));
630 
631     if (isStream && (addr.generic.sa_family == AF_INET ||
632                      addr.generic.sa_family == AF_INET6)) {
633       // TODO(perf):  As a hack for the 0.4 release we are always setting
634       //   TCP_NODELAY because Nagle's algorithm pretty much kills Cap'n Proto's
635       //   RPC protocol.  Later, we should extend the interface to provide more
636       //   control over this.  Perhaps write() should have a flag which
637       //   specifies whether to pass MSG_MORE.
638       int one = 1;
639       KJ_SYSCALL(setsockopt(
640           result, IPPROTO_TCP, TCP_NODELAY, (char*)&one, sizeof(one)));
641     }
642 
643     return result;
644   }
645 
bind(int sockfd) const646   void bind(int sockfd) const {
647 #if !defined(__OpenBSD__)
648     if (wildcard) {
649       // Disable IPV6_V6ONLY because we want to handle both ipv4 and ipv6 on this socket.  (The
650       // default value of this option varies across platforms.)
651       int value = 0;
652       KJ_SYSCALL(setsockopt(sockfd, IPPROTO_IPV6, IPV6_V6ONLY, &value, sizeof(value)));
653     }
654 #endif
655 
656     KJ_SYSCALL(::bind(sockfd, &addr.generic, addrlen), toString());
657   }
658 
getPort() const659   uint getPort() const {
660     switch (addr.generic.sa_family) {
661       case AF_INET: return ntohs(addr.inet4.sin_port);
662       case AF_INET6: return ntohs(addr.inet6.sin6_port);
663       default: return 0;
664     }
665   }
666 
toString() const667   String toString() const {
668     if (wildcard) {
669       return str("*:", getPort());
670     }
671 
672     switch (addr.generic.sa_family) {
673       case AF_INET: {
674         char buffer[INET6_ADDRSTRLEN];
675         if (inet_ntop(addr.inet4.sin_family, &addr.inet4.sin_addr,
676                       buffer, sizeof(buffer)) == nullptr) {
677           KJ_FAIL_SYSCALL("inet_ntop", errno) { break; }
678           return heapString("(inet_ntop error)");
679         }
680         return str(buffer, ':', ntohs(addr.inet4.sin_port));
681       }
682       case AF_INET6: {
683         char buffer[INET6_ADDRSTRLEN];
684         if (inet_ntop(addr.inet6.sin6_family, &addr.inet6.sin6_addr,
685                       buffer, sizeof(buffer)) == nullptr) {
686           KJ_FAIL_SYSCALL("inet_ntop", errno) { break; }
687           return heapString("(inet_ntop error)");
688         }
689         return str('[', buffer, "]:", ntohs(addr.inet6.sin6_port));
690       }
691       case AF_UNIX: {
692         auto path = _::safeUnixPath(&addr.unixDomain, addrlen);
693         if (path.size() > 0 && path[0] == '\0') {
694           return str("unix-abstract:", path.slice(1, path.size()));
695         } else {
696           return str("unix:", path);
697         }
698       }
699       default:
700         return str("(unknown address family ", addr.generic.sa_family, ")");
701     }
702   }
703 
704   static Promise<Array<SocketAddress>> lookupHost(
705       LowLevelAsyncIoProvider& lowLevel, kj::String host, kj::String service, uint portHint,
706       _::NetworkFilter& filter);
707   // Perform a DNS lookup.
708 
parse(LowLevelAsyncIoProvider & lowLevel,StringPtr str,uint portHint,_::NetworkFilter & filter)709   static Promise<Array<SocketAddress>> parse(
710       LowLevelAsyncIoProvider& lowLevel, StringPtr str, uint portHint, _::NetworkFilter& filter) {
711     // TODO(someday):  Allow commas in `str`.
712 
713     SocketAddress result;
714 
715     if (str.startsWith("unix:")) {
716       StringPtr path = str.slice(strlen("unix:"));
717       KJ_REQUIRE(path.size() < sizeof(addr.unixDomain.sun_path),
718                  "Unix domain socket address is too long.", str);
719       KJ_REQUIRE(path.size() == strlen(path.cStr()),
720                  "Unix domain socket address contains NULL. Use"
721                  " 'unix-abstract:' for the abstract namespace.");
722       result.addr.unixDomain.sun_family = AF_UNIX;
723       strcpy(result.addr.unixDomain.sun_path, path.cStr());
724       result.addrlen = offsetof(struct sockaddr_un, sun_path) + path.size() + 1;
725 
726       if (!result.parseAllowedBy(filter)) {
727         KJ_FAIL_REQUIRE("unix sockets blocked by restrictPeers()");
728         return Array<SocketAddress>();
729       }
730 
731       auto array = kj::heapArrayBuilder<SocketAddress>(1);
732       array.add(result);
733       return array.finish();
734     }
735 
736     if (str.startsWith("unix-abstract:")) {
737       StringPtr path = str.slice(strlen("unix-abstract:"));
738       KJ_REQUIRE(path.size() + 1 < sizeof(addr.unixDomain.sun_path),
739                  "Unix domain socket address is too long.", str);
740       result.addr.unixDomain.sun_family = AF_UNIX;
741       result.addr.unixDomain.sun_path[0] = '\0';
742       // although not strictly required by Linux, also copy the trailing
743       // NULL terminator so that we can safely read it back in toString
744       memcpy(result.addr.unixDomain.sun_path + 1, path.cStr(), path.size() + 1);
745       result.addrlen = offsetof(struct sockaddr_un, sun_path) + path.size() + 1;
746 
747       if (!result.parseAllowedBy(filter)) {
748         KJ_FAIL_REQUIRE("abstract unix sockets blocked by restrictPeers()");
749         return Array<SocketAddress>();
750       }
751 
752       auto array = kj::heapArrayBuilder<SocketAddress>(1);
753       array.add(result);
754       return array.finish();
755     }
756 
757     // Try to separate the address and port.
758     ArrayPtr<const char> addrPart;
759     Maybe<StringPtr> portPart;
760 
761     int af;
762 
763     if (str.startsWith("[")) {
764       // Address starts with a bracket, which is a common way to write an ip6 address with a port,
765       // since without brackets around the address part, the port looks like another segment of
766       // the address.
767       af = AF_INET6;
768       size_t closeBracket = KJ_ASSERT_NONNULL(str.findLast(']'),
769           "Unclosed '[' in address string.", str);
770 
771       addrPart = str.slice(1, closeBracket);
772       if (str.size() > closeBracket + 1) {
773         KJ_REQUIRE(str.slice(closeBracket + 1).startsWith(":"),
774                    "Expected port suffix after ']'.", str);
775         portPart = str.slice(closeBracket + 2);
776       }
777     } else {
778       KJ_IF_MAYBE(colon, str.findFirst(':')) {
779         if (str.slice(*colon + 1).findFirst(':') == nullptr) {
780           // There is exactly one colon and no brackets, so it must be an ip4 address with port.
781           af = AF_INET;
782           addrPart = str.slice(0, *colon);
783           portPart = str.slice(*colon + 1);
784         } else {
785           // There are two or more colons and no brackets, so the whole thing must be an ip6
786           // address with no port.
787           af = AF_INET6;
788           addrPart = str;
789         }
790       } else {
791         // No colons, so it must be an ip4 address without port.
792         af = AF_INET;
793         addrPart = str;
794       }
795     }
796 
797     // Parse the port.
798     unsigned long port;
799     KJ_IF_MAYBE(portText, portPart) {
800       char* endptr;
801       port = strtoul(portText->cStr(), &endptr, 0);
802       if (portText->size() == 0 || *endptr != '\0') {
803         // Not a number.  Maybe it's a service name.  Fall back to DNS.
804         return lookupHost(lowLevel, kj::heapString(addrPart), kj::heapString(*portText), portHint,
805                           filter);
806       }
807       KJ_REQUIRE(port < 65536, "Port number too large.");
808     } else {
809       port = portHint;
810     }
811 
812     // Check for wildcard.
813     if (addrPart.size() == 1 && addrPart[0] == '*') {
814       result.wildcard = true;
815 #if defined(__OpenBSD__)
816       // On OpenBSD, all sockets are either v4-only or v6-only, so use v4 as a
817       // temporary workaround for wildcards.
818       result.addrlen = sizeof(addr.inet4);
819       result.addr.inet4.sin_family = AF_INET;
820       result.addr.inet4.sin_port = htons(port);
821 #else
822       // Create an ip6 socket and set IPV6_V6ONLY to 0 later.
823       result.addrlen = sizeof(addr.inet6);
824       result.addr.inet6.sin6_family = AF_INET6;
825       result.addr.inet6.sin6_port = htons(port);
826 #endif
827 
828       auto array = kj::heapArrayBuilder<SocketAddress>(1);
829       array.add(result);
830       return array.finish();
831     }
832 
833     void* addrTarget;
834     if (af == AF_INET6) {
835       result.addrlen = sizeof(addr.inet6);
836       result.addr.inet6.sin6_family = AF_INET6;
837       result.addr.inet6.sin6_port = htons(port);
838       addrTarget = &result.addr.inet6.sin6_addr;
839     } else {
840       result.addrlen = sizeof(addr.inet4);
841       result.addr.inet4.sin_family = AF_INET;
842       result.addr.inet4.sin_port = htons(port);
843       addrTarget = &result.addr.inet4.sin_addr;
844     }
845 
846     if (addrPart.size() < INET6_ADDRSTRLEN - 1) {
847       // addrPart is not necessarily NUL-terminated so we have to make a copy.  :(
848       char buffer[INET6_ADDRSTRLEN];
849       memcpy(buffer, addrPart.begin(), addrPart.size());
850       buffer[addrPart.size()] = '\0';
851 
852       // OK, parse it!
853       switch (inet_pton(af, buffer, addrTarget)) {
854         case 1: {
855           // success.
856           if (!result.parseAllowedBy(filter)) {
857             KJ_FAIL_REQUIRE("address family blocked by restrictPeers()");
858             return Array<SocketAddress>();
859           }
860 
861           auto array = kj::heapArrayBuilder<SocketAddress>(1);
862           array.add(result);
863           return array.finish();
864         }
865         case 0:
866           // It's apparently not a simple address...  fall back to DNS.
867           break;
868         default:
869           KJ_FAIL_SYSCALL("inet_pton", errno, af, addrPart);
870       }
871     }
872 
873     return lookupHost(lowLevel, kj::heapString(addrPart), nullptr, port, filter);
874   }
875 
getLocalAddress(int sockfd)876   static SocketAddress getLocalAddress(int sockfd) {
877     SocketAddress result;
878     result.addrlen = sizeof(addr);
879     KJ_SYSCALL(getsockname(sockfd, &result.addr.generic, &result.addrlen));
880     return result;
881   }
882 
allowedBy(LowLevelAsyncIoProvider::NetworkFilter & filter)883   bool allowedBy(LowLevelAsyncIoProvider::NetworkFilter& filter) {
884     return filter.shouldAllow(&addr.generic, addrlen);
885   }
886 
parseAllowedBy(_::NetworkFilter & filter)887   bool parseAllowedBy(_::NetworkFilter& filter) {
888     return filter.shouldAllowParse(&addr.generic, addrlen);
889   }
890 
891 private:
SocketAddress()892   SocketAddress() {
893     // We need to memset the whole object 0 otherwise Valgrind gets unhappy when we write it to a
894     // pipe, due to the padding bytes being uninitialized.
895     memset(this, 0, sizeof(*this));
896   }
897 
898   socklen_t addrlen;
899   bool wildcard = false;
900   union {
901     struct sockaddr generic;
902     struct sockaddr_in inet4;
903     struct sockaddr_in6 inet6;
904     struct sockaddr_un unixDomain;
905     struct sockaddr_storage storage;
906   } addr;
907 
908   struct LookupParams;
909   class LookupReader;
910 };
911 
912 class SocketAddress::LookupReader {
913   // Reads SocketAddresses off of a pipe coming from another thread that is performing
914   // getaddrinfo.
915 
916 public:
LookupReader(kj::Own<Thread> && thread,kj::Own<AsyncInputStream> && input,_::NetworkFilter & filter)917   LookupReader(kj::Own<Thread>&& thread, kj::Own<AsyncInputStream>&& input,
918                _::NetworkFilter& filter)
919       : thread(kj::mv(thread)), input(kj::mv(input)), filter(filter) {}
920 
~LookupReader()921   ~LookupReader() {
922     if (thread) thread->detach();
923   }
924 
read()925   Promise<Array<SocketAddress>> read() {
926     return input->tryRead(&current, sizeof(current), sizeof(current)).then(
927         [this](size_t n) -> Promise<Array<SocketAddress>> {
928       if (n < sizeof(current)) {
929         thread = nullptr;
930         // getaddrinfo()'s docs seem to say it will never return an empty list, but let's check
931         // anyway.
932         KJ_REQUIRE(addresses.size() > 0, "DNS lookup returned no permitted addresses.") { break; }
933         return addresses.releaseAsArray();
934       } else {
935         // getaddrinfo() can return multiple copies of the same address for several reasons.
936         // A major one is that we don't give it a socket type (SOCK_STREAM vs. SOCK_DGRAM), so
937         // it may return two copies of the same address, one for each type, unless it explicitly
938         // knows that the service name given is specific to one type.  But we can't tell it a type,
939         // because we don't actually know which one the user wants, and if we specify SOCK_STREAM
940         // while the user specified a UDP service name then they'll get a resolution error which
941         // is lame.  (At least, I think that's how it works.)
942         //
943         // So we instead resort to de-duping results.
944         if (alreadySeen.insert(current).second) {
945           if (current.parseAllowedBy(filter)) {
946             addresses.add(current);
947           }
948         }
949         return read();
950       }
951     });
952   }
953 
954 private:
955   kj::Own<Thread> thread;
956   kj::Own<AsyncInputStream> input;
957   _::NetworkFilter& filter;
958   SocketAddress current;
959   kj::Vector<SocketAddress> addresses;
960   std::set<SocketAddress> alreadySeen;
961 };
962 
963 struct SocketAddress::LookupParams {
964   kj::String host;
965   kj::String service;
966 };
967 
lookupHost(LowLevelAsyncIoProvider & lowLevel,kj::String host,kj::String service,uint portHint,_::NetworkFilter & filter)968 Promise<Array<SocketAddress>> SocketAddress::lookupHost(
969     LowLevelAsyncIoProvider& lowLevel, kj::String host, kj::String service, uint portHint,
970     _::NetworkFilter& filter) {
971   // This shitty function spawns a thread to run getaddrinfo().  Unfortunately, getaddrinfo() is
972   // the only cross-platform DNS API and it is blocking.
973   //
974   // TODO(perf):  Use a thread pool?  Maybe kj::Thread should use a thread pool automatically?
975   //   Maybe use the various platform-specific asynchronous DNS libraries?  Please do not implement
976   //   a custom DNS resolver...
977 
978   int fds[2];
979 #if __linux__ && !__BIONIC__
980   KJ_SYSCALL(pipe2(fds, O_NONBLOCK | O_CLOEXEC));
981 #else
982   KJ_SYSCALL(pipe(fds));
983 #endif
984 
985   auto input = lowLevel.wrapInputFd(fds[0], NEW_FD_FLAGS);
986 
987   int outFd = fds[1];
988 
989   LookupParams params = { kj::mv(host), kj::mv(service) };
990 
991   auto thread = heap<Thread>(kj::mvCapture(params, [outFd,portHint](LookupParams&& params) {
992     FdOutputStream output((AutoCloseFd(outFd)));
993 
994     struct addrinfo* list;
995     int status = getaddrinfo(
996         params.host == "*" ? nullptr : params.host.cStr(),
997         params.service == nullptr ? nullptr : params.service.cStr(),
998         nullptr, &list);
999     if (status == 0) {
1000       KJ_DEFER(freeaddrinfo(list));
1001 
1002       struct addrinfo* cur = list;
1003       while (cur != nullptr) {
1004         if (params.service == nullptr) {
1005           switch (cur->ai_addr->sa_family) {
1006             case AF_INET:
1007               ((struct sockaddr_in*)cur->ai_addr)->sin_port = htons(portHint);
1008               break;
1009             case AF_INET6:
1010               ((struct sockaddr_in6*)cur->ai_addr)->sin6_port = htons(portHint);
1011               break;
1012             default:
1013               break;
1014           }
1015         }
1016 
1017         SocketAddress addr;
1018         if (params.host == "*") {
1019           // Set up a wildcard SocketAddress.  Only use the port number returned by getaddrinfo().
1020           addr.wildcard = true;
1021           addr.addrlen = sizeof(addr.addr.inet6);
1022           addr.addr.inet6.sin6_family = AF_INET6;
1023           switch (cur->ai_addr->sa_family) {
1024             case AF_INET:
1025               addr.addr.inet6.sin6_port = ((struct sockaddr_in*)cur->ai_addr)->sin_port;
1026               break;
1027             case AF_INET6:
1028               addr.addr.inet6.sin6_port = ((struct sockaddr_in6*)cur->ai_addr)->sin6_port;
1029               break;
1030             default:
1031               addr.addr.inet6.sin6_port = portHint;
1032               break;
1033           }
1034         } else {
1035           addr.addrlen = cur->ai_addrlen;
1036           memcpy(&addr.addr.generic, cur->ai_addr, cur->ai_addrlen);
1037         }
1038         KJ_ASSERT_CAN_MEMCPY(SocketAddress);
1039         output.write(&addr, sizeof(addr));
1040         cur = cur->ai_next;
1041       }
1042     } else if (status == EAI_SYSTEM) {
1043       KJ_FAIL_SYSCALL("getaddrinfo", errno, params.host, params.service) {
1044         return;
1045       }
1046     } else {
1047       KJ_FAIL_REQUIRE("DNS lookup failed.",
1048                       params.host, params.service, gai_strerror(status)) {
1049         return;
1050       }
1051     }
1052   }));
1053 
1054   auto reader = heap<LookupReader>(kj::mv(thread), kj::mv(input), filter);
1055   return reader->read().attach(kj::mv(reader));
1056 }
1057 
1058 // =======================================================================================
1059 
1060 class FdConnectionReceiver final: public ConnectionReceiver, public OwnedFileDescriptor {
1061 public:
FdConnectionReceiver(UnixEventPort & eventPort,int fd,LowLevelAsyncIoProvider::NetworkFilter & filter,uint flags)1062   FdConnectionReceiver(UnixEventPort& eventPort, int fd,
1063                        LowLevelAsyncIoProvider::NetworkFilter& filter, uint flags)
1064       : OwnedFileDescriptor(fd, flags), eventPort(eventPort), filter(filter),
1065         observer(eventPort, fd, UnixEventPort::FdObserver::OBSERVE_READ) {}
1066 
accept()1067   Promise<Own<AsyncIoStream>> accept() override {
1068     int newFd;
1069 
1070     struct sockaddr_storage addr;
1071     socklen_t addrlen = sizeof(addr);
1072 
1073   retry:
1074 #if __linux__ && !__BIONIC__
1075     newFd = ::accept4(fd, reinterpret_cast<struct sockaddr*>(&addr), &addrlen,
1076                       SOCK_NONBLOCK | SOCK_CLOEXEC);
1077 #else
1078     newFd = ::accept(fd, reinterpret_cast<struct sockaddr*>(&addr), &addrlen);
1079 #endif
1080 
1081     if (newFd >= 0) {
1082       if (!filter.shouldAllow(reinterpret_cast<struct sockaddr*>(&addr), addrlen)) {
1083         // Drop disallowed address.
1084         close(newFd);
1085         return accept();
1086       } else {
1087         return Own<AsyncIoStream>(heap<AsyncStreamFd>(eventPort, newFd, NEW_FD_FLAGS));
1088       }
1089     } else {
1090       int error = errno;
1091 
1092       switch (error) {
1093         case EAGAIN:
1094 #if EAGAIN != EWOULDBLOCK
1095         case EWOULDBLOCK:
1096 #endif
1097           // Not ready yet.
1098           return observer.whenBecomesReadable().then([this]() {
1099             return accept();
1100           });
1101 
1102         case EINTR:
1103         case ENETDOWN:
1104 #ifdef EPROTO
1105         // EPROTO is not defined on OpenBSD.
1106         case EPROTO:
1107 #endif
1108         case EHOSTDOWN:
1109         case EHOSTUNREACH:
1110         case ENETUNREACH:
1111         case ECONNABORTED:
1112         case ETIMEDOUT:
1113           // According to the Linux man page, accept() may report an error if the accepted
1114           // connection is already broken.  In this case, we really ought to just ignore it and
1115           // keep waiting.  But it's hard to say exactly what errors are such network errors and
1116           // which ones are permanent errors.  We've made a guess here.
1117           goto retry;
1118 
1119         default:
1120           KJ_FAIL_SYSCALL("accept", error);
1121       }
1122 
1123     }
1124   }
1125 
getPort()1126   uint getPort() override {
1127     return SocketAddress::getLocalAddress(fd).getPort();
1128   }
1129 
getsockopt(int level,int option,void * value,uint * length)1130   void getsockopt(int level, int option, void* value, uint* length) override {
1131     socklen_t socklen = *length;
1132     KJ_SYSCALL(::getsockopt(fd, level, option, value, &socklen));
1133     *length = socklen;
1134   }
setsockopt(int level,int option,const void * value,uint length)1135   void setsockopt(int level, int option, const void* value, uint length) override {
1136     KJ_SYSCALL(::setsockopt(fd, level, option, value, length));
1137   }
getsockname(struct sockaddr * addr,uint * length)1138   void getsockname(struct sockaddr* addr, uint* length) override {
1139     socklen_t socklen = *length;
1140     KJ_SYSCALL(::getsockname(fd, addr, &socklen));
1141     *length = socklen;
1142   }
1143 
1144 public:
1145   UnixEventPort& eventPort;
1146   LowLevelAsyncIoProvider::NetworkFilter& filter;
1147   UnixEventPort::FdObserver observer;
1148 };
1149 
1150 class DatagramPortImpl final: public DatagramPort, public OwnedFileDescriptor {
1151 public:
DatagramPortImpl(LowLevelAsyncIoProvider & lowLevel,UnixEventPort & eventPort,int fd,LowLevelAsyncIoProvider::NetworkFilter & filter,uint flags)1152   DatagramPortImpl(LowLevelAsyncIoProvider& lowLevel, UnixEventPort& eventPort, int fd,
1153                    LowLevelAsyncIoProvider::NetworkFilter& filter, uint flags)
1154       : OwnedFileDescriptor(fd, flags), lowLevel(lowLevel), eventPort(eventPort), filter(filter),
1155         observer(eventPort, fd, UnixEventPort::FdObserver::OBSERVE_READ |
1156                                 UnixEventPort::FdObserver::OBSERVE_WRITE) {}
1157 
1158   Promise<size_t> send(const void* buffer, size_t size, NetworkAddress& destination) override;
1159   Promise<size_t> send(
1160       ArrayPtr<const ArrayPtr<const byte>> pieces, NetworkAddress& destination) override;
1161 
1162   class ReceiverImpl;
1163 
1164   Own<DatagramReceiver> makeReceiver(DatagramReceiver::Capacity capacity) override;
1165 
getPort()1166   uint getPort() override {
1167     return SocketAddress::getLocalAddress(fd).getPort();
1168   }
1169 
getsockopt(int level,int option,void * value,uint * length)1170   void getsockopt(int level, int option, void* value, uint* length) override {
1171     socklen_t socklen = *length;
1172     KJ_SYSCALL(::getsockopt(fd, level, option, value, &socklen));
1173     *length = socklen;
1174   }
setsockopt(int level,int option,const void * value,uint length)1175   void setsockopt(int level, int option, const void* value, uint length) override {
1176     KJ_SYSCALL(::setsockopt(fd, level, option, value, length));
1177   }
1178 
1179 public:
1180   LowLevelAsyncIoProvider& lowLevel;
1181   UnixEventPort& eventPort;
1182   LowLevelAsyncIoProvider::NetworkFilter& filter;
1183   UnixEventPort::FdObserver observer;
1184 };
1185 
1186 class LowLevelAsyncIoProviderImpl final: public LowLevelAsyncIoProvider {
1187 public:
LowLevelAsyncIoProviderImpl()1188   LowLevelAsyncIoProviderImpl()
1189       : eventLoop(eventPort), waitScope(eventLoop) {}
1190 
getWaitScope()1191   inline WaitScope& getWaitScope() { return waitScope; }
1192 
wrapInputFd(int fd,uint flags=0)1193   Own<AsyncInputStream> wrapInputFd(int fd, uint flags = 0) override {
1194     return heap<AsyncStreamFd>(eventPort, fd, flags);
1195   }
wrapOutputFd(int fd,uint flags=0)1196   Own<AsyncOutputStream> wrapOutputFd(int fd, uint flags = 0) override {
1197     return heap<AsyncStreamFd>(eventPort, fd, flags);
1198   }
wrapSocketFd(int fd,uint flags=0)1199   Own<AsyncIoStream> wrapSocketFd(int fd, uint flags = 0) override {
1200     return heap<AsyncStreamFd>(eventPort, fd, flags);
1201   }
wrapUnixSocketFd(Fd fd,uint flags=0)1202   Own<AsyncCapabilityStream> wrapUnixSocketFd(Fd fd, uint flags = 0) override {
1203     return heap<AsyncStreamFd>(eventPort, fd, flags);
1204   }
wrapConnectingSocketFd(int fd,const struct sockaddr * addr,uint addrlen,uint flags=0)1205   Promise<Own<AsyncIoStream>> wrapConnectingSocketFd(
1206       int fd, const struct sockaddr* addr, uint addrlen, uint flags = 0) override {
1207     // It's important that we construct the AsyncStreamFd first, so that `flags` are honored,
1208     // especially setting nonblocking mode and taking ownership.
1209     auto result = heap<AsyncStreamFd>(eventPort, fd, flags);
1210 
1211     // Unfortunately connect() doesn't fit the mold of KJ_NONBLOCKING_SYSCALL, since it indicates
1212     // non-blocking using EINPROGRESS.
1213     for (;;) {
1214       if (::connect(fd, addr, addrlen) < 0) {
1215         int error = errno;
1216         if (error == EINPROGRESS) {
1217           // Fine.
1218           break;
1219         } else if (error != EINTR) {
1220           KJ_FAIL_SYSCALL("connect()", error) { break; }
1221           return Own<AsyncIoStream>();
1222         }
1223       } else {
1224         // no error
1225         break;
1226       }
1227     }
1228 
1229     auto connected = result->waitConnected();
1230     return connected.then(kj::mvCapture(result, [fd](Own<AsyncIoStream>&& stream) {
1231       int err;
1232       socklen_t errlen = sizeof(err);
1233       KJ_SYSCALL(getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen));
1234       if (err != 0) {
1235         KJ_FAIL_SYSCALL("connect()", err) { break; }
1236       }
1237       return kj::mv(stream);
1238     }));
1239   }
wrapListenSocketFd(int fd,NetworkFilter & filter,uint flags=0)1240   Own<ConnectionReceiver> wrapListenSocketFd(
1241       int fd, NetworkFilter& filter, uint flags = 0) override {
1242     return heap<FdConnectionReceiver>(eventPort, fd, filter, flags);
1243   }
wrapDatagramSocketFd(int fd,NetworkFilter & filter,uint flags=0)1244   Own<DatagramPort> wrapDatagramSocketFd(
1245       int fd, NetworkFilter& filter, uint flags = 0) override {
1246     return heap<DatagramPortImpl>(*this, eventPort, fd, filter, flags);
1247   }
1248 
getTimer()1249   Timer& getTimer() override { return eventPort.getTimer(); }
1250 
getEventPort()1251   UnixEventPort& getEventPort() { return eventPort; }
1252 
1253 private:
1254   UnixEventPort eventPort;
1255   EventLoop eventLoop;
1256   WaitScope waitScope;
1257 };
1258 
1259 // =======================================================================================
1260 
1261 class NetworkAddressImpl final: public NetworkAddress {
1262 public:
NetworkAddressImpl(LowLevelAsyncIoProvider & lowLevel,LowLevelAsyncIoProvider::NetworkFilter & filter,Array<SocketAddress> addrs)1263   NetworkAddressImpl(LowLevelAsyncIoProvider& lowLevel,
1264                      LowLevelAsyncIoProvider::NetworkFilter& filter,
1265                      Array<SocketAddress> addrs)
1266       : lowLevel(lowLevel), filter(filter), addrs(kj::mv(addrs)) {}
1267 
connect()1268   Promise<Own<AsyncIoStream>> connect() override {
1269     auto addrsCopy = heapArray(addrs.asPtr());
1270     auto promise = connectImpl(lowLevel, filter, addrsCopy);
1271     return promise.attach(kj::mv(addrsCopy));
1272   }
1273 
listen()1274   Own<ConnectionReceiver> listen() override {
1275     if (addrs.size() > 1) {
1276       KJ_LOG(WARNING, "Bind address resolved to multiple addresses.  Only the first address will "
1277           "be used.  If this is incorrect, specify the address numerically.  This may be fixed "
1278           "in the future.", addrs[0].toString());
1279     }
1280 
1281     int fd = addrs[0].socket(SOCK_STREAM);
1282 
1283     {
1284       KJ_ON_SCOPE_FAILURE(close(fd));
1285 
1286       // We always enable SO_REUSEADDR because having to take your server down for five minutes
1287       // before it can restart really sucks.
1288       int optval = 1;
1289       KJ_SYSCALL(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)));
1290 
1291       addrs[0].bind(fd);
1292 
1293       // TODO(someday):  Let queue size be specified explicitly in string addresses.
1294       KJ_SYSCALL(::listen(fd, SOMAXCONN));
1295     }
1296 
1297     return lowLevel.wrapListenSocketFd(fd, filter, NEW_FD_FLAGS);
1298   }
1299 
bindDatagramPort()1300   Own<DatagramPort> bindDatagramPort() override {
1301     if (addrs.size() > 1) {
1302       KJ_LOG(WARNING, "Bind address resolved to multiple addresses.  Only the first address will "
1303           "be used.  If this is incorrect, specify the address numerically.  This may be fixed "
1304           "in the future.", addrs[0].toString());
1305     }
1306 
1307     int fd = addrs[0].socket(SOCK_DGRAM);
1308 
1309     {
1310       KJ_ON_SCOPE_FAILURE(close(fd));
1311 
1312       // We always enable SO_REUSEADDR because having to take your server down for five minutes
1313       // before it can restart really sucks.
1314       int optval = 1;
1315       KJ_SYSCALL(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)));
1316 
1317       addrs[0].bind(fd);
1318     }
1319 
1320     return lowLevel.wrapDatagramSocketFd(fd, filter, NEW_FD_FLAGS);
1321   }
1322 
clone()1323   Own<NetworkAddress> clone() override {
1324     return kj::heap<NetworkAddressImpl>(lowLevel, filter, kj::heapArray(addrs.asPtr()));
1325   }
1326 
toString()1327   String toString() override {
1328     return strArray(KJ_MAP(addr, addrs) { return addr.toString(); }, ",");
1329   }
1330 
chooseOneAddress()1331   const SocketAddress& chooseOneAddress() {
1332     KJ_REQUIRE(addrs.size() > 0, "No addresses available.");
1333     return addrs[counter++ % addrs.size()];
1334   }
1335 
1336 private:
1337   LowLevelAsyncIoProvider& lowLevel;
1338   LowLevelAsyncIoProvider::NetworkFilter& filter;
1339   Array<SocketAddress> addrs;
1340   uint counter = 0;
1341 
connectImpl(LowLevelAsyncIoProvider & lowLevel,LowLevelAsyncIoProvider::NetworkFilter & filter,ArrayPtr<SocketAddress> addrs)1342   static Promise<Own<AsyncIoStream>> connectImpl(
1343       LowLevelAsyncIoProvider& lowLevel,
1344       LowLevelAsyncIoProvider::NetworkFilter& filter,
1345       ArrayPtr<SocketAddress> addrs) {
1346     KJ_ASSERT(addrs.size() > 0);
1347 
1348     return kj::evalNow([&]() -> Promise<Own<AsyncIoStream>> {
1349       if (!addrs[0].allowedBy(filter)) {
1350         return KJ_EXCEPTION(FAILED, "connect() blocked by restrictPeers()");
1351       } else {
1352         int fd = addrs[0].socket(SOCK_STREAM);
1353         return lowLevel.wrapConnectingSocketFd(
1354             fd, addrs[0].getRaw(), addrs[0].getRawSize(), NEW_FD_FLAGS);
1355       }
1356     }).then([](Own<AsyncIoStream>&& stream) -> Promise<Own<AsyncIoStream>> {
1357       // Success, pass along.
1358       return kj::mv(stream);
1359     }, [&lowLevel,&filter,addrs](Exception&& exception) mutable -> Promise<Own<AsyncIoStream>> {
1360       // Connect failed.
1361       if (addrs.size() > 1) {
1362         // Try the next address instead.
1363         return connectImpl(lowLevel, filter, addrs.slice(1, addrs.size()));
1364       } else {
1365         // No more addresses to try, so propagate the exception.
1366         return kj::mv(exception);
1367       }
1368     });
1369   }
1370 };
1371 
1372 class SocketNetwork final: public Network {
1373 public:
SocketNetwork(LowLevelAsyncIoProvider & lowLevel)1374   explicit SocketNetwork(LowLevelAsyncIoProvider& lowLevel): lowLevel(lowLevel) {}
SocketNetwork(SocketNetwork & parent,kj::ArrayPtr<const kj::StringPtr> allow,kj::ArrayPtr<const kj::StringPtr> deny)1375   explicit SocketNetwork(SocketNetwork& parent,
1376                          kj::ArrayPtr<const kj::StringPtr> allow,
1377                          kj::ArrayPtr<const kj::StringPtr> deny)
1378       : lowLevel(parent.lowLevel), filter(allow, deny, parent.filter) {}
1379 
parseAddress(StringPtr addr,uint portHint=0)1380   Promise<Own<NetworkAddress>> parseAddress(StringPtr addr, uint portHint = 0) override {
1381     return evalLater(mvCapture(heapString(addr), [this,portHint](String&& addr) {
1382       return SocketAddress::parse(lowLevel, addr, portHint, filter);
1383     })).then([this](Array<SocketAddress> addresses) -> Own<NetworkAddress> {
1384       return heap<NetworkAddressImpl>(lowLevel, filter, kj::mv(addresses));
1385     });
1386   }
1387 
getSockaddr(const void * sockaddr,uint len)1388   Own<NetworkAddress> getSockaddr(const void* sockaddr, uint len) override {
1389     auto array = kj::heapArrayBuilder<SocketAddress>(1);
1390     array.add(SocketAddress(sockaddr, len));
1391     KJ_REQUIRE(array[0].allowedBy(filter), "address blocked by restrictPeers()") { break; }
1392     return Own<NetworkAddress>(heap<NetworkAddressImpl>(lowLevel, filter, array.finish()));
1393   }
1394 
restrictPeers(kj::ArrayPtr<const kj::StringPtr> allow,kj::ArrayPtr<const kj::StringPtr> deny=nullptr)1395   Own<Network> restrictPeers(
1396       kj::ArrayPtr<const kj::StringPtr> allow,
1397       kj::ArrayPtr<const kj::StringPtr> deny = nullptr) override {
1398     return heap<SocketNetwork>(*this, allow, deny);
1399   }
1400 
1401 private:
1402   LowLevelAsyncIoProvider& lowLevel;
1403   _::NetworkFilter filter;
1404 };
1405 
1406 // =======================================================================================
1407 
send(const void * buffer,size_t size,NetworkAddress & destination)1408 Promise<size_t> DatagramPortImpl::send(
1409     const void* buffer, size_t size, NetworkAddress& destination) {
1410   auto& addr = downcast<NetworkAddressImpl>(destination).chooseOneAddress();
1411 
1412   ssize_t n;
1413   KJ_NONBLOCKING_SYSCALL(n = sendto(fd, buffer, size, 0, addr.getRaw(), addr.getRawSize()));
1414   if (n < 0) {
1415     // Write buffer full.
1416     return observer.whenBecomesWritable().then([this, buffer, size, &destination]() {
1417       return send(buffer, size, destination);
1418     });
1419   } else {
1420     // If less than the whole message was sent, then it got truncated, and there's nothing we can
1421     // do about it.
1422     return n;
1423   }
1424 }
1425 
send(ArrayPtr<const ArrayPtr<const byte>> pieces,NetworkAddress & destination)1426 Promise<size_t> DatagramPortImpl::send(
1427     ArrayPtr<const ArrayPtr<const byte>> pieces, NetworkAddress& destination) {
1428   struct msghdr msg;
1429   memset(&msg, 0, sizeof(msg));
1430 
1431   auto& addr = downcast<NetworkAddressImpl>(destination).chooseOneAddress();
1432   msg.msg_name = const_cast<void*>(implicitCast<const void*>(addr.getRaw()));
1433   msg.msg_namelen = addr.getRawSize();
1434 
1435   const size_t iovmax = kj::miniposix::iovMax(pieces.size());
1436   KJ_STACK_ARRAY(struct iovec, iov, kj::min(pieces.size(), iovmax), 16, 64);
1437 
1438   for (size_t i: kj::indices(pieces)) {
1439     iov[i].iov_base = const_cast<void*>(implicitCast<const void*>(pieces[i].begin()));
1440     iov[i].iov_len = pieces[i].size();
1441   }
1442 
1443   Array<byte> extra;
1444   if (pieces.size() > iovmax) {
1445     // Too many pieces, but we can't use multiple syscalls because they'd send separate
1446     // datagrams. We'll have to copy the trailing pieces into a temporary array.
1447     //
1448     // TODO(perf): On Linux we could use multiple syscalls via MSG_MORE.
1449     size_t extraSize = 0;
1450     for (size_t i = iovmax - 1; i < pieces.size(); i++) {
1451       extraSize += pieces[i].size();
1452     }
1453     extra = kj::heapArray<byte>(extraSize);
1454     extraSize = 0;
1455     for (size_t i = iovmax - 1; i < pieces.size(); i++) {
1456       memcpy(extra.begin() + extraSize, pieces[i].begin(), pieces[i].size());
1457       extraSize += pieces[i].size();
1458     }
1459     iov[iovmax - 1].iov_base = extra.begin();
1460     iov[iovmax - 1].iov_len = extra.size();
1461   }
1462 
1463   msg.msg_iov = iov.begin();
1464   msg.msg_iovlen = iov.size();
1465 
1466   ssize_t n;
1467   KJ_NONBLOCKING_SYSCALL(n = sendmsg(fd, &msg, 0));
1468   if (n < 0) {
1469     // Write buffer full.
1470     return observer.whenBecomesWritable().then([this, pieces, &destination]() {
1471       return send(pieces, destination);
1472     });
1473   } else {
1474     // If less than the whole message was sent, then it was truncated, and there's nothing we can
1475     // do about that now.
1476     return n;
1477   }
1478 }
1479 
1480 class DatagramPortImpl::ReceiverImpl final: public DatagramReceiver {
1481 public:
ReceiverImpl(DatagramPortImpl & port,Capacity capacity)1482   explicit ReceiverImpl(DatagramPortImpl& port, Capacity capacity)
1483       : port(port),
1484         contentBuffer(heapArray<byte>(capacity.content)),
1485         ancillaryBuffer(capacity.ancillary > 0 ? heapArray<byte>(capacity.ancillary)
1486                                                : Array<byte>(nullptr)) {}
1487 
receive()1488   Promise<void> receive() override {
1489     struct msghdr msg;
1490     memset(&msg, 0, sizeof(msg));
1491 
1492     struct sockaddr_storage addr;
1493     memset(&addr, 0, sizeof(addr));
1494     msg.msg_name = &addr;
1495     msg.msg_namelen = sizeof(addr);
1496 
1497     struct iovec iov;
1498     iov.iov_base = contentBuffer.begin();
1499     iov.iov_len = contentBuffer.size();
1500     msg.msg_iov = &iov;
1501     msg.msg_iovlen = 1;
1502     msg.msg_control = ancillaryBuffer.begin();
1503     msg.msg_controllen = ancillaryBuffer.size();
1504 
1505     ssize_t n;
1506     KJ_NONBLOCKING_SYSCALL(n = recvmsg(port.fd, &msg, 0));
1507 
1508     if (n < 0) {
1509       // No data available. Wait.
1510       return port.observer.whenBecomesReadable().then([this]() {
1511         return receive();
1512       });
1513     } else {
1514       if (!port.filter.shouldAllow(reinterpret_cast<const struct sockaddr*>(msg.msg_name),
1515                                    msg.msg_namelen)) {
1516         // Ignore message from disallowed source.
1517         return receive();
1518       }
1519 
1520       receivedSize = n;
1521       contentTruncated = msg.msg_flags & MSG_TRUNC;
1522 
1523       source.emplace(port.lowLevel, port.filter, msg.msg_name, msg.msg_namelen);
1524 
1525       ancillaryList.resize(0);
1526       ancillaryTruncated = msg.msg_flags & MSG_CTRUNC;
1527 
1528       for (struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg); cmsg != nullptr;
1529            cmsg = CMSG_NXTHDR(&msg, cmsg)) {
1530         // On some platforms (OSX), a cmsghdr's length may cross the end of the ancillary buffer
1531         // when truncated. On other platforms (Linux) the length in cmsghdr will itself be
1532         // truncated to fit within the buffer.
1533 
1534 #if __APPLE__
1535 // On MacOS, `CMSG_SPACE(0)` triggers a bogus warning.
1536 #pragma GCC diagnostic ignored "-Wnull-pointer-arithmetic"
1537 #endif
1538         const byte* pos = reinterpret_cast<const byte*>(cmsg);
1539         size_t available = ancillaryBuffer.end() - pos;
1540         if (available < CMSG_SPACE(0)) {
1541           // The buffer ends in the middle of the header. We can't use this message.
1542           // (On Linux, this never happens, because the message is not included if there isn't
1543           // space for a header. I'm not sure how other systems behave, though, so let's be safe.)
1544           break;
1545         }
1546 
1547         // OK, we know the cmsghdr is valid, at least.
1548 
1549         // Find the start of the message payload.
1550         const byte* begin = (const byte *)CMSG_DATA(cmsg);
1551 
1552         // Cap the message length to the available space.
1553         const byte* end = pos + kj::min(available, cmsg->cmsg_len);
1554 
1555         ancillaryList.add(AncillaryMessage(
1556             cmsg->cmsg_level, cmsg->cmsg_type, arrayPtr(begin, end)));
1557       }
1558 
1559       return READY_NOW;
1560     }
1561   }
1562 
getContent()1563   MaybeTruncated<ArrayPtr<const byte>> getContent() override {
1564     return { contentBuffer.slice(0, receivedSize), contentTruncated };
1565   }
1566 
getAncillary()1567   MaybeTruncated<ArrayPtr<const AncillaryMessage>> getAncillary() override {
1568     return { ancillaryList.asPtr(), ancillaryTruncated };
1569   }
1570 
getSource()1571   NetworkAddress& getSource() override {
1572     return KJ_REQUIRE_NONNULL(source, "Haven't sent a message yet.").abstract;
1573   }
1574 
1575 private:
1576   DatagramPortImpl& port;
1577   Array<byte> contentBuffer;
1578   Array<byte> ancillaryBuffer;
1579   Vector<AncillaryMessage> ancillaryList;
1580   size_t receivedSize = 0;
1581   bool contentTruncated = false;
1582   bool ancillaryTruncated = false;
1583 
1584   struct StoredAddress {
StoredAddresskj::__anone7cf81ca0111::DatagramPortImpl::ReceiverImpl::StoredAddress1585     StoredAddress(LowLevelAsyncIoProvider& lowLevel, LowLevelAsyncIoProvider::NetworkFilter& filter,
1586                   const void* sockaddr, uint length)
1587         : raw(sockaddr, length),
1588           abstract(lowLevel, filter, Array<SocketAddress>(&raw, 1, NullArrayDisposer::instance)) {}
1589 
1590     SocketAddress raw;
1591     NetworkAddressImpl abstract;
1592   };
1593 
1594   kj::Maybe<StoredAddress> source;
1595 };
1596 
makeReceiver(DatagramReceiver::Capacity capacity)1597 Own<DatagramReceiver> DatagramPortImpl::makeReceiver(DatagramReceiver::Capacity capacity) {
1598   return kj::heap<ReceiverImpl>(*this, capacity);
1599 }
1600 
1601 // =======================================================================================
1602 
1603 class AsyncIoProviderImpl final: public AsyncIoProvider {
1604 public:
AsyncIoProviderImpl(LowLevelAsyncIoProvider & lowLevel)1605   AsyncIoProviderImpl(LowLevelAsyncIoProvider& lowLevel)
1606       : lowLevel(lowLevel), network(lowLevel) {}
1607 
newOneWayPipe()1608   OneWayPipe newOneWayPipe() override {
1609     int fds[2];
1610 #if __linux__ && !__BIONIC__
1611     KJ_SYSCALL(pipe2(fds, O_NONBLOCK | O_CLOEXEC));
1612 #else
1613     KJ_SYSCALL(pipe(fds));
1614 #endif
1615     return OneWayPipe {
1616       lowLevel.wrapInputFd(fds[0], NEW_FD_FLAGS),
1617       lowLevel.wrapOutputFd(fds[1], NEW_FD_FLAGS)
1618     };
1619   }
1620 
newTwoWayPipe()1621   TwoWayPipe newTwoWayPipe() override {
1622     int fds[2];
1623     int type = SOCK_STREAM;
1624 #if __linux__ && !__BIONIC__
1625     type |= SOCK_NONBLOCK | SOCK_CLOEXEC;
1626 #endif
1627     KJ_SYSCALL(socketpair(AF_UNIX, type, 0, fds));
1628     return TwoWayPipe { {
1629       lowLevel.wrapSocketFd(fds[0], NEW_FD_FLAGS),
1630       lowLevel.wrapSocketFd(fds[1], NEW_FD_FLAGS)
1631     } };
1632   }
1633 
newCapabilityPipe()1634   CapabilityPipe newCapabilityPipe() override {
1635     int fds[2];
1636     int type = SOCK_STREAM;
1637 #if __linux__ && !__BIONIC__
1638     type |= SOCK_NONBLOCK | SOCK_CLOEXEC;
1639 #endif
1640     KJ_SYSCALL(socketpair(AF_UNIX, type, 0, fds));
1641     return CapabilityPipe { {
1642       lowLevel.wrapUnixSocketFd(fds[0], NEW_FD_FLAGS),
1643       lowLevel.wrapUnixSocketFd(fds[1], NEW_FD_FLAGS)
1644     } };
1645   }
1646 
getNetwork()1647   Network& getNetwork() override {
1648     return network;
1649   }
1650 
newPipeThread(Function<void (AsyncIoProvider &,AsyncIoStream &,WaitScope &)> startFunc)1651   PipeThread newPipeThread(
1652       Function<void(AsyncIoProvider&, AsyncIoStream&, WaitScope&)> startFunc) override {
1653     int fds[2];
1654     int type = SOCK_STREAM;
1655 #if __linux__ && !__BIONIC__
1656     type |= SOCK_NONBLOCK | SOCK_CLOEXEC;
1657 #endif
1658     KJ_SYSCALL(socketpair(AF_UNIX, type, 0, fds));
1659 
1660     int threadFd = fds[1];
1661     KJ_ON_SCOPE_FAILURE(close(threadFd));
1662 
1663     auto pipe = lowLevel.wrapSocketFd(fds[0], NEW_FD_FLAGS);
1664 
1665     auto thread = heap<Thread>(kj::mvCapture(startFunc,
1666         [threadFd](Function<void(AsyncIoProvider&, AsyncIoStream&, WaitScope&)>&& startFunc) {
1667       LowLevelAsyncIoProviderImpl lowLevel;
1668       auto stream = lowLevel.wrapSocketFd(threadFd, NEW_FD_FLAGS);
1669       AsyncIoProviderImpl ioProvider(lowLevel);
1670       startFunc(ioProvider, *stream, lowLevel.getWaitScope());
1671     }));
1672 
1673     return { kj::mv(thread), kj::mv(pipe) };
1674   }
1675 
getTimer()1676   Timer& getTimer() override { return lowLevel.getTimer(); }
1677 
1678 private:
1679   LowLevelAsyncIoProvider& lowLevel;
1680   SocketNetwork network;
1681 };
1682 
1683 }  // namespace
1684 
newAsyncIoProvider(LowLevelAsyncIoProvider & lowLevel)1685 Own<AsyncIoProvider> newAsyncIoProvider(LowLevelAsyncIoProvider& lowLevel) {
1686   return kj::heap<AsyncIoProviderImpl>(lowLevel);
1687 }
1688 
setupAsyncIo()1689 AsyncIoContext setupAsyncIo() {
1690   auto lowLevel = heap<LowLevelAsyncIoProviderImpl>();
1691   auto ioProvider = kj::heap<AsyncIoProviderImpl>(*lowLevel);
1692   auto& waitScope = lowLevel->getWaitScope();
1693   auto& eventPort = lowLevel->getEventPort();
1694   return { kj::mv(lowLevel), kj::mv(ioProvider), waitScope, eventPort };
1695 }
1696 
1697 }  // namespace kj
1698 
1699 #endif  // !_WIN32
1700