1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6
7 #include <algorithm>
8 #include "gtest/gtest.h"
9 #include "Helpers.h"
10 #include "mozilla/ReentrantMonitor.h"
11 #include "mozilla/Printf.h"
12 #include "nsCOMPtr.h"
13 #include "nsCRT.h"
14 #include "nsIAsyncInputStream.h"
15 #include "nsIAsyncOutputStream.h"
16 #include "nsIBufferedStreams.h"
17 #include "nsIClassInfo.h"
18 #include "nsICloneableInputStream.h"
19 #include "nsIInputStream.h"
20 #include "nsIOutputStream.h"
21 #include "nsIPipe.h"
22 #include "nsITellableStream.h"
23 #include "nsIThread.h"
24 #include "nsIRunnable.h"
25 #include "nsStreamUtils.h"
26 #include "nsString.h"
27 #include "nsThreadUtils.h"
28 #include "prinrval.h"
29
30 using namespace mozilla;
31
32 #define ITERATIONS 33333
33 char kTestPattern[] = "My hovercraft is full of eels.\n";
34
35 bool gTrace = false;
36
WriteAll(nsIOutputStream * os,const char * buf,uint32_t bufLen,uint32_t * lenWritten)37 static nsresult WriteAll(nsIOutputStream* os, const char* buf, uint32_t bufLen,
38 uint32_t* lenWritten) {
39 const char* p = buf;
40 *lenWritten = 0;
41 while (bufLen) {
42 uint32_t n;
43 nsresult rv = os->Write(p, bufLen, &n);
44 if (NS_FAILED(rv)) return rv;
45 p += n;
46 bufLen -= n;
47 *lenWritten += n;
48 }
49 return NS_OK;
50 }
51
52 class nsReceiver final : public Runnable {
53 public:
Run()54 NS_IMETHOD Run() override {
55 nsresult rv;
56 char buf[101];
57 uint32_t count;
58 PRIntervalTime start = PR_IntervalNow();
59 while (true) {
60 rv = mIn->Read(buf, 100, &count);
61 if (NS_FAILED(rv)) {
62 printf("read failed\n");
63 break;
64 }
65 if (count == 0) {
66 // printf("EOF count = %d\n", mCount);
67 break;
68 }
69
70 if (gTrace) {
71 buf[count] = '\0';
72 printf("read: %s\n", buf);
73 }
74 mCount += count;
75 }
76 PRIntervalTime end = PR_IntervalNow();
77 printf("read %d bytes, time = %dms\n", mCount,
78 PR_IntervalToMilliseconds(end - start));
79 return rv;
80 }
81
nsReceiver(nsIInputStream * in)82 explicit nsReceiver(nsIInputStream* in)
83 : Runnable("nsReceiver"), mIn(in), mCount(0) {}
84
GetBytesRead()85 uint32_t GetBytesRead() { return mCount; }
86
87 private:
88 ~nsReceiver() = default;
89
90 protected:
91 nsCOMPtr<nsIInputStream> mIn;
92 uint32_t mCount;
93 };
94
TestPipe(nsIInputStream * in,nsIOutputStream * out)95 static nsresult TestPipe(nsIInputStream* in, nsIOutputStream* out) {
96 RefPtr<nsReceiver> receiver = new nsReceiver(in);
97 nsresult rv;
98
99 nsCOMPtr<nsIThread> thread;
100 rv = NS_NewNamedThread("TestPipe", getter_AddRefs(thread), receiver);
101 if (NS_FAILED(rv)) return rv;
102
103 uint32_t total = 0;
104 PRIntervalTime start = PR_IntervalNow();
105 for (uint32_t i = 0; i < ITERATIONS; i++) {
106 uint32_t writeCount;
107 SmprintfPointer buf = mozilla::Smprintf("%d %s", i, kTestPattern);
108 uint32_t len = strlen(buf.get());
109 rv = WriteAll(out, buf.get(), len, &writeCount);
110 if (gTrace) {
111 printf("wrote: ");
112 for (uint32_t j = 0; j < writeCount; j++) {
113 putc(buf.get()[j], stdout);
114 }
115 printf("\n");
116 }
117 if (NS_FAILED(rv)) return rv;
118 total += writeCount;
119 }
120 rv = out->Close();
121 if (NS_FAILED(rv)) return rv;
122
123 PRIntervalTime end = PR_IntervalNow();
124
125 thread->Shutdown();
126
127 printf("wrote %d bytes, time = %dms\n", total,
128 PR_IntervalToMilliseconds(end - start));
129 EXPECT_EQ(receiver->GetBytesRead(), total);
130
131 return NS_OK;
132 }
133
134 ////////////////////////////////////////////////////////////////////////////////
135
136 class nsShortReader final : public Runnable {
137 public:
Run()138 NS_IMETHOD Run() override {
139 nsresult rv;
140 char buf[101];
141 uint32_t count;
142 uint32_t total = 0;
143 while (true) {
144 // if (gTrace)
145 // printf("calling Read\n");
146 rv = mIn->Read(buf, 100, &count);
147 if (NS_FAILED(rv)) {
148 printf("read failed\n");
149 break;
150 }
151 if (count == 0) {
152 break;
153 }
154
155 if (gTrace) {
156 // For next |printf()| call and possible others elsewhere.
157 buf[count] = '\0';
158
159 printf("read %d bytes: %s\n", count, buf);
160 }
161
162 Received(count);
163 total += count;
164 }
165 printf("read %d bytes\n", total);
166 return rv;
167 }
168
nsShortReader(nsIInputStream * in)169 explicit nsShortReader(nsIInputStream* in)
170 : Runnable("nsShortReader"), mIn(in), mReceived(0) {
171 mMon = new ReentrantMonitor("nsShortReader");
172 }
173
Received(uint32_t count)174 void Received(uint32_t count) {
175 ReentrantMonitorAutoEnter mon(*mMon);
176 mReceived += count;
177 mon.Notify();
178 }
179
WaitForReceipt(const uint32_t aWriteCount)180 uint32_t WaitForReceipt(const uint32_t aWriteCount) {
181 ReentrantMonitorAutoEnter mon(*mMon);
182 uint32_t result = mReceived;
183
184 while (result < aWriteCount) {
185 mon.Wait();
186
187 EXPECT_TRUE(mReceived > result);
188 result = mReceived;
189 }
190
191 mReceived = 0;
192 return result;
193 }
194
195 private:
196 ~nsShortReader() = default;
197
198 protected:
199 nsCOMPtr<nsIInputStream> mIn;
200 uint32_t mReceived;
201 ReentrantMonitor* mMon;
202 };
203
TestShortWrites(nsIInputStream * in,nsIOutputStream * out)204 static nsresult TestShortWrites(nsIInputStream* in, nsIOutputStream* out) {
205 RefPtr<nsShortReader> receiver = new nsShortReader(in);
206 nsresult rv;
207
208 nsCOMPtr<nsIThread> thread;
209 rv = NS_NewNamedThread("TestShortWrites", getter_AddRefs(thread), receiver);
210 if (NS_FAILED(rv)) return rv;
211
212 uint32_t total = 0;
213 for (uint32_t i = 0; i < ITERATIONS; i++) {
214 uint32_t writeCount;
215 SmprintfPointer buf = mozilla::Smprintf("%d %s", i, kTestPattern);
216 uint32_t len = strlen(buf.get());
217 len = len * rand() / RAND_MAX;
218 len = std::min(1u, len);
219 rv = WriteAll(out, buf.get(), len, &writeCount);
220 if (NS_FAILED(rv)) return rv;
221 EXPECT_EQ(writeCount, len);
222 total += writeCount;
223
224 if (gTrace) printf("wrote %d bytes: %s\n", writeCount, buf.get());
225 // printf("calling Flush\n");
226 out->Flush();
227 // printf("calling WaitForReceipt\n");
228
229 #ifdef DEBUG
230 const uint32_t received = receiver->WaitForReceipt(writeCount);
231 EXPECT_EQ(received, writeCount);
232 #endif
233 }
234 rv = out->Close();
235 if (NS_FAILED(rv)) return rv;
236
237 thread->Shutdown();
238
239 printf("wrote %d bytes\n", total);
240
241 return NS_OK;
242 }
243
244 ////////////////////////////////////////////////////////////////////////////////
245
246 class nsPump final : public Runnable {
247 public:
Run()248 NS_IMETHOD Run() override {
249 nsresult rv;
250 uint32_t count;
251 while (true) {
252 rv = mOut->WriteFrom(mIn, ~0U, &count);
253 if (NS_FAILED(rv)) {
254 printf("Write failed\n");
255 break;
256 }
257 if (count == 0) {
258 printf("EOF count = %d\n", mCount);
259 break;
260 }
261
262 if (gTrace) {
263 printf("Wrote: %d\n", count);
264 }
265 mCount += count;
266 }
267 mOut->Close();
268 return rv;
269 }
270
nsPump(nsIInputStream * in,nsIOutputStream * out)271 nsPump(nsIInputStream* in, nsIOutputStream* out)
272 : Runnable("nsPump"), mIn(in), mOut(out), mCount(0) {}
273
274 private:
275 ~nsPump() = default;
276
277 protected:
278 nsCOMPtr<nsIInputStream> mIn;
279 nsCOMPtr<nsIOutputStream> mOut;
280 uint32_t mCount;
281 };
282
TEST(Pipes,ChainedPipes)283 TEST(Pipes, ChainedPipes)
284 {
285 nsresult rv;
286 if (gTrace) {
287 printf("TestChainedPipes\n");
288 }
289
290 nsCOMPtr<nsIInputStream> in1;
291 nsCOMPtr<nsIOutputStream> out1;
292 rv = NS_NewPipe(getter_AddRefs(in1), getter_AddRefs(out1), 20, 1999);
293 if (NS_FAILED(rv)) return;
294
295 nsCOMPtr<nsIInputStream> in2;
296 nsCOMPtr<nsIOutputStream> out2;
297 rv = NS_NewPipe(getter_AddRefs(in2), getter_AddRefs(out2), 200, 401);
298 if (NS_FAILED(rv)) return;
299
300 RefPtr<nsPump> pump = new nsPump(in1, out2);
301 if (pump == nullptr) return;
302
303 nsCOMPtr<nsIThread> thread;
304 rv = NS_NewNamedThread("ChainedPipePump", getter_AddRefs(thread), pump);
305 if (NS_FAILED(rv)) return;
306
307 RefPtr<nsReceiver> receiver = new nsReceiver(in2);
308 if (receiver == nullptr) return;
309
310 nsCOMPtr<nsIThread> receiverThread;
311 rv = NS_NewNamedThread("ChainedPipeRecv", getter_AddRefs(receiverThread),
312 receiver);
313 if (NS_FAILED(rv)) return;
314
315 uint32_t total = 0;
316 for (uint32_t i = 0; i < ITERATIONS; i++) {
317 uint32_t writeCount;
318 SmprintfPointer buf = mozilla::Smprintf("%d %s", i, kTestPattern);
319 uint32_t len = strlen(buf.get());
320 len = len * rand() / RAND_MAX;
321 len = std::max(1u, len);
322 rv = WriteAll(out1, buf.get(), len, &writeCount);
323 if (NS_FAILED(rv)) return;
324 EXPECT_EQ(writeCount, len);
325 total += writeCount;
326
327 if (gTrace) printf("wrote %d bytes: %s\n", writeCount, buf.get());
328 }
329 if (gTrace) {
330 printf("wrote total of %d bytes\n", total);
331 }
332 rv = out1->Close();
333 if (NS_FAILED(rv)) return;
334
335 thread->Shutdown();
336 receiverThread->Shutdown();
337 }
338
339 ////////////////////////////////////////////////////////////////////////////////
340
RunTests(uint32_t segSize,uint32_t segCount)341 static void RunTests(uint32_t segSize, uint32_t segCount) {
342 nsresult rv;
343 nsCOMPtr<nsIInputStream> in;
344 nsCOMPtr<nsIOutputStream> out;
345 uint32_t bufSize = segSize * segCount;
346 if (gTrace) {
347 printf("Testing New Pipes: segment size %d buffer size %d\n", segSize,
348 bufSize);
349 printf("Testing long writes...\n");
350 }
351 rv = NS_NewPipe(getter_AddRefs(in), getter_AddRefs(out), segSize, bufSize);
352 EXPECT_TRUE(NS_SUCCEEDED(rv));
353 rv = TestPipe(in, out);
354 EXPECT_TRUE(NS_SUCCEEDED(rv));
355
356 if (gTrace) {
357 printf("Testing short writes...\n");
358 }
359 rv = NS_NewPipe(getter_AddRefs(in), getter_AddRefs(out), segSize, bufSize);
360 EXPECT_TRUE(NS_SUCCEEDED(rv));
361 rv = TestShortWrites(in, out);
362 EXPECT_TRUE(NS_SUCCEEDED(rv));
363 }
364
TEST(Pipes,Main)365 TEST(Pipes, Main)
366 {
367 RunTests(16, 1);
368 RunTests(4096, 16);
369 }
370
371 ////////////////////////////////////////////////////////////////////////////////
372
373 namespace {
374
375 static const uint32_t DEFAULT_SEGMENT_SIZE = 4 * 1024;
376
377 // An alternate pipe testing routing that uses NS_ConsumeStream() instead of
378 // manual read loop.
TestPipe2(uint32_t aNumBytes,uint32_t aSegmentSize=DEFAULT_SEGMENT_SIZE)379 static void TestPipe2(uint32_t aNumBytes,
380 uint32_t aSegmentSize = DEFAULT_SEGMENT_SIZE) {
381 nsCOMPtr<nsIInputStream> reader;
382 nsCOMPtr<nsIOutputStream> writer;
383
384 uint32_t maxSize = std::max(aNumBytes, aSegmentSize);
385
386 nsresult rv = NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer),
387 aSegmentSize, maxSize);
388 ASSERT_TRUE(NS_SUCCEEDED(rv));
389
390 nsTArray<char> inputData;
391 testing::CreateData(aNumBytes, inputData);
392 testing::WriteAllAndClose(writer, inputData);
393 testing::ConsumeAndValidateStream(reader, inputData);
394 }
395
396 } // namespace
397
TEST(Pipes,Blocking_32k)398 TEST(Pipes, Blocking_32k)
399 { TestPipe2(32 * 1024); }
400
TEST(Pipes,Blocking_64k)401 TEST(Pipes, Blocking_64k)
402 { TestPipe2(64 * 1024); }
403
TEST(Pipes,Blocking_128k)404 TEST(Pipes, Blocking_128k)
405 { TestPipe2(128 * 1024); }
406
407 ////////////////////////////////////////////////////////////////////////////////
408
409 namespace {
410
411 // Utility routine to validate pipe clone before. There are many knobs.
412 //
413 // aTotalBytes Total number of bytes to write to the pipe.
414 // aNumWrites How many separate write calls should be made. Bytes
415 // are evenly distributed over these write calls.
416 // aNumInitialClones How many clones of the pipe input stream should be
417 // made before writing begins.
418 // aNumToCloseAfterWrite How many streams should be closed after each write.
419 // One stream is always kept open. This verifies that
420 // closing one stream does not effect other open
421 // streams.
422 // aNumToCloneAfterWrite How many clones to create after each write. Occurs
423 // after closing any streams. This tests cloning
424 // active streams on a pipe that is being written to.
425 // aNumStreamToReadPerWrite How many streams to read fully after each write.
426 // This tests reading cloned streams at different rates
427 // while the pipe is being written to.
TestPipeClone(uint32_t aTotalBytes,uint32_t aNumWrites,uint32_t aNumInitialClones,uint32_t aNumToCloseAfterWrite,uint32_t aNumToCloneAfterWrite,uint32_t aNumStreamsToReadPerWrite,uint32_t aSegmentSize=DEFAULT_SEGMENT_SIZE)428 static void TestPipeClone(uint32_t aTotalBytes, uint32_t aNumWrites,
429 uint32_t aNumInitialClones,
430 uint32_t aNumToCloseAfterWrite,
431 uint32_t aNumToCloneAfterWrite,
432 uint32_t aNumStreamsToReadPerWrite,
433 uint32_t aSegmentSize = DEFAULT_SEGMENT_SIZE) {
434 nsCOMPtr<nsIInputStream> reader;
435 nsCOMPtr<nsIOutputStream> writer;
436
437 uint32_t maxSize = std::max(aTotalBytes, aSegmentSize);
438
439 // Use async input streams so we can NS_ConsumeStream() the current data
440 // while the pipe is still being written to.
441 nsresult rv =
442 NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer), aSegmentSize,
443 maxSize, true, false); // non-blocking - reader, writer
444 ASSERT_TRUE(NS_SUCCEEDED(rv));
445
446 nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(reader);
447 ASSERT_TRUE(cloneable);
448 ASSERT_TRUE(cloneable->GetCloneable());
449
450 nsTArray<nsCString> outputDataList;
451
452 nsTArray<nsCOMPtr<nsIInputStream>> streamList;
453
454 // first stream is our original reader from the pipe
455 streamList.AppendElement(reader);
456 outputDataList.AppendElement();
457
458 // Clone the initial input stream the specified number of times
459 // before performing any writes.
460 for (uint32_t i = 0; i < aNumInitialClones; ++i) {
461 nsCOMPtr<nsIInputStream>* clone = streamList.AppendElement();
462 rv = cloneable->Clone(getter_AddRefs(*clone));
463 ASSERT_TRUE(NS_SUCCEEDED(rv));
464 ASSERT_TRUE(*clone);
465
466 outputDataList.AppendElement();
467 }
468
469 nsTArray<char> inputData;
470 testing::CreateData(aTotalBytes, inputData);
471
472 const uint32_t bytesPerWrite = ((aTotalBytes - 1) / aNumWrites) + 1;
473 uint32_t offset = 0;
474 uint32_t remaining = aTotalBytes;
475 uint32_t nextStreamToRead = 0;
476
477 while (remaining) {
478 uint32_t numToWrite = std::min(bytesPerWrite, remaining);
479 testing::Write(writer, inputData, offset, numToWrite);
480 offset += numToWrite;
481 remaining -= numToWrite;
482
483 // Close the specified number of streams. This allows us to
484 // test that one closed clone does not break other open clones.
485 for (uint32_t i = 0; i < aNumToCloseAfterWrite && streamList.Length() > 1;
486 ++i) {
487 uint32_t lastIndex = streamList.Length() - 1;
488 streamList[lastIndex]->Close();
489 streamList.RemoveElementAt(lastIndex);
490 outputDataList.RemoveElementAt(lastIndex);
491
492 if (nextStreamToRead >= streamList.Length()) {
493 nextStreamToRead = 0;
494 }
495 }
496
497 // Create the specified number of clones. This lets us verify
498 // that we can create clones in the middle of pipe reading and
499 // writing.
500 for (uint32_t i = 0; i < aNumToCloneAfterWrite; ++i) {
501 nsCOMPtr<nsIInputStream>* clone = streamList.AppendElement();
502 rv = cloneable->Clone(getter_AddRefs(*clone));
503 ASSERT_TRUE(NS_SUCCEEDED(rv));
504 ASSERT_TRUE(*clone);
505
506 // Initialize the new output data to make whats been read to data for
507 // the original stream. First stream is always the original stream.
508 nsCString* outputData = outputDataList.AppendElement();
509 *outputData = outputDataList[0];
510 }
511
512 // Read the specified number of streams. This lets us verify that we
513 // can read from the clones at different rates while the pipe is being
514 // written to.
515 for (uint32_t i = 0; i < aNumStreamsToReadPerWrite; ++i) {
516 nsCOMPtr<nsIInputStream>& stream = streamList[nextStreamToRead];
517 nsCString& outputData = outputDataList[nextStreamToRead];
518
519 // Can't use ConsumeAndValidateStream() here because we're not
520 // guaranteed the exact amount read. It should just be at least
521 // as many as numToWrite.
522 nsAutoCString tmpOutputData;
523 rv = NS_ConsumeStream(stream, UINT32_MAX, tmpOutputData);
524 ASSERT_TRUE(rv == NS_BASE_STREAM_WOULD_BLOCK || NS_SUCCEEDED(rv));
525 ASSERT_GE(tmpOutputData.Length(), numToWrite);
526
527 outputData += tmpOutputData;
528
529 nextStreamToRead += 1;
530 if (nextStreamToRead >= streamList.Length()) {
531 // Note: When we wrap around on the streams being read, its possible
532 // we will trigger a segment to be deleted from the pipe. It
533 // would be nice to validate this here, but we don't have any
534 // QI'able interface that would let us check easily.
535
536 nextStreamToRead = 0;
537 }
538 }
539 }
540
541 rv = writer->Close();
542 ASSERT_TRUE(NS_SUCCEEDED(rv));
543
544 nsDependentCSubstring inputString(inputData.Elements(), inputData.Length());
545
546 // Finally, read the remaining bytes from each stream. This may be
547 // different amounts of data depending on how much reading we did while
548 // writing. Verify that the end result matches the input data.
549 for (uint32_t i = 0; i < streamList.Length(); ++i) {
550 nsCOMPtr<nsIInputStream>& stream = streamList[i];
551 nsCString& outputData = outputDataList[i];
552
553 nsAutoCString tmpOutputData;
554 rv = NS_ConsumeStream(stream, UINT32_MAX, tmpOutputData);
555 ASSERT_TRUE(rv == NS_BASE_STREAM_WOULD_BLOCK || NS_SUCCEEDED(rv));
556 stream->Close();
557
558 // Append to total amount read from the stream
559 outputData += tmpOutputData;
560
561 ASSERT_EQ(inputString.Length(), outputData.Length());
562 ASSERT_TRUE(inputString.Equals(outputData));
563 }
564 }
565
566 } // namespace
567
TEST(Pipes,Clone_BeforeWrite_ReadAtEnd)568 TEST(Pipes, Clone_BeforeWrite_ReadAtEnd)
569 {
570 TestPipeClone(32 * 1024, // total bytes
571 16, // num writes
572 3, // num initial clones
573 0, // num streams to close after each write
574 0, // num clones to add after each write
575 0); // num streams to read after each write
576 }
577
TEST(Pipes,Clone_BeforeWrite_ReadDuringWrite)578 TEST(Pipes, Clone_BeforeWrite_ReadDuringWrite)
579 {
580 // Since this reads all streams on every write, it should trigger the
581 // pipe cursor roll back optimization. Currently we can only verify
582 // this with logging.
583
584 TestPipeClone(32 * 1024, // total bytes
585 16, // num writes
586 3, // num initial clones
587 0, // num streams to close after each write
588 0, // num clones to add after each write
589 4); // num streams to read after each write
590 }
591
TEST(Pipes,Clone_DuringWrite_ReadAtEnd)592 TEST(Pipes, Clone_DuringWrite_ReadAtEnd)
593 {
594 TestPipeClone(32 * 1024, // total bytes
595 16, // num writes
596 0, // num initial clones
597 0, // num streams to close after each write
598 1, // num clones to add after each write
599 0); // num streams to read after each write
600 }
601
TEST(Pipes,Clone_DuringWrite_ReadDuringWrite)602 TEST(Pipes, Clone_DuringWrite_ReadDuringWrite)
603 {
604 TestPipeClone(32 * 1024, // total bytes
605 16, // num writes
606 0, // num initial clones
607 0, // num streams to close after each write
608 1, // num clones to add after each write
609 1); // num streams to read after each write
610 }
611
TEST(Pipes,Clone_DuringWrite_ReadDuringWrite_CloseDuringWrite)612 TEST(Pipes, Clone_DuringWrite_ReadDuringWrite_CloseDuringWrite)
613 {
614 // Since this reads streams faster than we clone new ones, it should
615 // trigger pipe segment deletion periodically. Currently we can
616 // only verify this with logging.
617
618 TestPipeClone(32 * 1024, // total bytes
619 16, // num writes
620 1, // num initial clones
621 1, // num streams to close after each write
622 2, // num clones to add after each write
623 3); // num streams to read after each write
624 }
625
TEST(Pipes,Write_AsyncWait)626 TEST(Pipes, Write_AsyncWait)
627 {
628 nsCOMPtr<nsIAsyncInputStream> reader;
629 nsCOMPtr<nsIAsyncOutputStream> writer;
630
631 const uint32_t segmentSize = 1024;
632 const uint32_t numSegments = 1;
633
634 nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer),
635 true, true, // non-blocking - reader, writer
636 segmentSize, numSegments);
637 ASSERT_TRUE(NS_SUCCEEDED(rv));
638
639 nsTArray<char> inputData;
640 testing::CreateData(segmentSize, inputData);
641
642 uint32_t numWritten = 0;
643 rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
644 ASSERT_TRUE(NS_SUCCEEDED(rv));
645
646 rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
647 ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv);
648
649 RefPtr<testing::OutputStreamCallback> cb =
650 new testing::OutputStreamCallback();
651
652 rv = writer->AsyncWait(cb, 0, 0, nullptr);
653 ASSERT_TRUE(NS_SUCCEEDED(rv));
654
655 ASSERT_FALSE(cb->Called());
656
657 testing::ConsumeAndValidateStream(reader, inputData);
658
659 ASSERT_TRUE(cb->Called());
660 }
661
TEST(Pipes,Write_AsyncWait_Clone)662 TEST(Pipes, Write_AsyncWait_Clone)
663 {
664 nsCOMPtr<nsIAsyncInputStream> reader;
665 nsCOMPtr<nsIAsyncOutputStream> writer;
666
667 const uint32_t segmentSize = 1024;
668 const uint32_t numSegments = 1;
669
670 nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer),
671 true, true, // non-blocking - reader, writer
672 segmentSize, numSegments);
673 ASSERT_TRUE(NS_SUCCEEDED(rv));
674
675 nsCOMPtr<nsIInputStream> clone;
676 rv = NS_CloneInputStream(reader, getter_AddRefs(clone));
677 ASSERT_TRUE(NS_SUCCEEDED(rv));
678
679 nsTArray<char> inputData;
680 testing::CreateData(segmentSize, inputData);
681
682 uint32_t numWritten = 0;
683 rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
684 ASSERT_TRUE(NS_SUCCEEDED(rv));
685
686 // This attempts to write data beyond the original pipe size limit. It
687 // should fail since neither side of the clone has been read yet.
688 rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
689 ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv);
690
691 RefPtr<testing::OutputStreamCallback> cb =
692 new testing::OutputStreamCallback();
693
694 rv = writer->AsyncWait(cb, 0, 0, nullptr);
695 ASSERT_TRUE(NS_SUCCEEDED(rv));
696
697 ASSERT_FALSE(cb->Called());
698
699 // Consume data on the original stream, but the clone still has not been read.
700 testing::ConsumeAndValidateStream(reader, inputData);
701
702 // A clone that is not being read should not stall the other input stream
703 // reader. Therefore the writer callback should trigger when the fastest
704 // reader drains the other input stream.
705 ASSERT_TRUE(cb->Called());
706
707 // Attempt to write data. This will buffer data beyond the pipe size limit in
708 // order for the clone stream to still work. This is allowed because the
709 // other input stream has drained its buffered segments and is ready for more
710 // data.
711 rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
712 ASSERT_TRUE(NS_SUCCEEDED(rv));
713
714 // Again, this should fail since the origin stream has not been read again.
715 // The pipe size should still restrict how far ahead we can buffer even
716 // when there is a cloned stream not being read.
717 rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
718 ASSERT_TRUE(NS_FAILED(rv));
719
720 cb = new testing::OutputStreamCallback();
721 rv = writer->AsyncWait(cb, 0, 0, nullptr);
722 ASSERT_TRUE(NS_SUCCEEDED(rv));
723
724 // The write should again be blocked since we have written data and the
725 // main reader is at its maximum advance buffer.
726 ASSERT_FALSE(cb->Called());
727
728 nsTArray<char> expectedCloneData;
729 expectedCloneData.AppendElements(inputData);
730 expectedCloneData.AppendElements(inputData);
731
732 // We should now be able to consume the entire backlog of buffered data on
733 // the cloned stream.
734 testing::ConsumeAndValidateStream(clone, expectedCloneData);
735
736 // Draining the clone side should also trigger the AsyncWait() writer
737 // callback
738 ASSERT_TRUE(cb->Called());
739
740 // Finally, we should be able to consume the remaining data on the original
741 // reader.
742 testing::ConsumeAndValidateStream(reader, inputData);
743 }
744
TEST(Pipes,Write_AsyncWait_Clone_CloseOriginal)745 TEST(Pipes, Write_AsyncWait_Clone_CloseOriginal)
746 {
747 nsCOMPtr<nsIAsyncInputStream> reader;
748 nsCOMPtr<nsIAsyncOutputStream> writer;
749
750 const uint32_t segmentSize = 1024;
751 const uint32_t numSegments = 1;
752
753 nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer),
754 true, true, // non-blocking - reader, writer
755 segmentSize, numSegments);
756 ASSERT_TRUE(NS_SUCCEEDED(rv));
757
758 nsCOMPtr<nsIInputStream> clone;
759 rv = NS_CloneInputStream(reader, getter_AddRefs(clone));
760 ASSERT_TRUE(NS_SUCCEEDED(rv));
761
762 nsTArray<char> inputData;
763 testing::CreateData(segmentSize, inputData);
764
765 uint32_t numWritten = 0;
766 rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
767 ASSERT_TRUE(NS_SUCCEEDED(rv));
768
769 // This attempts to write data beyond the original pipe size limit. It
770 // should fail since neither side of the clone has been read yet.
771 rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
772 ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv);
773
774 RefPtr<testing::OutputStreamCallback> cb =
775 new testing::OutputStreamCallback();
776
777 rv = writer->AsyncWait(cb, 0, 0, nullptr);
778 ASSERT_TRUE(NS_SUCCEEDED(rv));
779
780 ASSERT_FALSE(cb->Called());
781
782 // Consume data on the original stream, but the clone still has not been read.
783 testing::ConsumeAndValidateStream(reader, inputData);
784
785 // A clone that is not being read should not stall the other input stream
786 // reader. Therefore the writer callback should trigger when the fastest
787 // reader drains the other input stream.
788 ASSERT_TRUE(cb->Called());
789
790 // Attempt to write data. This will buffer data beyond the pipe size limit in
791 // order for the clone stream to still work. This is allowed because the
792 // other input stream has drained its buffered segments and is ready for more
793 // data.
794 rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
795 ASSERT_TRUE(NS_SUCCEEDED(rv));
796
797 // Again, this should fail since the origin stream has not been read again.
798 // The pipe size should still restrict how far ahead we can buffer even
799 // when there is a cloned stream not being read.
800 rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
801 ASSERT_TRUE(NS_FAILED(rv));
802
803 cb = new testing::OutputStreamCallback();
804 rv = writer->AsyncWait(cb, 0, 0, nullptr);
805 ASSERT_TRUE(NS_SUCCEEDED(rv));
806
807 // The write should again be blocked since we have written data and the
808 // main reader is at its maximum advance buffer.
809 ASSERT_FALSE(cb->Called());
810
811 // Close the original reader input stream. This was the fastest reader,
812 // so we should have a single stream that is buffered beyond our nominal
813 // limit.
814 reader->Close();
815
816 // Because the clone stream is still buffered the writable callback should
817 // not be fired.
818 ASSERT_FALSE(cb->Called());
819
820 // And we should not be able to perform a write.
821 rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
822 ASSERT_TRUE(NS_FAILED(rv));
823
824 // Create another clone stream. Now we have two streams that exceed our
825 // maximum size limit
826 nsCOMPtr<nsIInputStream> clone2;
827 rv = NS_CloneInputStream(clone, getter_AddRefs(clone2));
828 ASSERT_TRUE(NS_SUCCEEDED(rv));
829
830 nsTArray<char> expectedCloneData;
831 expectedCloneData.AppendElements(inputData);
832 expectedCloneData.AppendElements(inputData);
833
834 // We should now be able to consume the entire backlog of buffered data on
835 // the cloned stream.
836 testing::ConsumeAndValidateStream(clone, expectedCloneData);
837
838 // The pipe should now be writable because we have two open streams, one of
839 // which is completely drained.
840 ASSERT_TRUE(cb->Called());
841
842 // Write again to reach our limit again.
843 rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
844 ASSERT_TRUE(NS_SUCCEEDED(rv));
845
846 // The stream is again non-writeable.
847 cb = new testing::OutputStreamCallback();
848 rv = writer->AsyncWait(cb, 0, 0, nullptr);
849 ASSERT_TRUE(NS_SUCCEEDED(rv));
850 ASSERT_FALSE(cb->Called());
851
852 // Close the empty stream. This is different from our previous close since
853 // before we were closing a stream with some data still buffered.
854 clone->Close();
855
856 // The pipe should not be writable. The second clone is still fully buffered
857 // over our limit.
858 ASSERT_FALSE(cb->Called());
859 rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
860 ASSERT_TRUE(NS_FAILED(rv));
861
862 // Finally consume all of the buffered data on the second clone.
863 expectedCloneData.AppendElements(inputData);
864 testing::ConsumeAndValidateStream(clone2, expectedCloneData);
865
866 // Draining the final clone should make the pipe writable again.
867 ASSERT_TRUE(cb->Called());
868 }
869
TEST(Pipes,Read_AsyncWait)870 TEST(Pipes, Read_AsyncWait)
871 {
872 nsCOMPtr<nsIAsyncInputStream> reader;
873 nsCOMPtr<nsIAsyncOutputStream> writer;
874
875 const uint32_t segmentSize = 1024;
876 const uint32_t numSegments = 1;
877
878 nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer),
879 true, true, // non-blocking - reader, writer
880 segmentSize, numSegments);
881 ASSERT_TRUE(NS_SUCCEEDED(rv));
882
883 nsTArray<char> inputData;
884 testing::CreateData(segmentSize, inputData);
885
886 RefPtr<testing::InputStreamCallback> cb = new testing::InputStreamCallback();
887
888 rv = reader->AsyncWait(cb, 0, 0, nullptr);
889 ASSERT_TRUE(NS_SUCCEEDED(rv));
890
891 ASSERT_FALSE(cb->Called());
892
893 uint32_t numWritten = 0;
894 rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
895 ASSERT_TRUE(NS_SUCCEEDED(rv));
896
897 ASSERT_TRUE(cb->Called());
898
899 testing::ConsumeAndValidateStream(reader, inputData);
900 }
901
TEST(Pipes,Read_AsyncWait_Clone)902 TEST(Pipes, Read_AsyncWait_Clone)
903 {
904 nsCOMPtr<nsIAsyncInputStream> reader;
905 nsCOMPtr<nsIAsyncOutputStream> writer;
906
907 const uint32_t segmentSize = 1024;
908 const uint32_t numSegments = 1;
909
910 nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer),
911 true, true, // non-blocking - reader, writer
912 segmentSize, numSegments);
913 ASSERT_TRUE(NS_SUCCEEDED(rv));
914
915 nsCOMPtr<nsIInputStream> clone;
916 rv = NS_CloneInputStream(reader, getter_AddRefs(clone));
917 ASSERT_TRUE(NS_SUCCEEDED(rv));
918
919 nsCOMPtr<nsIAsyncInputStream> asyncClone = do_QueryInterface(clone);
920 ASSERT_TRUE(asyncClone);
921
922 nsTArray<char> inputData;
923 testing::CreateData(segmentSize, inputData);
924
925 RefPtr<testing::InputStreamCallback> cb = new testing::InputStreamCallback();
926
927 RefPtr<testing::InputStreamCallback> cb2 = new testing::InputStreamCallback();
928
929 rv = reader->AsyncWait(cb, 0, 0, nullptr);
930 ASSERT_TRUE(NS_SUCCEEDED(rv));
931
932 ASSERT_FALSE(cb->Called());
933
934 rv = asyncClone->AsyncWait(cb2, 0, 0, nullptr);
935 ASSERT_TRUE(NS_SUCCEEDED(rv));
936
937 ASSERT_FALSE(cb2->Called());
938
939 uint32_t numWritten = 0;
940 rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
941 ASSERT_TRUE(NS_SUCCEEDED(rv));
942
943 ASSERT_TRUE(cb->Called());
944 ASSERT_TRUE(cb2->Called());
945
946 testing::ConsumeAndValidateStream(reader, inputData);
947 }
948
949 namespace {
950
CloseDuringReadFunc(nsIInputStream * aReader,void * aClosure,const char * aFromSegment,uint32_t aToOffset,uint32_t aCount,uint32_t * aWriteCountOut)951 nsresult CloseDuringReadFunc(nsIInputStream* aReader, void* aClosure,
952 const char* aFromSegment, uint32_t aToOffset,
953 uint32_t aCount, uint32_t* aWriteCountOut) {
954 MOZ_RELEASE_ASSERT(aReader);
955 MOZ_RELEASE_ASSERT(aClosure);
956 MOZ_RELEASE_ASSERT(aFromSegment);
957 MOZ_RELEASE_ASSERT(aWriteCountOut);
958 MOZ_RELEASE_ASSERT(aToOffset == 0);
959
960 // This is insanity and you probably should not do this under normal
961 // conditions. We want to simulate the case where the pipe is closed
962 // (possibly from other end on another thread) simultaneously with the
963 // read. This is the easiest way to do trigger this case in a synchronous
964 // gtest.
965 MOZ_ALWAYS_SUCCEEDS(aReader->Close());
966
967 nsTArray<char>* buffer = static_cast<nsTArray<char>*>(aClosure);
968 buffer->AppendElements(aFromSegment, aCount);
969
970 *aWriteCountOut = aCount;
971
972 return NS_OK;
973 }
974
TestCloseDuringRead(uint32_t aSegmentSize,uint32_t aDataSize)975 void TestCloseDuringRead(uint32_t aSegmentSize, uint32_t aDataSize) {
976 nsCOMPtr<nsIInputStream> reader;
977 nsCOMPtr<nsIOutputStream> writer;
978
979 const uint32_t maxSize = aSegmentSize;
980
981 nsresult rv = NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer),
982 aSegmentSize, maxSize);
983 ASSERT_TRUE(NS_SUCCEEDED(rv));
984
985 nsTArray<char> inputData;
986
987 testing::CreateData(aDataSize, inputData);
988
989 uint32_t numWritten = 0;
990 rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
991 ASSERT_TRUE(NS_SUCCEEDED(rv));
992
993 nsTArray<char> outputData;
994
995 uint32_t numRead = 0;
996 rv = reader->ReadSegments(CloseDuringReadFunc, &outputData,
997 inputData.Length(), &numRead);
998 ASSERT_TRUE(NS_SUCCEEDED(rv));
999 ASSERT_EQ(inputData.Length(), numRead);
1000
1001 ASSERT_EQ(inputData, outputData);
1002
1003 uint64_t available;
1004 rv = reader->Available(&available);
1005 ASSERT_EQ(NS_BASE_STREAM_CLOSED, rv);
1006 }
1007
1008 } // namespace
1009
TEST(Pipes,Close_During_Read_Partial_Segment)1010 TEST(Pipes, Close_During_Read_Partial_Segment)
1011 { TestCloseDuringRead(1024, 512); }
1012
TEST(Pipes,Close_During_Read_Full_Segment)1013 TEST(Pipes, Close_During_Read_Full_Segment)
1014 { TestCloseDuringRead(1024, 1024); }
1015
TEST(Pipes,Interfaces)1016 TEST(Pipes, Interfaces)
1017 {
1018 nsCOMPtr<nsIInputStream> reader;
1019 nsCOMPtr<nsIOutputStream> writer;
1020
1021 nsresult rv = NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer));
1022 ASSERT_TRUE(NS_SUCCEEDED(rv));
1023
1024 nsCOMPtr<nsIAsyncInputStream> readerType1 = do_QueryInterface(reader);
1025 ASSERT_TRUE(readerType1);
1026
1027 nsCOMPtr<nsITellableStream> readerType2 = do_QueryInterface(reader);
1028 ASSERT_TRUE(readerType2);
1029
1030 nsCOMPtr<nsISearchableInputStream> readerType3 = do_QueryInterface(reader);
1031 ASSERT_TRUE(readerType3);
1032
1033 nsCOMPtr<nsICloneableInputStream> readerType4 = do_QueryInterface(reader);
1034 ASSERT_TRUE(readerType4);
1035
1036 nsCOMPtr<nsIClassInfo> readerType5 = do_QueryInterface(reader);
1037 ASSERT_TRUE(readerType5);
1038
1039 nsCOMPtr<nsIBufferedInputStream> readerType6 = do_QueryInterface(reader);
1040 ASSERT_TRUE(readerType6);
1041 }
1042