1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
2 // Licensed under the MIT License:
3 //
4 // Permission is hereby granted, free of charge, to any person obtaining a copy
5 // of this software and associated documentation files (the "Software"), to deal
6 // in the Software without restriction, including without limitation the rights
7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 // copies of the Software, and to permit persons to whom the Software is
9 // furnished to do so, subject to the following conditions:
10 //
11 // The above copyright notice and this permission notice shall be included in
12 // all copies or substantial portions of the Software.
13 //
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20 // THE SOFTWARE.
21 
22 #if !_WIN32
23 
24 #include "async-unix.h"
25 #include "thread.h"
26 #include "debug.h"
27 #include "io.h"
28 #include <unistd.h>
29 #include <fcntl.h>
30 #include <sys/types.h>
31 #include <sys/socket.h>
32 #include <sys/stat.h>
33 #include <netinet/in.h>
34 #include <kj/compat/gtest.h>
35 #include <pthread.h>
36 #include <algorithm>
37 #include <sys/wait.h>
38 #include <sys/time.h>
39 #include <errno.h>
40 #include "mutex.h"
41 
42 #if __BIONIC__
43 // Android's Bionic defines SIGRTMIN but using it in sigaddset() throws EINVAL, which means we
44 // definitely can't actually use RT signals.
45 #undef SIGRTMIN
46 #endif
47 
48 namespace kj {
49 namespace {
50 
delay()51 inline void delay() { usleep(10000); }
52 
53 // On OSX, si_code seems to be zero when SI_USER is expected.
54 #if __linux__ || __CYGWIN__
55 #define EXPECT_SI_CODE EXPECT_EQ
56 #else
57 #define EXPECT_SI_CODE(a,b)
58 #endif
59 
captureSignals()60 void captureSignals() {
61   static bool captured = false;
62   if (!captured) {
63     // We use SIGIO and SIGURG as our test signals because they're two signals that we can be
64     // reasonably confident won't otherwise be delivered to any KJ or Cap'n Proto test.  We can't
65     // use SIGUSR1 because it is reserved by UnixEventPort and SIGUSR2 is used by Valgrind on OSX.
66     UnixEventPort::captureSignal(SIGURG);
67     UnixEventPort::captureSignal(SIGIO);
68 
69 #ifdef SIGRTMIN
70     UnixEventPort::captureSignal(SIGRTMIN);
71 #endif
72 
73     UnixEventPort::captureChildExit();
74 
75     captured = true;
76   }
77 }
78 
TEST(AsyncUnixTest,Signals)79 TEST(AsyncUnixTest, Signals) {
80   captureSignals();
81   UnixEventPort port;
82   EventLoop loop(port);
83   WaitScope waitScope(loop);
84 
85   kill(getpid(), SIGURG);
86 
87   siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
88   EXPECT_EQ(SIGURG, info.si_signo);
89   EXPECT_SI_CODE(SI_USER, info.si_code);
90 }
91 
92 #if defined(SIGRTMIN) && !__BIONIC__ && !(__linux__ && __mips__)
TEST(AsyncUnixTest,SignalWithValue)93 TEST(AsyncUnixTest, SignalWithValue) {
94   // This tests that if we use sigqueue() to attach a value to the signal, that value is received
95   // correctly.  Note that this only works on platforms that support real-time signals -- even
96   // though the signal we're sending is SIGURG, the sigqueue() system call is introduced by RT
97   // signals.  Hence this test won't run on e.g. Mac OSX.
98   //
99   // Also, Android's bionic does not appear to support sigqueue() even though the kernel does.
100   //
101   // Also, this test fails on Linux on mipsel. si_value comes back as zero. No one with a mips
102   // machine wants to debug the problem but they demand a patch fixing it, so we disable the test.
103   // Sad. https://github.com/sandstorm-io/capnproto/issues/204
104 
105   captureSignals();
106   UnixEventPort port;
107   EventLoop loop(port);
108   WaitScope waitScope(loop);
109 
110   union sigval value;
111   memset(&value, 0, sizeof(value));
112   value.sival_int = 123;
113   KJ_SYSCALL_HANDLE_ERRORS(sigqueue(getpid(), SIGURG, value)) {
114     case ENOSYS:
115       // sigqueue() not supported. Maybe running on WSL.
116       KJ_LOG(WARNING, "sigqueue() is not implemented by your system; skipping test");
117       return;
118     default:
119       KJ_FAIL_SYSCALL("sigqueue(getpid(), SIGURG, value)", error);
120   }
121 
122   siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
123   EXPECT_EQ(SIGURG, info.si_signo);
124   EXPECT_SI_CODE(SI_QUEUE, info.si_code);
125   EXPECT_EQ(123, info.si_value.sival_int);
126 }
127 
TEST(AsyncUnixTest,SignalWithPointerValue)128 TEST(AsyncUnixTest, SignalWithPointerValue) {
129   // This tests that if we use sigqueue() to attach a value to the signal, that value is received
130   // correctly.  Note that this only works on platforms that support real-time signals -- even
131   // though the signal we're sending is SIGURG, the sigqueue() system call is introduced by RT
132   // signals.  Hence this test won't run on e.g. Mac OSX.
133   //
134   // Also, Android's bionic does not appear to support sigqueue() even though the kernel does.
135   //
136   // Also, this test fails on Linux on mipsel. si_value comes back as zero. No one with a mips
137   // machine wants to debug the problem but they demand a patch fixing it, so we disable the test.
138   // Sad. https://github.com/sandstorm-io/capnproto/issues/204
139 
140   captureSignals();
141   UnixEventPort port;
142   EventLoop loop(port);
143   WaitScope waitScope(loop);
144 
145   union sigval value;
146   memset(&value, 0, sizeof(value));
147   value.sival_ptr = &port;
148   KJ_SYSCALL_HANDLE_ERRORS(sigqueue(getpid(), SIGURG, value)) {
149     case ENOSYS:
150       // sigqueue() not supported. Maybe running on WSL.
151       KJ_LOG(WARNING, "sigqueue() is not implemented by your system; skipping test");
152       return;
153     default:
154       KJ_FAIL_SYSCALL("sigqueue(getpid(), SIGURG, value)", error);
155   }
156 
157   siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
158   EXPECT_EQ(SIGURG, info.si_signo);
159   EXPECT_SI_CODE(SI_QUEUE, info.si_code);
160   EXPECT_EQ(&port, info.si_value.sival_ptr);
161 }
162 #endif
163 
TEST(AsyncUnixTest,SignalsMultiListen)164 TEST(AsyncUnixTest, SignalsMultiListen) {
165   captureSignals();
166   UnixEventPort port;
167   EventLoop loop(port);
168   WaitScope waitScope(loop);
169 
170   port.onSignal(SIGIO).then([](siginfo_t&&) {
171     KJ_FAIL_EXPECT("Received wrong signal.");
172   }).detach([](kj::Exception&& exception) {
173     KJ_FAIL_EXPECT(exception);
174   });
175 
176   kill(getpid(), SIGURG);
177 
178   siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
179   EXPECT_EQ(SIGURG, info.si_signo);
180   EXPECT_SI_CODE(SI_USER, info.si_code);
181 }
182 
183 #if !__CYGWIN32__
184 // Cygwin32 (but not Cygwin64) appears not to deliver SIGURG in the following test (but it does
185 // deliver SIGIO, if you reverse the order of the waits).  Since this doesn't occur on any other
186 // platform I'm assuming it's a Cygwin bug.
187 
TEST(AsyncUnixTest,SignalsMultiReceive)188 TEST(AsyncUnixTest, SignalsMultiReceive) {
189   captureSignals();
190   UnixEventPort port;
191   EventLoop loop(port);
192   WaitScope waitScope(loop);
193 
194   kill(getpid(), SIGURG);
195   kill(getpid(), SIGIO);
196 
197   siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
198   EXPECT_EQ(SIGURG, info.si_signo);
199   EXPECT_SI_CODE(SI_USER, info.si_code);
200 
201   info = port.onSignal(SIGIO).wait(waitScope);
202   EXPECT_EQ(SIGIO, info.si_signo);
203   EXPECT_SI_CODE(SI_USER, info.si_code);
204 }
205 
206 #endif  // !__CYGWIN32__
207 
TEST(AsyncUnixTest,SignalsAsync)208 TEST(AsyncUnixTest, SignalsAsync) {
209   captureSignals();
210   UnixEventPort port;
211   EventLoop loop(port);
212   WaitScope waitScope(loop);
213 
214   // Arrange for a signal to be sent from another thread.
215   pthread_t mainThread = pthread_self();
216   Thread thread([&]() {
217     delay();
218     pthread_kill(mainThread, SIGURG);
219   });
220 
221   siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
222   EXPECT_EQ(SIGURG, info.si_signo);
223 #if __linux__
224   EXPECT_SI_CODE(SI_TKILL, info.si_code);
225 #endif
226 }
227 
228 #if !__CYGWIN32__
229 // Cygwin32 (but not Cygwin64) appears not to deliver SIGURG in the following test (but it does
230 // deliver SIGIO, if you reverse the order of the waits).  Since this doesn't occur on any other
231 // platform I'm assuming it's a Cygwin bug.
232 
TEST(AsyncUnixTest,SignalsNoWait)233 TEST(AsyncUnixTest, SignalsNoWait) {
234   // Verify that UnixEventPort::poll() correctly receives pending signals.
235 
236   captureSignals();
237   UnixEventPort port;
238   EventLoop loop(port);
239   WaitScope waitScope(loop);
240 
241   bool receivedSigurg = false;
242   bool receivedSigio = false;
243   port.onSignal(SIGURG).then([&](siginfo_t&& info) {
244     receivedSigurg = true;
245     EXPECT_EQ(SIGURG, info.si_signo);
246     EXPECT_SI_CODE(SI_USER, info.si_code);
247   }).detach([](Exception&& e) { KJ_FAIL_EXPECT(e); });
248   port.onSignal(SIGIO).then([&](siginfo_t&& info) {
249     receivedSigio = true;
250     EXPECT_EQ(SIGIO, info.si_signo);
251     EXPECT_SI_CODE(SI_USER, info.si_code);
252   }).detach([](Exception&& e) { KJ_FAIL_EXPECT(e); });
253 
254   kill(getpid(), SIGURG);
255   kill(getpid(), SIGIO);
256 
257   EXPECT_FALSE(receivedSigurg);
258   EXPECT_FALSE(receivedSigio);
259 
260   loop.run();
261 
262   EXPECT_FALSE(receivedSigurg);
263   EXPECT_FALSE(receivedSigio);
264 
265   port.poll();
266 
267   EXPECT_FALSE(receivedSigurg);
268   EXPECT_FALSE(receivedSigio);
269 
270   loop.run();
271 
272   EXPECT_TRUE(receivedSigurg);
273   EXPECT_TRUE(receivedSigio);
274 }
275 
276 #endif  // !__CYGWIN32__
277 
TEST(AsyncUnixTest,ReadObserver)278 TEST(AsyncUnixTest, ReadObserver) {
279   captureSignals();
280   UnixEventPort port;
281   EventLoop loop(port);
282   WaitScope waitScope(loop);
283 
284   int pipefds[2];
285   KJ_SYSCALL(pipe(pipefds));
286   kj::AutoCloseFd infd(pipefds[0]), outfd(pipefds[1]);
287 
288   UnixEventPort::FdObserver observer(port, infd, UnixEventPort::FdObserver::OBSERVE_READ);
289 
290   KJ_SYSCALL(write(outfd, "foo", 3));
291 
292   observer.whenBecomesReadable().wait(waitScope);
293 
294 #if __linux__  // platform known to support POLLRDHUP
295   EXPECT_FALSE(KJ_ASSERT_NONNULL(observer.atEndHint()));
296 
297   char buffer[4096];
298   ssize_t n;
299   KJ_SYSCALL(n = read(infd, &buffer, sizeof(buffer)));
300   EXPECT_EQ(3, n);
301 
302   KJ_SYSCALL(write(outfd, "bar", 3));
303   outfd = nullptr;
304 
305   observer.whenBecomesReadable().wait(waitScope);
306 
307   EXPECT_TRUE(KJ_ASSERT_NONNULL(observer.atEndHint()));
308 #endif
309 }
310 
TEST(AsyncUnixTest,ReadObserverMultiListen)311 TEST(AsyncUnixTest, ReadObserverMultiListen) {
312   captureSignals();
313   UnixEventPort port;
314   EventLoop loop(port);
315   WaitScope waitScope(loop);
316 
317   int bogusPipefds[2];
318   KJ_SYSCALL(pipe(bogusPipefds));
319   KJ_DEFER({ close(bogusPipefds[1]); close(bogusPipefds[0]); });
320 
321   UnixEventPort::FdObserver bogusObserver(port, bogusPipefds[0],
322       UnixEventPort::FdObserver::OBSERVE_READ);
323 
324   bogusObserver.whenBecomesReadable().then([]() {
325     ADD_FAILURE() << "Received wrong poll.";
326   }).detach([](kj::Exception&& exception) {
327     ADD_FAILURE() << kj::str(exception).cStr();
328   });
329 
330   int pipefds[2];
331   KJ_SYSCALL(pipe(pipefds));
332   KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
333 
334   UnixEventPort::FdObserver observer(port, pipefds[0],
335       UnixEventPort::FdObserver::OBSERVE_READ);
336   KJ_SYSCALL(write(pipefds[1], "foo", 3));
337 
338   observer.whenBecomesReadable().wait(waitScope);
339 }
340 
TEST(AsyncUnixTest,ReadObserverMultiReceive)341 TEST(AsyncUnixTest, ReadObserverMultiReceive) {
342   captureSignals();
343   UnixEventPort port;
344   EventLoop loop(port);
345   WaitScope waitScope(loop);
346 
347   int pipefds[2];
348   KJ_SYSCALL(pipe(pipefds));
349   KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
350 
351   UnixEventPort::FdObserver observer(port, pipefds[0],
352       UnixEventPort::FdObserver::OBSERVE_READ);
353   KJ_SYSCALL(write(pipefds[1], "foo", 3));
354 
355   int pipefds2[2];
356   KJ_SYSCALL(pipe(pipefds2));
357   KJ_DEFER({ close(pipefds2[1]); close(pipefds2[0]); });
358 
359   UnixEventPort::FdObserver observer2(port, pipefds2[0],
360       UnixEventPort::FdObserver::OBSERVE_READ);
361   KJ_SYSCALL(write(pipefds2[1], "bar", 3));
362 
363   auto promise1 = observer.whenBecomesReadable();
364   auto promise2 = observer2.whenBecomesReadable();
365   promise1.wait(waitScope);
366   promise2.wait(waitScope);
367 }
368 
TEST(AsyncUnixTest,ReadObserverAsync)369 TEST(AsyncUnixTest, ReadObserverAsync) {
370   captureSignals();
371   UnixEventPort port;
372   EventLoop loop(port);
373   WaitScope waitScope(loop);
374 
375   // Make a pipe and wait on its read end while another thread writes to it.
376   int pipefds[2];
377   KJ_SYSCALL(pipe(pipefds));
378   KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
379   UnixEventPort::FdObserver observer(port, pipefds[0],
380       UnixEventPort::FdObserver::OBSERVE_READ);
381 
382   Thread thread([&]() {
383     delay();
384     KJ_SYSCALL(write(pipefds[1], "foo", 3));
385   });
386 
387   // Wait for the event in this thread.
388   observer.whenBecomesReadable().wait(waitScope);
389 }
390 
TEST(AsyncUnixTest,ReadObserverNoWait)391 TEST(AsyncUnixTest, ReadObserverNoWait) {
392   // Verify that UnixEventPort::poll() correctly receives pending FD events.
393 
394   captureSignals();
395   UnixEventPort port;
396   EventLoop loop(port);
397   WaitScope waitScope(loop);
398 
399   int pipefds[2];
400   KJ_SYSCALL(pipe(pipefds));
401   KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
402   UnixEventPort::FdObserver observer(port, pipefds[0],
403       UnixEventPort::FdObserver::OBSERVE_READ);
404 
405   int pipefds2[2];
406   KJ_SYSCALL(pipe(pipefds2));
407   KJ_DEFER({ close(pipefds2[1]); close(pipefds2[0]); });
408   UnixEventPort::FdObserver observer2(port, pipefds2[0],
409       UnixEventPort::FdObserver::OBSERVE_READ);
410 
411   int receivedCount = 0;
412   observer.whenBecomesReadable().then([&]() {
413     receivedCount++;
414   }).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
415   observer2.whenBecomesReadable().then([&]() {
416     receivedCount++;
417   }).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
418 
419   KJ_SYSCALL(write(pipefds[1], "foo", 3));
420   KJ_SYSCALL(write(pipefds2[1], "bar", 3));
421 
422   EXPECT_EQ(0, receivedCount);
423 
424   loop.run();
425 
426   EXPECT_EQ(0, receivedCount);
427 
428   port.poll();
429 
430   EXPECT_EQ(0, receivedCount);
431 
432   loop.run();
433 
434   EXPECT_EQ(2, receivedCount);
435 }
436 
setNonblocking(int fd)437 static void setNonblocking(int fd) {
438   int flags;
439   KJ_SYSCALL(flags = fcntl(fd, F_GETFL));
440   if ((flags & O_NONBLOCK) == 0) {
441     KJ_SYSCALL(fcntl(fd, F_SETFL, flags | O_NONBLOCK));
442   }
443 }
444 
TEST(AsyncUnixTest,WriteObserver)445 TEST(AsyncUnixTest, WriteObserver) {
446   captureSignals();
447   UnixEventPort port;
448   EventLoop loop(port);
449   WaitScope waitScope(loop);
450 
451   int pipefds[2];
452   KJ_SYSCALL(pipe(pipefds));
453   kj::AutoCloseFd infd(pipefds[0]), outfd(pipefds[1]);
454   setNonblocking(outfd);
455   setNonblocking(infd);
456 
457   UnixEventPort::FdObserver observer(port, outfd, UnixEventPort::FdObserver::OBSERVE_WRITE);
458 
459   // Fill buffer.
460   ssize_t n;
461   do {
462     KJ_NONBLOCKING_SYSCALL(n = write(outfd, "foo", 3));
463   } while (n >= 0);
464 
465   bool writable = false;
466   auto promise = observer.whenBecomesWritable()
467       .then([&]() { writable = true; }).eagerlyEvaluate(nullptr);
468 
469   loop.run();
470   port.poll();
471   loop.run();
472 
473   EXPECT_FALSE(writable);
474 
475   // Empty the read end so that the write end becomes writable. Note that Linux implements a
476   // high watermark / low watermark heuristic which means that only reading one byte is not
477   // sufficient. The amount we have to read is in fact architecture-dependent -- it appears to be
478   // 1 page. To be safe, we read everything.
479   char buffer[4096];
480   do {
481     KJ_NONBLOCKING_SYSCALL(n = read(infd, &buffer, sizeof(buffer)));
482   } while (n > 0);
483 
484   loop.run();
485   port.poll();
486   loop.run();
487 
488   EXPECT_TRUE(writable);
489 }
490 
491 #if !__APPLE__
492 // Disabled on macOS due to https://github.com/sandstorm-io/capnproto/issues/374.
TEST(AsyncUnixTest,UrgentObserver)493 TEST(AsyncUnixTest, UrgentObserver) {
494   // Verify that FdObserver correctly detects availability of out-of-band data.
495   // Availability of out-of-band data is implementation-specific.
496   // Linux's and OS X's TCP/IP stack supports out-of-band messages for TCP sockets, which is used
497   // for this test.
498 
499   UnixEventPort port;
500   EventLoop loop(port);
501   WaitScope waitScope(loop);
502   int tmpFd;
503   char c;
504 
505   // Spawn a TCP server
506   KJ_SYSCALL(tmpFd = socket(AF_INET, SOCK_STREAM, 0));
507   kj::AutoCloseFd serverFd(tmpFd);
508   sockaddr_in saddr;
509   memset(&saddr, 0, sizeof(saddr));
510   saddr.sin_family = AF_INET;
511   saddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
512   KJ_SYSCALL(bind(serverFd, reinterpret_cast<sockaddr*>(&saddr), sizeof(saddr)));
513   socklen_t saddrLen = sizeof(saddr);
514   KJ_SYSCALL(getsockname(serverFd, reinterpret_cast<sockaddr*>(&saddr), &saddrLen));
515   KJ_SYSCALL(listen(serverFd, 1));
516 
517   // Create a pipe that we'll use to signal if MSG_OOB return EINVAL.
518   int failpipe[2];
519   KJ_SYSCALL(pipe(failpipe));
520   KJ_DEFER({
521     close(failpipe[0]);
522     close(failpipe[1]);
523   });
524 
525   // Accept one connection, send in-band and OOB byte, wait for a quit message
526   Thread thread([&]() {
527     int tmpFd;
528     char c;
529 
530     sockaddr_in caddr;
531     socklen_t caddrLen = sizeof(caddr);
532     KJ_SYSCALL(tmpFd = accept(serverFd, reinterpret_cast<sockaddr*>(&caddr), &caddrLen));
533     kj::AutoCloseFd clientFd(tmpFd);
534     delay();
535 
536     // Workaround: OS X won't signal POLLPRI without POLLIN. Also enqueue some in-band data.
537     c = 'i';
538     KJ_SYSCALL(send(clientFd, &c, 1, 0));
539     c = 'o';
540     KJ_SYSCALL_HANDLE_ERRORS(send(clientFd, &c, 1, MSG_OOB)) {
541       case EINVAL:
542         // Looks like MSG_OOB is not supported. (This is the case e.g. on WSL.)
543         KJ_SYSCALL(write(failpipe[1], &c, 1));
544         break;
545       default:
546         KJ_FAIL_SYSCALL("send(..., MSG_OOB)", error);
547     }
548 
549     KJ_SYSCALL(recv(clientFd, &c, 1, 0));
550     EXPECT_EQ('q', c);
551   });
552   KJ_DEFER({ shutdown(serverFd, SHUT_RDWR); serverFd = nullptr; });
553 
554   KJ_SYSCALL(tmpFd = socket(AF_INET, SOCK_STREAM, 0));
555   kj::AutoCloseFd clientFd(tmpFd);
556   KJ_SYSCALL(connect(clientFd, reinterpret_cast<sockaddr*>(&saddr), saddrLen));
557 
558   UnixEventPort::FdObserver observer(port, clientFd,
559       UnixEventPort::FdObserver::OBSERVE_READ | UnixEventPort::FdObserver::OBSERVE_URGENT);
560   UnixEventPort::FdObserver failObserver(port, failpipe[0],
561       UnixEventPort::FdObserver::OBSERVE_READ | UnixEventPort::FdObserver::OBSERVE_URGENT);
562 
563   auto promise = observer.whenUrgentDataAvailable().then([]() { return true; });
564   auto failPromise = failObserver.whenBecomesReadable().then([]() { return false; });
565 
566   bool oobSupported = promise.exclusiveJoin(kj::mv(failPromise)).wait(waitScope);
567   if (oobSupported) {
568 #if __CYGWIN__
569     // On Cygwin, reading the urgent byte first causes the subsequent regular read to block until
570     // such a time as the connection closes -- and then the byte is successfully returned. This
571     // seems to be a cygwin bug.
572     KJ_SYSCALL(recv(clientFd, &c, 1, 0));
573     EXPECT_EQ('i', c);
574     KJ_SYSCALL(recv(clientFd, &c, 1, MSG_OOB));
575     EXPECT_EQ('o', c);
576 #else
577     // Attempt to read the urgent byte prior to reading the in-band byte.
578     KJ_SYSCALL(recv(clientFd, &c, 1, MSG_OOB));
579     EXPECT_EQ('o', c);
580     KJ_SYSCALL(recv(clientFd, &c, 1, 0));
581     EXPECT_EQ('i', c);
582 #endif
583   } else {
584     KJ_LOG(WARNING, "MSG_OOB doesn't seem to be supported on your platform.");
585   }
586 
587   // Allow server thread to let its clientFd go out of scope.
588   c = 'q';
589   KJ_SYSCALL(send(clientFd, &c, 1, 0));
590   KJ_SYSCALL(shutdown(clientFd, SHUT_RDWR));
591 }
592 #endif
593 
TEST(AsyncUnixTest,SteadyTimers)594 TEST(AsyncUnixTest, SteadyTimers) {
595   captureSignals();
596   UnixEventPort port;
597   EventLoop loop(port);
598   WaitScope waitScope(loop);
599 
600   auto& timer = port.getTimer();
601 
602   auto start = timer.now();
603   kj::Vector<TimePoint> expected;
604   kj::Vector<TimePoint> actual;
605 
606   auto addTimer = [&](Duration delay) {
607     expected.add(max(start + delay, start));
608     timer.atTime(start + delay).then([&]() {
609       actual.add(timer.now());
610     }).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
611   };
612 
613   addTimer(30 * MILLISECONDS);
614   addTimer(40 * MILLISECONDS);
615   addTimer(20350 * MICROSECONDS);
616   addTimer(30 * MILLISECONDS);
617   addTimer(-10 * MILLISECONDS);
618 
619   std::sort(expected.begin(), expected.end());
620   timer.atTime(expected.back() + MILLISECONDS).wait(waitScope);
621 
622   ASSERT_EQ(expected.size(), actual.size());
623   for (int i = 0; i < expected.size(); ++i) {
624     KJ_EXPECT(expected[i] <= actual[i], "Actual time for timer i is too early.",
625               i, ((expected[i] - actual[i]) / NANOSECONDS));
626   }
627 }
628 
629 bool dummySignalHandlerCalled = false;
dummySignalHandler(int)630 void dummySignalHandler(int) {
631   dummySignalHandlerCalled = true;
632 }
633 
TEST(AsyncUnixTest,InterruptedTimer)634 TEST(AsyncUnixTest, InterruptedTimer) {
635   captureSignals();
636   UnixEventPort port;
637   EventLoop loop(port);
638   WaitScope waitScope(loop);
639 
640 #if __linux__
641   // Linux timeslices are 1ms.
642   constexpr auto OS_SLOWNESS_FACTOR = 1;
643 #else
644   // OSX timeslices are 10ms, so we need longer timeouts to avoid flakiness.
645   // To be safe we'll assume other OS's are similar.
646   constexpr auto OS_SLOWNESS_FACTOR = 10;
647 #endif
648 
649   // Schedule a timer event in 100ms.
650   auto& timer = port.getTimer();
651   auto start = timer.now();
652   constexpr auto timeout = 100 * MILLISECONDS * OS_SLOWNESS_FACTOR;
653 
654   // Arrange SIGALRM to be delivered in 50ms, handled in an empty signal handler. This will cause
655   // our wait to be interrupted with EINTR. We should nevertheless continue waiting for the right
656   // amount of time.
657   dummySignalHandlerCalled = false;
658   if (signal(SIGALRM, &dummySignalHandler) == SIG_ERR) {
659     KJ_FAIL_SYSCALL("signal(SIGALRM)", errno);
660   }
661   struct itimerval itv;
662   memset(&itv, 0, sizeof(itv));
663   itv.it_value.tv_usec = 50000 * OS_SLOWNESS_FACTOR;  // signal after 50ms
664   setitimer(ITIMER_REAL, &itv, nullptr);
665 
666   timer.afterDelay(timeout).wait(waitScope);
667 
668   KJ_EXPECT(dummySignalHandlerCalled);
669   KJ_EXPECT(timer.now() - start >= timeout);
670   KJ_EXPECT(timer.now() - start <= timeout + (timeout / 5));  // allow 20ms error
671 }
672 
TEST(AsyncUnixTest,Wake)673 TEST(AsyncUnixTest, Wake) {
674   captureSignals();
675   UnixEventPort port;
676   EventLoop loop(port);
677   WaitScope waitScope(loop);
678 
679   EXPECT_FALSE(port.poll());
680   port.wake();
681   EXPECT_TRUE(port.poll());
682   EXPECT_FALSE(port.poll());
683 
684   port.wake();
685   EXPECT_TRUE(port.wait());
686 
687   {
688     auto promise = port.getTimer().atTime(port.getTimer().now());
689     EXPECT_FALSE(port.wait());
690   }
691 
692   // Test wake() when already wait()ing.
693   {
694     Thread thread([&]() {
695       delay();
696       port.wake();
697     });
698 
699     EXPECT_TRUE(port.wait());
700   }
701 
702   // Test wait() after wake() already happened.
703   {
704     Thread thread([&]() {
705       port.wake();
706     });
707 
708     delay();
709     EXPECT_TRUE(port.wait());
710   }
711 
712   // Test wake() during poll() busy loop.
713   {
714     Thread thread([&]() {
715       delay();
716       port.wake();
717     });
718 
719     EXPECT_FALSE(port.poll());
720     while (!port.poll()) {}
721   }
722 
723   // Test poll() when wake() already delivered.
724   {
725     EXPECT_FALSE(port.poll());
726 
727     Thread thread([&]() {
728       port.wake();
729     });
730 
731     do {
732       delay();
733     } while (!port.poll());
734   }
735 }
736 
737 int exitCodeForSignal = 0;
exitSignalHandler(int)738 void exitSignalHandler(int) {
739   _exit(exitCodeForSignal);
740 }
741 
742 struct TestChild {
743   kj::Maybe<pid_t> pid;
744   kj::Promise<int> promise = nullptr;
745 
TestChildkj::__anon6de57eb20111::TestChild746   TestChild(UnixEventPort& port, int exitCode) {
747     pid_t p;
748     KJ_SYSCALL(p = fork());
749     if (p == 0) {
750       // Arrange for SIGTERM to cause the process to exit normally.
751       exitCodeForSignal = exitCode;
752       signal(SIGTERM, &exitSignalHandler);
753       sigset_t sigs;
754       sigemptyset(&sigs);
755       sigaddset(&sigs, SIGTERM);
756       pthread_sigmask(SIG_UNBLOCK, &sigs, nullptr);
757 
758       for (;;) pause();
759     }
760     pid = p;
761     promise = port.onChildExit(pid);
762   }
763 
~TestChildkj::__anon6de57eb20111::TestChild764   ~TestChild() noexcept(false) {
765     KJ_IF_MAYBE(p, pid) {
766       KJ_SYSCALL(::kill(*p, SIGKILL)) { return; }
767       int status;
768       KJ_SYSCALL(waitpid(*p, &status, 0)) { return; }
769     }
770   }
771 
killkj::__anon6de57eb20111::TestChild772   void kill(int signo) {
773     KJ_SYSCALL(::kill(KJ_REQUIRE_NONNULL(pid), signo));
774   }
775 
776   KJ_DISALLOW_COPY(TestChild);
777 };
778 
TEST(AsyncUnixTest,ChildProcess)779 TEST(AsyncUnixTest, ChildProcess) {
780   captureSignals();
781   UnixEventPort port;
782   EventLoop loop(port);
783   WaitScope waitScope(loop);
784 
785   // Block SIGTERM so that we can carefully un-block it in children.
786   sigset_t sigs, oldsigs;
787   KJ_SYSCALL(sigemptyset(&sigs));
788   KJ_SYSCALL(sigaddset(&sigs, SIGTERM));
789   KJ_SYSCALL(pthread_sigmask(SIG_BLOCK, &sigs, &oldsigs));
790   KJ_DEFER(KJ_SYSCALL(pthread_sigmask(SIG_SETMASK, &oldsigs, nullptr)) { break; });
791 
792   TestChild child1(port, 123);
793   KJ_EXPECT(!child1.promise.poll(waitScope));
794 
795   child1.kill(SIGTERM);
796 
797   {
798     int status = child1.promise.wait(waitScope);
799     KJ_EXPECT(WIFEXITED(status));
800     KJ_EXPECT(WEXITSTATUS(status) == 123);
801   }
802 
803   TestChild child2(port, 234);
804   TestChild child3(port, 345);
805 
806   KJ_EXPECT(!child2.promise.poll(waitScope));
807   KJ_EXPECT(!child3.promise.poll(waitScope));
808 
809   child2.kill(SIGKILL);
810 
811   {
812     int status = child2.promise.wait(waitScope);
813     KJ_EXPECT(!WIFEXITED(status));
814     KJ_EXPECT(WIFSIGNALED(status));
815     KJ_EXPECT(WTERMSIG(status) == SIGKILL);
816   }
817 
818   KJ_EXPECT(!child3.promise.poll(waitScope));
819 
820   // child3 will be killed and synchronously waited on the way out.
821 }
822 
823 #if !__CYGWIN__
824 // TODO(someday): Figure out why whenWriteDisconnected() never resolves on Cygwin.
825 
826 KJ_TEST("UnixEventPort whenWriteDisconnected()") {
827   captureSignals();
828   UnixEventPort port;
829   EventLoop loop(port);
830   WaitScope waitScope(loop);
831 
832   int fds_[2];
833   KJ_SYSCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, fds_));
834   kj::AutoCloseFd fds[2] = { kj::AutoCloseFd(fds_[0]), kj::AutoCloseFd(fds_[1]) };
835 
836   UnixEventPort::FdObserver observer(port, fds[0], UnixEventPort::FdObserver::OBSERVE_READ);
837 
838   // At one point, the poll()-based version of UnixEventPort had a bug where if some other event
839   // had completed previously, whenWriteDisconnected() would stop being watched for. So we watch
840   // for readability as well and check that that goes away first.
841   auto readablePromise = observer.whenBecomesReadable();
842   auto hupPromise = observer.whenWriteDisconnected();
843 
844   KJ_EXPECT(!readablePromise.poll(waitScope));
845   KJ_EXPECT(!hupPromise.poll(waitScope));
846 
847   KJ_SYSCALL(write(fds[1], "foo", 3));
848 
849   KJ_ASSERT(readablePromise.poll(waitScope));
850   readablePromise.wait(waitScope);
851 
852   {
853     char junk[16];
854     ssize_t n;
855     KJ_SYSCALL(n = read(fds[0], junk, 16));
856     KJ_EXPECT(n == 3);
857   }
858 
859   KJ_EXPECT(!hupPromise.poll(waitScope));
860 
861   fds[1] = nullptr;
862   KJ_ASSERT(hupPromise.poll(waitScope));
863   hupPromise.wait(waitScope);
864 }
865 
866 KJ_TEST("UnixEventPort FdObserver(..., flags=0)::whenWriteDisconnected()") {
867   // Verifies that given `0' as a `flags' argument,
868   // FdObserver still observes whenWriteDisconnected().
869   //
870   // This can be useful to watch disconnection on a blocking file descriptor.
871   // See discussion: https://github.com/capnproto/capnproto/issues/924
872 
873   captureSignals();
874   UnixEventPort port;
875   EventLoop loop(port);
876   WaitScope waitScope(loop);
877 
878   int pipefds[2];
879   KJ_SYSCALL(pipe(pipefds));
880   kj::AutoCloseFd infd(pipefds[0]), outfd(pipefds[1]);
881 
882   UnixEventPort::FdObserver observer(port, outfd, 0);
883 
884   auto hupPromise = observer.whenWriteDisconnected();
885 
886   KJ_EXPECT(!hupPromise.poll(waitScope));
887 
888   infd = nullptr;
889   KJ_ASSERT(hupPromise.poll(waitScope));
890   hupPromise.wait(waitScope);
891 }
892 
893 #endif
894 
895 KJ_TEST("UnixEventPort poll for signals") {
896   captureSignals();
897   UnixEventPort port;
898   EventLoop loop(port);
899   WaitScope waitScope(loop);
900 
901   auto promise1 = port.onSignal(SIGURG);
902   auto promise2 = port.onSignal(SIGIO);
903 
904   KJ_EXPECT(!promise1.poll(waitScope));
905   KJ_EXPECT(!promise2.poll(waitScope));
906 
907   KJ_SYSCALL(raise(SIGURG));
908   KJ_SYSCALL(raise(SIGIO));
909   port.wake();
910 
911   KJ_EXPECT(port.poll());
912   KJ_EXPECT(promise1.poll(waitScope));
913   KJ_EXPECT(promise2.poll(waitScope));
914 
915   promise1.wait(waitScope);
916   promise2.wait(waitScope);
917 }
918 
919 #if defined(SIGRTMIN) && !__CYGWIN__
920 // TODO(someday): Figure out why RT signals don't seem to work correctly on Cygwin. It looks like
921 //   only the first signal is delivered, like how non-RT signals work. Is it possible Cygwin
922 //   advertites RT signal support but doesn't actually implement them correctly? I can't find any
923 //   information on the internet about this and TBH I don't care about Cygwin enough to dig in.
924 
testRtSignals(UnixEventPort & port,WaitScope & waitScope,bool doPoll)925 void testRtSignals(UnixEventPort& port, WaitScope& waitScope, bool doPoll) {
926   union sigval value;
927   memset(&value, 0, sizeof(value));
928 
929   // Queue three copies of the signal upfront.
930   for (uint i = 0; i < 3; i++) {
931     value.sival_int = 123 + i;
932     KJ_SYSCALL(sigqueue(getpid(), SIGRTMIN, value));
933   }
934 
935   // Now wait for them.
936   for (uint i = 0; i < 3; i++) {
937     auto promise = port.onSignal(SIGRTMIN);
938     if (doPoll) {
939       KJ_ASSERT(promise.poll(waitScope));
940     }
941     auto info = promise.wait(waitScope);
942     KJ_EXPECT(info.si_value.sival_int == 123 + i);
943   }
944 
945   KJ_EXPECT(!port.onSignal(SIGRTMIN).poll(waitScope));
946 }
947 
948 KJ_TEST("UnixEventPort can receive multiple queued instances of an RT signal") {
949   captureSignals();
950   UnixEventPort port;
951   EventLoop loop(port);
952   WaitScope waitScope(loop);
953 
954   testRtSignals(port, waitScope, true);
955 
956   // Test again, but don't poll() the promises. This may test a different code path, if poll() and
957   // wait() are very different in how they read signals. (For the poll(2)-based implementation of
958   // UnixEventPort, they are indeed pretty different.)
959   testRtSignals(port, waitScope, false);
960 }
961 #endif
962 
963 }  // namespace
964 }  // namespace kj
965 
966 #endif  // !_WIN32
967