1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4  * License, v. 2.0. If a copy of the MPL was not distributed with this
5  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 
7 #include <algorithm>
8 #include "gtest/gtest.h"
9 #include "Helpers.h"
10 #include "mozilla/ReentrantMonitor.h"
11 #include "mozilla/Printf.h"
12 #include "nsCOMPtr.h"
13 #include "nsCRT.h"
14 #include "nsIAsyncInputStream.h"
15 #include "nsIAsyncOutputStream.h"
16 #include "nsIBufferedStreams.h"
17 #include "nsIClassInfo.h"
18 #include "nsICloneableInputStream.h"
19 #include "nsIInputStream.h"
20 #include "nsIOutputStream.h"
21 #include "nsIPipe.h"
22 #include "nsITellableStream.h"
23 #include "nsIThread.h"
24 #include "nsIRunnable.h"
25 #include "nsStreamUtils.h"
26 #include "nsString.h"
27 #include "nsThreadUtils.h"
28 #include "prinrval.h"
29 
30 using namespace mozilla;
31 
32 #define ITERATIONS 33333
33 char kTestPattern[] = "My hovercraft is full of eels.\n";
34 
35 bool gTrace = false;
36 
WriteAll(nsIOutputStream * os,const char * buf,uint32_t bufLen,uint32_t * lenWritten)37 static nsresult WriteAll(nsIOutputStream* os, const char* buf, uint32_t bufLen,
38                          uint32_t* lenWritten) {
39   const char* p = buf;
40   *lenWritten = 0;
41   while (bufLen) {
42     uint32_t n;
43     nsresult rv = os->Write(p, bufLen, &n);
44     if (NS_FAILED(rv)) return rv;
45     p += n;
46     bufLen -= n;
47     *lenWritten += n;
48   }
49   return NS_OK;
50 }
51 
52 class nsReceiver final : public Runnable {
53  public:
Run()54   NS_IMETHOD Run() override {
55     nsresult rv;
56     char buf[101];
57     uint32_t count;
58     PRIntervalTime start = PR_IntervalNow();
59     while (true) {
60       rv = mIn->Read(buf, 100, &count);
61       if (NS_FAILED(rv)) {
62         printf("read failed\n");
63         break;
64       }
65       if (count == 0) {
66         //                printf("EOF count = %d\n", mCount);
67         break;
68       }
69 
70       if (gTrace) {
71         buf[count] = '\0';
72         printf("read: %s\n", buf);
73       }
74       mCount += count;
75     }
76     PRIntervalTime end = PR_IntervalNow();
77     printf("read  %d bytes, time = %dms\n", mCount,
78            PR_IntervalToMilliseconds(end - start));
79     return rv;
80   }
81 
nsReceiver(nsIInputStream * in)82   explicit nsReceiver(nsIInputStream* in)
83       : Runnable("nsReceiver"), mIn(in), mCount(0) {}
84 
GetBytesRead()85   uint32_t GetBytesRead() { return mCount; }
86 
87  private:
88   ~nsReceiver() = default;
89 
90  protected:
91   nsCOMPtr<nsIInputStream> mIn;
92   uint32_t mCount;
93 };
94 
TestPipe(nsIInputStream * in,nsIOutputStream * out)95 static nsresult TestPipe(nsIInputStream* in, nsIOutputStream* out) {
96   RefPtr<nsReceiver> receiver = new nsReceiver(in);
97   nsresult rv;
98 
99   nsCOMPtr<nsIThread> thread;
100   rv = NS_NewNamedThread("TestPipe", getter_AddRefs(thread), receiver);
101   if (NS_FAILED(rv)) return rv;
102 
103   uint32_t total = 0;
104   PRIntervalTime start = PR_IntervalNow();
105   for (uint32_t i = 0; i < ITERATIONS; i++) {
106     uint32_t writeCount;
107     SmprintfPointer buf = mozilla::Smprintf("%d %s", i, kTestPattern);
108     uint32_t len = strlen(buf.get());
109     rv = WriteAll(out, buf.get(), len, &writeCount);
110     if (gTrace) {
111       printf("wrote: ");
112       for (uint32_t j = 0; j < writeCount; j++) {
113         putc(buf.get()[j], stdout);
114       }
115       printf("\n");
116     }
117     if (NS_FAILED(rv)) return rv;
118     total += writeCount;
119   }
120   rv = out->Close();
121   if (NS_FAILED(rv)) return rv;
122 
123   PRIntervalTime end = PR_IntervalNow();
124 
125   thread->Shutdown();
126 
127   printf("wrote %d bytes, time = %dms\n", total,
128          PR_IntervalToMilliseconds(end - start));
129   EXPECT_EQ(receiver->GetBytesRead(), total);
130 
131   return NS_OK;
132 }
133 
134 ////////////////////////////////////////////////////////////////////////////////
135 
136 class nsShortReader final : public Runnable {
137  public:
Run()138   NS_IMETHOD Run() override {
139     nsresult rv;
140     char buf[101];
141     uint32_t count;
142     uint32_t total = 0;
143     while (true) {
144       // if (gTrace)
145       //    printf("calling Read\n");
146       rv = mIn->Read(buf, 100, &count);
147       if (NS_FAILED(rv)) {
148         printf("read failed\n");
149         break;
150       }
151       if (count == 0) {
152         break;
153       }
154 
155       if (gTrace) {
156         // For next |printf()| call and possible others elsewhere.
157         buf[count] = '\0';
158 
159         printf("read %d bytes: %s\n", count, buf);
160       }
161 
162       Received(count);
163       total += count;
164     }
165     printf("read  %d bytes\n", total);
166     return rv;
167   }
168 
nsShortReader(nsIInputStream * in)169   explicit nsShortReader(nsIInputStream* in)
170       : Runnable("nsShortReader"), mIn(in), mReceived(0) {
171     mMon = new ReentrantMonitor("nsShortReader");
172   }
173 
Received(uint32_t count)174   void Received(uint32_t count) {
175     ReentrantMonitorAutoEnter mon(*mMon);
176     mReceived += count;
177     mon.Notify();
178   }
179 
WaitForReceipt(const uint32_t aWriteCount)180   uint32_t WaitForReceipt(const uint32_t aWriteCount) {
181     ReentrantMonitorAutoEnter mon(*mMon);
182     uint32_t result = mReceived;
183 
184     while (result < aWriteCount) {
185       mon.Wait();
186 
187       EXPECT_TRUE(mReceived > result);
188       result = mReceived;
189     }
190 
191     mReceived = 0;
192     return result;
193   }
194 
195  private:
196   ~nsShortReader() = default;
197 
198  protected:
199   nsCOMPtr<nsIInputStream> mIn;
200   uint32_t mReceived;
201   ReentrantMonitor* mMon;
202 };
203 
TestShortWrites(nsIInputStream * in,nsIOutputStream * out)204 static nsresult TestShortWrites(nsIInputStream* in, nsIOutputStream* out) {
205   RefPtr<nsShortReader> receiver = new nsShortReader(in);
206   nsresult rv;
207 
208   nsCOMPtr<nsIThread> thread;
209   rv = NS_NewNamedThread("TestShortWrites", getter_AddRefs(thread), receiver);
210   if (NS_FAILED(rv)) return rv;
211 
212   uint32_t total = 0;
213   for (uint32_t i = 0; i < ITERATIONS; i++) {
214     uint32_t writeCount;
215     SmprintfPointer buf = mozilla::Smprintf("%d %s", i, kTestPattern);
216     uint32_t len = strlen(buf.get());
217     len = len * rand() / RAND_MAX;
218     len = std::min(1u, len);
219     rv = WriteAll(out, buf.get(), len, &writeCount);
220     if (NS_FAILED(rv)) return rv;
221     EXPECT_EQ(writeCount, len);
222     total += writeCount;
223 
224     if (gTrace) printf("wrote %d bytes: %s\n", writeCount, buf.get());
225     // printf("calling Flush\n");
226     out->Flush();
227     // printf("calling WaitForReceipt\n");
228 
229 #ifdef DEBUG
230     const uint32_t received = receiver->WaitForReceipt(writeCount);
231     EXPECT_EQ(received, writeCount);
232 #endif
233   }
234   rv = out->Close();
235   if (NS_FAILED(rv)) return rv;
236 
237   thread->Shutdown();
238 
239   printf("wrote %d bytes\n", total);
240 
241   return NS_OK;
242 }
243 
244 ////////////////////////////////////////////////////////////////////////////////
245 
246 class nsPump final : public Runnable {
247  public:
Run()248   NS_IMETHOD Run() override {
249     nsresult rv;
250     uint32_t count;
251     while (true) {
252       rv = mOut->WriteFrom(mIn, ~0U, &count);
253       if (NS_FAILED(rv)) {
254         printf("Write failed\n");
255         break;
256       }
257       if (count == 0) {
258         printf("EOF count = %d\n", mCount);
259         break;
260       }
261 
262       if (gTrace) {
263         printf("Wrote: %d\n", count);
264       }
265       mCount += count;
266     }
267     mOut->Close();
268     return rv;
269   }
270 
nsPump(nsIInputStream * in,nsIOutputStream * out)271   nsPump(nsIInputStream* in, nsIOutputStream* out)
272       : Runnable("nsPump"), mIn(in), mOut(out), mCount(0) {}
273 
274  private:
275   ~nsPump() = default;
276 
277  protected:
278   nsCOMPtr<nsIInputStream> mIn;
279   nsCOMPtr<nsIOutputStream> mOut;
280   uint32_t mCount;
281 };
282 
TEST(Pipes,ChainedPipes)283 TEST(Pipes, ChainedPipes)
284 {
285   nsresult rv;
286   if (gTrace) {
287     printf("TestChainedPipes\n");
288   }
289 
290   nsCOMPtr<nsIInputStream> in1;
291   nsCOMPtr<nsIOutputStream> out1;
292   rv = NS_NewPipe(getter_AddRefs(in1), getter_AddRefs(out1), 20, 1999);
293   if (NS_FAILED(rv)) return;
294 
295   nsCOMPtr<nsIInputStream> in2;
296   nsCOMPtr<nsIOutputStream> out2;
297   rv = NS_NewPipe(getter_AddRefs(in2), getter_AddRefs(out2), 200, 401);
298   if (NS_FAILED(rv)) return;
299 
300   RefPtr<nsPump> pump = new nsPump(in1, out2);
301   if (pump == nullptr) return;
302 
303   nsCOMPtr<nsIThread> thread;
304   rv = NS_NewNamedThread("ChainedPipePump", getter_AddRefs(thread), pump);
305   if (NS_FAILED(rv)) return;
306 
307   RefPtr<nsReceiver> receiver = new nsReceiver(in2);
308   if (receiver == nullptr) return;
309 
310   nsCOMPtr<nsIThread> receiverThread;
311   rv = NS_NewNamedThread("ChainedPipeRecv", getter_AddRefs(receiverThread),
312                          receiver);
313   if (NS_FAILED(rv)) return;
314 
315   uint32_t total = 0;
316   for (uint32_t i = 0; i < ITERATIONS; i++) {
317     uint32_t writeCount;
318     SmprintfPointer buf = mozilla::Smprintf("%d %s", i, kTestPattern);
319     uint32_t len = strlen(buf.get());
320     len = len * rand() / RAND_MAX;
321     len = std::max(1u, len);
322     rv = WriteAll(out1, buf.get(), len, &writeCount);
323     if (NS_FAILED(rv)) return;
324     EXPECT_EQ(writeCount, len);
325     total += writeCount;
326 
327     if (gTrace) printf("wrote %d bytes: %s\n", writeCount, buf.get());
328   }
329   if (gTrace) {
330     printf("wrote total of %d bytes\n", total);
331   }
332   rv = out1->Close();
333   if (NS_FAILED(rv)) return;
334 
335   thread->Shutdown();
336   receiverThread->Shutdown();
337 }
338 
339 ////////////////////////////////////////////////////////////////////////////////
340 
RunTests(uint32_t segSize,uint32_t segCount)341 static void RunTests(uint32_t segSize, uint32_t segCount) {
342   nsresult rv;
343   nsCOMPtr<nsIInputStream> in;
344   nsCOMPtr<nsIOutputStream> out;
345   uint32_t bufSize = segSize * segCount;
346   if (gTrace) {
347     printf("Testing New Pipes: segment size %d buffer size %d\n", segSize,
348            bufSize);
349     printf("Testing long writes...\n");
350   }
351   rv = NS_NewPipe(getter_AddRefs(in), getter_AddRefs(out), segSize, bufSize);
352   EXPECT_TRUE(NS_SUCCEEDED(rv));
353   rv = TestPipe(in, out);
354   EXPECT_TRUE(NS_SUCCEEDED(rv));
355 
356   if (gTrace) {
357     printf("Testing short writes...\n");
358   }
359   rv = NS_NewPipe(getter_AddRefs(in), getter_AddRefs(out), segSize, bufSize);
360   EXPECT_TRUE(NS_SUCCEEDED(rv));
361   rv = TestShortWrites(in, out);
362   EXPECT_TRUE(NS_SUCCEEDED(rv));
363 }
364 
TEST(Pipes,Main)365 TEST(Pipes, Main)
366 {
367   RunTests(16, 1);
368   RunTests(4096, 16);
369 }
370 
371 ////////////////////////////////////////////////////////////////////////////////
372 
373 namespace {
374 
375 static const uint32_t DEFAULT_SEGMENT_SIZE = 4 * 1024;
376 
377 // An alternate pipe testing routing that uses NS_ConsumeStream() instead of
378 // manual read loop.
TestPipe2(uint32_t aNumBytes,uint32_t aSegmentSize=DEFAULT_SEGMENT_SIZE)379 static void TestPipe2(uint32_t aNumBytes,
380                       uint32_t aSegmentSize = DEFAULT_SEGMENT_SIZE) {
381   nsCOMPtr<nsIInputStream> reader;
382   nsCOMPtr<nsIOutputStream> writer;
383 
384   uint32_t maxSize = std::max(aNumBytes, aSegmentSize);
385 
386   nsresult rv = NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer),
387                            aSegmentSize, maxSize);
388   ASSERT_TRUE(NS_SUCCEEDED(rv));
389 
390   nsTArray<char> inputData;
391   testing::CreateData(aNumBytes, inputData);
392   testing::WriteAllAndClose(writer, inputData);
393   testing::ConsumeAndValidateStream(reader, inputData);
394 }
395 
396 }  // namespace
397 
TEST(Pipes,Blocking_32k)398 TEST(Pipes, Blocking_32k)
399 { TestPipe2(32 * 1024); }
400 
TEST(Pipes,Blocking_64k)401 TEST(Pipes, Blocking_64k)
402 { TestPipe2(64 * 1024); }
403 
TEST(Pipes,Blocking_128k)404 TEST(Pipes, Blocking_128k)
405 { TestPipe2(128 * 1024); }
406 
407 ////////////////////////////////////////////////////////////////////////////////
408 
409 namespace {
410 
411 // Utility routine to validate pipe clone before.  There are many knobs.
412 //
413 // aTotalBytes              Total number of bytes to write to the pipe.
414 // aNumWrites               How many separate write calls should be made.  Bytes
415 //                          are evenly distributed over these write calls.
416 // aNumInitialClones        How many clones of the pipe input stream should be
417 //                          made before writing begins.
418 // aNumToCloseAfterWrite    How many streams should be closed after each write.
419 //                          One stream is always kept open.  This verifies that
420 //                          closing one stream does not effect other open
421 //                          streams.
422 // aNumToCloneAfterWrite    How many clones to create after each write.  Occurs
423 //                          after closing any streams.  This tests cloning
424 //                          active streams on a pipe that is being written to.
425 // aNumStreamToReadPerWrite How many streams to read fully after each write.
426 //                          This tests reading cloned streams at different rates
427 //                          while the pipe is being written to.
TestPipeClone(uint32_t aTotalBytes,uint32_t aNumWrites,uint32_t aNumInitialClones,uint32_t aNumToCloseAfterWrite,uint32_t aNumToCloneAfterWrite,uint32_t aNumStreamsToReadPerWrite,uint32_t aSegmentSize=DEFAULT_SEGMENT_SIZE)428 static void TestPipeClone(uint32_t aTotalBytes, uint32_t aNumWrites,
429                           uint32_t aNumInitialClones,
430                           uint32_t aNumToCloseAfterWrite,
431                           uint32_t aNumToCloneAfterWrite,
432                           uint32_t aNumStreamsToReadPerWrite,
433                           uint32_t aSegmentSize = DEFAULT_SEGMENT_SIZE) {
434   nsCOMPtr<nsIInputStream> reader;
435   nsCOMPtr<nsIOutputStream> writer;
436 
437   uint32_t maxSize = std::max(aTotalBytes, aSegmentSize);
438 
439   // Use async input streams so we can NS_ConsumeStream() the current data
440   // while the pipe is still being written to.
441   nsresult rv =
442       NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer), aSegmentSize,
443                  maxSize, true, false);  // non-blocking - reader, writer
444   ASSERT_TRUE(NS_SUCCEEDED(rv));
445 
446   nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(reader);
447   ASSERT_TRUE(cloneable);
448   ASSERT_TRUE(cloneable->GetCloneable());
449 
450   nsTArray<nsCString> outputDataList;
451 
452   nsTArray<nsCOMPtr<nsIInputStream>> streamList;
453 
454   // first stream is our original reader from the pipe
455   streamList.AppendElement(reader);
456   outputDataList.AppendElement();
457 
458   // Clone the initial input stream the specified number of times
459   // before performing any writes.
460   for (uint32_t i = 0; i < aNumInitialClones; ++i) {
461     nsCOMPtr<nsIInputStream>* clone = streamList.AppendElement();
462     rv = cloneable->Clone(getter_AddRefs(*clone));
463     ASSERT_TRUE(NS_SUCCEEDED(rv));
464     ASSERT_TRUE(*clone);
465 
466     outputDataList.AppendElement();
467   }
468 
469   nsTArray<char> inputData;
470   testing::CreateData(aTotalBytes, inputData);
471 
472   const uint32_t bytesPerWrite = ((aTotalBytes - 1) / aNumWrites) + 1;
473   uint32_t offset = 0;
474   uint32_t remaining = aTotalBytes;
475   uint32_t nextStreamToRead = 0;
476 
477   while (remaining) {
478     uint32_t numToWrite = std::min(bytesPerWrite, remaining);
479     testing::Write(writer, inputData, offset, numToWrite);
480     offset += numToWrite;
481     remaining -= numToWrite;
482 
483     // Close the specified number of streams.  This allows us to
484     // test that one closed clone does not break other open clones.
485     for (uint32_t i = 0; i < aNumToCloseAfterWrite && streamList.Length() > 1;
486          ++i) {
487       uint32_t lastIndex = streamList.Length() - 1;
488       streamList[lastIndex]->Close();
489       streamList.RemoveElementAt(lastIndex);
490       outputDataList.RemoveElementAt(lastIndex);
491 
492       if (nextStreamToRead >= streamList.Length()) {
493         nextStreamToRead = 0;
494       }
495     }
496 
497     // Create the specified number of clones.  This lets us verify
498     // that we can create clones in the middle of pipe reading and
499     // writing.
500     for (uint32_t i = 0; i < aNumToCloneAfterWrite; ++i) {
501       nsCOMPtr<nsIInputStream>* clone = streamList.AppendElement();
502       rv = cloneable->Clone(getter_AddRefs(*clone));
503       ASSERT_TRUE(NS_SUCCEEDED(rv));
504       ASSERT_TRUE(*clone);
505 
506       // Initialize the new output data to make whats been read to data for
507       // the original stream.  First stream is always the original stream.
508       nsCString* outputData = outputDataList.AppendElement();
509       *outputData = outputDataList[0];
510     }
511 
512     // Read the specified number of streams.  This lets us verify that we
513     // can read from the clones at different rates while the pipe is being
514     // written to.
515     for (uint32_t i = 0; i < aNumStreamsToReadPerWrite; ++i) {
516       nsCOMPtr<nsIInputStream>& stream = streamList[nextStreamToRead];
517       nsCString& outputData = outputDataList[nextStreamToRead];
518 
519       // Can't use ConsumeAndValidateStream() here because we're not
520       // guaranteed the exact amount read.  It should just be at least
521       // as many as numToWrite.
522       nsAutoCString tmpOutputData;
523       rv = NS_ConsumeStream(stream, UINT32_MAX, tmpOutputData);
524       ASSERT_TRUE(rv == NS_BASE_STREAM_WOULD_BLOCK || NS_SUCCEEDED(rv));
525       ASSERT_GE(tmpOutputData.Length(), numToWrite);
526 
527       outputData += tmpOutputData;
528 
529       nextStreamToRead += 1;
530       if (nextStreamToRead >= streamList.Length()) {
531         // Note: When we wrap around on the streams being read, its possible
532         //       we will trigger a segment to be deleted from the pipe.  It
533         //       would be nice to validate this here, but we don't have any
534         //       QI'able interface that would let us check easily.
535 
536         nextStreamToRead = 0;
537       }
538     }
539   }
540 
541   rv = writer->Close();
542   ASSERT_TRUE(NS_SUCCEEDED(rv));
543 
544   nsDependentCSubstring inputString(inputData.Elements(), inputData.Length());
545 
546   // Finally, read the remaining bytes from each stream.  This may be
547   // different amounts of data depending on how much reading we did while
548   // writing.  Verify that the end result matches the input data.
549   for (uint32_t i = 0; i < streamList.Length(); ++i) {
550     nsCOMPtr<nsIInputStream>& stream = streamList[i];
551     nsCString& outputData = outputDataList[i];
552 
553     nsAutoCString tmpOutputData;
554     rv = NS_ConsumeStream(stream, UINT32_MAX, tmpOutputData);
555     ASSERT_TRUE(rv == NS_BASE_STREAM_WOULD_BLOCK || NS_SUCCEEDED(rv));
556     stream->Close();
557 
558     // Append to total amount read from the stream
559     outputData += tmpOutputData;
560 
561     ASSERT_EQ(inputString.Length(), outputData.Length());
562     ASSERT_TRUE(inputString.Equals(outputData));
563   }
564 }
565 
566 }  // namespace
567 
TEST(Pipes,Clone_BeforeWrite_ReadAtEnd)568 TEST(Pipes, Clone_BeforeWrite_ReadAtEnd)
569 {
570   TestPipeClone(32 * 1024,  // total bytes
571                 16,         // num writes
572                 3,          // num initial clones
573                 0,          // num streams to close after each write
574                 0,          // num clones to add after each write
575                 0);         // num streams to read after each write
576 }
577 
TEST(Pipes,Clone_BeforeWrite_ReadDuringWrite)578 TEST(Pipes, Clone_BeforeWrite_ReadDuringWrite)
579 {
580   // Since this reads all streams on every write, it should trigger the
581   // pipe cursor roll back optimization.  Currently we can only verify
582   // this with logging.
583 
584   TestPipeClone(32 * 1024,  // total bytes
585                 16,         // num writes
586                 3,          // num initial clones
587                 0,          // num streams to close after each write
588                 0,          // num clones to add after each write
589                 4);         // num streams to read after each write
590 }
591 
TEST(Pipes,Clone_DuringWrite_ReadAtEnd)592 TEST(Pipes, Clone_DuringWrite_ReadAtEnd)
593 {
594   TestPipeClone(32 * 1024,  // total bytes
595                 16,         // num writes
596                 0,          // num initial clones
597                 0,          // num streams to close after each write
598                 1,          // num clones to add after each write
599                 0);         // num streams to read after each write
600 }
601 
TEST(Pipes,Clone_DuringWrite_ReadDuringWrite)602 TEST(Pipes, Clone_DuringWrite_ReadDuringWrite)
603 {
604   TestPipeClone(32 * 1024,  // total bytes
605                 16,         // num writes
606                 0,          // num initial clones
607                 0,          // num streams to close after each write
608                 1,          // num clones to add after each write
609                 1);         // num streams to read after each write
610 }
611 
TEST(Pipes,Clone_DuringWrite_ReadDuringWrite_CloseDuringWrite)612 TEST(Pipes, Clone_DuringWrite_ReadDuringWrite_CloseDuringWrite)
613 {
614   // Since this reads streams faster than we clone new ones, it should
615   // trigger pipe segment deletion periodically.  Currently we can
616   // only verify this with logging.
617 
618   TestPipeClone(32 * 1024,  // total bytes
619                 16,         // num writes
620                 1,          // num initial clones
621                 1,          // num streams to close after each write
622                 2,          // num clones to add after each write
623                 3);         // num streams to read after each write
624 }
625 
TEST(Pipes,Write_AsyncWait)626 TEST(Pipes, Write_AsyncWait)
627 {
628   nsCOMPtr<nsIAsyncInputStream> reader;
629   nsCOMPtr<nsIAsyncOutputStream> writer;
630 
631   const uint32_t segmentSize = 1024;
632   const uint32_t numSegments = 1;
633 
634   nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer),
635                             true, true,  // non-blocking - reader, writer
636                             segmentSize, numSegments);
637   ASSERT_TRUE(NS_SUCCEEDED(rv));
638 
639   nsTArray<char> inputData;
640   testing::CreateData(segmentSize, inputData);
641 
642   uint32_t numWritten = 0;
643   rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
644   ASSERT_TRUE(NS_SUCCEEDED(rv));
645 
646   rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
647   ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv);
648 
649   RefPtr<testing::OutputStreamCallback> cb =
650       new testing::OutputStreamCallback();
651 
652   rv = writer->AsyncWait(cb, 0, 0, nullptr);
653   ASSERT_TRUE(NS_SUCCEEDED(rv));
654 
655   ASSERT_FALSE(cb->Called());
656 
657   testing::ConsumeAndValidateStream(reader, inputData);
658 
659   ASSERT_TRUE(cb->Called());
660 }
661 
TEST(Pipes,Write_AsyncWait_Clone)662 TEST(Pipes, Write_AsyncWait_Clone)
663 {
664   nsCOMPtr<nsIAsyncInputStream> reader;
665   nsCOMPtr<nsIAsyncOutputStream> writer;
666 
667   const uint32_t segmentSize = 1024;
668   const uint32_t numSegments = 1;
669 
670   nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer),
671                             true, true,  // non-blocking - reader, writer
672                             segmentSize, numSegments);
673   ASSERT_TRUE(NS_SUCCEEDED(rv));
674 
675   nsCOMPtr<nsIInputStream> clone;
676   rv = NS_CloneInputStream(reader, getter_AddRefs(clone));
677   ASSERT_TRUE(NS_SUCCEEDED(rv));
678 
679   nsTArray<char> inputData;
680   testing::CreateData(segmentSize, inputData);
681 
682   uint32_t numWritten = 0;
683   rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
684   ASSERT_TRUE(NS_SUCCEEDED(rv));
685 
686   // This attempts to write data beyond the original pipe size limit.  It
687   // should fail since neither side of the clone has been read yet.
688   rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
689   ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv);
690 
691   RefPtr<testing::OutputStreamCallback> cb =
692       new testing::OutputStreamCallback();
693 
694   rv = writer->AsyncWait(cb, 0, 0, nullptr);
695   ASSERT_TRUE(NS_SUCCEEDED(rv));
696 
697   ASSERT_FALSE(cb->Called());
698 
699   // Consume data on the original stream, but the clone still has not been read.
700   testing::ConsumeAndValidateStream(reader, inputData);
701 
702   // A clone that is not being read should not stall the other input stream
703   // reader.  Therefore the writer callback should trigger when the fastest
704   // reader drains the other input stream.
705   ASSERT_TRUE(cb->Called());
706 
707   // Attempt to write data.  This will buffer data beyond the pipe size limit in
708   // order for the clone stream to still work.  This is allowed because the
709   // other input stream has drained its buffered segments and is ready for more
710   // data.
711   rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
712   ASSERT_TRUE(NS_SUCCEEDED(rv));
713 
714   // Again, this should fail since the origin stream has not been read again.
715   // The pipe size should still restrict how far ahead we can buffer even
716   // when there is a cloned stream not being read.
717   rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
718   ASSERT_TRUE(NS_FAILED(rv));
719 
720   cb = new testing::OutputStreamCallback();
721   rv = writer->AsyncWait(cb, 0, 0, nullptr);
722   ASSERT_TRUE(NS_SUCCEEDED(rv));
723 
724   // The write should again be blocked since we have written data and the
725   // main reader is at its maximum advance buffer.
726   ASSERT_FALSE(cb->Called());
727 
728   nsTArray<char> expectedCloneData;
729   expectedCloneData.AppendElements(inputData);
730   expectedCloneData.AppendElements(inputData);
731 
732   // We should now be able to consume the entire backlog of buffered data on
733   // the cloned stream.
734   testing::ConsumeAndValidateStream(clone, expectedCloneData);
735 
736   // Draining the clone side should also trigger the AsyncWait() writer
737   // callback
738   ASSERT_TRUE(cb->Called());
739 
740   // Finally, we should be able to consume the remaining data on the original
741   // reader.
742   testing::ConsumeAndValidateStream(reader, inputData);
743 }
744 
TEST(Pipes,Write_AsyncWait_Clone_CloseOriginal)745 TEST(Pipes, Write_AsyncWait_Clone_CloseOriginal)
746 {
747   nsCOMPtr<nsIAsyncInputStream> reader;
748   nsCOMPtr<nsIAsyncOutputStream> writer;
749 
750   const uint32_t segmentSize = 1024;
751   const uint32_t numSegments = 1;
752 
753   nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer),
754                             true, true,  // non-blocking - reader, writer
755                             segmentSize, numSegments);
756   ASSERT_TRUE(NS_SUCCEEDED(rv));
757 
758   nsCOMPtr<nsIInputStream> clone;
759   rv = NS_CloneInputStream(reader, getter_AddRefs(clone));
760   ASSERT_TRUE(NS_SUCCEEDED(rv));
761 
762   nsTArray<char> inputData;
763   testing::CreateData(segmentSize, inputData);
764 
765   uint32_t numWritten = 0;
766   rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
767   ASSERT_TRUE(NS_SUCCEEDED(rv));
768 
769   // This attempts to write data beyond the original pipe size limit.  It
770   // should fail since neither side of the clone has been read yet.
771   rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
772   ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv);
773 
774   RefPtr<testing::OutputStreamCallback> cb =
775       new testing::OutputStreamCallback();
776 
777   rv = writer->AsyncWait(cb, 0, 0, nullptr);
778   ASSERT_TRUE(NS_SUCCEEDED(rv));
779 
780   ASSERT_FALSE(cb->Called());
781 
782   // Consume data on the original stream, but the clone still has not been read.
783   testing::ConsumeAndValidateStream(reader, inputData);
784 
785   // A clone that is not being read should not stall the other input stream
786   // reader.  Therefore the writer callback should trigger when the fastest
787   // reader drains the other input stream.
788   ASSERT_TRUE(cb->Called());
789 
790   // Attempt to write data.  This will buffer data beyond the pipe size limit in
791   // order for the clone stream to still work.  This is allowed because the
792   // other input stream has drained its buffered segments and is ready for more
793   // data.
794   rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
795   ASSERT_TRUE(NS_SUCCEEDED(rv));
796 
797   // Again, this should fail since the origin stream has not been read again.
798   // The pipe size should still restrict how far ahead we can buffer even
799   // when there is a cloned stream not being read.
800   rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
801   ASSERT_TRUE(NS_FAILED(rv));
802 
803   cb = new testing::OutputStreamCallback();
804   rv = writer->AsyncWait(cb, 0, 0, nullptr);
805   ASSERT_TRUE(NS_SUCCEEDED(rv));
806 
807   // The write should again be blocked since we have written data and the
808   // main reader is at its maximum advance buffer.
809   ASSERT_FALSE(cb->Called());
810 
811   // Close the original reader input stream.  This was the fastest reader,
812   // so we should have a single stream that is buffered beyond our nominal
813   // limit.
814   reader->Close();
815 
816   // Because the clone stream is still buffered the writable callback should
817   // not be fired.
818   ASSERT_FALSE(cb->Called());
819 
820   // And we should not be able to perform a write.
821   rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
822   ASSERT_TRUE(NS_FAILED(rv));
823 
824   // Create another clone stream.  Now we have two streams that exceed our
825   // maximum size limit
826   nsCOMPtr<nsIInputStream> clone2;
827   rv = NS_CloneInputStream(clone, getter_AddRefs(clone2));
828   ASSERT_TRUE(NS_SUCCEEDED(rv));
829 
830   nsTArray<char> expectedCloneData;
831   expectedCloneData.AppendElements(inputData);
832   expectedCloneData.AppendElements(inputData);
833 
834   // We should now be able to consume the entire backlog of buffered data on
835   // the cloned stream.
836   testing::ConsumeAndValidateStream(clone, expectedCloneData);
837 
838   // The pipe should now be writable because we have two open streams, one of
839   // which is completely drained.
840   ASSERT_TRUE(cb->Called());
841 
842   // Write again to reach our limit again.
843   rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
844   ASSERT_TRUE(NS_SUCCEEDED(rv));
845 
846   // The stream is again non-writeable.
847   cb = new testing::OutputStreamCallback();
848   rv = writer->AsyncWait(cb, 0, 0, nullptr);
849   ASSERT_TRUE(NS_SUCCEEDED(rv));
850   ASSERT_FALSE(cb->Called());
851 
852   // Close the empty stream.  This is different from our previous close since
853   // before we were closing a stream with some data still buffered.
854   clone->Close();
855 
856   // The pipe should not be writable.  The second clone is still fully buffered
857   // over our limit.
858   ASSERT_FALSE(cb->Called());
859   rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
860   ASSERT_TRUE(NS_FAILED(rv));
861 
862   // Finally consume all of the buffered data on the second clone.
863   expectedCloneData.AppendElements(inputData);
864   testing::ConsumeAndValidateStream(clone2, expectedCloneData);
865 
866   // Draining the final clone should make the pipe writable again.
867   ASSERT_TRUE(cb->Called());
868 }
869 
TEST(Pipes,Read_AsyncWait)870 TEST(Pipes, Read_AsyncWait)
871 {
872   nsCOMPtr<nsIAsyncInputStream> reader;
873   nsCOMPtr<nsIAsyncOutputStream> writer;
874 
875   const uint32_t segmentSize = 1024;
876   const uint32_t numSegments = 1;
877 
878   nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer),
879                             true, true,  // non-blocking - reader, writer
880                             segmentSize, numSegments);
881   ASSERT_TRUE(NS_SUCCEEDED(rv));
882 
883   nsTArray<char> inputData;
884   testing::CreateData(segmentSize, inputData);
885 
886   RefPtr<testing::InputStreamCallback> cb = new testing::InputStreamCallback();
887 
888   rv = reader->AsyncWait(cb, 0, 0, nullptr);
889   ASSERT_TRUE(NS_SUCCEEDED(rv));
890 
891   ASSERT_FALSE(cb->Called());
892 
893   uint32_t numWritten = 0;
894   rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
895   ASSERT_TRUE(NS_SUCCEEDED(rv));
896 
897   ASSERT_TRUE(cb->Called());
898 
899   testing::ConsumeAndValidateStream(reader, inputData);
900 }
901 
TEST(Pipes,Read_AsyncWait_Clone)902 TEST(Pipes, Read_AsyncWait_Clone)
903 {
904   nsCOMPtr<nsIAsyncInputStream> reader;
905   nsCOMPtr<nsIAsyncOutputStream> writer;
906 
907   const uint32_t segmentSize = 1024;
908   const uint32_t numSegments = 1;
909 
910   nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer),
911                             true, true,  // non-blocking - reader, writer
912                             segmentSize, numSegments);
913   ASSERT_TRUE(NS_SUCCEEDED(rv));
914 
915   nsCOMPtr<nsIInputStream> clone;
916   rv = NS_CloneInputStream(reader, getter_AddRefs(clone));
917   ASSERT_TRUE(NS_SUCCEEDED(rv));
918 
919   nsCOMPtr<nsIAsyncInputStream> asyncClone = do_QueryInterface(clone);
920   ASSERT_TRUE(asyncClone);
921 
922   nsTArray<char> inputData;
923   testing::CreateData(segmentSize, inputData);
924 
925   RefPtr<testing::InputStreamCallback> cb = new testing::InputStreamCallback();
926 
927   RefPtr<testing::InputStreamCallback> cb2 = new testing::InputStreamCallback();
928 
929   rv = reader->AsyncWait(cb, 0, 0, nullptr);
930   ASSERT_TRUE(NS_SUCCEEDED(rv));
931 
932   ASSERT_FALSE(cb->Called());
933 
934   rv = asyncClone->AsyncWait(cb2, 0, 0, nullptr);
935   ASSERT_TRUE(NS_SUCCEEDED(rv));
936 
937   ASSERT_FALSE(cb2->Called());
938 
939   uint32_t numWritten = 0;
940   rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
941   ASSERT_TRUE(NS_SUCCEEDED(rv));
942 
943   ASSERT_TRUE(cb->Called());
944   ASSERT_TRUE(cb2->Called());
945 
946   testing::ConsumeAndValidateStream(reader, inputData);
947 }
948 
949 namespace {
950 
CloseDuringReadFunc(nsIInputStream * aReader,void * aClosure,const char * aFromSegment,uint32_t aToOffset,uint32_t aCount,uint32_t * aWriteCountOut)951 nsresult CloseDuringReadFunc(nsIInputStream* aReader, void* aClosure,
952                              const char* aFromSegment, uint32_t aToOffset,
953                              uint32_t aCount, uint32_t* aWriteCountOut) {
954   MOZ_RELEASE_ASSERT(aReader);
955   MOZ_RELEASE_ASSERT(aClosure);
956   MOZ_RELEASE_ASSERT(aFromSegment);
957   MOZ_RELEASE_ASSERT(aWriteCountOut);
958   MOZ_RELEASE_ASSERT(aToOffset == 0);
959 
960   // This is insanity and you probably should not do this under normal
961   // conditions.  We want to simulate the case where the pipe is closed
962   // (possibly from other end on another thread) simultaneously with the
963   // read.  This is the easiest way to do trigger this case in a synchronous
964   // gtest.
965   MOZ_ALWAYS_SUCCEEDS(aReader->Close());
966 
967   nsTArray<char>* buffer = static_cast<nsTArray<char>*>(aClosure);
968   buffer->AppendElements(aFromSegment, aCount);
969 
970   *aWriteCountOut = aCount;
971 
972   return NS_OK;
973 }
974 
TestCloseDuringRead(uint32_t aSegmentSize,uint32_t aDataSize)975 void TestCloseDuringRead(uint32_t aSegmentSize, uint32_t aDataSize) {
976   nsCOMPtr<nsIInputStream> reader;
977   nsCOMPtr<nsIOutputStream> writer;
978 
979   const uint32_t maxSize = aSegmentSize;
980 
981   nsresult rv = NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer),
982                            aSegmentSize, maxSize);
983   ASSERT_TRUE(NS_SUCCEEDED(rv));
984 
985   nsTArray<char> inputData;
986 
987   testing::CreateData(aDataSize, inputData);
988 
989   uint32_t numWritten = 0;
990   rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
991   ASSERT_TRUE(NS_SUCCEEDED(rv));
992 
993   nsTArray<char> outputData;
994 
995   uint32_t numRead = 0;
996   rv = reader->ReadSegments(CloseDuringReadFunc, &outputData,
997                             inputData.Length(), &numRead);
998   ASSERT_TRUE(NS_SUCCEEDED(rv));
999   ASSERT_EQ(inputData.Length(), numRead);
1000 
1001   ASSERT_EQ(inputData, outputData);
1002 
1003   uint64_t available;
1004   rv = reader->Available(&available);
1005   ASSERT_EQ(NS_BASE_STREAM_CLOSED, rv);
1006 }
1007 
1008 }  // namespace
1009 
TEST(Pipes,Close_During_Read_Partial_Segment)1010 TEST(Pipes, Close_During_Read_Partial_Segment)
1011 { TestCloseDuringRead(1024, 512); }
1012 
TEST(Pipes,Close_During_Read_Full_Segment)1013 TEST(Pipes, Close_During_Read_Full_Segment)
1014 { TestCloseDuringRead(1024, 1024); }
1015 
TEST(Pipes,Interfaces)1016 TEST(Pipes, Interfaces)
1017 {
1018   nsCOMPtr<nsIInputStream> reader;
1019   nsCOMPtr<nsIOutputStream> writer;
1020 
1021   nsresult rv = NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer));
1022   ASSERT_TRUE(NS_SUCCEEDED(rv));
1023 
1024   nsCOMPtr<nsIAsyncInputStream> readerType1 = do_QueryInterface(reader);
1025   ASSERT_TRUE(readerType1);
1026 
1027   nsCOMPtr<nsITellableStream> readerType2 = do_QueryInterface(reader);
1028   ASSERT_TRUE(readerType2);
1029 
1030   nsCOMPtr<nsISearchableInputStream> readerType3 = do_QueryInterface(reader);
1031   ASSERT_TRUE(readerType3);
1032 
1033   nsCOMPtr<nsICloneableInputStream> readerType4 = do_QueryInterface(reader);
1034   ASSERT_TRUE(readerType4);
1035 
1036   nsCOMPtr<nsIClassInfo> readerType5 = do_QueryInterface(reader);
1037   ASSERT_TRUE(readerType5);
1038 
1039   nsCOMPtr<nsIBufferedInputStream> readerType6 = do_QueryInterface(reader);
1040   ASSERT_TRUE(readerType6);
1041 }
1042