1 // Copyright (c) 2019 Cloudflare, Inc. and contributors
2 // Licensed under the MIT License:
3 //
4 // Permission is hereby granted, free of charge, to any person obtaining a copy
5 // of this software and associated documentation files (the "Software"), to deal
6 // in the Software without restriction, including without limitation the rights
7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 // copies of the Software, and to permit persons to whom the Software is
9 // furnished to do so, subject to the following conditions:
10 //
11 // The above copyright notice and this permission notice shall be included in
12 // all copies or substantial portions of the Software.
13 //
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20 // THE SOFTWARE.
21 
22 #include "byte-stream.h"
23 #include <kj/test.h>
24 #include <capnp/rpc-twoparty.h>
25 #include <stdlib.h>
26 
27 namespace capnp {
28 namespace {
29 
expectRead(kj::AsyncInputStream & in,kj::StringPtr expected)30 kj::Promise<void> expectRead(kj::AsyncInputStream& in, kj::StringPtr expected) {
31   if (expected.size() == 0) return kj::READY_NOW;
32 
33   auto buffer = kj::heapArray<char>(expected.size());
34 
35   auto promise = in.tryRead(buffer.begin(), 1, buffer.size());
36   return promise.then(kj::mvCapture(buffer, [&in,expected](kj::Array<char> buffer, size_t amount) {
37     if (amount == 0) {
38       KJ_FAIL_ASSERT("expected data never sent", expected);
39     }
40 
41     auto actual = buffer.slice(0, amount);
42     if (memcmp(actual.begin(), expected.begin(), actual.size()) != 0) {
43       KJ_FAIL_ASSERT("data from stream doesn't match expected", expected, actual);
44     }
45 
46     return expectRead(in, expected.slice(amount));
47   }));
48 }
49 
makeString(size_t size)50 kj::String makeString(size_t size) {
51   auto bytes = kj::heapArray<char>(size);
52   for (char& c: bytes) {
53     c = 'a' + rand() % 26;
54   }
55   bytes[bytes.size() - 1] = 0;
56   return kj::String(kj::mv(bytes));
57 };
58 
59 KJ_TEST("KJ -> ByteStream -> KJ without shortening") {
60   kj::EventLoop eventLoop;
61   kj::WaitScope waitScope(eventLoop);
62 
63   ByteStreamFactory factory1;
64   ByteStreamFactory factory2;
65 
66   auto pipe = kj::newOneWayPipe();
67 
68   auto wrapped = factory1.capnpToKj(factory2.kjToCapnp(kj::mv(pipe.out)));
69 
70   {
71     auto promise = wrapped->write("foo", 3);
72     KJ_EXPECT(!promise.poll(waitScope));
73     expectRead(*pipe.in, "foo").wait(waitScope);
74     promise.wait(waitScope);
75   }
76 
77   {
78     // Write more than 1 << 16 bytes at once to exercise write splitting.
79     auto str = makeString(1 << 17);
80     auto promise = wrapped->write(str.begin(), str.size());
81     KJ_EXPECT(!promise.poll(waitScope));
82     expectRead(*pipe.in, str).wait(waitScope);
83     promise.wait(waitScope);
84   }
85 
86   {
87     // Write more than 1 << 16 bytes via an array to exercise write splitting.
88     auto str = makeString(1 << 18);
89     auto pieces = kj::heapArrayBuilder<kj::ArrayPtr<const kj::byte>>(4);
90 
91     // Two 2^15 pieces will be combined.
92     pieces.add(kj::arrayPtr(reinterpret_cast<kj::byte*>(str.begin()), 1 << 15));
93     pieces.add(kj::arrayPtr(reinterpret_cast<kj::byte*>(str.begin() + (1 << 15)), 1 << 15));
94 
95     // One 2^16 piece will be written alone.
96     pieces.add(kj::arrayPtr(reinterpret_cast<kj::byte*>(
97         str.begin() + (1 << 16)), 1 << 16));
98 
99     // One 2^17 piece will be split.
100     pieces.add(kj::arrayPtr(reinterpret_cast<kj::byte*>(
101         str.begin() + (1 << 17)), str.size() - (1 << 17)));
102 
103     auto promise = wrapped->write(pieces);
104     KJ_EXPECT(!promise.poll(waitScope));
105     expectRead(*pipe.in, str).wait(waitScope);
106     promise.wait(waitScope);
107   }
108 
109   wrapped = nullptr;
110   KJ_EXPECT(pipe.in->readAllText().wait(waitScope) == "");
111 }
112 
113 class ExactPointerWriter: public kj::AsyncOutputStream {
114 public:
115   kj::ArrayPtr<const char> receivedBuffer;
116 
fulfill()117   void fulfill() {
118     KJ_ASSERT_NONNULL(fulfiller)->fulfill();
119     fulfiller = nullptr;
120     receivedBuffer = nullptr;
121   }
122 
write(const void * buffer,size_t size)123   kj::Promise<void> write(const void* buffer, size_t size) override {
124     KJ_ASSERT(fulfiller == nullptr);
125     receivedBuffer = kj::arrayPtr(reinterpret_cast<const char*>(buffer), size);
126     auto paf = kj::newPromiseAndFulfiller<void>();
127     fulfiller = kj::mv(paf.fulfiller);
128     return kj::mv(paf.promise);
129   }
write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces)130   kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
131     KJ_UNIMPLEMENTED("not implemented for test");
132   }
whenWriteDisconnected()133   kj::Promise<void> whenWriteDisconnected() override {
134     return kj::NEVER_DONE;
135   }
136 
expectBuffer(kj::StringPtr expected)137   void expectBuffer(kj::StringPtr expected) {
138     KJ_EXPECT(receivedBuffer == expected.asArray(), receivedBuffer, expected);
139   }
140 
141 private:
142   kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> fulfiller;
143 };
144 
145 KJ_TEST("KJ -> ByteStream -> KJ with shortening") {
146   kj::EventLoop eventLoop;
147   kj::WaitScope waitScope(eventLoop);
148 
149   ByteStreamFactory factory;
150 
151   auto pipe = kj::newOneWayPipe();
152 
153   ExactPointerWriter exactPointerWriter;
154   auto pumpPromise = pipe.in->pumpTo(exactPointerWriter);
155 
156   auto wrapped = factory.capnpToKj(factory.kjToCapnp(kj::mv(pipe.out)));
157 
158   {
159     char buffer[4] = "foo";
160     auto promise = wrapped->write(buffer, 3);
161     KJ_EXPECT(!promise.poll(waitScope));
162 
163     // This first write won't have been path-shortened because we didn't know about the shorter
164     // path yet when it started.
165     KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() != buffer);
166     KJ_EXPECT(kj::str(exactPointerWriter.receivedBuffer) == "foo");
167     exactPointerWriter.fulfill();
168     promise.wait(waitScope);
169   }
170 
171   {
172     char buffer[4] = "foo";
173     auto promise = wrapped->write(buffer, 3);
174     KJ_EXPECT(!promise.poll(waitScope));
175 
176     // The second write was path-shortened so passes through the exact buffer!
177     KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer);
178     KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 3);
179     exactPointerWriter.fulfill();
180     promise.wait(waitScope);
181   }
182 
183   wrapped = nullptr;
184   KJ_EXPECT(pipe.in->readAllText().wait(waitScope) == "");
185 }
186 
187 KJ_TEST("KJ -> ByteStream -> KJ -> ByteStream -> KJ with shortening") {
188   kj::EventLoop eventLoop;
189   kj::WaitScope waitScope(eventLoop);
190 
191   ByteStreamFactory factory;
192 
193   auto pipe = kj::newOneWayPipe();
194 
195   ExactPointerWriter exactPointerWriter;
196   auto pumpPromise = pipe.in->pumpTo(exactPointerWriter);
197 
198   auto wrapped = factory.capnpToKj(factory.kjToCapnp(
199                  factory.capnpToKj(factory.kjToCapnp(kj::mv(pipe.out)))));
200 
201   {
202     char buffer[4] = "foo";
203     auto promise = wrapped->write(buffer, 3);
204     KJ_EXPECT(!promise.poll(waitScope));
205 
206     // This first write won't have been path-shortened because we didn't know about the shorter
207     // path yet when it started.
208     KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() != buffer);
209     KJ_EXPECT(kj::str(exactPointerWriter.receivedBuffer) == "foo");
210     exactPointerWriter.fulfill();
211     promise.wait(waitScope);
212   }
213 
214   {
215     char buffer[4] = "bar";
216     auto promise = wrapped->write(buffer, 3);
217     KJ_EXPECT(!promise.poll(waitScope));
218 
219     // The second write was path-shortened so passes through the exact buffer!
220     KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer);
221     KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 3);
222     exactPointerWriter.fulfill();
223     promise.wait(waitScope);
224   }
225 
226   wrapped = nullptr;
227   KJ_EXPECT(pumpPromise.wait(waitScope) == 6);
228 }
229 
230 KJ_TEST("KJ -> ByteStream -> KJ pipe -> ByteStream -> KJ with shortening") {
231   kj::EventLoop eventLoop;
232   kj::WaitScope waitScope(eventLoop);
233 
234   ByteStreamFactory factory;
235 
236   auto backPipe = kj::newOneWayPipe();
237   auto middlePipe = kj::newOneWayPipe();
238 
239   ExactPointerWriter exactPointerWriter;
240   auto backPumpPromise = backPipe.in->pumpTo(exactPointerWriter);
241 
242   auto backWrapped = factory.capnpToKj(factory.kjToCapnp(kj::mv(backPipe.out)));
243   auto midPumpPormise = middlePipe.in->pumpTo(*backWrapped, 3);
244 
245   auto wrapped = factory.capnpToKj(factory.kjToCapnp(kj::mv(middlePipe.out)));
246 
247   // Poll whenWriteDisconnected(), mainly as a way to let all the path-shortening settle.
248   auto disconnectPromise = wrapped->whenWriteDisconnected();
249   KJ_EXPECT(!disconnectPromise.poll(waitScope));
250 
251   char buffer[7] = "foobar";
252   auto writePromise = wrapped->write(buffer, 6);
253   KJ_EXPECT(!writePromise.poll(waitScope));
254 
255   // The first three bytes will tunnel all the way down to the destination.
256   KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer);
257   KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 3);
258   exactPointerWriter.fulfill();
259 
260   KJ_EXPECT(midPumpPormise.wait(waitScope) == 3);
261 
262   ExactPointerWriter exactPointerWriter2;
263   midPumpPormise = middlePipe.in->pumpTo(exactPointerWriter2, 6);
264   KJ_EXPECT(!writePromise.poll(waitScope));
265 
266   // The second half of the "foobar" write will have taken a slow path, because the write was
267   // restarted in the middle of the stream re-resolving itself.
268   KJ_EXPECT(kj::str(exactPointerWriter2.receivedBuffer) == "bar");
269   exactPointerWriter2.fulfill();
270 
271   // Now that write is done.
272   writePromise.wait(waitScope);
273   KJ_EXPECT(!midPumpPormise.poll(waitScope));
274 
275   // If we write again, it'll hit the fast path.
276   char buffer2[4] = "baz";
277   writePromise = wrapped->write(buffer2, 3);
278   KJ_EXPECT(!writePromise.poll(waitScope));
279   KJ_EXPECT(exactPointerWriter2.receivedBuffer.begin() == buffer2);
280   KJ_EXPECT(exactPointerWriter2.receivedBuffer.size() == 3);
281   exactPointerWriter2.fulfill();
282 
283   KJ_EXPECT(midPumpPormise.wait(waitScope) == 6);
284   writePromise.wait(waitScope);
285 }
286 
287 KJ_TEST("KJ -> ByteStream RPC -> KJ pipe -> ByteStream RPC -> KJ with shortening") {
288   // For this test, we're going to verify that if we have ByteStreams over RPC in both directions
289   // and we pump a ByteStream to another ByteStream at one end of the connection, it gets shortened
290   // all the way to the other end!
291 
292   kj::EventLoop eventLoop;
293   kj::WaitScope waitScope(eventLoop);
294 
295   ByteStreamFactory clientFactory;
296   ByteStreamFactory serverFactory;
297 
298   auto backPipe = kj::newOneWayPipe();
299   auto middlePipe = kj::newOneWayPipe();
300 
301   ExactPointerWriter exactPointerWriter;
302   auto backPumpPromise = backPipe.in->pumpTo(exactPointerWriter);
303 
304   auto rpcConnection = kj::newTwoWayPipe();
305   capnp::TwoPartyClient client(*rpcConnection.ends[0],
306       clientFactory.kjToCapnp(kj::mv(backPipe.out)),
307       rpc::twoparty::Side::CLIENT);
308   capnp::TwoPartyClient server(*rpcConnection.ends[1],
309       serverFactory.kjToCapnp(kj::mv(middlePipe.out)),
310       rpc::twoparty::Side::CLIENT);
311 
312   auto backWrapped = serverFactory.capnpToKj(server.bootstrap().castAs<ByteStream>());
313   auto midPumpPormise = middlePipe.in->pumpTo(*backWrapped, 3);
314 
315   auto wrapped = clientFactory.capnpToKj(client.bootstrap().castAs<ByteStream>());
316 
317   // Poll whenWriteDisconnected(), mainly as a way to let all the path-shortening settle.
318   auto disconnectPromise = wrapped->whenWriteDisconnected();
319   KJ_EXPECT(!disconnectPromise.poll(waitScope));
320 
321   char buffer[7] = "foobar";
322   auto writePromise = wrapped->write(buffer, 6);
323 
324   // The server side did a 3-byte pump. Path-shortening magic kicks in, and the first three bytes
325   // of the write on the client side go *directly* to the endpoint without a copy!
326   KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer);
327   KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 3);
328   exactPointerWriter.fulfill();
329 
330   KJ_EXPECT(midPumpPormise.wait(waitScope) == 3);
331 
332   ExactPointerWriter exactPointerWriter2;
333   midPumpPormise = middlePipe.in->pumpTo(exactPointerWriter2, 6);
334   midPumpPormise.poll(waitScope);
335 
336   // The second half of the "foobar" write will have taken a slow path, because the write was
337   // restarted in the middle of the stream re-resolving itself.
338   KJ_EXPECT(kj::str(exactPointerWriter2.receivedBuffer) == "bar");
339   exactPointerWriter2.fulfill();
340 
341   // Now that write is done.
342   writePromise.wait(waitScope);
343   KJ_EXPECT(!midPumpPormise.poll(waitScope));
344 
345   // If we write again, it'll finish the server-side pump (but won't be a zero-copy write since
346   // it has to go over RPC).
347   char buffer2[4] = "baz";
348   writePromise = wrapped->write(buffer2, 3);
349   KJ_EXPECT(!midPumpPormise.poll(waitScope));
350   KJ_EXPECT(kj::str(exactPointerWriter2.receivedBuffer) == "baz");
351   exactPointerWriter2.fulfill();
352 
353   KJ_EXPECT(midPumpPormise.wait(waitScope) == 6);
354   writePromise.wait(waitScope);
355 }
356 
357 KJ_TEST("KJ -> ByteStream RPC -> KJ pipe -> ByteStream RPC -> KJ with concurrent shortening") {
358   // This is similar to the previous test, but we start writing before the path-shortening has
359   // settled. This should result in some writes optimistically bouncing back and forth before
360   // the stream settles in.
361 
362   kj::EventLoop eventLoop;
363   kj::WaitScope waitScope(eventLoop);
364 
365   ByteStreamFactory clientFactory;
366   ByteStreamFactory serverFactory;
367 
368   auto backPipe = kj::newOneWayPipe();
369   auto middlePipe = kj::newOneWayPipe();
370 
371   ExactPointerWriter exactPointerWriter;
372   auto backPumpPromise = backPipe.in->pumpTo(exactPointerWriter);
373 
374   auto rpcConnection = kj::newTwoWayPipe();
375   capnp::TwoPartyClient client(*rpcConnection.ends[0],
376       clientFactory.kjToCapnp(kj::mv(backPipe.out)),
377       rpc::twoparty::Side::CLIENT);
378   capnp::TwoPartyClient server(*rpcConnection.ends[1],
379       serverFactory.kjToCapnp(kj::mv(middlePipe.out)),
380       rpc::twoparty::Side::CLIENT);
381 
382   auto backWrapped = serverFactory.capnpToKj(server.bootstrap().castAs<ByteStream>());
383   auto midPumpPormise = middlePipe.in->pumpTo(*backWrapped);
384 
385   auto wrapped = clientFactory.capnpToKj(client.bootstrap().castAs<ByteStream>());
386 
387   char buffer[7] = "foobar";
388   auto writePromise = wrapped->write(buffer, 6);
389 
390   // The write went to RPC so it's not immediately received.
391   KJ_EXPECT(exactPointerWriter.receivedBuffer == nullptr);
392 
393   // Write should be received after we turn the event loop.
394   waitScope.poll();
395   KJ_EXPECT(exactPointerWriter.receivedBuffer != nullptr);
396 
397   // Note that the promise that write() returned above has already resolved, because it hit RPC
398   // and went into the streaming window.
399   KJ_ASSERT(writePromise.poll(waitScope));
400   writePromise.wait(waitScope);
401 
402   // Let's start a second write. Even though the first write technically isn't done yet, it's
403   // legal for us to start a second one because the first write's returned promise optimistically
404   // resolved for streaming window reasons. This ends up being a very tricky case for our code!
405   char buffer2[7] = "bazqux";
406   auto writePromise2 = wrapped->write(buffer2, 6);
407 
408   // Now check the first write was correct, and close it out.
409   KJ_EXPECT(kj::str(exactPointerWriter.receivedBuffer) == "foobar");
410   exactPointerWriter.fulfill();
411 
412   // Turn event loop again. Now the second write arrives.
413   waitScope.poll();
414   KJ_EXPECT(kj::str(exactPointerWriter.receivedBuffer) == "bazqux");
415   exactPointerWriter.fulfill();
416   writePromise2.wait(waitScope);
417 
418   // If we do another write now, it should be zero-copy, because everything has settled.
419   char buffer3[6] = "corge";
420   auto writePromise3 = wrapped->write(buffer3, 5);
421   KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer3);
422   KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 5);
423   KJ_EXPECT(!writePromise3.poll(waitScope));
424   exactPointerWriter.fulfill();
425   writePromise3.wait(waitScope);
426 }
427 
428 KJ_TEST("KJ -> KJ pipe -> ByteStream RPC -> KJ pipe -> ByteStream RPC -> KJ with concurrent shortening") {
429   // Same as previous test, except we add a KJ pipe at the beginning and pump it into the top of
430   // the pipe, which invokes tryPumpFrom() on the KjToCapnpStreamAdapter.
431 
432   kj::EventLoop eventLoop;
433   kj::WaitScope waitScope(eventLoop);
434 
435   ByteStreamFactory clientFactory;
436   ByteStreamFactory serverFactory;
437 
438   auto backPipe = kj::newOneWayPipe();
439   auto middlePipe = kj::newOneWayPipe();
440   auto frontPipe = kj::newOneWayPipe();
441 
442   ExactPointerWriter exactPointerWriter;
443   auto backPumpPromise = backPipe.in->pumpTo(exactPointerWriter);
444 
445   auto rpcConnection = kj::newTwoWayPipe();
446   capnp::TwoPartyClient client(*rpcConnection.ends[0],
447       clientFactory.kjToCapnp(kj::mv(backPipe.out)),
448       rpc::twoparty::Side::CLIENT);
449   capnp::TwoPartyClient server(*rpcConnection.ends[1],
450       serverFactory.kjToCapnp(kj::mv(middlePipe.out)),
451       rpc::twoparty::Side::CLIENT);
452 
453   auto backWrapped = serverFactory.capnpToKj(server.bootstrap().castAs<ByteStream>());
454   auto midPumpPormise = middlePipe.in->pumpTo(*backWrapped);
455 
456   auto wrapped = clientFactory.capnpToKj(client.bootstrap().castAs<ByteStream>());
457   auto frontPumpPromise = frontPipe.in->pumpTo(*wrapped);
458 
459   char buffer[7] = "foobar";
460   auto writePromise = frontPipe.out->write(buffer, 6);
461 
462   // The write went to RPC so it's not immediately received.
463   KJ_EXPECT(exactPointerWriter.receivedBuffer == nullptr);
464 
465   // Write should be received after we turn the event loop.
466   waitScope.poll();
467   KJ_EXPECT(exactPointerWriter.receivedBuffer != nullptr);
468 
469   // Note that the promise that write() returned above has already resolved, because it hit RPC
470   // and went into the streaming window.
471   KJ_ASSERT(writePromise.poll(waitScope));
472   writePromise.wait(waitScope);
473 
474   // Let's start a second write. Even though the first write technically isn't done yet, it's
475   // legal for us to start a second one because the first write's returned promise optimistically
476   // resolved for streaming window reasons. This ends up being a very tricky case for our code!
477   char buffer2[7] = "bazqux";
478   auto writePromise2 = frontPipe.out->write(buffer2, 6);
479 
480   // Now check the first write was correct, and close it out.
481   KJ_EXPECT(kj::str(exactPointerWriter.receivedBuffer) == "foobar");
482   exactPointerWriter.fulfill();
483 
484   // Turn event loop again. Now the second write arrives.
485   waitScope.poll();
486   KJ_EXPECT(kj::str(exactPointerWriter.receivedBuffer) == "bazqux");
487   exactPointerWriter.fulfill();
488   writePromise2.wait(waitScope);
489 
490   // If we do another write now, it should be zero-copy, because everything has settled.
491   char buffer3[6] = "corge";
492   auto writePromise3 = frontPipe.out->write(buffer3, 5);
493   KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer3);
494   KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 5);
495   KJ_EXPECT(!writePromise3.poll(waitScope));
496   exactPointerWriter.fulfill();
497   writePromise3.wait(waitScope);
498 }
499 
500 KJ_TEST("Two Substreams on one destination") {
501   kj::EventLoop eventLoop;
502   kj::WaitScope waitScope(eventLoop);
503 
504   ByteStreamFactory factory;
505 
506   auto backPipe = kj::newOneWayPipe();
507   auto middlePipe1 = kj::newOneWayPipe();
508   auto middlePipe2 = kj::newOneWayPipe();
509 
510   ExactPointerWriter exactPointerWriter;
511   auto backPumpPromise = backPipe.in->pumpTo(exactPointerWriter);
512 
513   auto backWrapped = factory.capnpToKj(factory.kjToCapnp(kj::mv(backPipe.out)));
514 
515   auto wrapped1 = factory.capnpToKj(factory.kjToCapnp(kj::mv(middlePipe1.out)));
516   auto wrapped2 = factory.capnpToKj(factory.kjToCapnp(kj::mv(middlePipe2.out)));
517 
518   // Declare these buffers out here so that they can't possibly end up with the same address.
519   char buffer1[4] = "foo";
520   char buffer2[4] = "bar";
521 
522   {
523     auto wrapped = kj::mv(wrapped1);
524 
525     // First pump 3 bytes from the first stream.
526     auto midPumpPormise = middlePipe1.in->pumpTo(*backWrapped, 3);
527 
528     // Poll whenWriteDisconnected(), mainly as a way to let all the path-shortening settle.
529     auto disconnectPromise = wrapped->whenWriteDisconnected();
530     KJ_EXPECT(!disconnectPromise.poll(waitScope));
531 
532     auto writePromise = wrapped->write(buffer1, 3);
533     KJ_EXPECT(!writePromise.poll(waitScope));
534 
535     // The first write will tunnel all the way down to the destination.
536     KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer1);
537     KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 3);
538     exactPointerWriter.fulfill();
539 
540     writePromise.wait(waitScope);
541     KJ_EXPECT(midPumpPormise.wait(waitScope) == 3);
542   }
543 
544   {
545     auto wrapped = kj::mv(wrapped2);
546 
547     // Now pump another 3 bytes from the second stream.
548     auto midPumpPormise = middlePipe2.in->pumpTo(*backWrapped, 3);
549 
550     // Poll whenWriteDisconnected(), mainly as a way to let all the path-shortening settle.
551     auto disconnectPromise = wrapped->whenWriteDisconnected();
552     KJ_EXPECT(!disconnectPromise.poll(waitScope));
553 
554     auto writePromise = wrapped->write(buffer2, 3);
555     KJ_EXPECT(!writePromise.poll(waitScope));
556 
557     // The second write will also tunnel all the way down to the destination.
558     KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer2);
559     KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 3);
560     exactPointerWriter.fulfill();
561 
562     writePromise.wait(waitScope);
563     KJ_EXPECT(midPumpPormise.wait(waitScope) == 3);
564   }
565 }
566 
567 KJ_TEST("Two Substreams on one destination no limits (pump to EOF)") {
568   kj::EventLoop eventLoop;
569   kj::WaitScope waitScope(eventLoop);
570 
571   ByteStreamFactory factory;
572 
573   auto backPipe = kj::newOneWayPipe();
574   auto middlePipe1 = kj::newOneWayPipe();
575   auto middlePipe2 = kj::newOneWayPipe();
576 
577   ExactPointerWriter exactPointerWriter;
578   auto backPumpPromise = backPipe.in->pumpTo(exactPointerWriter);
579 
580   auto backWrapped = factory.capnpToKj(factory.kjToCapnp(kj::mv(backPipe.out)));
581 
582   auto wrapped1 = factory.capnpToKj(factory.kjToCapnp(kj::mv(middlePipe1.out)));
583   auto wrapped2 = factory.capnpToKj(factory.kjToCapnp(kj::mv(middlePipe2.out)));
584 
585   // Declare these buffers out here so that they can't possibly end up with the same address.
586   char buffer1[4] = "foo";
587   char buffer2[4] = "bar";
588 
589   {
590     auto wrapped = kj::mv(wrapped1);
591 
592     // First pump from the first stream until EOF.
593     auto midPumpPormise = middlePipe1.in->pumpTo(*backWrapped);
594 
595     // Poll whenWriteDisconnected(), mainly as a way to let all the path-shortening settle.
596     auto disconnectPromise = wrapped->whenWriteDisconnected();
597     KJ_EXPECT(!disconnectPromise.poll(waitScope));
598 
599     auto writePromise = wrapped->write(buffer1, 3);
600     KJ_EXPECT(!writePromise.poll(waitScope));
601 
602     // The first write will tunnel all the way down to the destination.
603     KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer1);
604     KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 3);
605     exactPointerWriter.fulfill();
606 
607     writePromise.wait(waitScope);
608     { auto drop = kj::mv(wrapped); }
609     KJ_EXPECT(midPumpPormise.wait(waitScope) == 3);
610   }
611 
612   {
613     auto wrapped = kj::mv(wrapped2);
614 
615     // Now pump from the second stream until EOF.
616     auto midPumpPormise = middlePipe2.in->pumpTo(*backWrapped);
617 
618     // Poll whenWriteDisconnected(), mainly as a way to let all the path-shortening settle.
619     auto disconnectPromise = wrapped->whenWriteDisconnected();
620     KJ_EXPECT(!disconnectPromise.poll(waitScope));
621 
622     auto writePromise = wrapped->write(buffer2, 3);
623     KJ_EXPECT(!writePromise.poll(waitScope));
624 
625     // The second write will also tunnel all the way down to the destination.
626     KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer2);
627     KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 3);
628     exactPointerWriter.fulfill();
629 
630     writePromise.wait(waitScope);
631     { auto drop = kj::mv(wrapped); }
632     KJ_EXPECT(midPumpPormise.wait(waitScope) == 3);
633   }
634 }
635 
636 KJ_TEST("KJ -> ByteStream RPC -> KJ promise stream -> ByteStream -> KJ") {
637   // Test what happens if we queue up several requests on a ByteStream and then it resolves to
638   // a shorter path.
639 
640   kj::EventLoop eventLoop;
641   kj::WaitScope waitScope(eventLoop);
642 
643   ByteStreamFactory factory;
644   ExactPointerWriter exactPointerWriter;
645 
646   auto paf = kj::newPromiseAndFulfiller<kj::Own<kj::AsyncOutputStream>>();
647   auto backCap = factory.kjToCapnp(kj::newPromisedStream(kj::mv(paf.promise)));
648 
649   auto rpcPipe = kj::newTwoWayPipe();
650   capnp::TwoPartyClient client(*rpcPipe.ends[0]);
651   capnp::TwoPartyClient server(*rpcPipe.ends[1], kj::mv(backCap), rpc::twoparty::Side::SERVER);
652   auto front = factory.capnpToKj(client.bootstrap().castAs<ByteStream>());
653 
654   // These will all queue up in the RPC layer.
655   front->write("foo", 3).wait(waitScope);
656   front->write("bar", 3).wait(waitScope);
657   front->write("baz", 3).wait(waitScope);
658   front->write("qux", 3).wait(waitScope);
659 
660   // Make sure those writes manage to get all the way through the RPC system and queue up in the
661   // LocalClient wrapping the CapnpToKjStreamAdapter at the other end.
662   waitScope.poll();
663 
664   // Fulfill the promise.
665   paf.fulfiller->fulfill(factory.capnpToKj(factory.kjToCapnp(kj::attachRef(exactPointerWriter))));
666   waitScope.poll();
667 
668   // Now:
669   // - "foo" should have made it all the way down to the final output stream.
670   // - "bar", "baz", and "qux" are queued on the CapnpToKjStreamAdapter immediately wrapping the
671   //   KJ promise stream.
672   // - But that stream adapter has discovered that there's another capnp stream downstream and has
673   //   resolved itself to the later stream.
674   // - A new call at this time should NOT be allowed to hop the queue.
675 
676   exactPointerWriter.expectBuffer("foo");
677 
678   front->write("corge", 5).wait(waitScope);
679   waitScope.poll();
680 
681   exactPointerWriter.fulfill();
682 
683   waitScope.poll();
684   exactPointerWriter.expectBuffer("bar");
685   exactPointerWriter.fulfill();
686 
687   waitScope.poll();
688   exactPointerWriter.expectBuffer("baz");
689   exactPointerWriter.fulfill();
690 
691   waitScope.poll();
692   exactPointerWriter.expectBuffer("qux");
693   exactPointerWriter.fulfill();
694 
695   waitScope.poll();
696   exactPointerWriter.expectBuffer("corge");
697   exactPointerWriter.fulfill();
698 
699   // There may still be some detach()ed promises holding on to some capabilities that transitively
700   // hold a fake Own<AsyncOutputStream> pointing at exactPointerWriter, which is actually on the
701   // stack. We created a fake Own pointing to a stack variable by using
702   // kj::attachRef(exactPointerWriter), above; it does not actually own the object it points to.
703   // We need to make sure those Owns are dropped before exactPoniterWriter is destroyed, otherwise
704   // ASAN will flag some invalid reads (of exactPointerWriter's vtable, in particular).
705   waitScope.cancelAllDetached();
706 }
707 
708 // TODO:
709 // - Parallel writes (requires streaming)
710 // - Write to KJ -> capnp -> RPC -> capnp -> KJ loopback without shortening, verify we can write
711 //   several things to buffer (requires streaming).
712 // - Again, but with shortening which only occurs after some promise resolve.
713 
714 }  // namespace
715 }  // namespace capnp
716