1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
2 // Licensed under the MIT License:
3 //
4 // Permission is hereby granted, free of charge, to any person obtaining a copy
5 // of this software and associated documentation files (the "Software"), to deal
6 // in the Software without restriction, including without limitation the rights
7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 // copies of the Software, and to permit persons to whom the Software is
9 // furnished to do so, subject to the following conditions:
10 //
11 // The above copyright notice and this permission notice shall be included in
12 // all copies or substantial portions of the Software.
13 //
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20 // THE SOFTWARE.
21
22 #if !_WIN32
23 // For Win32 implementation, see async-io-win32.c++.
24
25 #ifndef _GNU_SOURCE
26 #define _GNU_SOURCE
27 #endif
28
29 #include "async-io.h"
30 #include "async-io-internal.h"
31 #include "async-unix.h"
32 #include "debug.h"
33 #include "thread.h"
34 #include "io.h"
35 #include "miniposix.h"
36 #include <unistd.h>
37 #include <sys/uio.h>
38 #include <errno.h>
39 #include <fcntl.h>
40 #include <sys/types.h>
41 #include <sys/socket.h>
42 #include <sys/un.h>
43 #include <netinet/in.h>
44 #include <netinet/tcp.h>
45 #include <stddef.h>
46 #include <stdlib.h>
47 #include <arpa/inet.h>
48 #include <netdb.h>
49 #include <set>
50 #include <poll.h>
51 #include <limits.h>
52 #include <sys/ioctl.h>
53
54 namespace kj {
55
56 namespace {
57
setNonblocking(int fd)58 void setNonblocking(int fd) {
59 #ifdef FIONBIO
60 int opt = 1;
61 KJ_SYSCALL(ioctl(fd, FIONBIO, &opt));
62 #else
63 int flags;
64 KJ_SYSCALL(flags = fcntl(fd, F_GETFL));
65 if ((flags & O_NONBLOCK) == 0) {
66 KJ_SYSCALL(fcntl(fd, F_SETFL, flags | O_NONBLOCK));
67 }
68 #endif
69 }
70
setCloseOnExec(int fd)71 void setCloseOnExec(int fd) {
72 #ifdef FIOCLEX
73 KJ_SYSCALL(ioctl(fd, FIOCLEX));
74 #else
75 int flags;
76 KJ_SYSCALL(flags = fcntl(fd, F_GETFD));
77 if ((flags & FD_CLOEXEC) == 0) {
78 KJ_SYSCALL(fcntl(fd, F_SETFD, flags | FD_CLOEXEC));
79 }
80 #endif
81 }
82
83 static constexpr uint NEW_FD_FLAGS =
84 #if __linux__ && !__BIONIC__
85 LowLevelAsyncIoProvider::ALREADY_CLOEXEC | LowLevelAsyncIoProvider::ALREADY_NONBLOCK |
86 #endif
87 LowLevelAsyncIoProvider::TAKE_OWNERSHIP;
88 // We always try to open FDs with CLOEXEC and NONBLOCK already set on Linux, but on other platforms
89 // this is not possible.
90
91 class OwnedFileDescriptor {
92 public:
OwnedFileDescriptor(int fd,uint flags)93 OwnedFileDescriptor(int fd, uint flags): fd(fd), flags(flags) {
94 if (flags & LowLevelAsyncIoProvider::ALREADY_NONBLOCK) {
95 KJ_DREQUIRE(fcntl(fd, F_GETFL) & O_NONBLOCK, "You claimed you set NONBLOCK, but you didn't.");
96 } else {
97 setNonblocking(fd);
98 }
99
100 if (flags & LowLevelAsyncIoProvider::TAKE_OWNERSHIP) {
101 if (flags & LowLevelAsyncIoProvider::ALREADY_CLOEXEC) {
102 KJ_DREQUIRE(fcntl(fd, F_GETFD) & FD_CLOEXEC,
103 "You claimed you set CLOEXEC, but you didn't.");
104 } else {
105 setCloseOnExec(fd);
106 }
107 }
108 }
109
~OwnedFileDescriptor()110 ~OwnedFileDescriptor() noexcept(false) {
111 // Don't use SYSCALL() here because close() should not be repeated on EINTR.
112 if ((flags & LowLevelAsyncIoProvider::TAKE_OWNERSHIP) && close(fd) < 0) {
113 KJ_FAIL_SYSCALL("close", errno, fd) {
114 // Recoverable exceptions are safe in destructors.
115 break;
116 }
117 }
118 }
119
120 protected:
121 const int fd;
122
123 private:
124 uint flags;
125 };
126
127 // =======================================================================================
128
129 class AsyncStreamFd: public OwnedFileDescriptor, public AsyncCapabilityStream {
130 public:
AsyncStreamFd(UnixEventPort & eventPort,int fd,uint flags)131 AsyncStreamFd(UnixEventPort& eventPort, int fd, uint flags)
132 : OwnedFileDescriptor(fd, flags),
133 eventPort(eventPort),
134 observer(eventPort, fd, UnixEventPort::FdObserver::OBSERVE_READ_WRITE) {}
~AsyncStreamFd()135 virtual ~AsyncStreamFd() noexcept(false) {}
136
tryRead(void * buffer,size_t minBytes,size_t maxBytes)137 Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
138 return tryReadInternal(buffer, minBytes, maxBytes, nullptr, 0, {0,0})
139 .then([](ReadResult r) { return r.byteCount; });
140 }
141
tryReadWithFds(void * buffer,size_t minBytes,size_t maxBytes,AutoCloseFd * fdBuffer,size_t maxFds)142 Promise<ReadResult> tryReadWithFds(void* buffer, size_t minBytes, size_t maxBytes,
143 AutoCloseFd* fdBuffer, size_t maxFds) override {
144 return tryReadInternal(buffer, minBytes, maxBytes, fdBuffer, maxFds, {0,0});
145 }
146
tryReadWithStreams(void * buffer,size_t minBytes,size_t maxBytes,Own<AsyncCapabilityStream> * streamBuffer,size_t maxStreams)147 Promise<ReadResult> tryReadWithStreams(
148 void* buffer, size_t minBytes, size_t maxBytes,
149 Own<AsyncCapabilityStream>* streamBuffer, size_t maxStreams) override {
150 auto fdBuffer = kj::heapArray<AutoCloseFd>(maxStreams);
151 auto promise = tryReadInternal(buffer, minBytes, maxBytes, fdBuffer.begin(), maxStreams, {0,0});
152
153 return promise.then([this, fdBuffer = kj::mv(fdBuffer), streamBuffer]
154 (ReadResult result) mutable {
155 for (auto i: kj::zeroTo(result.capCount)) {
156 streamBuffer[i] = kj::heap<AsyncStreamFd>(eventPort, fdBuffer[i].release(),
157 LowLevelAsyncIoProvider::TAKE_OWNERSHIP | LowLevelAsyncIoProvider::ALREADY_CLOEXEC);
158 }
159 return result;
160 });
161 }
162
write(const void * buffer,size_t size)163 Promise<void> write(const void* buffer, size_t size) override {
164 ssize_t n;
165 KJ_NONBLOCKING_SYSCALL(n = ::write(fd, buffer, size)) {
166 // Error.
167
168 // We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to
169 // a bug that exists in both Clang and GCC:
170 // http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799
171 // http://llvm.org/bugs/show_bug.cgi?id=12286
172 goto error;
173 }
174 if (false) {
175 error:
176 return kj::READY_NOW;
177 }
178
179 if (n < 0) {
180 // EAGAIN -- need to wait for writability and try again.
181 return observer.whenBecomesWritable().then([=]() {
182 return write(buffer, size);
183 });
184 } else if (n == size) {
185 // All done.
186 return READY_NOW;
187 } else {
188 // Fewer than `size` bytes were written, but we CANNOT assume we're out of buffer space, as
189 // Linux is known to return partial reads/writes when interrupted by a signal -- yes, even
190 // for non-blocking operations. So, we'll need to write() again now, even though it will
191 // almost certainly fail with EAGAIN. See comments in the read path for more info.
192 buffer = reinterpret_cast<const byte*>(buffer) + n;
193 size -= n;
194 return write(buffer, size);
195 }
196 }
197
write(ArrayPtr<const ArrayPtr<const byte>> pieces)198 Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
199 if (pieces.size() == 0) {
200 return writeInternal(nullptr, nullptr, nullptr);
201 } else {
202 return writeInternal(pieces[0], pieces.slice(1, pieces.size()), nullptr);
203 }
204 }
205
writeWithFds(ArrayPtr<const byte> data,ArrayPtr<const ArrayPtr<const byte>> moreData,ArrayPtr<const int> fds)206 Promise<void> writeWithFds(ArrayPtr<const byte> data,
207 ArrayPtr<const ArrayPtr<const byte>> moreData,
208 ArrayPtr<const int> fds) override {
209 return writeInternal(data, moreData, fds);
210 }
211
writeWithStreams(ArrayPtr<const byte> data,ArrayPtr<const ArrayPtr<const byte>> moreData,Array<Own<AsyncCapabilityStream>> streams)212 Promise<void> writeWithStreams(ArrayPtr<const byte> data,
213 ArrayPtr<const ArrayPtr<const byte>> moreData,
214 Array<Own<AsyncCapabilityStream>> streams) override {
215 auto fds = KJ_MAP(stream, streams) {
216 return downcast<AsyncStreamFd>(*stream).fd;
217 };
218 auto promise = writeInternal(data, moreData, fds);
219 return promise.attach(kj::mv(fds), kj::mv(streams));
220 }
221
whenWriteDisconnected()222 Promise<void> whenWriteDisconnected() override {
223 KJ_IF_MAYBE(p, writeDisconnectedPromise) {
224 return p->addBranch();
225 } else {
226 auto fork = observer.whenWriteDisconnected().fork();
227 auto result = fork.addBranch();
228 writeDisconnectedPromise = kj::mv(fork);
229 return kj::mv(result);
230 }
231 }
232
shutdownWrite()233 void shutdownWrite() override {
234 // There's no legitimate way to get an AsyncStreamFd that isn't a socket through the
235 // UnixAsyncIoProvider interface.
236 KJ_SYSCALL(shutdown(fd, SHUT_WR));
237 }
238
abortRead()239 void abortRead() override {
240 // There's no legitimate way to get an AsyncStreamFd that isn't a socket through the
241 // UnixAsyncIoProvider interface.
242 KJ_SYSCALL(shutdown(fd, SHUT_RD));
243 }
244
getsockopt(int level,int option,void * value,uint * length)245 void getsockopt(int level, int option, void* value, uint* length) override {
246 socklen_t socklen = *length;
247 KJ_SYSCALL(::getsockopt(fd, level, option, value, &socklen));
248 *length = socklen;
249 }
250
setsockopt(int level,int option,const void * value,uint length)251 void setsockopt(int level, int option, const void* value, uint length) override {
252 KJ_SYSCALL(::setsockopt(fd, level, option, value, length));
253 }
254
getsockname(struct sockaddr * addr,uint * length)255 void getsockname(struct sockaddr* addr, uint* length) override {
256 socklen_t socklen = *length;
257 KJ_SYSCALL(::getsockname(fd, addr, &socklen));
258 *length = socklen;
259 }
260
getpeername(struct sockaddr * addr,uint * length)261 void getpeername(struct sockaddr* addr, uint* length) override {
262 socklen_t socklen = *length;
263 KJ_SYSCALL(::getpeername(fd, addr, &socklen));
264 *length = socklen;
265 }
266
waitConnected()267 Promise<void> waitConnected() {
268 // Wait until initial connection has completed. This actually just waits until it is writable.
269
270 // Can't just go directly to writeObserver.whenBecomesWritable() because of edge triggering. We
271 // need to explicitly check if the socket is already connected.
272
273 struct pollfd pollfd;
274 memset(&pollfd, 0, sizeof(pollfd));
275 pollfd.fd = fd;
276 pollfd.events = POLLOUT;
277
278 int pollResult;
279 KJ_SYSCALL(pollResult = poll(&pollfd, 1, 0));
280
281 if (pollResult == 0) {
282 // Not ready yet. We can safely use the edge-triggered observer.
283 return observer.whenBecomesWritable();
284 } else {
285 // Ready now.
286 return kj::READY_NOW;
287 }
288 }
289
290 private:
291 UnixEventPort& eventPort;
292 UnixEventPort::FdObserver observer;
293 Maybe<ForkedPromise<void>> writeDisconnectedPromise;
294
tryReadInternal(void * buffer,size_t minBytes,size_t maxBytes,AutoCloseFd * fdBuffer,size_t maxFds,ReadResult alreadyRead)295 Promise<ReadResult> tryReadInternal(void* buffer, size_t minBytes, size_t maxBytes,
296 AutoCloseFd* fdBuffer, size_t maxFds,
297 ReadResult alreadyRead) {
298 // `alreadyRead` is the number of bytes we have already received via previous reads -- minBytes,
299 // maxBytes, and buffer have already been adjusted to account for them, but this count must
300 // be included in the final return value.
301
302 ssize_t n;
303 if (maxFds == 0) {
304 KJ_NONBLOCKING_SYSCALL(n = ::read(fd, buffer, maxBytes)) {
305 // Error.
306
307 // We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to
308 // a bug that exists in both Clang and GCC:
309 // http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799
310 // http://llvm.org/bugs/show_bug.cgi?id=12286
311 goto error;
312 }
313 } else {
314 struct msghdr msg;
315 memset(&msg, 0, sizeof(msg));
316
317 struct iovec iov;
318 memset(&iov, 0, sizeof(iov));
319 iov.iov_base = buffer;
320 iov.iov_len = maxBytes;
321 msg.msg_iov = &iov;
322 msg.msg_iovlen = 1;
323
324 // Allocate space to receive a cmsg.
325 #if __APPLE__ || __FreeBSD__
326 // Until very recently (late 2018 / early 2019), FreeBSD suffered from a bug in which when
327 // an SCM_RIGHTS message was truncated on delivery, it would not close the FDs that weren't
328 // delivered -- they would simply leak: https://bugs.freebsd.org/131876
329 //
330 // My testing indicates that MacOS has this same bug as of today (April 2019). I don't know
331 // if they plan to fix it or are even aware of it.
332 //
333 // To handle both cases, we will always provide space to receive 512 FDs. Hopefully, this is
334 // greater than the maximum number of FDs that these kernels will transmit in one message
335 // PLUS enough space for any other ancillary messages that could be sent before the
336 // SCM_RIGHTS message to push it back in the buffer. I couldn't find any firm documentation
337 // on these limits, though -- I only know that Linux is limited to 253, and I saw a hint in
338 // a comment in someone else's application that suggested FreeBSD is the same. Hopefully,
339 // then, this is sufficient to prevent attacks. But if not, there's nothing more we can do;
340 // it's really up to the kernel to fix this.
341 size_t msgBytes = CMSG_SPACE(sizeof(int) * 512);
342 #else
343 size_t msgBytes = CMSG_SPACE(sizeof(int) * maxFds);
344 #endif
345 // On Linux, CMSG_SPACE will align to a word-size boundary, but on Mac it always aligns to a
346 // 32-bit boundary. I guess aligning to 32 bits helps avoid the problem where you
347 // surprisingly end up with space for two file descriptors when you only wanted one. However,
348 // cmsghdr's preferred alignment is word-size (it contains a size_t). If we stack-allocate
349 // the buffer, we need to make sure it is aligned properly (maybe not on x64, but maybe on
350 // other platforms), so we want to allocate an array of words (we use void*). So... we use
351 // CMSG_SPACE() and then additionally round up to deal with Mac.
352 size_t msgWords = (msgBytes + sizeof(void*) - 1) / sizeof(void*);
353 KJ_STACK_ARRAY(void*, cmsgSpace, msgWords, 16, 256);
354 auto cmsgBytes = cmsgSpace.asBytes();
355 memset(cmsgBytes.begin(), 0, cmsgBytes.size());
356 msg.msg_control = cmsgBytes.begin();
357 msg.msg_controllen = msgBytes;
358
359 #ifdef MSG_CMSG_CLOEXEC
360 static constexpr int RECVMSG_FLAGS = MSG_CMSG_CLOEXEC;
361 #else
362 static constexpr int RECVMSG_FLAGS = 0;
363 #endif
364
365 KJ_NONBLOCKING_SYSCALL(n = ::recvmsg(fd, &msg, RECVMSG_FLAGS)) {
366 // Error.
367
368 // We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to
369 // a bug that exists in both Clang and GCC:
370 // http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799
371 // http://llvm.org/bugs/show_bug.cgi?id=12286
372 goto error;
373 }
374
375 if (n >= 0) {
376 // Process all messages.
377 //
378 // WARNING DANGER: We have to be VERY careful not to miss a file descriptor here, because
379 // if we do, then that FD will never be closed, and a malicious peer could exploit this to
380 // fill up our FD table, creating a DoS attack. Some things to keep in mind:
381 // - CMSG_SPACE() could have rounded up the space for alignment purposes, and this could
382 // mean we permitted the kernel to deliver more file descriptors than `maxFds`. We need
383 // to close the extras.
384 // - We can receive multiple ancillary messages at once. In particular, there is also
385 // SCM_CREDENTIALS. The sender decides what to send. They could send SCM_CREDENTIALS
386 // first followed by SCM_RIGHTS. We need to make sure we see both.
387 size_t nfds = 0;
388 size_t spaceLeft = msg.msg_controllen;
389 for (struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
390 cmsg != nullptr; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
391 if (spaceLeft >= CMSG_LEN(0) &&
392 cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
393 // Some operating systems (like MacOS) do not adjust csmg_len when the message is
394 // truncated. We must do so ourselves or risk overrunning the buffer.
395 auto len = kj::min(cmsg->cmsg_len, spaceLeft);
396 auto data = arrayPtr(reinterpret_cast<int*>(CMSG_DATA(cmsg)),
397 (len - CMSG_LEN(0)) / sizeof(int));
398 kj::Vector<kj::AutoCloseFd> trashFds;
399 for (auto fd: data) {
400 kj::AutoCloseFd ownFd(fd);
401 if (nfds < maxFds) {
402 fdBuffer[nfds++] = kj::mv(ownFd);
403 } else {
404 trashFds.add(kj::mv(ownFd));
405 }
406 }
407 }
408
409 if (spaceLeft >= CMSG_LEN(0) && spaceLeft >= cmsg->cmsg_len) {
410 spaceLeft -= cmsg->cmsg_len;
411 } else {
412 spaceLeft = 0;
413 }
414 }
415
416 #ifndef MSG_CMSG_CLOEXEC
417 for (size_t i = 0; i < nfds; i++) {
418 setCloseOnExec(fdBuffer[i]);
419 }
420 #endif
421
422 alreadyRead.capCount += nfds;
423 fdBuffer += nfds;
424 maxFds -= nfds;
425 }
426 }
427
428 if (false) {
429 error:
430 return alreadyRead;
431 }
432
433 if (n < 0) {
434 // Read would block.
435 return observer.whenBecomesReadable().then([=]() {
436 return tryReadInternal(buffer, minBytes, maxBytes, fdBuffer, maxFds, alreadyRead);
437 });
438 } else if (n == 0) {
439 // EOF -OR- maxBytes == 0.
440 return alreadyRead;
441 } else if (implicitCast<size_t>(n) >= minBytes) {
442 // We read enough to stop here.
443 alreadyRead.byteCount += n;
444 return alreadyRead;
445 } else {
446 // The kernel returned fewer bytes than we asked for (and fewer than we need).
447
448 buffer = reinterpret_cast<byte*>(buffer) + n;
449 minBytes -= n;
450 maxBytes -= n;
451 alreadyRead.byteCount += n;
452
453 // According to David Klempner, who works on Stubby at Google, we sadly CANNOT assume that
454 // we've consumed the whole read buffer here. If a signal is delivered in the middle of a
455 // read() -- yes, even a non-blocking read -- it can cause the kernel to return a partial
456 // result, with data still in the buffer.
457 // https://bugzilla.kernel.org/show_bug.cgi?id=199131
458 // https://twitter.com/CaptainSegfault/status/1112622245531144194
459 //
460 // Unfortunately, we have no choice but to issue more read()s until it either tells us EOF
461 // or EAGAIN. We used to have an optimization here using observer.atEndHint() (when it is
462 // non-null) to avoid a redundant call to read(). Alas...
463 return tryReadInternal(buffer, minBytes, maxBytes, fdBuffer, maxFds, alreadyRead);
464 }
465 }
466
writeInternal(ArrayPtr<const byte> firstPiece,ArrayPtr<const ArrayPtr<const byte>> morePieces,ArrayPtr<const int> fds)467 Promise<void> writeInternal(ArrayPtr<const byte> firstPiece,
468 ArrayPtr<const ArrayPtr<const byte>> morePieces,
469 ArrayPtr<const int> fds) {
470 const size_t iovmax = kj::miniposix::iovMax(1 + morePieces.size());
471 // If there are more than IOV_MAX pieces, we'll only write the first IOV_MAX for now, and
472 // then we'll loop later.
473 KJ_STACK_ARRAY(struct iovec, iov, kj::min(1 + morePieces.size(), iovmax), 16, 128);
474 size_t iovTotal = 0;
475
476 // writev() interface is not const-correct. :(
477 iov[0].iov_base = const_cast<byte*>(firstPiece.begin());
478 iov[0].iov_len = firstPiece.size();
479 iovTotal += iov[0].iov_len;
480 for (uint i = 1; i < iov.size(); i++) {
481 iov[i].iov_base = const_cast<byte*>(morePieces[i - 1].begin());
482 iov[i].iov_len = morePieces[i - 1].size();
483 iovTotal += iov[i].iov_len;
484 }
485
486 if (iovTotal == 0) {
487 KJ_REQUIRE(fds.size() == 0, "can't write FDs without bytes");
488 return kj::READY_NOW;
489 }
490
491 ssize_t n;
492 if (fds.size() == 0) {
493 KJ_NONBLOCKING_SYSCALL(n = ::writev(fd, iov.begin(), iov.size())) {
494 // Error.
495
496 // We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to
497 // a bug that exists in both Clang and GCC:
498 // http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799
499 // http://llvm.org/bugs/show_bug.cgi?id=12286
500 goto error;
501 }
502 } else {
503 struct msghdr msg;
504 memset(&msg, 0, sizeof(msg));
505 msg.msg_iov = iov.begin();
506 msg.msg_iovlen = iov.size();
507
508 // Allocate space to receive a cmsg.
509 size_t msgBytes = CMSG_SPACE(sizeof(int) * fds.size());
510 // On Linux, CMSG_SPACE will align to a word-size boundary, but on Mac it always aligns to a
511 // 32-bit boundary. I guess aligning to 32 bits helps avoid the problem where you
512 // surprisingly end up with space for two file descriptors when you only wanted one. However,
513 // cmsghdr's preferred alignment is word-size (it contains a size_t). If we stack-allocate
514 // the buffer, we need to make sure it is aligned properly (maybe not on x64, but maybe on
515 // other platforms), so we want to allocate an array of words (we use void*). So... we use
516 // CMSG_SPACE() and then additionally round up to deal with Mac.
517 size_t msgWords = (msgBytes + sizeof(void*) - 1) / sizeof(void*);
518 KJ_STACK_ARRAY(void*, cmsgSpace, msgWords, 16, 256);
519 auto cmsgBytes = cmsgSpace.asBytes();
520 memset(cmsgBytes.begin(), 0, cmsgBytes.size());
521 msg.msg_control = cmsgBytes.begin();
522 msg.msg_controllen = msgBytes;
523
524 struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
525 cmsg->cmsg_level = SOL_SOCKET;
526 cmsg->cmsg_type = SCM_RIGHTS;
527 cmsg->cmsg_len = CMSG_LEN(sizeof(int) * fds.size());
528 memcpy(CMSG_DATA(cmsg), fds.begin(), fds.asBytes().size());
529
530 KJ_NONBLOCKING_SYSCALL(n = ::sendmsg(fd, &msg, 0)) {
531 // Error.
532
533 // We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to
534 // a bug that exists in both Clang and GCC:
535 // http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799
536 // http://llvm.org/bugs/show_bug.cgi?id=12286
537 goto error;
538 }
539 }
540
541 if (false) {
542 error:
543 return kj::READY_NOW;
544 }
545
546 if (n < 0) {
547 // Got EAGAIN. Nothing was written.
548 return observer.whenBecomesWritable().then([=]() {
549 return writeInternal(firstPiece, morePieces, fds);
550 });
551 } else if (n == 0) {
552 // Why would a sendmsg() with a non-empty message ever return 0 when writing to a stream
553 // socket? If there's no room in the send buffer, it should fail with EAGAIN. If the
554 // connection is closed, it should fail with EPIPE. Various documents and forum posts around
555 // the internet claim this can happen but no one seems to know when. My guess is it can only
556 // happen if we try to send an empty message -- which we didn't. So I think this is
557 // impossible. If it is possible, we need to figure out how to correctly handle it, which
558 // depends on what caused it.
559 //
560 // Note in particular that if 0 is a valid return here, and we sent an SCM_RIGHTS message,
561 // we need to know whether the message was sent or not, in order to decide whether to retry
562 // sending it!
563 KJ_FAIL_ASSERT("non-empty sendmsg() returned 0");
564 }
565
566 // Non-zero bytes were written. This also implies that *all* FDs were written.
567
568 // Discard all data that was written, then issue a new write for what's left (if any).
569 for (;;) {
570 if (n < firstPiece.size()) {
571 // Only part of the first piece was consumed. Wait for buffer space and then write again.
572 firstPiece = firstPiece.slice(n, firstPiece.size());
573 iovTotal -= n;
574
575 if (iovTotal == 0) {
576 // Oops, what actually happened is that we hit the IOV_MAX limit. Don't wait.
577 return writeInternal(firstPiece, morePieces, nullptr);
578 }
579
580 // As with read(), we cannot assume that a short write() really means the write buffer is
581 // full (see comments in the read path above). We have to write again.
582 return writeInternal(firstPiece, morePieces, nullptr);
583 } else if (morePieces.size() == 0) {
584 // First piece was fully-consumed and there are no more pieces, so we're done.
585 KJ_DASSERT(n == firstPiece.size(), n);
586 return READY_NOW;
587 } else {
588 // First piece was fully consumed, so move on to the next piece.
589 n -= firstPiece.size();
590 iovTotal -= firstPiece.size();
591 firstPiece = morePieces[0];
592 morePieces = morePieces.slice(1, morePieces.size());
593 }
594 }
595 }
596 };
597
598 // =======================================================================================
599
600 class SocketAddress {
601 public:
SocketAddress(const void * sockaddr,uint len)602 SocketAddress(const void* sockaddr, uint len): addrlen(len) {
603 KJ_REQUIRE(len <= sizeof(addr), "Sorry, your sockaddr is too big for me.");
604 memcpy(&addr.generic, sockaddr, len);
605 }
606
operator <(const SocketAddress & other) const607 bool operator<(const SocketAddress& other) const {
608 // So we can use std::set<SocketAddress>... see DNS lookup code.
609
610 if (wildcard < other.wildcard) return true;
611 if (wildcard > other.wildcard) return false;
612
613 if (addrlen < other.addrlen) return true;
614 if (addrlen > other.addrlen) return false;
615
616 return memcmp(&addr.generic, &other.addr.generic, addrlen) < 0;
617 }
618
getRaw() const619 const struct sockaddr* getRaw() const { return &addr.generic; }
getRawSize() const620 socklen_t getRawSize() const { return addrlen; }
621
socket(int type) const622 int socket(int type) const {
623 bool isStream = type == SOCK_STREAM;
624
625 int result;
626 #if __linux__ && !__BIONIC__
627 type |= SOCK_NONBLOCK | SOCK_CLOEXEC;
628 #endif
629 KJ_SYSCALL(result = ::socket(addr.generic.sa_family, type, 0));
630
631 if (isStream && (addr.generic.sa_family == AF_INET ||
632 addr.generic.sa_family == AF_INET6)) {
633 // TODO(perf): As a hack for the 0.4 release we are always setting
634 // TCP_NODELAY because Nagle's algorithm pretty much kills Cap'n Proto's
635 // RPC protocol. Later, we should extend the interface to provide more
636 // control over this. Perhaps write() should have a flag which
637 // specifies whether to pass MSG_MORE.
638 int one = 1;
639 KJ_SYSCALL(setsockopt(
640 result, IPPROTO_TCP, TCP_NODELAY, (char*)&one, sizeof(one)));
641 }
642
643 return result;
644 }
645
bind(int sockfd) const646 void bind(int sockfd) const {
647 #if !defined(__OpenBSD__)
648 if (wildcard) {
649 // Disable IPV6_V6ONLY because we want to handle both ipv4 and ipv6 on this socket. (The
650 // default value of this option varies across platforms.)
651 int value = 0;
652 KJ_SYSCALL(setsockopt(sockfd, IPPROTO_IPV6, IPV6_V6ONLY, &value, sizeof(value)));
653 }
654 #endif
655
656 KJ_SYSCALL(::bind(sockfd, &addr.generic, addrlen), toString());
657 }
658
getPort() const659 uint getPort() const {
660 switch (addr.generic.sa_family) {
661 case AF_INET: return ntohs(addr.inet4.sin_port);
662 case AF_INET6: return ntohs(addr.inet6.sin6_port);
663 default: return 0;
664 }
665 }
666
toString() const667 String toString() const {
668 if (wildcard) {
669 return str("*:", getPort());
670 }
671
672 switch (addr.generic.sa_family) {
673 case AF_INET: {
674 char buffer[INET6_ADDRSTRLEN];
675 if (inet_ntop(addr.inet4.sin_family, &addr.inet4.sin_addr,
676 buffer, sizeof(buffer)) == nullptr) {
677 KJ_FAIL_SYSCALL("inet_ntop", errno) { break; }
678 return heapString("(inet_ntop error)");
679 }
680 return str(buffer, ':', ntohs(addr.inet4.sin_port));
681 }
682 case AF_INET6: {
683 char buffer[INET6_ADDRSTRLEN];
684 if (inet_ntop(addr.inet6.sin6_family, &addr.inet6.sin6_addr,
685 buffer, sizeof(buffer)) == nullptr) {
686 KJ_FAIL_SYSCALL("inet_ntop", errno) { break; }
687 return heapString("(inet_ntop error)");
688 }
689 return str('[', buffer, "]:", ntohs(addr.inet6.sin6_port));
690 }
691 case AF_UNIX: {
692 auto path = _::safeUnixPath(&addr.unixDomain, addrlen);
693 if (path.size() > 0 && path[0] == '\0') {
694 return str("unix-abstract:", path.slice(1, path.size()));
695 } else {
696 return str("unix:", path);
697 }
698 }
699 default:
700 return str("(unknown address family ", addr.generic.sa_family, ")");
701 }
702 }
703
704 static Promise<Array<SocketAddress>> lookupHost(
705 LowLevelAsyncIoProvider& lowLevel, kj::String host, kj::String service, uint portHint,
706 _::NetworkFilter& filter);
707 // Perform a DNS lookup.
708
parse(LowLevelAsyncIoProvider & lowLevel,StringPtr str,uint portHint,_::NetworkFilter & filter)709 static Promise<Array<SocketAddress>> parse(
710 LowLevelAsyncIoProvider& lowLevel, StringPtr str, uint portHint, _::NetworkFilter& filter) {
711 // TODO(someday): Allow commas in `str`.
712
713 SocketAddress result;
714
715 if (str.startsWith("unix:")) {
716 StringPtr path = str.slice(strlen("unix:"));
717 KJ_REQUIRE(path.size() < sizeof(addr.unixDomain.sun_path),
718 "Unix domain socket address is too long.", str);
719 KJ_REQUIRE(path.size() == strlen(path.cStr()),
720 "Unix domain socket address contains NULL. Use"
721 " 'unix-abstract:' for the abstract namespace.");
722 result.addr.unixDomain.sun_family = AF_UNIX;
723 strcpy(result.addr.unixDomain.sun_path, path.cStr());
724 result.addrlen = offsetof(struct sockaddr_un, sun_path) + path.size() + 1;
725
726 if (!result.parseAllowedBy(filter)) {
727 KJ_FAIL_REQUIRE("unix sockets blocked by restrictPeers()");
728 return Array<SocketAddress>();
729 }
730
731 auto array = kj::heapArrayBuilder<SocketAddress>(1);
732 array.add(result);
733 return array.finish();
734 }
735
736 if (str.startsWith("unix-abstract:")) {
737 StringPtr path = str.slice(strlen("unix-abstract:"));
738 KJ_REQUIRE(path.size() + 1 < sizeof(addr.unixDomain.sun_path),
739 "Unix domain socket address is too long.", str);
740 result.addr.unixDomain.sun_family = AF_UNIX;
741 result.addr.unixDomain.sun_path[0] = '\0';
742 // although not strictly required by Linux, also copy the trailing
743 // NULL terminator so that we can safely read it back in toString
744 memcpy(result.addr.unixDomain.sun_path + 1, path.cStr(), path.size() + 1);
745 result.addrlen = offsetof(struct sockaddr_un, sun_path) + path.size() + 1;
746
747 if (!result.parseAllowedBy(filter)) {
748 KJ_FAIL_REQUIRE("abstract unix sockets blocked by restrictPeers()");
749 return Array<SocketAddress>();
750 }
751
752 auto array = kj::heapArrayBuilder<SocketAddress>(1);
753 array.add(result);
754 return array.finish();
755 }
756
757 // Try to separate the address and port.
758 ArrayPtr<const char> addrPart;
759 Maybe<StringPtr> portPart;
760
761 int af;
762
763 if (str.startsWith("[")) {
764 // Address starts with a bracket, which is a common way to write an ip6 address with a port,
765 // since without brackets around the address part, the port looks like another segment of
766 // the address.
767 af = AF_INET6;
768 size_t closeBracket = KJ_ASSERT_NONNULL(str.findLast(']'),
769 "Unclosed '[' in address string.", str);
770
771 addrPart = str.slice(1, closeBracket);
772 if (str.size() > closeBracket + 1) {
773 KJ_REQUIRE(str.slice(closeBracket + 1).startsWith(":"),
774 "Expected port suffix after ']'.", str);
775 portPart = str.slice(closeBracket + 2);
776 }
777 } else {
778 KJ_IF_MAYBE(colon, str.findFirst(':')) {
779 if (str.slice(*colon + 1).findFirst(':') == nullptr) {
780 // There is exactly one colon and no brackets, so it must be an ip4 address with port.
781 af = AF_INET;
782 addrPart = str.slice(0, *colon);
783 portPart = str.slice(*colon + 1);
784 } else {
785 // There are two or more colons and no brackets, so the whole thing must be an ip6
786 // address with no port.
787 af = AF_INET6;
788 addrPart = str;
789 }
790 } else {
791 // No colons, so it must be an ip4 address without port.
792 af = AF_INET;
793 addrPart = str;
794 }
795 }
796
797 // Parse the port.
798 unsigned long port;
799 KJ_IF_MAYBE(portText, portPart) {
800 char* endptr;
801 port = strtoul(portText->cStr(), &endptr, 0);
802 if (portText->size() == 0 || *endptr != '\0') {
803 // Not a number. Maybe it's a service name. Fall back to DNS.
804 return lookupHost(lowLevel, kj::heapString(addrPart), kj::heapString(*portText), portHint,
805 filter);
806 }
807 KJ_REQUIRE(port < 65536, "Port number too large.");
808 } else {
809 port = portHint;
810 }
811
812 // Check for wildcard.
813 if (addrPart.size() == 1 && addrPart[0] == '*') {
814 result.wildcard = true;
815 #if defined(__OpenBSD__)
816 // On OpenBSD, all sockets are either v4-only or v6-only, so use v4 as a
817 // temporary workaround for wildcards.
818 result.addrlen = sizeof(addr.inet4);
819 result.addr.inet4.sin_family = AF_INET;
820 result.addr.inet4.sin_port = htons(port);
821 #else
822 // Create an ip6 socket and set IPV6_V6ONLY to 0 later.
823 result.addrlen = sizeof(addr.inet6);
824 result.addr.inet6.sin6_family = AF_INET6;
825 result.addr.inet6.sin6_port = htons(port);
826 #endif
827
828 auto array = kj::heapArrayBuilder<SocketAddress>(1);
829 array.add(result);
830 return array.finish();
831 }
832
833 void* addrTarget;
834 if (af == AF_INET6) {
835 result.addrlen = sizeof(addr.inet6);
836 result.addr.inet6.sin6_family = AF_INET6;
837 result.addr.inet6.sin6_port = htons(port);
838 addrTarget = &result.addr.inet6.sin6_addr;
839 } else {
840 result.addrlen = sizeof(addr.inet4);
841 result.addr.inet4.sin_family = AF_INET;
842 result.addr.inet4.sin_port = htons(port);
843 addrTarget = &result.addr.inet4.sin_addr;
844 }
845
846 if (addrPart.size() < INET6_ADDRSTRLEN - 1) {
847 // addrPart is not necessarily NUL-terminated so we have to make a copy. :(
848 char buffer[INET6_ADDRSTRLEN];
849 memcpy(buffer, addrPart.begin(), addrPart.size());
850 buffer[addrPart.size()] = '\0';
851
852 // OK, parse it!
853 switch (inet_pton(af, buffer, addrTarget)) {
854 case 1: {
855 // success.
856 if (!result.parseAllowedBy(filter)) {
857 KJ_FAIL_REQUIRE("address family blocked by restrictPeers()");
858 return Array<SocketAddress>();
859 }
860
861 auto array = kj::heapArrayBuilder<SocketAddress>(1);
862 array.add(result);
863 return array.finish();
864 }
865 case 0:
866 // It's apparently not a simple address... fall back to DNS.
867 break;
868 default:
869 KJ_FAIL_SYSCALL("inet_pton", errno, af, addrPart);
870 }
871 }
872
873 return lookupHost(lowLevel, kj::heapString(addrPart), nullptr, port, filter);
874 }
875
getLocalAddress(int sockfd)876 static SocketAddress getLocalAddress(int sockfd) {
877 SocketAddress result;
878 result.addrlen = sizeof(addr);
879 KJ_SYSCALL(getsockname(sockfd, &result.addr.generic, &result.addrlen));
880 return result;
881 }
882
allowedBy(LowLevelAsyncIoProvider::NetworkFilter & filter)883 bool allowedBy(LowLevelAsyncIoProvider::NetworkFilter& filter) {
884 return filter.shouldAllow(&addr.generic, addrlen);
885 }
886
parseAllowedBy(_::NetworkFilter & filter)887 bool parseAllowedBy(_::NetworkFilter& filter) {
888 return filter.shouldAllowParse(&addr.generic, addrlen);
889 }
890
891 private:
SocketAddress()892 SocketAddress() {
893 // We need to memset the whole object 0 otherwise Valgrind gets unhappy when we write it to a
894 // pipe, due to the padding bytes being uninitialized.
895 memset(this, 0, sizeof(*this));
896 }
897
898 socklen_t addrlen;
899 bool wildcard = false;
900 union {
901 struct sockaddr generic;
902 struct sockaddr_in inet4;
903 struct sockaddr_in6 inet6;
904 struct sockaddr_un unixDomain;
905 struct sockaddr_storage storage;
906 } addr;
907
908 struct LookupParams;
909 class LookupReader;
910 };
911
912 class SocketAddress::LookupReader {
913 // Reads SocketAddresses off of a pipe coming from another thread that is performing
914 // getaddrinfo.
915
916 public:
LookupReader(kj::Own<Thread> && thread,kj::Own<AsyncInputStream> && input,_::NetworkFilter & filter)917 LookupReader(kj::Own<Thread>&& thread, kj::Own<AsyncInputStream>&& input,
918 _::NetworkFilter& filter)
919 : thread(kj::mv(thread)), input(kj::mv(input)), filter(filter) {}
920
~LookupReader()921 ~LookupReader() {
922 if (thread) thread->detach();
923 }
924
read()925 Promise<Array<SocketAddress>> read() {
926 return input->tryRead(¤t, sizeof(current), sizeof(current)).then(
927 [this](size_t n) -> Promise<Array<SocketAddress>> {
928 if (n < sizeof(current)) {
929 thread = nullptr;
930 // getaddrinfo()'s docs seem to say it will never return an empty list, but let's check
931 // anyway.
932 KJ_REQUIRE(addresses.size() > 0, "DNS lookup returned no permitted addresses.") { break; }
933 return addresses.releaseAsArray();
934 } else {
935 // getaddrinfo() can return multiple copies of the same address for several reasons.
936 // A major one is that we don't give it a socket type (SOCK_STREAM vs. SOCK_DGRAM), so
937 // it may return two copies of the same address, one for each type, unless it explicitly
938 // knows that the service name given is specific to one type. But we can't tell it a type,
939 // because we don't actually know which one the user wants, and if we specify SOCK_STREAM
940 // while the user specified a UDP service name then they'll get a resolution error which
941 // is lame. (At least, I think that's how it works.)
942 //
943 // So we instead resort to de-duping results.
944 if (alreadySeen.insert(current).second) {
945 if (current.parseAllowedBy(filter)) {
946 addresses.add(current);
947 }
948 }
949 return read();
950 }
951 });
952 }
953
954 private:
955 kj::Own<Thread> thread;
956 kj::Own<AsyncInputStream> input;
957 _::NetworkFilter& filter;
958 SocketAddress current;
959 kj::Vector<SocketAddress> addresses;
960 std::set<SocketAddress> alreadySeen;
961 };
962
963 struct SocketAddress::LookupParams {
964 kj::String host;
965 kj::String service;
966 };
967
lookupHost(LowLevelAsyncIoProvider & lowLevel,kj::String host,kj::String service,uint portHint,_::NetworkFilter & filter)968 Promise<Array<SocketAddress>> SocketAddress::lookupHost(
969 LowLevelAsyncIoProvider& lowLevel, kj::String host, kj::String service, uint portHint,
970 _::NetworkFilter& filter) {
971 // This shitty function spawns a thread to run getaddrinfo(). Unfortunately, getaddrinfo() is
972 // the only cross-platform DNS API and it is blocking.
973 //
974 // TODO(perf): Use a thread pool? Maybe kj::Thread should use a thread pool automatically?
975 // Maybe use the various platform-specific asynchronous DNS libraries? Please do not implement
976 // a custom DNS resolver...
977
978 int fds[2];
979 #if __linux__ && !__BIONIC__
980 KJ_SYSCALL(pipe2(fds, O_NONBLOCK | O_CLOEXEC));
981 #else
982 KJ_SYSCALL(pipe(fds));
983 #endif
984
985 auto input = lowLevel.wrapInputFd(fds[0], NEW_FD_FLAGS);
986
987 int outFd = fds[1];
988
989 LookupParams params = { kj::mv(host), kj::mv(service) };
990
991 auto thread = heap<Thread>(kj::mvCapture(params, [outFd,portHint](LookupParams&& params) {
992 FdOutputStream output((AutoCloseFd(outFd)));
993
994 struct addrinfo* list;
995 int status = getaddrinfo(
996 params.host == "*" ? nullptr : params.host.cStr(),
997 params.service == nullptr ? nullptr : params.service.cStr(),
998 nullptr, &list);
999 if (status == 0) {
1000 KJ_DEFER(freeaddrinfo(list));
1001
1002 struct addrinfo* cur = list;
1003 while (cur != nullptr) {
1004 if (params.service == nullptr) {
1005 switch (cur->ai_addr->sa_family) {
1006 case AF_INET:
1007 ((struct sockaddr_in*)cur->ai_addr)->sin_port = htons(portHint);
1008 break;
1009 case AF_INET6:
1010 ((struct sockaddr_in6*)cur->ai_addr)->sin6_port = htons(portHint);
1011 break;
1012 default:
1013 break;
1014 }
1015 }
1016
1017 SocketAddress addr;
1018 if (params.host == "*") {
1019 // Set up a wildcard SocketAddress. Only use the port number returned by getaddrinfo().
1020 addr.wildcard = true;
1021 addr.addrlen = sizeof(addr.addr.inet6);
1022 addr.addr.inet6.sin6_family = AF_INET6;
1023 switch (cur->ai_addr->sa_family) {
1024 case AF_INET:
1025 addr.addr.inet6.sin6_port = ((struct sockaddr_in*)cur->ai_addr)->sin_port;
1026 break;
1027 case AF_INET6:
1028 addr.addr.inet6.sin6_port = ((struct sockaddr_in6*)cur->ai_addr)->sin6_port;
1029 break;
1030 default:
1031 addr.addr.inet6.sin6_port = portHint;
1032 break;
1033 }
1034 } else {
1035 addr.addrlen = cur->ai_addrlen;
1036 memcpy(&addr.addr.generic, cur->ai_addr, cur->ai_addrlen);
1037 }
1038 KJ_ASSERT_CAN_MEMCPY(SocketAddress);
1039 output.write(&addr, sizeof(addr));
1040 cur = cur->ai_next;
1041 }
1042 } else if (status == EAI_SYSTEM) {
1043 KJ_FAIL_SYSCALL("getaddrinfo", errno, params.host, params.service) {
1044 return;
1045 }
1046 } else {
1047 KJ_FAIL_REQUIRE("DNS lookup failed.",
1048 params.host, params.service, gai_strerror(status)) {
1049 return;
1050 }
1051 }
1052 }));
1053
1054 auto reader = heap<LookupReader>(kj::mv(thread), kj::mv(input), filter);
1055 return reader->read().attach(kj::mv(reader));
1056 }
1057
1058 // =======================================================================================
1059
1060 class FdConnectionReceiver final: public ConnectionReceiver, public OwnedFileDescriptor {
1061 public:
FdConnectionReceiver(UnixEventPort & eventPort,int fd,LowLevelAsyncIoProvider::NetworkFilter & filter,uint flags)1062 FdConnectionReceiver(UnixEventPort& eventPort, int fd,
1063 LowLevelAsyncIoProvider::NetworkFilter& filter, uint flags)
1064 : OwnedFileDescriptor(fd, flags), eventPort(eventPort), filter(filter),
1065 observer(eventPort, fd, UnixEventPort::FdObserver::OBSERVE_READ) {}
1066
accept()1067 Promise<Own<AsyncIoStream>> accept() override {
1068 int newFd;
1069
1070 struct sockaddr_storage addr;
1071 socklen_t addrlen = sizeof(addr);
1072
1073 retry:
1074 #if __linux__ && !__BIONIC__
1075 newFd = ::accept4(fd, reinterpret_cast<struct sockaddr*>(&addr), &addrlen,
1076 SOCK_NONBLOCK | SOCK_CLOEXEC);
1077 #else
1078 newFd = ::accept(fd, reinterpret_cast<struct sockaddr*>(&addr), &addrlen);
1079 #endif
1080
1081 if (newFd >= 0) {
1082 if (!filter.shouldAllow(reinterpret_cast<struct sockaddr*>(&addr), addrlen)) {
1083 // Drop disallowed address.
1084 close(newFd);
1085 return accept();
1086 } else {
1087 return Own<AsyncIoStream>(heap<AsyncStreamFd>(eventPort, newFd, NEW_FD_FLAGS));
1088 }
1089 } else {
1090 int error = errno;
1091
1092 switch (error) {
1093 case EAGAIN:
1094 #if EAGAIN != EWOULDBLOCK
1095 case EWOULDBLOCK:
1096 #endif
1097 // Not ready yet.
1098 return observer.whenBecomesReadable().then([this]() {
1099 return accept();
1100 });
1101
1102 case EINTR:
1103 case ENETDOWN:
1104 #ifdef EPROTO
1105 // EPROTO is not defined on OpenBSD.
1106 case EPROTO:
1107 #endif
1108 case EHOSTDOWN:
1109 case EHOSTUNREACH:
1110 case ENETUNREACH:
1111 case ECONNABORTED:
1112 case ETIMEDOUT:
1113 // According to the Linux man page, accept() may report an error if the accepted
1114 // connection is already broken. In this case, we really ought to just ignore it and
1115 // keep waiting. But it's hard to say exactly what errors are such network errors and
1116 // which ones are permanent errors. We've made a guess here.
1117 goto retry;
1118
1119 default:
1120 KJ_FAIL_SYSCALL("accept", error);
1121 }
1122
1123 }
1124 }
1125
getPort()1126 uint getPort() override {
1127 return SocketAddress::getLocalAddress(fd).getPort();
1128 }
1129
getsockopt(int level,int option,void * value,uint * length)1130 void getsockopt(int level, int option, void* value, uint* length) override {
1131 socklen_t socklen = *length;
1132 KJ_SYSCALL(::getsockopt(fd, level, option, value, &socklen));
1133 *length = socklen;
1134 }
setsockopt(int level,int option,const void * value,uint length)1135 void setsockopt(int level, int option, const void* value, uint length) override {
1136 KJ_SYSCALL(::setsockopt(fd, level, option, value, length));
1137 }
getsockname(struct sockaddr * addr,uint * length)1138 void getsockname(struct sockaddr* addr, uint* length) override {
1139 socklen_t socklen = *length;
1140 KJ_SYSCALL(::getsockname(fd, addr, &socklen));
1141 *length = socklen;
1142 }
1143
1144 public:
1145 UnixEventPort& eventPort;
1146 LowLevelAsyncIoProvider::NetworkFilter& filter;
1147 UnixEventPort::FdObserver observer;
1148 };
1149
1150 class DatagramPortImpl final: public DatagramPort, public OwnedFileDescriptor {
1151 public:
DatagramPortImpl(LowLevelAsyncIoProvider & lowLevel,UnixEventPort & eventPort,int fd,LowLevelAsyncIoProvider::NetworkFilter & filter,uint flags)1152 DatagramPortImpl(LowLevelAsyncIoProvider& lowLevel, UnixEventPort& eventPort, int fd,
1153 LowLevelAsyncIoProvider::NetworkFilter& filter, uint flags)
1154 : OwnedFileDescriptor(fd, flags), lowLevel(lowLevel), eventPort(eventPort), filter(filter),
1155 observer(eventPort, fd, UnixEventPort::FdObserver::OBSERVE_READ |
1156 UnixEventPort::FdObserver::OBSERVE_WRITE) {}
1157
1158 Promise<size_t> send(const void* buffer, size_t size, NetworkAddress& destination) override;
1159 Promise<size_t> send(
1160 ArrayPtr<const ArrayPtr<const byte>> pieces, NetworkAddress& destination) override;
1161
1162 class ReceiverImpl;
1163
1164 Own<DatagramReceiver> makeReceiver(DatagramReceiver::Capacity capacity) override;
1165
getPort()1166 uint getPort() override {
1167 return SocketAddress::getLocalAddress(fd).getPort();
1168 }
1169
getsockopt(int level,int option,void * value,uint * length)1170 void getsockopt(int level, int option, void* value, uint* length) override {
1171 socklen_t socklen = *length;
1172 KJ_SYSCALL(::getsockopt(fd, level, option, value, &socklen));
1173 *length = socklen;
1174 }
setsockopt(int level,int option,const void * value,uint length)1175 void setsockopt(int level, int option, const void* value, uint length) override {
1176 KJ_SYSCALL(::setsockopt(fd, level, option, value, length));
1177 }
1178
1179 public:
1180 LowLevelAsyncIoProvider& lowLevel;
1181 UnixEventPort& eventPort;
1182 LowLevelAsyncIoProvider::NetworkFilter& filter;
1183 UnixEventPort::FdObserver observer;
1184 };
1185
1186 class LowLevelAsyncIoProviderImpl final: public LowLevelAsyncIoProvider {
1187 public:
LowLevelAsyncIoProviderImpl()1188 LowLevelAsyncIoProviderImpl()
1189 : eventLoop(eventPort), waitScope(eventLoop) {}
1190
getWaitScope()1191 inline WaitScope& getWaitScope() { return waitScope; }
1192
wrapInputFd(int fd,uint flags=0)1193 Own<AsyncInputStream> wrapInputFd(int fd, uint flags = 0) override {
1194 return heap<AsyncStreamFd>(eventPort, fd, flags);
1195 }
wrapOutputFd(int fd,uint flags=0)1196 Own<AsyncOutputStream> wrapOutputFd(int fd, uint flags = 0) override {
1197 return heap<AsyncStreamFd>(eventPort, fd, flags);
1198 }
wrapSocketFd(int fd,uint flags=0)1199 Own<AsyncIoStream> wrapSocketFd(int fd, uint flags = 0) override {
1200 return heap<AsyncStreamFd>(eventPort, fd, flags);
1201 }
wrapUnixSocketFd(Fd fd,uint flags=0)1202 Own<AsyncCapabilityStream> wrapUnixSocketFd(Fd fd, uint flags = 0) override {
1203 return heap<AsyncStreamFd>(eventPort, fd, flags);
1204 }
wrapConnectingSocketFd(int fd,const struct sockaddr * addr,uint addrlen,uint flags=0)1205 Promise<Own<AsyncIoStream>> wrapConnectingSocketFd(
1206 int fd, const struct sockaddr* addr, uint addrlen, uint flags = 0) override {
1207 // It's important that we construct the AsyncStreamFd first, so that `flags` are honored,
1208 // especially setting nonblocking mode and taking ownership.
1209 auto result = heap<AsyncStreamFd>(eventPort, fd, flags);
1210
1211 // Unfortunately connect() doesn't fit the mold of KJ_NONBLOCKING_SYSCALL, since it indicates
1212 // non-blocking using EINPROGRESS.
1213 for (;;) {
1214 if (::connect(fd, addr, addrlen) < 0) {
1215 int error = errno;
1216 if (error == EINPROGRESS) {
1217 // Fine.
1218 break;
1219 } else if (error != EINTR) {
1220 KJ_FAIL_SYSCALL("connect()", error) { break; }
1221 return Own<AsyncIoStream>();
1222 }
1223 } else {
1224 // no error
1225 break;
1226 }
1227 }
1228
1229 auto connected = result->waitConnected();
1230 return connected.then(kj::mvCapture(result, [fd](Own<AsyncIoStream>&& stream) {
1231 int err;
1232 socklen_t errlen = sizeof(err);
1233 KJ_SYSCALL(getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen));
1234 if (err != 0) {
1235 KJ_FAIL_SYSCALL("connect()", err) { break; }
1236 }
1237 return kj::mv(stream);
1238 }));
1239 }
wrapListenSocketFd(int fd,NetworkFilter & filter,uint flags=0)1240 Own<ConnectionReceiver> wrapListenSocketFd(
1241 int fd, NetworkFilter& filter, uint flags = 0) override {
1242 return heap<FdConnectionReceiver>(eventPort, fd, filter, flags);
1243 }
wrapDatagramSocketFd(int fd,NetworkFilter & filter,uint flags=0)1244 Own<DatagramPort> wrapDatagramSocketFd(
1245 int fd, NetworkFilter& filter, uint flags = 0) override {
1246 return heap<DatagramPortImpl>(*this, eventPort, fd, filter, flags);
1247 }
1248
getTimer()1249 Timer& getTimer() override { return eventPort.getTimer(); }
1250
getEventPort()1251 UnixEventPort& getEventPort() { return eventPort; }
1252
1253 private:
1254 UnixEventPort eventPort;
1255 EventLoop eventLoop;
1256 WaitScope waitScope;
1257 };
1258
1259 // =======================================================================================
1260
1261 class NetworkAddressImpl final: public NetworkAddress {
1262 public:
NetworkAddressImpl(LowLevelAsyncIoProvider & lowLevel,LowLevelAsyncIoProvider::NetworkFilter & filter,Array<SocketAddress> addrs)1263 NetworkAddressImpl(LowLevelAsyncIoProvider& lowLevel,
1264 LowLevelAsyncIoProvider::NetworkFilter& filter,
1265 Array<SocketAddress> addrs)
1266 : lowLevel(lowLevel), filter(filter), addrs(kj::mv(addrs)) {}
1267
connect()1268 Promise<Own<AsyncIoStream>> connect() override {
1269 auto addrsCopy = heapArray(addrs.asPtr());
1270 auto promise = connectImpl(lowLevel, filter, addrsCopy);
1271 return promise.attach(kj::mv(addrsCopy));
1272 }
1273
listen()1274 Own<ConnectionReceiver> listen() override {
1275 if (addrs.size() > 1) {
1276 KJ_LOG(WARNING, "Bind address resolved to multiple addresses. Only the first address will "
1277 "be used. If this is incorrect, specify the address numerically. This may be fixed "
1278 "in the future.", addrs[0].toString());
1279 }
1280
1281 int fd = addrs[0].socket(SOCK_STREAM);
1282
1283 {
1284 KJ_ON_SCOPE_FAILURE(close(fd));
1285
1286 // We always enable SO_REUSEADDR because having to take your server down for five minutes
1287 // before it can restart really sucks.
1288 int optval = 1;
1289 KJ_SYSCALL(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)));
1290
1291 addrs[0].bind(fd);
1292
1293 // TODO(someday): Let queue size be specified explicitly in string addresses.
1294 KJ_SYSCALL(::listen(fd, SOMAXCONN));
1295 }
1296
1297 return lowLevel.wrapListenSocketFd(fd, filter, NEW_FD_FLAGS);
1298 }
1299
bindDatagramPort()1300 Own<DatagramPort> bindDatagramPort() override {
1301 if (addrs.size() > 1) {
1302 KJ_LOG(WARNING, "Bind address resolved to multiple addresses. Only the first address will "
1303 "be used. If this is incorrect, specify the address numerically. This may be fixed "
1304 "in the future.", addrs[0].toString());
1305 }
1306
1307 int fd = addrs[0].socket(SOCK_DGRAM);
1308
1309 {
1310 KJ_ON_SCOPE_FAILURE(close(fd));
1311
1312 // We always enable SO_REUSEADDR because having to take your server down for five minutes
1313 // before it can restart really sucks.
1314 int optval = 1;
1315 KJ_SYSCALL(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)));
1316
1317 addrs[0].bind(fd);
1318 }
1319
1320 return lowLevel.wrapDatagramSocketFd(fd, filter, NEW_FD_FLAGS);
1321 }
1322
clone()1323 Own<NetworkAddress> clone() override {
1324 return kj::heap<NetworkAddressImpl>(lowLevel, filter, kj::heapArray(addrs.asPtr()));
1325 }
1326
toString()1327 String toString() override {
1328 return strArray(KJ_MAP(addr, addrs) { return addr.toString(); }, ",");
1329 }
1330
chooseOneAddress()1331 const SocketAddress& chooseOneAddress() {
1332 KJ_REQUIRE(addrs.size() > 0, "No addresses available.");
1333 return addrs[counter++ % addrs.size()];
1334 }
1335
1336 private:
1337 LowLevelAsyncIoProvider& lowLevel;
1338 LowLevelAsyncIoProvider::NetworkFilter& filter;
1339 Array<SocketAddress> addrs;
1340 uint counter = 0;
1341
connectImpl(LowLevelAsyncIoProvider & lowLevel,LowLevelAsyncIoProvider::NetworkFilter & filter,ArrayPtr<SocketAddress> addrs)1342 static Promise<Own<AsyncIoStream>> connectImpl(
1343 LowLevelAsyncIoProvider& lowLevel,
1344 LowLevelAsyncIoProvider::NetworkFilter& filter,
1345 ArrayPtr<SocketAddress> addrs) {
1346 KJ_ASSERT(addrs.size() > 0);
1347
1348 return kj::evalNow([&]() -> Promise<Own<AsyncIoStream>> {
1349 if (!addrs[0].allowedBy(filter)) {
1350 return KJ_EXCEPTION(FAILED, "connect() blocked by restrictPeers()");
1351 } else {
1352 int fd = addrs[0].socket(SOCK_STREAM);
1353 return lowLevel.wrapConnectingSocketFd(
1354 fd, addrs[0].getRaw(), addrs[0].getRawSize(), NEW_FD_FLAGS);
1355 }
1356 }).then([](Own<AsyncIoStream>&& stream) -> Promise<Own<AsyncIoStream>> {
1357 // Success, pass along.
1358 return kj::mv(stream);
1359 }, [&lowLevel,&filter,addrs](Exception&& exception) mutable -> Promise<Own<AsyncIoStream>> {
1360 // Connect failed.
1361 if (addrs.size() > 1) {
1362 // Try the next address instead.
1363 return connectImpl(lowLevel, filter, addrs.slice(1, addrs.size()));
1364 } else {
1365 // No more addresses to try, so propagate the exception.
1366 return kj::mv(exception);
1367 }
1368 });
1369 }
1370 };
1371
1372 class SocketNetwork final: public Network {
1373 public:
SocketNetwork(LowLevelAsyncIoProvider & lowLevel)1374 explicit SocketNetwork(LowLevelAsyncIoProvider& lowLevel): lowLevel(lowLevel) {}
SocketNetwork(SocketNetwork & parent,kj::ArrayPtr<const kj::StringPtr> allow,kj::ArrayPtr<const kj::StringPtr> deny)1375 explicit SocketNetwork(SocketNetwork& parent,
1376 kj::ArrayPtr<const kj::StringPtr> allow,
1377 kj::ArrayPtr<const kj::StringPtr> deny)
1378 : lowLevel(parent.lowLevel), filter(allow, deny, parent.filter) {}
1379
parseAddress(StringPtr addr,uint portHint=0)1380 Promise<Own<NetworkAddress>> parseAddress(StringPtr addr, uint portHint = 0) override {
1381 return evalLater(mvCapture(heapString(addr), [this,portHint](String&& addr) {
1382 return SocketAddress::parse(lowLevel, addr, portHint, filter);
1383 })).then([this](Array<SocketAddress> addresses) -> Own<NetworkAddress> {
1384 return heap<NetworkAddressImpl>(lowLevel, filter, kj::mv(addresses));
1385 });
1386 }
1387
getSockaddr(const void * sockaddr,uint len)1388 Own<NetworkAddress> getSockaddr(const void* sockaddr, uint len) override {
1389 auto array = kj::heapArrayBuilder<SocketAddress>(1);
1390 array.add(SocketAddress(sockaddr, len));
1391 KJ_REQUIRE(array[0].allowedBy(filter), "address blocked by restrictPeers()") { break; }
1392 return Own<NetworkAddress>(heap<NetworkAddressImpl>(lowLevel, filter, array.finish()));
1393 }
1394
restrictPeers(kj::ArrayPtr<const kj::StringPtr> allow,kj::ArrayPtr<const kj::StringPtr> deny=nullptr)1395 Own<Network> restrictPeers(
1396 kj::ArrayPtr<const kj::StringPtr> allow,
1397 kj::ArrayPtr<const kj::StringPtr> deny = nullptr) override {
1398 return heap<SocketNetwork>(*this, allow, deny);
1399 }
1400
1401 private:
1402 LowLevelAsyncIoProvider& lowLevel;
1403 _::NetworkFilter filter;
1404 };
1405
1406 // =======================================================================================
1407
send(const void * buffer,size_t size,NetworkAddress & destination)1408 Promise<size_t> DatagramPortImpl::send(
1409 const void* buffer, size_t size, NetworkAddress& destination) {
1410 auto& addr = downcast<NetworkAddressImpl>(destination).chooseOneAddress();
1411
1412 ssize_t n;
1413 KJ_NONBLOCKING_SYSCALL(n = sendto(fd, buffer, size, 0, addr.getRaw(), addr.getRawSize()));
1414 if (n < 0) {
1415 // Write buffer full.
1416 return observer.whenBecomesWritable().then([this, buffer, size, &destination]() {
1417 return send(buffer, size, destination);
1418 });
1419 } else {
1420 // If less than the whole message was sent, then it got truncated, and there's nothing we can
1421 // do about it.
1422 return n;
1423 }
1424 }
1425
send(ArrayPtr<const ArrayPtr<const byte>> pieces,NetworkAddress & destination)1426 Promise<size_t> DatagramPortImpl::send(
1427 ArrayPtr<const ArrayPtr<const byte>> pieces, NetworkAddress& destination) {
1428 struct msghdr msg;
1429 memset(&msg, 0, sizeof(msg));
1430
1431 auto& addr = downcast<NetworkAddressImpl>(destination).chooseOneAddress();
1432 msg.msg_name = const_cast<void*>(implicitCast<const void*>(addr.getRaw()));
1433 msg.msg_namelen = addr.getRawSize();
1434
1435 const size_t iovmax = kj::miniposix::iovMax(pieces.size());
1436 KJ_STACK_ARRAY(struct iovec, iov, kj::min(pieces.size(), iovmax), 16, 64);
1437
1438 for (size_t i: kj::indices(pieces)) {
1439 iov[i].iov_base = const_cast<void*>(implicitCast<const void*>(pieces[i].begin()));
1440 iov[i].iov_len = pieces[i].size();
1441 }
1442
1443 Array<byte> extra;
1444 if (pieces.size() > iovmax) {
1445 // Too many pieces, but we can't use multiple syscalls because they'd send separate
1446 // datagrams. We'll have to copy the trailing pieces into a temporary array.
1447 //
1448 // TODO(perf): On Linux we could use multiple syscalls via MSG_MORE.
1449 size_t extraSize = 0;
1450 for (size_t i = iovmax - 1; i < pieces.size(); i++) {
1451 extraSize += pieces[i].size();
1452 }
1453 extra = kj::heapArray<byte>(extraSize);
1454 extraSize = 0;
1455 for (size_t i = iovmax - 1; i < pieces.size(); i++) {
1456 memcpy(extra.begin() + extraSize, pieces[i].begin(), pieces[i].size());
1457 extraSize += pieces[i].size();
1458 }
1459 iov[iovmax - 1].iov_base = extra.begin();
1460 iov[iovmax - 1].iov_len = extra.size();
1461 }
1462
1463 msg.msg_iov = iov.begin();
1464 msg.msg_iovlen = iov.size();
1465
1466 ssize_t n;
1467 KJ_NONBLOCKING_SYSCALL(n = sendmsg(fd, &msg, 0));
1468 if (n < 0) {
1469 // Write buffer full.
1470 return observer.whenBecomesWritable().then([this, pieces, &destination]() {
1471 return send(pieces, destination);
1472 });
1473 } else {
1474 // If less than the whole message was sent, then it was truncated, and there's nothing we can
1475 // do about that now.
1476 return n;
1477 }
1478 }
1479
1480 class DatagramPortImpl::ReceiverImpl final: public DatagramReceiver {
1481 public:
ReceiverImpl(DatagramPortImpl & port,Capacity capacity)1482 explicit ReceiverImpl(DatagramPortImpl& port, Capacity capacity)
1483 : port(port),
1484 contentBuffer(heapArray<byte>(capacity.content)),
1485 ancillaryBuffer(capacity.ancillary > 0 ? heapArray<byte>(capacity.ancillary)
1486 : Array<byte>(nullptr)) {}
1487
receive()1488 Promise<void> receive() override {
1489 struct msghdr msg;
1490 memset(&msg, 0, sizeof(msg));
1491
1492 struct sockaddr_storage addr;
1493 memset(&addr, 0, sizeof(addr));
1494 msg.msg_name = &addr;
1495 msg.msg_namelen = sizeof(addr);
1496
1497 struct iovec iov;
1498 iov.iov_base = contentBuffer.begin();
1499 iov.iov_len = contentBuffer.size();
1500 msg.msg_iov = &iov;
1501 msg.msg_iovlen = 1;
1502 msg.msg_control = ancillaryBuffer.begin();
1503 msg.msg_controllen = ancillaryBuffer.size();
1504
1505 ssize_t n;
1506 KJ_NONBLOCKING_SYSCALL(n = recvmsg(port.fd, &msg, 0));
1507
1508 if (n < 0) {
1509 // No data available. Wait.
1510 return port.observer.whenBecomesReadable().then([this]() {
1511 return receive();
1512 });
1513 } else {
1514 if (!port.filter.shouldAllow(reinterpret_cast<const struct sockaddr*>(msg.msg_name),
1515 msg.msg_namelen)) {
1516 // Ignore message from disallowed source.
1517 return receive();
1518 }
1519
1520 receivedSize = n;
1521 contentTruncated = msg.msg_flags & MSG_TRUNC;
1522
1523 source.emplace(port.lowLevel, port.filter, msg.msg_name, msg.msg_namelen);
1524
1525 ancillaryList.resize(0);
1526 ancillaryTruncated = msg.msg_flags & MSG_CTRUNC;
1527
1528 for (struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg); cmsg != nullptr;
1529 cmsg = CMSG_NXTHDR(&msg, cmsg)) {
1530 // On some platforms (OSX), a cmsghdr's length may cross the end of the ancillary buffer
1531 // when truncated. On other platforms (Linux) the length in cmsghdr will itself be
1532 // truncated to fit within the buffer.
1533
1534 #if __APPLE__
1535 // On MacOS, `CMSG_SPACE(0)` triggers a bogus warning.
1536 #pragma GCC diagnostic ignored "-Wnull-pointer-arithmetic"
1537 #endif
1538 const byte* pos = reinterpret_cast<const byte*>(cmsg);
1539 size_t available = ancillaryBuffer.end() - pos;
1540 if (available < CMSG_SPACE(0)) {
1541 // The buffer ends in the middle of the header. We can't use this message.
1542 // (On Linux, this never happens, because the message is not included if there isn't
1543 // space for a header. I'm not sure how other systems behave, though, so let's be safe.)
1544 break;
1545 }
1546
1547 // OK, we know the cmsghdr is valid, at least.
1548
1549 // Find the start of the message payload.
1550 const byte* begin = (const byte *)CMSG_DATA(cmsg);
1551
1552 // Cap the message length to the available space.
1553 const byte* end = pos + kj::min(available, cmsg->cmsg_len);
1554
1555 ancillaryList.add(AncillaryMessage(
1556 cmsg->cmsg_level, cmsg->cmsg_type, arrayPtr(begin, end)));
1557 }
1558
1559 return READY_NOW;
1560 }
1561 }
1562
getContent()1563 MaybeTruncated<ArrayPtr<const byte>> getContent() override {
1564 return { contentBuffer.slice(0, receivedSize), contentTruncated };
1565 }
1566
getAncillary()1567 MaybeTruncated<ArrayPtr<const AncillaryMessage>> getAncillary() override {
1568 return { ancillaryList.asPtr(), ancillaryTruncated };
1569 }
1570
getSource()1571 NetworkAddress& getSource() override {
1572 return KJ_REQUIRE_NONNULL(source, "Haven't sent a message yet.").abstract;
1573 }
1574
1575 private:
1576 DatagramPortImpl& port;
1577 Array<byte> contentBuffer;
1578 Array<byte> ancillaryBuffer;
1579 Vector<AncillaryMessage> ancillaryList;
1580 size_t receivedSize = 0;
1581 bool contentTruncated = false;
1582 bool ancillaryTruncated = false;
1583
1584 struct StoredAddress {
StoredAddresskj::__anone7cf81ca0111::DatagramPortImpl::ReceiverImpl::StoredAddress1585 StoredAddress(LowLevelAsyncIoProvider& lowLevel, LowLevelAsyncIoProvider::NetworkFilter& filter,
1586 const void* sockaddr, uint length)
1587 : raw(sockaddr, length),
1588 abstract(lowLevel, filter, Array<SocketAddress>(&raw, 1, NullArrayDisposer::instance)) {}
1589
1590 SocketAddress raw;
1591 NetworkAddressImpl abstract;
1592 };
1593
1594 kj::Maybe<StoredAddress> source;
1595 };
1596
makeReceiver(DatagramReceiver::Capacity capacity)1597 Own<DatagramReceiver> DatagramPortImpl::makeReceiver(DatagramReceiver::Capacity capacity) {
1598 return kj::heap<ReceiverImpl>(*this, capacity);
1599 }
1600
1601 // =======================================================================================
1602
1603 class AsyncIoProviderImpl final: public AsyncIoProvider {
1604 public:
AsyncIoProviderImpl(LowLevelAsyncIoProvider & lowLevel)1605 AsyncIoProviderImpl(LowLevelAsyncIoProvider& lowLevel)
1606 : lowLevel(lowLevel), network(lowLevel) {}
1607
newOneWayPipe()1608 OneWayPipe newOneWayPipe() override {
1609 int fds[2];
1610 #if __linux__ && !__BIONIC__
1611 KJ_SYSCALL(pipe2(fds, O_NONBLOCK | O_CLOEXEC));
1612 #else
1613 KJ_SYSCALL(pipe(fds));
1614 #endif
1615 return OneWayPipe {
1616 lowLevel.wrapInputFd(fds[0], NEW_FD_FLAGS),
1617 lowLevel.wrapOutputFd(fds[1], NEW_FD_FLAGS)
1618 };
1619 }
1620
newTwoWayPipe()1621 TwoWayPipe newTwoWayPipe() override {
1622 int fds[2];
1623 int type = SOCK_STREAM;
1624 #if __linux__ && !__BIONIC__
1625 type |= SOCK_NONBLOCK | SOCK_CLOEXEC;
1626 #endif
1627 KJ_SYSCALL(socketpair(AF_UNIX, type, 0, fds));
1628 return TwoWayPipe { {
1629 lowLevel.wrapSocketFd(fds[0], NEW_FD_FLAGS),
1630 lowLevel.wrapSocketFd(fds[1], NEW_FD_FLAGS)
1631 } };
1632 }
1633
newCapabilityPipe()1634 CapabilityPipe newCapabilityPipe() override {
1635 int fds[2];
1636 int type = SOCK_STREAM;
1637 #if __linux__ && !__BIONIC__
1638 type |= SOCK_NONBLOCK | SOCK_CLOEXEC;
1639 #endif
1640 KJ_SYSCALL(socketpair(AF_UNIX, type, 0, fds));
1641 return CapabilityPipe { {
1642 lowLevel.wrapUnixSocketFd(fds[0], NEW_FD_FLAGS),
1643 lowLevel.wrapUnixSocketFd(fds[1], NEW_FD_FLAGS)
1644 } };
1645 }
1646
getNetwork()1647 Network& getNetwork() override {
1648 return network;
1649 }
1650
newPipeThread(Function<void (AsyncIoProvider &,AsyncIoStream &,WaitScope &)> startFunc)1651 PipeThread newPipeThread(
1652 Function<void(AsyncIoProvider&, AsyncIoStream&, WaitScope&)> startFunc) override {
1653 int fds[2];
1654 int type = SOCK_STREAM;
1655 #if __linux__ && !__BIONIC__
1656 type |= SOCK_NONBLOCK | SOCK_CLOEXEC;
1657 #endif
1658 KJ_SYSCALL(socketpair(AF_UNIX, type, 0, fds));
1659
1660 int threadFd = fds[1];
1661 KJ_ON_SCOPE_FAILURE(close(threadFd));
1662
1663 auto pipe = lowLevel.wrapSocketFd(fds[0], NEW_FD_FLAGS);
1664
1665 auto thread = heap<Thread>(kj::mvCapture(startFunc,
1666 [threadFd](Function<void(AsyncIoProvider&, AsyncIoStream&, WaitScope&)>&& startFunc) {
1667 LowLevelAsyncIoProviderImpl lowLevel;
1668 auto stream = lowLevel.wrapSocketFd(threadFd, NEW_FD_FLAGS);
1669 AsyncIoProviderImpl ioProvider(lowLevel);
1670 startFunc(ioProvider, *stream, lowLevel.getWaitScope());
1671 }));
1672
1673 return { kj::mv(thread), kj::mv(pipe) };
1674 }
1675
getTimer()1676 Timer& getTimer() override { return lowLevel.getTimer(); }
1677
1678 private:
1679 LowLevelAsyncIoProvider& lowLevel;
1680 SocketNetwork network;
1681 };
1682
1683 } // namespace
1684
newAsyncIoProvider(LowLevelAsyncIoProvider & lowLevel)1685 Own<AsyncIoProvider> newAsyncIoProvider(LowLevelAsyncIoProvider& lowLevel) {
1686 return kj::heap<AsyncIoProviderImpl>(lowLevel);
1687 }
1688
setupAsyncIo()1689 AsyncIoContext setupAsyncIo() {
1690 auto lowLevel = heap<LowLevelAsyncIoProviderImpl>();
1691 auto ioProvider = kj::heap<AsyncIoProviderImpl>(*lowLevel);
1692 auto& waitScope = lowLevel->getWaitScope();
1693 auto& eventPort = lowLevel->getEventPort();
1694 return { kj::mv(lowLevel), kj::mv(ioProvider), waitScope, eventPort };
1695 }
1696
1697 } // namespace kj
1698
1699 #endif // !_WIN32
1700