1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
2 // Licensed under the MIT License:
3 //
4 // Permission is hereby granted, free of charge, to any person obtaining a copy
5 // of this software and associated documentation files (the "Software"), to deal
6 // in the Software without restriction, including without limitation the rights
7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 // copies of the Software, and to permit persons to whom the Software is
9 // furnished to do so, subject to the following conditions:
10 //
11 // The above copyright notice and this permission notice shall be included in
12 // all copies or substantial portions of the Software.
13 //
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20 // THE SOFTWARE.
21
22 #if !_WIN32
23
24 #include "async-unix.h"
25 #include "thread.h"
26 #include "debug.h"
27 #include "io.h"
28 #include <unistd.h>
29 #include <fcntl.h>
30 #include <sys/types.h>
31 #include <sys/socket.h>
32 #include <sys/stat.h>
33 #include <netinet/in.h>
34 #include <kj/compat/gtest.h>
35 #include <pthread.h>
36 #include <algorithm>
37 #include <sys/wait.h>
38 #include <sys/time.h>
39 #include <errno.h>
40 #include "mutex.h"
41
42 #if __BIONIC__
43 // Android's Bionic defines SIGRTMIN but using it in sigaddset() throws EINVAL, which means we
44 // definitely can't actually use RT signals.
45 #undef SIGRTMIN
46 #endif
47
48 namespace kj {
49 namespace {
50
delay()51 inline void delay() { usleep(10000); }
52
53 // On OSX, si_code seems to be zero when SI_USER is expected.
54 #if __linux__ || __CYGWIN__
55 #define EXPECT_SI_CODE EXPECT_EQ
56 #else
57 #define EXPECT_SI_CODE(a,b)
58 #endif
59
captureSignals()60 void captureSignals() {
61 static bool captured = false;
62 if (!captured) {
63 // We use SIGIO and SIGURG as our test signals because they're two signals that we can be
64 // reasonably confident won't otherwise be delivered to any KJ or Cap'n Proto test. We can't
65 // use SIGUSR1 because it is reserved by UnixEventPort and SIGUSR2 is used by Valgrind on OSX.
66 UnixEventPort::captureSignal(SIGURG);
67 UnixEventPort::captureSignal(SIGIO);
68
69 #ifdef SIGRTMIN
70 UnixEventPort::captureSignal(SIGRTMIN);
71 #endif
72
73 UnixEventPort::captureChildExit();
74
75 captured = true;
76 }
77 }
78
TEST(AsyncUnixTest,Signals)79 TEST(AsyncUnixTest, Signals) {
80 captureSignals();
81 UnixEventPort port;
82 EventLoop loop(port);
83 WaitScope waitScope(loop);
84
85 kill(getpid(), SIGURG);
86
87 siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
88 EXPECT_EQ(SIGURG, info.si_signo);
89 EXPECT_SI_CODE(SI_USER, info.si_code);
90 }
91
92 #if defined(SIGRTMIN) && !__BIONIC__ && !(__linux__ && __mips__)
TEST(AsyncUnixTest,SignalWithValue)93 TEST(AsyncUnixTest, SignalWithValue) {
94 // This tests that if we use sigqueue() to attach a value to the signal, that value is received
95 // correctly. Note that this only works on platforms that support real-time signals -- even
96 // though the signal we're sending is SIGURG, the sigqueue() system call is introduced by RT
97 // signals. Hence this test won't run on e.g. Mac OSX.
98 //
99 // Also, Android's bionic does not appear to support sigqueue() even though the kernel does.
100 //
101 // Also, this test fails on Linux on mipsel. si_value comes back as zero. No one with a mips
102 // machine wants to debug the problem but they demand a patch fixing it, so we disable the test.
103 // Sad. https://github.com/sandstorm-io/capnproto/issues/204
104
105 captureSignals();
106 UnixEventPort port;
107 EventLoop loop(port);
108 WaitScope waitScope(loop);
109
110 union sigval value;
111 memset(&value, 0, sizeof(value));
112 value.sival_int = 123;
113 KJ_SYSCALL_HANDLE_ERRORS(sigqueue(getpid(), SIGURG, value)) {
114 case ENOSYS:
115 // sigqueue() not supported. Maybe running on WSL.
116 KJ_LOG(WARNING, "sigqueue() is not implemented by your system; skipping test");
117 return;
118 default:
119 KJ_FAIL_SYSCALL("sigqueue(getpid(), SIGURG, value)", error);
120 }
121
122 siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
123 EXPECT_EQ(SIGURG, info.si_signo);
124 EXPECT_SI_CODE(SI_QUEUE, info.si_code);
125 EXPECT_EQ(123, info.si_value.sival_int);
126 }
127
TEST(AsyncUnixTest,SignalWithPointerValue)128 TEST(AsyncUnixTest, SignalWithPointerValue) {
129 // This tests that if we use sigqueue() to attach a value to the signal, that value is received
130 // correctly. Note that this only works on platforms that support real-time signals -- even
131 // though the signal we're sending is SIGURG, the sigqueue() system call is introduced by RT
132 // signals. Hence this test won't run on e.g. Mac OSX.
133 //
134 // Also, Android's bionic does not appear to support sigqueue() even though the kernel does.
135 //
136 // Also, this test fails on Linux on mipsel. si_value comes back as zero. No one with a mips
137 // machine wants to debug the problem but they demand a patch fixing it, so we disable the test.
138 // Sad. https://github.com/sandstorm-io/capnproto/issues/204
139
140 captureSignals();
141 UnixEventPort port;
142 EventLoop loop(port);
143 WaitScope waitScope(loop);
144
145 union sigval value;
146 memset(&value, 0, sizeof(value));
147 value.sival_ptr = &port;
148 KJ_SYSCALL_HANDLE_ERRORS(sigqueue(getpid(), SIGURG, value)) {
149 case ENOSYS:
150 // sigqueue() not supported. Maybe running on WSL.
151 KJ_LOG(WARNING, "sigqueue() is not implemented by your system; skipping test");
152 return;
153 default:
154 KJ_FAIL_SYSCALL("sigqueue(getpid(), SIGURG, value)", error);
155 }
156
157 siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
158 EXPECT_EQ(SIGURG, info.si_signo);
159 EXPECT_SI_CODE(SI_QUEUE, info.si_code);
160 EXPECT_EQ(&port, info.si_value.sival_ptr);
161 }
162 #endif
163
TEST(AsyncUnixTest,SignalsMultiListen)164 TEST(AsyncUnixTest, SignalsMultiListen) {
165 captureSignals();
166 UnixEventPort port;
167 EventLoop loop(port);
168 WaitScope waitScope(loop);
169
170 port.onSignal(SIGIO).then([](siginfo_t&&) {
171 KJ_FAIL_EXPECT("Received wrong signal.");
172 }).detach([](kj::Exception&& exception) {
173 KJ_FAIL_EXPECT(exception);
174 });
175
176 kill(getpid(), SIGURG);
177
178 siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
179 EXPECT_EQ(SIGURG, info.si_signo);
180 EXPECT_SI_CODE(SI_USER, info.si_code);
181 }
182
183 #if !__CYGWIN32__
184 // Cygwin32 (but not Cygwin64) appears not to deliver SIGURG in the following test (but it does
185 // deliver SIGIO, if you reverse the order of the waits). Since this doesn't occur on any other
186 // platform I'm assuming it's a Cygwin bug.
187
TEST(AsyncUnixTest,SignalsMultiReceive)188 TEST(AsyncUnixTest, SignalsMultiReceive) {
189 captureSignals();
190 UnixEventPort port;
191 EventLoop loop(port);
192 WaitScope waitScope(loop);
193
194 kill(getpid(), SIGURG);
195 kill(getpid(), SIGIO);
196
197 siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
198 EXPECT_EQ(SIGURG, info.si_signo);
199 EXPECT_SI_CODE(SI_USER, info.si_code);
200
201 info = port.onSignal(SIGIO).wait(waitScope);
202 EXPECT_EQ(SIGIO, info.si_signo);
203 EXPECT_SI_CODE(SI_USER, info.si_code);
204 }
205
206 #endif // !__CYGWIN32__
207
TEST(AsyncUnixTest,SignalsAsync)208 TEST(AsyncUnixTest, SignalsAsync) {
209 captureSignals();
210 UnixEventPort port;
211 EventLoop loop(port);
212 WaitScope waitScope(loop);
213
214 // Arrange for a signal to be sent from another thread.
215 pthread_t mainThread = pthread_self();
216 Thread thread([&]() {
217 delay();
218 pthread_kill(mainThread, SIGURG);
219 });
220
221 siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
222 EXPECT_EQ(SIGURG, info.si_signo);
223 #if __linux__
224 EXPECT_SI_CODE(SI_TKILL, info.si_code);
225 #endif
226 }
227
228 #if !__CYGWIN32__
229 // Cygwin32 (but not Cygwin64) appears not to deliver SIGURG in the following test (but it does
230 // deliver SIGIO, if you reverse the order of the waits). Since this doesn't occur on any other
231 // platform I'm assuming it's a Cygwin bug.
232
TEST(AsyncUnixTest,SignalsNoWait)233 TEST(AsyncUnixTest, SignalsNoWait) {
234 // Verify that UnixEventPort::poll() correctly receives pending signals.
235
236 captureSignals();
237 UnixEventPort port;
238 EventLoop loop(port);
239 WaitScope waitScope(loop);
240
241 bool receivedSigurg = false;
242 bool receivedSigio = false;
243 port.onSignal(SIGURG).then([&](siginfo_t&& info) {
244 receivedSigurg = true;
245 EXPECT_EQ(SIGURG, info.si_signo);
246 EXPECT_SI_CODE(SI_USER, info.si_code);
247 }).detach([](Exception&& e) { KJ_FAIL_EXPECT(e); });
248 port.onSignal(SIGIO).then([&](siginfo_t&& info) {
249 receivedSigio = true;
250 EXPECT_EQ(SIGIO, info.si_signo);
251 EXPECT_SI_CODE(SI_USER, info.si_code);
252 }).detach([](Exception&& e) { KJ_FAIL_EXPECT(e); });
253
254 kill(getpid(), SIGURG);
255 kill(getpid(), SIGIO);
256
257 EXPECT_FALSE(receivedSigurg);
258 EXPECT_FALSE(receivedSigio);
259
260 loop.run();
261
262 EXPECT_FALSE(receivedSigurg);
263 EXPECT_FALSE(receivedSigio);
264
265 port.poll();
266
267 EXPECT_FALSE(receivedSigurg);
268 EXPECT_FALSE(receivedSigio);
269
270 loop.run();
271
272 EXPECT_TRUE(receivedSigurg);
273 EXPECT_TRUE(receivedSigio);
274 }
275
276 #endif // !__CYGWIN32__
277
TEST(AsyncUnixTest,ReadObserver)278 TEST(AsyncUnixTest, ReadObserver) {
279 captureSignals();
280 UnixEventPort port;
281 EventLoop loop(port);
282 WaitScope waitScope(loop);
283
284 int pipefds[2];
285 KJ_SYSCALL(pipe(pipefds));
286 kj::AutoCloseFd infd(pipefds[0]), outfd(pipefds[1]);
287
288 UnixEventPort::FdObserver observer(port, infd, UnixEventPort::FdObserver::OBSERVE_READ);
289
290 KJ_SYSCALL(write(outfd, "foo", 3));
291
292 observer.whenBecomesReadable().wait(waitScope);
293
294 #if __linux__ // platform known to support POLLRDHUP
295 EXPECT_FALSE(KJ_ASSERT_NONNULL(observer.atEndHint()));
296
297 char buffer[4096];
298 ssize_t n;
299 KJ_SYSCALL(n = read(infd, &buffer, sizeof(buffer)));
300 EXPECT_EQ(3, n);
301
302 KJ_SYSCALL(write(outfd, "bar", 3));
303 outfd = nullptr;
304
305 observer.whenBecomesReadable().wait(waitScope);
306
307 EXPECT_TRUE(KJ_ASSERT_NONNULL(observer.atEndHint()));
308 #endif
309 }
310
TEST(AsyncUnixTest,ReadObserverMultiListen)311 TEST(AsyncUnixTest, ReadObserverMultiListen) {
312 captureSignals();
313 UnixEventPort port;
314 EventLoop loop(port);
315 WaitScope waitScope(loop);
316
317 int bogusPipefds[2];
318 KJ_SYSCALL(pipe(bogusPipefds));
319 KJ_DEFER({ close(bogusPipefds[1]); close(bogusPipefds[0]); });
320
321 UnixEventPort::FdObserver bogusObserver(port, bogusPipefds[0],
322 UnixEventPort::FdObserver::OBSERVE_READ);
323
324 bogusObserver.whenBecomesReadable().then([]() {
325 ADD_FAILURE() << "Received wrong poll.";
326 }).detach([](kj::Exception&& exception) {
327 ADD_FAILURE() << kj::str(exception).cStr();
328 });
329
330 int pipefds[2];
331 KJ_SYSCALL(pipe(pipefds));
332 KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
333
334 UnixEventPort::FdObserver observer(port, pipefds[0],
335 UnixEventPort::FdObserver::OBSERVE_READ);
336 KJ_SYSCALL(write(pipefds[1], "foo", 3));
337
338 observer.whenBecomesReadable().wait(waitScope);
339 }
340
TEST(AsyncUnixTest,ReadObserverMultiReceive)341 TEST(AsyncUnixTest, ReadObserverMultiReceive) {
342 captureSignals();
343 UnixEventPort port;
344 EventLoop loop(port);
345 WaitScope waitScope(loop);
346
347 int pipefds[2];
348 KJ_SYSCALL(pipe(pipefds));
349 KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
350
351 UnixEventPort::FdObserver observer(port, pipefds[0],
352 UnixEventPort::FdObserver::OBSERVE_READ);
353 KJ_SYSCALL(write(pipefds[1], "foo", 3));
354
355 int pipefds2[2];
356 KJ_SYSCALL(pipe(pipefds2));
357 KJ_DEFER({ close(pipefds2[1]); close(pipefds2[0]); });
358
359 UnixEventPort::FdObserver observer2(port, pipefds2[0],
360 UnixEventPort::FdObserver::OBSERVE_READ);
361 KJ_SYSCALL(write(pipefds2[1], "bar", 3));
362
363 auto promise1 = observer.whenBecomesReadable();
364 auto promise2 = observer2.whenBecomesReadable();
365 promise1.wait(waitScope);
366 promise2.wait(waitScope);
367 }
368
TEST(AsyncUnixTest,ReadObserverAsync)369 TEST(AsyncUnixTest, ReadObserverAsync) {
370 captureSignals();
371 UnixEventPort port;
372 EventLoop loop(port);
373 WaitScope waitScope(loop);
374
375 // Make a pipe and wait on its read end while another thread writes to it.
376 int pipefds[2];
377 KJ_SYSCALL(pipe(pipefds));
378 KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
379 UnixEventPort::FdObserver observer(port, pipefds[0],
380 UnixEventPort::FdObserver::OBSERVE_READ);
381
382 Thread thread([&]() {
383 delay();
384 KJ_SYSCALL(write(pipefds[1], "foo", 3));
385 });
386
387 // Wait for the event in this thread.
388 observer.whenBecomesReadable().wait(waitScope);
389 }
390
TEST(AsyncUnixTest,ReadObserverNoWait)391 TEST(AsyncUnixTest, ReadObserverNoWait) {
392 // Verify that UnixEventPort::poll() correctly receives pending FD events.
393
394 captureSignals();
395 UnixEventPort port;
396 EventLoop loop(port);
397 WaitScope waitScope(loop);
398
399 int pipefds[2];
400 KJ_SYSCALL(pipe(pipefds));
401 KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
402 UnixEventPort::FdObserver observer(port, pipefds[0],
403 UnixEventPort::FdObserver::OBSERVE_READ);
404
405 int pipefds2[2];
406 KJ_SYSCALL(pipe(pipefds2));
407 KJ_DEFER({ close(pipefds2[1]); close(pipefds2[0]); });
408 UnixEventPort::FdObserver observer2(port, pipefds2[0],
409 UnixEventPort::FdObserver::OBSERVE_READ);
410
411 int receivedCount = 0;
412 observer.whenBecomesReadable().then([&]() {
413 receivedCount++;
414 }).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
415 observer2.whenBecomesReadable().then([&]() {
416 receivedCount++;
417 }).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
418
419 KJ_SYSCALL(write(pipefds[1], "foo", 3));
420 KJ_SYSCALL(write(pipefds2[1], "bar", 3));
421
422 EXPECT_EQ(0, receivedCount);
423
424 loop.run();
425
426 EXPECT_EQ(0, receivedCount);
427
428 port.poll();
429
430 EXPECT_EQ(0, receivedCount);
431
432 loop.run();
433
434 EXPECT_EQ(2, receivedCount);
435 }
436
setNonblocking(int fd)437 static void setNonblocking(int fd) {
438 int flags;
439 KJ_SYSCALL(flags = fcntl(fd, F_GETFL));
440 if ((flags & O_NONBLOCK) == 0) {
441 KJ_SYSCALL(fcntl(fd, F_SETFL, flags | O_NONBLOCK));
442 }
443 }
444
TEST(AsyncUnixTest,WriteObserver)445 TEST(AsyncUnixTest, WriteObserver) {
446 captureSignals();
447 UnixEventPort port;
448 EventLoop loop(port);
449 WaitScope waitScope(loop);
450
451 int pipefds[2];
452 KJ_SYSCALL(pipe(pipefds));
453 kj::AutoCloseFd infd(pipefds[0]), outfd(pipefds[1]);
454 setNonblocking(outfd);
455 setNonblocking(infd);
456
457 UnixEventPort::FdObserver observer(port, outfd, UnixEventPort::FdObserver::OBSERVE_WRITE);
458
459 // Fill buffer.
460 ssize_t n;
461 do {
462 KJ_NONBLOCKING_SYSCALL(n = write(outfd, "foo", 3));
463 } while (n >= 0);
464
465 bool writable = false;
466 auto promise = observer.whenBecomesWritable()
467 .then([&]() { writable = true; }).eagerlyEvaluate(nullptr);
468
469 loop.run();
470 port.poll();
471 loop.run();
472
473 EXPECT_FALSE(writable);
474
475 // Empty the read end so that the write end becomes writable. Note that Linux implements a
476 // high watermark / low watermark heuristic which means that only reading one byte is not
477 // sufficient. The amount we have to read is in fact architecture-dependent -- it appears to be
478 // 1 page. To be safe, we read everything.
479 char buffer[4096];
480 do {
481 KJ_NONBLOCKING_SYSCALL(n = read(infd, &buffer, sizeof(buffer)));
482 } while (n > 0);
483
484 loop.run();
485 port.poll();
486 loop.run();
487
488 EXPECT_TRUE(writable);
489 }
490
491 #if !__APPLE__
492 // Disabled on macOS due to https://github.com/sandstorm-io/capnproto/issues/374.
TEST(AsyncUnixTest,UrgentObserver)493 TEST(AsyncUnixTest, UrgentObserver) {
494 // Verify that FdObserver correctly detects availability of out-of-band data.
495 // Availability of out-of-band data is implementation-specific.
496 // Linux's and OS X's TCP/IP stack supports out-of-band messages for TCP sockets, which is used
497 // for this test.
498
499 UnixEventPort port;
500 EventLoop loop(port);
501 WaitScope waitScope(loop);
502 int tmpFd;
503 char c;
504
505 // Spawn a TCP server
506 KJ_SYSCALL(tmpFd = socket(AF_INET, SOCK_STREAM, 0));
507 kj::AutoCloseFd serverFd(tmpFd);
508 sockaddr_in saddr;
509 memset(&saddr, 0, sizeof(saddr));
510 saddr.sin_family = AF_INET;
511 saddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
512 KJ_SYSCALL(bind(serverFd, reinterpret_cast<sockaddr*>(&saddr), sizeof(saddr)));
513 socklen_t saddrLen = sizeof(saddr);
514 KJ_SYSCALL(getsockname(serverFd, reinterpret_cast<sockaddr*>(&saddr), &saddrLen));
515 KJ_SYSCALL(listen(serverFd, 1));
516
517 // Create a pipe that we'll use to signal if MSG_OOB return EINVAL.
518 int failpipe[2];
519 KJ_SYSCALL(pipe(failpipe));
520 KJ_DEFER({
521 close(failpipe[0]);
522 close(failpipe[1]);
523 });
524
525 // Accept one connection, send in-band and OOB byte, wait for a quit message
526 Thread thread([&]() {
527 int tmpFd;
528 char c;
529
530 sockaddr_in caddr;
531 socklen_t caddrLen = sizeof(caddr);
532 KJ_SYSCALL(tmpFd = accept(serverFd, reinterpret_cast<sockaddr*>(&caddr), &caddrLen));
533 kj::AutoCloseFd clientFd(tmpFd);
534 delay();
535
536 // Workaround: OS X won't signal POLLPRI without POLLIN. Also enqueue some in-band data.
537 c = 'i';
538 KJ_SYSCALL(send(clientFd, &c, 1, 0));
539 c = 'o';
540 KJ_SYSCALL_HANDLE_ERRORS(send(clientFd, &c, 1, MSG_OOB)) {
541 case EINVAL:
542 // Looks like MSG_OOB is not supported. (This is the case e.g. on WSL.)
543 KJ_SYSCALL(write(failpipe[1], &c, 1));
544 break;
545 default:
546 KJ_FAIL_SYSCALL("send(..., MSG_OOB)", error);
547 }
548
549 KJ_SYSCALL(recv(clientFd, &c, 1, 0));
550 EXPECT_EQ('q', c);
551 });
552 KJ_DEFER({ shutdown(serverFd, SHUT_RDWR); serverFd = nullptr; });
553
554 KJ_SYSCALL(tmpFd = socket(AF_INET, SOCK_STREAM, 0));
555 kj::AutoCloseFd clientFd(tmpFd);
556 KJ_SYSCALL(connect(clientFd, reinterpret_cast<sockaddr*>(&saddr), saddrLen));
557
558 UnixEventPort::FdObserver observer(port, clientFd,
559 UnixEventPort::FdObserver::OBSERVE_READ | UnixEventPort::FdObserver::OBSERVE_URGENT);
560 UnixEventPort::FdObserver failObserver(port, failpipe[0],
561 UnixEventPort::FdObserver::OBSERVE_READ | UnixEventPort::FdObserver::OBSERVE_URGENT);
562
563 auto promise = observer.whenUrgentDataAvailable().then([]() { return true; });
564 auto failPromise = failObserver.whenBecomesReadable().then([]() { return false; });
565
566 bool oobSupported = promise.exclusiveJoin(kj::mv(failPromise)).wait(waitScope);
567 if (oobSupported) {
568 #if __CYGWIN__
569 // On Cygwin, reading the urgent byte first causes the subsequent regular read to block until
570 // such a time as the connection closes -- and then the byte is successfully returned. This
571 // seems to be a cygwin bug.
572 KJ_SYSCALL(recv(clientFd, &c, 1, 0));
573 EXPECT_EQ('i', c);
574 KJ_SYSCALL(recv(clientFd, &c, 1, MSG_OOB));
575 EXPECT_EQ('o', c);
576 #else
577 // Attempt to read the urgent byte prior to reading the in-band byte.
578 KJ_SYSCALL(recv(clientFd, &c, 1, MSG_OOB));
579 EXPECT_EQ('o', c);
580 KJ_SYSCALL(recv(clientFd, &c, 1, 0));
581 EXPECT_EQ('i', c);
582 #endif
583 } else {
584 KJ_LOG(WARNING, "MSG_OOB doesn't seem to be supported on your platform.");
585 }
586
587 // Allow server thread to let its clientFd go out of scope.
588 c = 'q';
589 KJ_SYSCALL(send(clientFd, &c, 1, 0));
590 KJ_SYSCALL(shutdown(clientFd, SHUT_RDWR));
591 }
592 #endif
593
TEST(AsyncUnixTest,SteadyTimers)594 TEST(AsyncUnixTest, SteadyTimers) {
595 captureSignals();
596 UnixEventPort port;
597 EventLoop loop(port);
598 WaitScope waitScope(loop);
599
600 auto& timer = port.getTimer();
601
602 auto start = timer.now();
603 kj::Vector<TimePoint> expected;
604 kj::Vector<TimePoint> actual;
605
606 auto addTimer = [&](Duration delay) {
607 expected.add(max(start + delay, start));
608 timer.atTime(start + delay).then([&]() {
609 actual.add(timer.now());
610 }).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
611 };
612
613 addTimer(30 * MILLISECONDS);
614 addTimer(40 * MILLISECONDS);
615 addTimer(20350 * MICROSECONDS);
616 addTimer(30 * MILLISECONDS);
617 addTimer(-10 * MILLISECONDS);
618
619 std::sort(expected.begin(), expected.end());
620 timer.atTime(expected.back() + MILLISECONDS).wait(waitScope);
621
622 ASSERT_EQ(expected.size(), actual.size());
623 for (int i = 0; i < expected.size(); ++i) {
624 KJ_EXPECT(expected[i] <= actual[i], "Actual time for timer i is too early.",
625 i, ((expected[i] - actual[i]) / NANOSECONDS));
626 }
627 }
628
629 bool dummySignalHandlerCalled = false;
dummySignalHandler(int)630 void dummySignalHandler(int) {
631 dummySignalHandlerCalled = true;
632 }
633
TEST(AsyncUnixTest,InterruptedTimer)634 TEST(AsyncUnixTest, InterruptedTimer) {
635 captureSignals();
636 UnixEventPort port;
637 EventLoop loop(port);
638 WaitScope waitScope(loop);
639
640 #if __linux__
641 // Linux timeslices are 1ms.
642 constexpr auto OS_SLOWNESS_FACTOR = 1;
643 #else
644 // OSX timeslices are 10ms, so we need longer timeouts to avoid flakiness.
645 // To be safe we'll assume other OS's are similar.
646 constexpr auto OS_SLOWNESS_FACTOR = 10;
647 #endif
648
649 // Schedule a timer event in 100ms.
650 auto& timer = port.getTimer();
651 auto start = timer.now();
652 constexpr auto timeout = 100 * MILLISECONDS * OS_SLOWNESS_FACTOR;
653
654 // Arrange SIGALRM to be delivered in 50ms, handled in an empty signal handler. This will cause
655 // our wait to be interrupted with EINTR. We should nevertheless continue waiting for the right
656 // amount of time.
657 dummySignalHandlerCalled = false;
658 if (signal(SIGALRM, &dummySignalHandler) == SIG_ERR) {
659 KJ_FAIL_SYSCALL("signal(SIGALRM)", errno);
660 }
661 struct itimerval itv;
662 memset(&itv, 0, sizeof(itv));
663 itv.it_value.tv_usec = 50000 * OS_SLOWNESS_FACTOR; // signal after 50ms
664 setitimer(ITIMER_REAL, &itv, nullptr);
665
666 timer.afterDelay(timeout).wait(waitScope);
667
668 KJ_EXPECT(dummySignalHandlerCalled);
669 KJ_EXPECT(timer.now() - start >= timeout);
670 KJ_EXPECT(timer.now() - start <= timeout + (timeout / 5)); // allow 20ms error
671 }
672
TEST(AsyncUnixTest,Wake)673 TEST(AsyncUnixTest, Wake) {
674 captureSignals();
675 UnixEventPort port;
676 EventLoop loop(port);
677 WaitScope waitScope(loop);
678
679 EXPECT_FALSE(port.poll());
680 port.wake();
681 EXPECT_TRUE(port.poll());
682 EXPECT_FALSE(port.poll());
683
684 port.wake();
685 EXPECT_TRUE(port.wait());
686
687 {
688 auto promise = port.getTimer().atTime(port.getTimer().now());
689 EXPECT_FALSE(port.wait());
690 }
691
692 // Test wake() when already wait()ing.
693 {
694 Thread thread([&]() {
695 delay();
696 port.wake();
697 });
698
699 EXPECT_TRUE(port.wait());
700 }
701
702 // Test wait() after wake() already happened.
703 {
704 Thread thread([&]() {
705 port.wake();
706 });
707
708 delay();
709 EXPECT_TRUE(port.wait());
710 }
711
712 // Test wake() during poll() busy loop.
713 {
714 Thread thread([&]() {
715 delay();
716 port.wake();
717 });
718
719 EXPECT_FALSE(port.poll());
720 while (!port.poll()) {}
721 }
722
723 // Test poll() when wake() already delivered.
724 {
725 EXPECT_FALSE(port.poll());
726
727 Thread thread([&]() {
728 port.wake();
729 });
730
731 do {
732 delay();
733 } while (!port.poll());
734 }
735 }
736
737 int exitCodeForSignal = 0;
exitSignalHandler(int)738 void exitSignalHandler(int) {
739 _exit(exitCodeForSignal);
740 }
741
742 struct TestChild {
743 kj::Maybe<pid_t> pid;
744 kj::Promise<int> promise = nullptr;
745
TestChildkj::__anon6de57eb20111::TestChild746 TestChild(UnixEventPort& port, int exitCode) {
747 pid_t p;
748 KJ_SYSCALL(p = fork());
749 if (p == 0) {
750 // Arrange for SIGTERM to cause the process to exit normally.
751 exitCodeForSignal = exitCode;
752 signal(SIGTERM, &exitSignalHandler);
753 sigset_t sigs;
754 sigemptyset(&sigs);
755 sigaddset(&sigs, SIGTERM);
756 pthread_sigmask(SIG_UNBLOCK, &sigs, nullptr);
757
758 for (;;) pause();
759 }
760 pid = p;
761 promise = port.onChildExit(pid);
762 }
763
~TestChildkj::__anon6de57eb20111::TestChild764 ~TestChild() noexcept(false) {
765 KJ_IF_MAYBE(p, pid) {
766 KJ_SYSCALL(::kill(*p, SIGKILL)) { return; }
767 int status;
768 KJ_SYSCALL(waitpid(*p, &status, 0)) { return; }
769 }
770 }
771
killkj::__anon6de57eb20111::TestChild772 void kill(int signo) {
773 KJ_SYSCALL(::kill(KJ_REQUIRE_NONNULL(pid), signo));
774 }
775
776 KJ_DISALLOW_COPY(TestChild);
777 };
778
TEST(AsyncUnixTest,ChildProcess)779 TEST(AsyncUnixTest, ChildProcess) {
780 captureSignals();
781 UnixEventPort port;
782 EventLoop loop(port);
783 WaitScope waitScope(loop);
784
785 // Block SIGTERM so that we can carefully un-block it in children.
786 sigset_t sigs, oldsigs;
787 KJ_SYSCALL(sigemptyset(&sigs));
788 KJ_SYSCALL(sigaddset(&sigs, SIGTERM));
789 KJ_SYSCALL(pthread_sigmask(SIG_BLOCK, &sigs, &oldsigs));
790 KJ_DEFER(KJ_SYSCALL(pthread_sigmask(SIG_SETMASK, &oldsigs, nullptr)) { break; });
791
792 TestChild child1(port, 123);
793 KJ_EXPECT(!child1.promise.poll(waitScope));
794
795 child1.kill(SIGTERM);
796
797 {
798 int status = child1.promise.wait(waitScope);
799 KJ_EXPECT(WIFEXITED(status));
800 KJ_EXPECT(WEXITSTATUS(status) == 123);
801 }
802
803 TestChild child2(port, 234);
804 TestChild child3(port, 345);
805
806 KJ_EXPECT(!child2.promise.poll(waitScope));
807 KJ_EXPECT(!child3.promise.poll(waitScope));
808
809 child2.kill(SIGKILL);
810
811 {
812 int status = child2.promise.wait(waitScope);
813 KJ_EXPECT(!WIFEXITED(status));
814 KJ_EXPECT(WIFSIGNALED(status));
815 KJ_EXPECT(WTERMSIG(status) == SIGKILL);
816 }
817
818 KJ_EXPECT(!child3.promise.poll(waitScope));
819
820 // child3 will be killed and synchronously waited on the way out.
821 }
822
823 #if !__CYGWIN__
824 // TODO(someday): Figure out why whenWriteDisconnected() never resolves on Cygwin.
825
826 KJ_TEST("UnixEventPort whenWriteDisconnected()") {
827 captureSignals();
828 UnixEventPort port;
829 EventLoop loop(port);
830 WaitScope waitScope(loop);
831
832 int fds_[2];
833 KJ_SYSCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, fds_));
834 kj::AutoCloseFd fds[2] = { kj::AutoCloseFd(fds_[0]), kj::AutoCloseFd(fds_[1]) };
835
836 UnixEventPort::FdObserver observer(port, fds[0], UnixEventPort::FdObserver::OBSERVE_READ);
837
838 // At one point, the poll()-based version of UnixEventPort had a bug where if some other event
839 // had completed previously, whenWriteDisconnected() would stop being watched for. So we watch
840 // for readability as well and check that that goes away first.
841 auto readablePromise = observer.whenBecomesReadable();
842 auto hupPromise = observer.whenWriteDisconnected();
843
844 KJ_EXPECT(!readablePromise.poll(waitScope));
845 KJ_EXPECT(!hupPromise.poll(waitScope));
846
847 KJ_SYSCALL(write(fds[1], "foo", 3));
848
849 KJ_ASSERT(readablePromise.poll(waitScope));
850 readablePromise.wait(waitScope);
851
852 {
853 char junk[16];
854 ssize_t n;
855 KJ_SYSCALL(n = read(fds[0], junk, 16));
856 KJ_EXPECT(n == 3);
857 }
858
859 KJ_EXPECT(!hupPromise.poll(waitScope));
860
861 fds[1] = nullptr;
862 KJ_ASSERT(hupPromise.poll(waitScope));
863 hupPromise.wait(waitScope);
864 }
865
866 KJ_TEST("UnixEventPort FdObserver(..., flags=0)::whenWriteDisconnected()") {
867 // Verifies that given `0' as a `flags' argument,
868 // FdObserver still observes whenWriteDisconnected().
869 //
870 // This can be useful to watch disconnection on a blocking file descriptor.
871 // See discussion: https://github.com/capnproto/capnproto/issues/924
872
873 captureSignals();
874 UnixEventPort port;
875 EventLoop loop(port);
876 WaitScope waitScope(loop);
877
878 int pipefds[2];
879 KJ_SYSCALL(pipe(pipefds));
880 kj::AutoCloseFd infd(pipefds[0]), outfd(pipefds[1]);
881
882 UnixEventPort::FdObserver observer(port, outfd, 0);
883
884 auto hupPromise = observer.whenWriteDisconnected();
885
886 KJ_EXPECT(!hupPromise.poll(waitScope));
887
888 infd = nullptr;
889 KJ_ASSERT(hupPromise.poll(waitScope));
890 hupPromise.wait(waitScope);
891 }
892
893 #endif
894
895 KJ_TEST("UnixEventPort poll for signals") {
896 captureSignals();
897 UnixEventPort port;
898 EventLoop loop(port);
899 WaitScope waitScope(loop);
900
901 auto promise1 = port.onSignal(SIGURG);
902 auto promise2 = port.onSignal(SIGIO);
903
904 KJ_EXPECT(!promise1.poll(waitScope));
905 KJ_EXPECT(!promise2.poll(waitScope));
906
907 KJ_SYSCALL(raise(SIGURG));
908 KJ_SYSCALL(raise(SIGIO));
909 port.wake();
910
911 KJ_EXPECT(port.poll());
912 KJ_EXPECT(promise1.poll(waitScope));
913 KJ_EXPECT(promise2.poll(waitScope));
914
915 promise1.wait(waitScope);
916 promise2.wait(waitScope);
917 }
918
919 #if defined(SIGRTMIN) && !__CYGWIN__
920 // TODO(someday): Figure out why RT signals don't seem to work correctly on Cygwin. It looks like
921 // only the first signal is delivered, like how non-RT signals work. Is it possible Cygwin
922 // advertites RT signal support but doesn't actually implement them correctly? I can't find any
923 // information on the internet about this and TBH I don't care about Cygwin enough to dig in.
924
testRtSignals(UnixEventPort & port,WaitScope & waitScope,bool doPoll)925 void testRtSignals(UnixEventPort& port, WaitScope& waitScope, bool doPoll) {
926 union sigval value;
927 memset(&value, 0, sizeof(value));
928
929 // Queue three copies of the signal upfront.
930 for (uint i = 0; i < 3; i++) {
931 value.sival_int = 123 + i;
932 KJ_SYSCALL(sigqueue(getpid(), SIGRTMIN, value));
933 }
934
935 // Now wait for them.
936 for (uint i = 0; i < 3; i++) {
937 auto promise = port.onSignal(SIGRTMIN);
938 if (doPoll) {
939 KJ_ASSERT(promise.poll(waitScope));
940 }
941 auto info = promise.wait(waitScope);
942 KJ_EXPECT(info.si_value.sival_int == 123 + i);
943 }
944
945 KJ_EXPECT(!port.onSignal(SIGRTMIN).poll(waitScope));
946 }
947
948 KJ_TEST("UnixEventPort can receive multiple queued instances of an RT signal") {
949 captureSignals();
950 UnixEventPort port;
951 EventLoop loop(port);
952 WaitScope waitScope(loop);
953
954 testRtSignals(port, waitScope, true);
955
956 // Test again, but don't poll() the promises. This may test a different code path, if poll() and
957 // wait() are very different in how they read signals. (For the poll(2)-based implementation of
958 // UnixEventPort, they are indeed pretty different.)
959 testRtSignals(port, waitScope, false);
960 }
961 #endif
962
963 } // namespace
964 } // namespace kj
965
966 #endif // !_WIN32
967