1 // Copyright (c) 2019 Cloudflare, Inc. and contributors
2 // Licensed under the MIT License:
3 //
4 // Permission is hereby granted, free of charge, to any person obtaining a copy
5 // of this software and associated documentation files (the "Software"), to deal
6 // in the Software without restriction, including without limitation the rights
7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 // copies of the Software, and to permit persons to whom the Software is
9 // furnished to do so, subject to the following conditions:
10 //
11 // The above copyright notice and this permission notice shall be included in
12 // all copies or substantial portions of the Software.
13 //
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20 // THE SOFTWARE.
21
22 #include "byte-stream.h"
23 #include <kj/test.h>
24 #include <capnp/rpc-twoparty.h>
25 #include <stdlib.h>
26
27 namespace capnp {
28 namespace {
29
expectRead(kj::AsyncInputStream & in,kj::StringPtr expected)30 kj::Promise<void> expectRead(kj::AsyncInputStream& in, kj::StringPtr expected) {
31 if (expected.size() == 0) return kj::READY_NOW;
32
33 auto buffer = kj::heapArray<char>(expected.size());
34
35 auto promise = in.tryRead(buffer.begin(), 1, buffer.size());
36 return promise.then(kj::mvCapture(buffer, [&in,expected](kj::Array<char> buffer, size_t amount) {
37 if (amount == 0) {
38 KJ_FAIL_ASSERT("expected data never sent", expected);
39 }
40
41 auto actual = buffer.slice(0, amount);
42 if (memcmp(actual.begin(), expected.begin(), actual.size()) != 0) {
43 KJ_FAIL_ASSERT("data from stream doesn't match expected", expected, actual);
44 }
45
46 return expectRead(in, expected.slice(amount));
47 }));
48 }
49
makeString(size_t size)50 kj::String makeString(size_t size) {
51 auto bytes = kj::heapArray<char>(size);
52 for (char& c: bytes) {
53 c = 'a' + rand() % 26;
54 }
55 bytes[bytes.size() - 1] = 0;
56 return kj::String(kj::mv(bytes));
57 };
58
59 KJ_TEST("KJ -> ByteStream -> KJ without shortening") {
60 kj::EventLoop eventLoop;
61 kj::WaitScope waitScope(eventLoop);
62
63 ByteStreamFactory factory1;
64 ByteStreamFactory factory2;
65
66 auto pipe = kj::newOneWayPipe();
67
68 auto wrapped = factory1.capnpToKj(factory2.kjToCapnp(kj::mv(pipe.out)));
69
70 {
71 auto promise = wrapped->write("foo", 3);
72 KJ_EXPECT(!promise.poll(waitScope));
73 expectRead(*pipe.in, "foo").wait(waitScope);
74 promise.wait(waitScope);
75 }
76
77 {
78 // Write more than 1 << 16 bytes at once to exercise write splitting.
79 auto str = makeString(1 << 17);
80 auto promise = wrapped->write(str.begin(), str.size());
81 KJ_EXPECT(!promise.poll(waitScope));
82 expectRead(*pipe.in, str).wait(waitScope);
83 promise.wait(waitScope);
84 }
85
86 {
87 // Write more than 1 << 16 bytes via an array to exercise write splitting.
88 auto str = makeString(1 << 18);
89 auto pieces = kj::heapArrayBuilder<kj::ArrayPtr<const kj::byte>>(4);
90
91 // Two 2^15 pieces will be combined.
92 pieces.add(kj::arrayPtr(reinterpret_cast<kj::byte*>(str.begin()), 1 << 15));
93 pieces.add(kj::arrayPtr(reinterpret_cast<kj::byte*>(str.begin() + (1 << 15)), 1 << 15));
94
95 // One 2^16 piece will be written alone.
96 pieces.add(kj::arrayPtr(reinterpret_cast<kj::byte*>(
97 str.begin() + (1 << 16)), 1 << 16));
98
99 // One 2^17 piece will be split.
100 pieces.add(kj::arrayPtr(reinterpret_cast<kj::byte*>(
101 str.begin() + (1 << 17)), str.size() - (1 << 17)));
102
103 auto promise = wrapped->write(pieces);
104 KJ_EXPECT(!promise.poll(waitScope));
105 expectRead(*pipe.in, str).wait(waitScope);
106 promise.wait(waitScope);
107 }
108
109 wrapped = nullptr;
110 KJ_EXPECT(pipe.in->readAllText().wait(waitScope) == "");
111 }
112
113 class ExactPointerWriter: public kj::AsyncOutputStream {
114 public:
115 kj::ArrayPtr<const char> receivedBuffer;
116
fulfill()117 void fulfill() {
118 KJ_ASSERT_NONNULL(fulfiller)->fulfill();
119 fulfiller = nullptr;
120 receivedBuffer = nullptr;
121 }
122
write(const void * buffer,size_t size)123 kj::Promise<void> write(const void* buffer, size_t size) override {
124 KJ_ASSERT(fulfiller == nullptr);
125 receivedBuffer = kj::arrayPtr(reinterpret_cast<const char*>(buffer), size);
126 auto paf = kj::newPromiseAndFulfiller<void>();
127 fulfiller = kj::mv(paf.fulfiller);
128 return kj::mv(paf.promise);
129 }
write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces)130 kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
131 KJ_UNIMPLEMENTED("not implemented for test");
132 }
whenWriteDisconnected()133 kj::Promise<void> whenWriteDisconnected() override {
134 return kj::NEVER_DONE;
135 }
136
expectBuffer(kj::StringPtr expected)137 void expectBuffer(kj::StringPtr expected) {
138 KJ_EXPECT(receivedBuffer == expected.asArray(), receivedBuffer, expected);
139 }
140
141 private:
142 kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> fulfiller;
143 };
144
145 KJ_TEST("KJ -> ByteStream -> KJ with shortening") {
146 kj::EventLoop eventLoop;
147 kj::WaitScope waitScope(eventLoop);
148
149 ByteStreamFactory factory;
150
151 auto pipe = kj::newOneWayPipe();
152
153 ExactPointerWriter exactPointerWriter;
154 auto pumpPromise = pipe.in->pumpTo(exactPointerWriter);
155
156 auto wrapped = factory.capnpToKj(factory.kjToCapnp(kj::mv(pipe.out)));
157
158 {
159 char buffer[4] = "foo";
160 auto promise = wrapped->write(buffer, 3);
161 KJ_EXPECT(!promise.poll(waitScope));
162
163 // This first write won't have been path-shortened because we didn't know about the shorter
164 // path yet when it started.
165 KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() != buffer);
166 KJ_EXPECT(kj::str(exactPointerWriter.receivedBuffer) == "foo");
167 exactPointerWriter.fulfill();
168 promise.wait(waitScope);
169 }
170
171 {
172 char buffer[4] = "foo";
173 auto promise = wrapped->write(buffer, 3);
174 KJ_EXPECT(!promise.poll(waitScope));
175
176 // The second write was path-shortened so passes through the exact buffer!
177 KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer);
178 KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 3);
179 exactPointerWriter.fulfill();
180 promise.wait(waitScope);
181 }
182
183 wrapped = nullptr;
184 KJ_EXPECT(pipe.in->readAllText().wait(waitScope) == "");
185 }
186
187 KJ_TEST("KJ -> ByteStream -> KJ -> ByteStream -> KJ with shortening") {
188 kj::EventLoop eventLoop;
189 kj::WaitScope waitScope(eventLoop);
190
191 ByteStreamFactory factory;
192
193 auto pipe = kj::newOneWayPipe();
194
195 ExactPointerWriter exactPointerWriter;
196 auto pumpPromise = pipe.in->pumpTo(exactPointerWriter);
197
198 auto wrapped = factory.capnpToKj(factory.kjToCapnp(
199 factory.capnpToKj(factory.kjToCapnp(kj::mv(pipe.out)))));
200
201 {
202 char buffer[4] = "foo";
203 auto promise = wrapped->write(buffer, 3);
204 KJ_EXPECT(!promise.poll(waitScope));
205
206 // This first write won't have been path-shortened because we didn't know about the shorter
207 // path yet when it started.
208 KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() != buffer);
209 KJ_EXPECT(kj::str(exactPointerWriter.receivedBuffer) == "foo");
210 exactPointerWriter.fulfill();
211 promise.wait(waitScope);
212 }
213
214 {
215 char buffer[4] = "bar";
216 auto promise = wrapped->write(buffer, 3);
217 KJ_EXPECT(!promise.poll(waitScope));
218
219 // The second write was path-shortened so passes through the exact buffer!
220 KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer);
221 KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 3);
222 exactPointerWriter.fulfill();
223 promise.wait(waitScope);
224 }
225
226 wrapped = nullptr;
227 KJ_EXPECT(pumpPromise.wait(waitScope) == 6);
228 }
229
230 KJ_TEST("KJ -> ByteStream -> KJ pipe -> ByteStream -> KJ with shortening") {
231 kj::EventLoop eventLoop;
232 kj::WaitScope waitScope(eventLoop);
233
234 ByteStreamFactory factory;
235
236 auto backPipe = kj::newOneWayPipe();
237 auto middlePipe = kj::newOneWayPipe();
238
239 ExactPointerWriter exactPointerWriter;
240 auto backPumpPromise = backPipe.in->pumpTo(exactPointerWriter);
241
242 auto backWrapped = factory.capnpToKj(factory.kjToCapnp(kj::mv(backPipe.out)));
243 auto midPumpPormise = middlePipe.in->pumpTo(*backWrapped, 3);
244
245 auto wrapped = factory.capnpToKj(factory.kjToCapnp(kj::mv(middlePipe.out)));
246
247 // Poll whenWriteDisconnected(), mainly as a way to let all the path-shortening settle.
248 auto disconnectPromise = wrapped->whenWriteDisconnected();
249 KJ_EXPECT(!disconnectPromise.poll(waitScope));
250
251 char buffer[7] = "foobar";
252 auto writePromise = wrapped->write(buffer, 6);
253 KJ_EXPECT(!writePromise.poll(waitScope));
254
255 // The first three bytes will tunnel all the way down to the destination.
256 KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer);
257 KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 3);
258 exactPointerWriter.fulfill();
259
260 KJ_EXPECT(midPumpPormise.wait(waitScope) == 3);
261
262 ExactPointerWriter exactPointerWriter2;
263 midPumpPormise = middlePipe.in->pumpTo(exactPointerWriter2, 6);
264 KJ_EXPECT(!writePromise.poll(waitScope));
265
266 // The second half of the "foobar" write will have taken a slow path, because the write was
267 // restarted in the middle of the stream re-resolving itself.
268 KJ_EXPECT(kj::str(exactPointerWriter2.receivedBuffer) == "bar");
269 exactPointerWriter2.fulfill();
270
271 // Now that write is done.
272 writePromise.wait(waitScope);
273 KJ_EXPECT(!midPumpPormise.poll(waitScope));
274
275 // If we write again, it'll hit the fast path.
276 char buffer2[4] = "baz";
277 writePromise = wrapped->write(buffer2, 3);
278 KJ_EXPECT(!writePromise.poll(waitScope));
279 KJ_EXPECT(exactPointerWriter2.receivedBuffer.begin() == buffer2);
280 KJ_EXPECT(exactPointerWriter2.receivedBuffer.size() == 3);
281 exactPointerWriter2.fulfill();
282
283 KJ_EXPECT(midPumpPormise.wait(waitScope) == 6);
284 writePromise.wait(waitScope);
285 }
286
287 KJ_TEST("KJ -> ByteStream RPC -> KJ pipe -> ByteStream RPC -> KJ with shortening") {
288 // For this test, we're going to verify that if we have ByteStreams over RPC in both directions
289 // and we pump a ByteStream to another ByteStream at one end of the connection, it gets shortened
290 // all the way to the other end!
291
292 kj::EventLoop eventLoop;
293 kj::WaitScope waitScope(eventLoop);
294
295 ByteStreamFactory clientFactory;
296 ByteStreamFactory serverFactory;
297
298 auto backPipe = kj::newOneWayPipe();
299 auto middlePipe = kj::newOneWayPipe();
300
301 ExactPointerWriter exactPointerWriter;
302 auto backPumpPromise = backPipe.in->pumpTo(exactPointerWriter);
303
304 auto rpcConnection = kj::newTwoWayPipe();
305 capnp::TwoPartyClient client(*rpcConnection.ends[0],
306 clientFactory.kjToCapnp(kj::mv(backPipe.out)),
307 rpc::twoparty::Side::CLIENT);
308 capnp::TwoPartyClient server(*rpcConnection.ends[1],
309 serverFactory.kjToCapnp(kj::mv(middlePipe.out)),
310 rpc::twoparty::Side::CLIENT);
311
312 auto backWrapped = serverFactory.capnpToKj(server.bootstrap().castAs<ByteStream>());
313 auto midPumpPormise = middlePipe.in->pumpTo(*backWrapped, 3);
314
315 auto wrapped = clientFactory.capnpToKj(client.bootstrap().castAs<ByteStream>());
316
317 // Poll whenWriteDisconnected(), mainly as a way to let all the path-shortening settle.
318 auto disconnectPromise = wrapped->whenWriteDisconnected();
319 KJ_EXPECT(!disconnectPromise.poll(waitScope));
320
321 char buffer[7] = "foobar";
322 auto writePromise = wrapped->write(buffer, 6);
323
324 // The server side did a 3-byte pump. Path-shortening magic kicks in, and the first three bytes
325 // of the write on the client side go *directly* to the endpoint without a copy!
326 KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer);
327 KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 3);
328 exactPointerWriter.fulfill();
329
330 KJ_EXPECT(midPumpPormise.wait(waitScope) == 3);
331
332 ExactPointerWriter exactPointerWriter2;
333 midPumpPormise = middlePipe.in->pumpTo(exactPointerWriter2, 6);
334 midPumpPormise.poll(waitScope);
335
336 // The second half of the "foobar" write will have taken a slow path, because the write was
337 // restarted in the middle of the stream re-resolving itself.
338 KJ_EXPECT(kj::str(exactPointerWriter2.receivedBuffer) == "bar");
339 exactPointerWriter2.fulfill();
340
341 // Now that write is done.
342 writePromise.wait(waitScope);
343 KJ_EXPECT(!midPumpPormise.poll(waitScope));
344
345 // If we write again, it'll finish the server-side pump (but won't be a zero-copy write since
346 // it has to go over RPC).
347 char buffer2[4] = "baz";
348 writePromise = wrapped->write(buffer2, 3);
349 KJ_EXPECT(!midPumpPormise.poll(waitScope));
350 KJ_EXPECT(kj::str(exactPointerWriter2.receivedBuffer) == "baz");
351 exactPointerWriter2.fulfill();
352
353 KJ_EXPECT(midPumpPormise.wait(waitScope) == 6);
354 writePromise.wait(waitScope);
355 }
356
357 KJ_TEST("KJ -> ByteStream RPC -> KJ pipe -> ByteStream RPC -> KJ with concurrent shortening") {
358 // This is similar to the previous test, but we start writing before the path-shortening has
359 // settled. This should result in some writes optimistically bouncing back and forth before
360 // the stream settles in.
361
362 kj::EventLoop eventLoop;
363 kj::WaitScope waitScope(eventLoop);
364
365 ByteStreamFactory clientFactory;
366 ByteStreamFactory serverFactory;
367
368 auto backPipe = kj::newOneWayPipe();
369 auto middlePipe = kj::newOneWayPipe();
370
371 ExactPointerWriter exactPointerWriter;
372 auto backPumpPromise = backPipe.in->pumpTo(exactPointerWriter);
373
374 auto rpcConnection = kj::newTwoWayPipe();
375 capnp::TwoPartyClient client(*rpcConnection.ends[0],
376 clientFactory.kjToCapnp(kj::mv(backPipe.out)),
377 rpc::twoparty::Side::CLIENT);
378 capnp::TwoPartyClient server(*rpcConnection.ends[1],
379 serverFactory.kjToCapnp(kj::mv(middlePipe.out)),
380 rpc::twoparty::Side::CLIENT);
381
382 auto backWrapped = serverFactory.capnpToKj(server.bootstrap().castAs<ByteStream>());
383 auto midPumpPormise = middlePipe.in->pumpTo(*backWrapped);
384
385 auto wrapped = clientFactory.capnpToKj(client.bootstrap().castAs<ByteStream>());
386
387 char buffer[7] = "foobar";
388 auto writePromise = wrapped->write(buffer, 6);
389
390 // The write went to RPC so it's not immediately received.
391 KJ_EXPECT(exactPointerWriter.receivedBuffer == nullptr);
392
393 // Write should be received after we turn the event loop.
394 waitScope.poll();
395 KJ_EXPECT(exactPointerWriter.receivedBuffer != nullptr);
396
397 // Note that the promise that write() returned above has already resolved, because it hit RPC
398 // and went into the streaming window.
399 KJ_ASSERT(writePromise.poll(waitScope));
400 writePromise.wait(waitScope);
401
402 // Let's start a second write. Even though the first write technically isn't done yet, it's
403 // legal for us to start a second one because the first write's returned promise optimistically
404 // resolved for streaming window reasons. This ends up being a very tricky case for our code!
405 char buffer2[7] = "bazqux";
406 auto writePromise2 = wrapped->write(buffer2, 6);
407
408 // Now check the first write was correct, and close it out.
409 KJ_EXPECT(kj::str(exactPointerWriter.receivedBuffer) == "foobar");
410 exactPointerWriter.fulfill();
411
412 // Turn event loop again. Now the second write arrives.
413 waitScope.poll();
414 KJ_EXPECT(kj::str(exactPointerWriter.receivedBuffer) == "bazqux");
415 exactPointerWriter.fulfill();
416 writePromise2.wait(waitScope);
417
418 // If we do another write now, it should be zero-copy, because everything has settled.
419 char buffer3[6] = "corge";
420 auto writePromise3 = wrapped->write(buffer3, 5);
421 KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer3);
422 KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 5);
423 KJ_EXPECT(!writePromise3.poll(waitScope));
424 exactPointerWriter.fulfill();
425 writePromise3.wait(waitScope);
426 }
427
428 KJ_TEST("KJ -> KJ pipe -> ByteStream RPC -> KJ pipe -> ByteStream RPC -> KJ with concurrent shortening") {
429 // Same as previous test, except we add a KJ pipe at the beginning and pump it into the top of
430 // the pipe, which invokes tryPumpFrom() on the KjToCapnpStreamAdapter.
431
432 kj::EventLoop eventLoop;
433 kj::WaitScope waitScope(eventLoop);
434
435 ByteStreamFactory clientFactory;
436 ByteStreamFactory serverFactory;
437
438 auto backPipe = kj::newOneWayPipe();
439 auto middlePipe = kj::newOneWayPipe();
440 auto frontPipe = kj::newOneWayPipe();
441
442 ExactPointerWriter exactPointerWriter;
443 auto backPumpPromise = backPipe.in->pumpTo(exactPointerWriter);
444
445 auto rpcConnection = kj::newTwoWayPipe();
446 capnp::TwoPartyClient client(*rpcConnection.ends[0],
447 clientFactory.kjToCapnp(kj::mv(backPipe.out)),
448 rpc::twoparty::Side::CLIENT);
449 capnp::TwoPartyClient server(*rpcConnection.ends[1],
450 serverFactory.kjToCapnp(kj::mv(middlePipe.out)),
451 rpc::twoparty::Side::CLIENT);
452
453 auto backWrapped = serverFactory.capnpToKj(server.bootstrap().castAs<ByteStream>());
454 auto midPumpPormise = middlePipe.in->pumpTo(*backWrapped);
455
456 auto wrapped = clientFactory.capnpToKj(client.bootstrap().castAs<ByteStream>());
457 auto frontPumpPromise = frontPipe.in->pumpTo(*wrapped);
458
459 char buffer[7] = "foobar";
460 auto writePromise = frontPipe.out->write(buffer, 6);
461
462 // The write went to RPC so it's not immediately received.
463 KJ_EXPECT(exactPointerWriter.receivedBuffer == nullptr);
464
465 // Write should be received after we turn the event loop.
466 waitScope.poll();
467 KJ_EXPECT(exactPointerWriter.receivedBuffer != nullptr);
468
469 // Note that the promise that write() returned above has already resolved, because it hit RPC
470 // and went into the streaming window.
471 KJ_ASSERT(writePromise.poll(waitScope));
472 writePromise.wait(waitScope);
473
474 // Let's start a second write. Even though the first write technically isn't done yet, it's
475 // legal for us to start a second one because the first write's returned promise optimistically
476 // resolved for streaming window reasons. This ends up being a very tricky case for our code!
477 char buffer2[7] = "bazqux";
478 auto writePromise2 = frontPipe.out->write(buffer2, 6);
479
480 // Now check the first write was correct, and close it out.
481 KJ_EXPECT(kj::str(exactPointerWriter.receivedBuffer) == "foobar");
482 exactPointerWriter.fulfill();
483
484 // Turn event loop again. Now the second write arrives.
485 waitScope.poll();
486 KJ_EXPECT(kj::str(exactPointerWriter.receivedBuffer) == "bazqux");
487 exactPointerWriter.fulfill();
488 writePromise2.wait(waitScope);
489
490 // If we do another write now, it should be zero-copy, because everything has settled.
491 char buffer3[6] = "corge";
492 auto writePromise3 = frontPipe.out->write(buffer3, 5);
493 KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer3);
494 KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 5);
495 KJ_EXPECT(!writePromise3.poll(waitScope));
496 exactPointerWriter.fulfill();
497 writePromise3.wait(waitScope);
498 }
499
500 KJ_TEST("Two Substreams on one destination") {
501 kj::EventLoop eventLoop;
502 kj::WaitScope waitScope(eventLoop);
503
504 ByteStreamFactory factory;
505
506 auto backPipe = kj::newOneWayPipe();
507 auto middlePipe1 = kj::newOneWayPipe();
508 auto middlePipe2 = kj::newOneWayPipe();
509
510 ExactPointerWriter exactPointerWriter;
511 auto backPumpPromise = backPipe.in->pumpTo(exactPointerWriter);
512
513 auto backWrapped = factory.capnpToKj(factory.kjToCapnp(kj::mv(backPipe.out)));
514
515 auto wrapped1 = factory.capnpToKj(factory.kjToCapnp(kj::mv(middlePipe1.out)));
516 auto wrapped2 = factory.capnpToKj(factory.kjToCapnp(kj::mv(middlePipe2.out)));
517
518 // Declare these buffers out here so that they can't possibly end up with the same address.
519 char buffer1[4] = "foo";
520 char buffer2[4] = "bar";
521
522 {
523 auto wrapped = kj::mv(wrapped1);
524
525 // First pump 3 bytes from the first stream.
526 auto midPumpPormise = middlePipe1.in->pumpTo(*backWrapped, 3);
527
528 // Poll whenWriteDisconnected(), mainly as a way to let all the path-shortening settle.
529 auto disconnectPromise = wrapped->whenWriteDisconnected();
530 KJ_EXPECT(!disconnectPromise.poll(waitScope));
531
532 auto writePromise = wrapped->write(buffer1, 3);
533 KJ_EXPECT(!writePromise.poll(waitScope));
534
535 // The first write will tunnel all the way down to the destination.
536 KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer1);
537 KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 3);
538 exactPointerWriter.fulfill();
539
540 writePromise.wait(waitScope);
541 KJ_EXPECT(midPumpPormise.wait(waitScope) == 3);
542 }
543
544 {
545 auto wrapped = kj::mv(wrapped2);
546
547 // Now pump another 3 bytes from the second stream.
548 auto midPumpPormise = middlePipe2.in->pumpTo(*backWrapped, 3);
549
550 // Poll whenWriteDisconnected(), mainly as a way to let all the path-shortening settle.
551 auto disconnectPromise = wrapped->whenWriteDisconnected();
552 KJ_EXPECT(!disconnectPromise.poll(waitScope));
553
554 auto writePromise = wrapped->write(buffer2, 3);
555 KJ_EXPECT(!writePromise.poll(waitScope));
556
557 // The second write will also tunnel all the way down to the destination.
558 KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer2);
559 KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 3);
560 exactPointerWriter.fulfill();
561
562 writePromise.wait(waitScope);
563 KJ_EXPECT(midPumpPormise.wait(waitScope) == 3);
564 }
565 }
566
567 KJ_TEST("Two Substreams on one destination no limits (pump to EOF)") {
568 kj::EventLoop eventLoop;
569 kj::WaitScope waitScope(eventLoop);
570
571 ByteStreamFactory factory;
572
573 auto backPipe = kj::newOneWayPipe();
574 auto middlePipe1 = kj::newOneWayPipe();
575 auto middlePipe2 = kj::newOneWayPipe();
576
577 ExactPointerWriter exactPointerWriter;
578 auto backPumpPromise = backPipe.in->pumpTo(exactPointerWriter);
579
580 auto backWrapped = factory.capnpToKj(factory.kjToCapnp(kj::mv(backPipe.out)));
581
582 auto wrapped1 = factory.capnpToKj(factory.kjToCapnp(kj::mv(middlePipe1.out)));
583 auto wrapped2 = factory.capnpToKj(factory.kjToCapnp(kj::mv(middlePipe2.out)));
584
585 // Declare these buffers out here so that they can't possibly end up with the same address.
586 char buffer1[4] = "foo";
587 char buffer2[4] = "bar";
588
589 {
590 auto wrapped = kj::mv(wrapped1);
591
592 // First pump from the first stream until EOF.
593 auto midPumpPormise = middlePipe1.in->pumpTo(*backWrapped);
594
595 // Poll whenWriteDisconnected(), mainly as a way to let all the path-shortening settle.
596 auto disconnectPromise = wrapped->whenWriteDisconnected();
597 KJ_EXPECT(!disconnectPromise.poll(waitScope));
598
599 auto writePromise = wrapped->write(buffer1, 3);
600 KJ_EXPECT(!writePromise.poll(waitScope));
601
602 // The first write will tunnel all the way down to the destination.
603 KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer1);
604 KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 3);
605 exactPointerWriter.fulfill();
606
607 writePromise.wait(waitScope);
608 { auto drop = kj::mv(wrapped); }
609 KJ_EXPECT(midPumpPormise.wait(waitScope) == 3);
610 }
611
612 {
613 auto wrapped = kj::mv(wrapped2);
614
615 // Now pump from the second stream until EOF.
616 auto midPumpPormise = middlePipe2.in->pumpTo(*backWrapped);
617
618 // Poll whenWriteDisconnected(), mainly as a way to let all the path-shortening settle.
619 auto disconnectPromise = wrapped->whenWriteDisconnected();
620 KJ_EXPECT(!disconnectPromise.poll(waitScope));
621
622 auto writePromise = wrapped->write(buffer2, 3);
623 KJ_EXPECT(!writePromise.poll(waitScope));
624
625 // The second write will also tunnel all the way down to the destination.
626 KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer2);
627 KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 3);
628 exactPointerWriter.fulfill();
629
630 writePromise.wait(waitScope);
631 { auto drop = kj::mv(wrapped); }
632 KJ_EXPECT(midPumpPormise.wait(waitScope) == 3);
633 }
634 }
635
636 KJ_TEST("KJ -> ByteStream RPC -> KJ promise stream -> ByteStream -> KJ") {
637 // Test what happens if we queue up several requests on a ByteStream and then it resolves to
638 // a shorter path.
639
640 kj::EventLoop eventLoop;
641 kj::WaitScope waitScope(eventLoop);
642
643 ByteStreamFactory factory;
644 ExactPointerWriter exactPointerWriter;
645
646 auto paf = kj::newPromiseAndFulfiller<kj::Own<kj::AsyncOutputStream>>();
647 auto backCap = factory.kjToCapnp(kj::newPromisedStream(kj::mv(paf.promise)));
648
649 auto rpcPipe = kj::newTwoWayPipe();
650 capnp::TwoPartyClient client(*rpcPipe.ends[0]);
651 capnp::TwoPartyClient server(*rpcPipe.ends[1], kj::mv(backCap), rpc::twoparty::Side::SERVER);
652 auto front = factory.capnpToKj(client.bootstrap().castAs<ByteStream>());
653
654 // These will all queue up in the RPC layer.
655 front->write("foo", 3).wait(waitScope);
656 front->write("bar", 3).wait(waitScope);
657 front->write("baz", 3).wait(waitScope);
658 front->write("qux", 3).wait(waitScope);
659
660 // Make sure those writes manage to get all the way through the RPC system and queue up in the
661 // LocalClient wrapping the CapnpToKjStreamAdapter at the other end.
662 waitScope.poll();
663
664 // Fulfill the promise.
665 paf.fulfiller->fulfill(factory.capnpToKj(factory.kjToCapnp(kj::attachRef(exactPointerWriter))));
666 waitScope.poll();
667
668 // Now:
669 // - "foo" should have made it all the way down to the final output stream.
670 // - "bar", "baz", and "qux" are queued on the CapnpToKjStreamAdapter immediately wrapping the
671 // KJ promise stream.
672 // - But that stream adapter has discovered that there's another capnp stream downstream and has
673 // resolved itself to the later stream.
674 // - A new call at this time should NOT be allowed to hop the queue.
675
676 exactPointerWriter.expectBuffer("foo");
677
678 front->write("corge", 5).wait(waitScope);
679 waitScope.poll();
680
681 exactPointerWriter.fulfill();
682
683 waitScope.poll();
684 exactPointerWriter.expectBuffer("bar");
685 exactPointerWriter.fulfill();
686
687 waitScope.poll();
688 exactPointerWriter.expectBuffer("baz");
689 exactPointerWriter.fulfill();
690
691 waitScope.poll();
692 exactPointerWriter.expectBuffer("qux");
693 exactPointerWriter.fulfill();
694
695 waitScope.poll();
696 exactPointerWriter.expectBuffer("corge");
697 exactPointerWriter.fulfill();
698
699 // There may still be some detach()ed promises holding on to some capabilities that transitively
700 // hold a fake Own<AsyncOutputStream> pointing at exactPointerWriter, which is actually on the
701 // stack. We created a fake Own pointing to a stack variable by using
702 // kj::attachRef(exactPointerWriter), above; it does not actually own the object it points to.
703 // We need to make sure those Owns are dropped before exactPoniterWriter is destroyed, otherwise
704 // ASAN will flag some invalid reads (of exactPointerWriter's vtable, in particular).
705 waitScope.cancelAllDetached();
706 }
707
708 // TODO:
709 // - Parallel writes (requires streaming)
710 // - Write to KJ -> capnp -> RPC -> capnp -> KJ loopback without shortening, verify we can write
711 // several things to buffer (requires streaming).
712 // - Again, but with shortening which only occurs after some promise resolve.
713
714 } // namespace
715 } // namespace capnp
716