1 // Copyright 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "third_party/blink/renderer/core/streams/readable_stream.h"
6 
7 #include "base/stl_util.h"
8 #include "third_party/blink/renderer/bindings/core/v8/script_function.h"
9 #include "third_party/blink/renderer/bindings/core/v8/v8_abort_signal.h"
10 #include "third_party/blink/renderer/bindings/core/v8/v8_iterator_result_value.h"
11 #include "third_party/blink/renderer/bindings/core/v8/v8_readable_stream.h"
12 #include "third_party/blink/renderer/bindings/core/v8/v8_throw_dom_exception.h"
13 #include "third_party/blink/renderer/bindings/core/v8/v8_writable_stream.h"
14 #include "third_party/blink/renderer/core/dom/abort_signal.h"
15 #include "third_party/blink/renderer/core/execution_context/execution_context.h"
16 #include "third_party/blink/renderer/core/frame/web_feature.h"
17 #include "third_party/blink/renderer/core/streams/miscellaneous_operations.h"
18 #include "third_party/blink/renderer/core/streams/promise_handler.h"
19 #include "third_party/blink/renderer/core/streams/readable_stream_default_controller.h"
20 #include "third_party/blink/renderer/core/streams/readable_stream_reader.h"
21 #include "third_party/blink/renderer/core/streams/stream_algorithms.h"
22 #include "third_party/blink/renderer/core/streams/stream_promise_resolver.h"
23 #include "third_party/blink/renderer/core/streams/transferable_streams.h"
24 #include "third_party/blink/renderer/core/streams/underlying_source_base.h"
25 #include "third_party/blink/renderer/core/streams/writable_stream.h"
26 #include "third_party/blink/renderer/core/streams/writable_stream_default_controller.h"
27 #include "third_party/blink/renderer/core/streams/writable_stream_default_writer.h"
28 #include "third_party/blink/renderer/platform/bindings/exception_code.h"
29 #include "third_party/blink/renderer/platform/bindings/exception_state.h"
30 #include "third_party/blink/renderer/platform/bindings/script_state.h"
31 #include "third_party/blink/renderer/platform/bindings/v8_binding.h"
32 #include "third_party/blink/renderer/platform/heap/garbage_collected.h"
33 #include "third_party/blink/renderer/platform/heap/heap_allocator.h"
34 #include "third_party/blink/renderer/platform/heap/persistent.h"
35 #include "third_party/blink/renderer/platform/heap/visitor.h"
36 #include "third_party/blink/renderer/platform/instrumentation/use_counter.h"
37 #include "third_party/blink/renderer/platform/wtf/assertions.h"
38 #include "third_party/blink/renderer/platform/wtf/deque.h"
39 
40 namespace blink {
41 
PipeOptions()42 ReadableStream::PipeOptions::PipeOptions()
43     : prevent_close_(false), prevent_abort_(false), prevent_cancel_(false) {}
44 
PipeOptions(ScriptState * script_state,ScriptValue options,ExceptionState & exception_state)45 ReadableStream::PipeOptions::PipeOptions(ScriptState* script_state,
46                                          ScriptValue options,
47                                          ExceptionState& exception_state) {
48   auto* isolate = script_state->GetIsolate();
49   v8::TryCatch block(isolate);
50   v8::Local<v8::Value> options_value = options.V8Value();
51   v8::Local<v8::Object> options_object;
52   if (options_value->IsUndefined()) {
53     options_object = v8::Object::New(isolate);
54   } else if (!options_value->ToObject(script_state->GetContext())
55                   .ToLocal(&options_object)) {
56     exception_state.RethrowV8Exception(block.Exception());
57     return;
58   }
59 
60   // 4. Set preventClose to ! ToBoolean(preventClose), set preventAbort to !
61   // ToBoolean(preventAbort), and set preventCancel to !
62   // ToBoolean(preventCancel).
63   prevent_close_ =
64       GetBoolean(script_state, options_object, "preventClose", exception_state);
65   if (exception_state.HadException()) {
66     return;
67   }
68 
69   prevent_abort_ =
70       GetBoolean(script_state, options_object, "preventAbort", exception_state);
71   if (exception_state.HadException()) {
72     return;
73   }
74 
75   prevent_cancel_ = GetBoolean(script_state, options_object, "preventCancel",
76                                exception_state);
77   if (exception_state.HadException()) {
78     return;
79   }
80 
81   v8::Local<v8::Value> signal_value;
82   if (!options_object
83            ->Get(script_state->GetContext(), V8AtomicString(isolate, "signal"))
84            .ToLocal(&signal_value)) {
85     exception_state.RethrowV8Exception(block.Exception());
86     return;
87   }
88 
89   // 5. If signal is not undefined, and signal is not an instance of the
90   // AbortSignal interface, throw a TypeError exception.
91   if (signal_value->IsUndefined())
92     return;
93 
94   signal_ = V8AbortSignal::ToImplWithTypeCheck(isolate, signal_value);
95   if (!signal_) {
96     exception_state.ThrowTypeError(
97         "'signal' must be an AbortSignal object or undefined");
98     return;
99   }
100 }
101 
Trace(Visitor * visitor)102 void ReadableStream::PipeOptions::Trace(Visitor* visitor) {
103   visitor->Trace(signal_);
104 }
105 
GetBoolean(ScriptState * script_state,v8::Local<v8::Object> dictionary,const char * property_name,ExceptionState & exception_state)106 bool ReadableStream::PipeOptions::GetBoolean(ScriptState* script_state,
107                                              v8::Local<v8::Object> dictionary,
108                                              const char* property_name,
109                                              ExceptionState& exception_state) {
110   auto* isolate = script_state->GetIsolate();
111   v8::TryCatch block(isolate);
112   v8::Local<v8::Value> property_value;
113   if (!dictionary
114            ->Get(script_state->GetContext(),
115                  V8AtomicString(isolate, property_name))
116            .ToLocal(&property_value)) {
117     exception_state.RethrowV8Exception(block.Exception());
118     return false;
119   }
120   return property_value->ToBoolean(isolate)->Value();
121 }
122 
123 // PipeToEngine implements PipeTo(). All standard steps in this class come from
124 // https://streams.spec.whatwg.org/#readable-stream-pipe-to
125 //
126 // This implementation is simple but suboptimal because it uses V8 promises to
127 // drive its asynchronous state machine, allocating a lot of temporary V8
128 // objects as a result.
129 //
130 // TODO(ricea): Create internal versions of ReadableStreamDefaultReader::Read()
131 // and WritableStreamDefaultWriter::Write() to bypass promise creation and so
132 // reduce the number of allocations on the hot path.
133 class ReadableStream::PipeToEngine final
134     : public GarbageCollected<PipeToEngine> {
135  public:
PipeToEngine(ScriptState * script_state,PipeOptions * pipe_options)136   PipeToEngine(ScriptState* script_state, PipeOptions* pipe_options)
137       : script_state_(script_state), pipe_options_(pipe_options) {}
138 
139   // This is the main entrypoint for ReadableStreamPipeTo().
Start(ReadableStream * readable,WritableStream * destination)140   ScriptPromise Start(ReadableStream* readable, WritableStream* destination) {
141     // 1. Assert: ! IsReadableStream(source) is true.
142     DCHECK(readable);
143 
144     // 2. Assert: ! IsWritableStream(dest) is true.
145     DCHECK(destination);
146 
147     // Not relevant to C++ implementation:
148     // 3. Assert: Type(preventClose) is Boolean, Type(preventAbort) is Boolean,
149     //    and Type(preventCancel) is Boolean.
150 
151     // TODO(ricea): Implement |signal|.
152     // 4. Assert: signal is undefined or signal is an instance of the
153     //    AbortSignal interface.
154 
155     // 5. Assert: ! IsReadableStreamLocked(source) is false.
156     DCHECK(!ReadableStream::IsLocked(readable));
157 
158     // 6. Assert: ! IsWritableStreamLocked(dest) is false.
159     DCHECK(!WritableStream::IsLocked(destination));
160 
161     auto* isolate = script_state_->GetIsolate();
162     ExceptionState exception_state(isolate, ExceptionState::kUnknownContext, "",
163                                    "");
164 
165     // 7. If !
166     //    IsReadableByteStreamController(source.[[readableStreamController]]) is
167     //    true, let reader be either ! AcquireReadableStreamBYOBReader(source)
168     //    or ! AcquireReadableStreamDefaultReader(source), at the user agent’s
169     //    discretion.
170     // 8. Otherwise, let reader be ! AcquireReadableStreamDefaultReader(source).
171     reader_ = ReadableStream::AcquireDefaultReader(script_state_, readable,
172                                                    false, exception_state);
173     DCHECK(!exception_state.HadException());
174 
175     // 9. Let writer be ! AcquireWritableStreamDefaultWriter(dest).
176     writer_ = WritableStream::AcquireDefaultWriter(script_state_, destination,
177                                                    exception_state);
178     DCHECK(!exception_state.HadException());
179 
180     // 10. Let shuttingDown be false.
181     DCHECK(!is_shutting_down_);
182 
183     // 11. Let promise be a new promise.
184     promise_ = MakeGarbageCollected<StreamPromiseResolver>(script_state_);
185 
186     // 12. If signal is not undefined,
187     if (auto* signal = pipe_options_->Signal()) {
188       //   b. If signal’s aborted flag is set, perform abortAlgorithm and
189       //      return promise.
190       if (signal->aborted()) {
191         AbortAlgorithm();
192         return promise_->GetScriptPromise(script_state_);
193       }
194 
195       //   c. Add abortAlgorithm to signal.
196       signal->AddAlgorithm(
197           WTF::Bind(&PipeToEngine::AbortAlgorithm, WrapWeakPersistent(this)));
198     }
199 
200     // 13. In parallel ...
201     // The rest of the algorithm is described in terms of a series of
202     // constraints rather than as explicit steps.
203     if (CheckInitialState()) {
204       // Need to detect closing and error when we are not reading. This
205       // corresponds to the following conditions from the standard:
206       //     1. Errors must be propagated forward: if source.[[state]] is or
207       //        becomes "errored", ...
208       // and
209       //     3. Closing must be propagated forward: if source.[[state]] is or
210       //        becomes "closed", ...
211       ThenPromise(reader_->ClosedPromise()->V8Promise(isolate),
212                   &PipeToEngine::OnReaderClosed, &PipeToEngine::ReadableError);
213 
214       // Need to detect error when we are not writing. This corresponds to this
215       // condition from the standard:
216       //    2. Errors must be propagated backward: if dest.[[state]] is or
217       //       becomes "errored", ...
218       // We do not need to detect closure of the writable end of the pipe,
219       // because we have it locked and so it can only be closed by us.
220       ThenPromise(writer_->ClosedPromise()->V8Promise(isolate), nullptr,
221                   &PipeToEngine::WritableError);
222 
223       // Start the main read / write loop.
224       HandleNextEvent(Undefined());
225     }
226 
227     // 14. Return promise.
228     return promise_->GetScriptPromise(script_state_);
229   }
230 
Trace(Visitor * visitor)231   void Trace(Visitor* visitor) {
232     visitor->Trace(script_state_);
233     visitor->Trace(pipe_options_);
234     visitor->Trace(reader_);
235     visitor->Trace(writer_);
236     visitor->Trace(promise_);
237     visitor->Trace(last_write_);
238     visitor->Trace(shutdown_error_);
239   }
240 
241  private:
242   // The implementation uses method pointers to maximise code reuse.
243 
244   // |Action| represents an action that can be passed to the "Shutdown with an
245   // action" operation. Each Action is implemented as a method which delegates
246   // to some abstract operation, inferring the arguments from the state of
247   // |this|.
248   using Action = v8::Local<v8::Promise> (PipeToEngine::*)();
249 
250   // This implementation uses ThenPromise() 7 times. Instead of creating a dozen
251   // separate subclasses of ScriptFunction, we use a single implementation and
252   // pass a method pointer at runtime to control the behaviour. Most
253   // PromiseReaction methods don't need to return a value, but because some do,
254   // the rest have to return undefined so that they can have the same method
255   // signature. Similarly, many of the methods ignore the argument that is
256   // passed to them.
257   using PromiseReaction =
258       v8::Local<v8::Value> (PipeToEngine::*)(v8::Local<v8::Value>);
259 
260   class WrappedPromiseReaction final : public PromiseHandlerWithValue {
261    public:
WrappedPromiseReaction(ScriptState * script_state,PipeToEngine * instance,PromiseReaction method)262     WrappedPromiseReaction(ScriptState* script_state,
263                            PipeToEngine* instance,
264                            PromiseReaction method)
265         : PromiseHandlerWithValue(script_state),
266           instance_(instance),
267           method_(method) {}
268 
CallWithLocal(v8::Local<v8::Value> value)269     v8::Local<v8::Value> CallWithLocal(v8::Local<v8::Value> value) override {
270       return (instance_->*method_)(value);
271     }
272 
Trace(Visitor * visitor)273     void Trace(Visitor* visitor) override {
274       visitor->Trace(instance_);
275       ScriptFunction::Trace(visitor);
276     }
277 
278    private:
279     Member<PipeToEngine> instance_;
280     PromiseReaction method_;
281   };
282 
283   // Checks the state of the streams and executes the shutdown handlers if
284   // necessary. Returns true if piping can continue.
CheckInitialState()285   bool CheckInitialState() {
286     auto* isolate = script_state_->GetIsolate();
287     const auto state = Readable()->state_;
288 
289     // Both streams can be errored or closed. To perform the right action the
290     // order of the checks must match the standard: "the following conditions
291     // must be applied in order." This method only checks the initial state;
292     // detection of state changes elsewhere is done through checking promise
293     // reactions.
294 
295     // a. Errors must be propagated forward: if source.[[state]] is or
296     //    becomes "errored",
297     if (state == kErrored) {
298       ReadableError(Readable()->GetStoredError(isolate));
299       return false;
300     }
301 
302     // 2. Errors must be propagated backward: if dest.[[state]] is or becomes
303     //    "errored",
304     if (Destination()->IsErrored()) {
305       WritableError(Destination()->GetStoredError(isolate));
306       return false;
307     }
308 
309     // 3. Closing must be propagated forward: if source.[[state]] is or
310     //    becomes "closed", then
311     if (state == kClosed) {
312       ReadableClosed();
313       return false;
314     }
315 
316     // 4. Closing must be propagated backward: if !
317     //    WritableStreamCloseQueuedOrInFlight(dest) is true or dest.[[state]]
318     //    is "closed",
319     if (Destination()->IsClosingOrClosed()) {
320       WritableStartedClosed();
321       return false;
322     }
323 
324     return true;
325   }
326 
AbortAlgorithm()327   void AbortAlgorithm() {
328     // a. Let abortAlgorithm be the following steps:
329     //    i. Let error be a new "AbortError" DOMException.
330     v8::Local<v8::Value> error = V8ThrowDOMException::CreateOrEmpty(
331         script_state_->GetIsolate(), DOMExceptionCode::kAbortError,
332         "Pipe aborted.");
333 
334     // Steps ii. to iv. are implemented in AbortAlgorithmAction.
335 
336     //    v. Shutdown with an action consisting of getting a promise to wait for
337     //       all of the actions in actions, and with error.
338     ShutdownWithAction(&PipeToEngine::AbortAlgorithmAction, error);
339   }
340 
AbortAlgorithmAction()341   v8::Local<v8::Promise> AbortAlgorithmAction() {
342     v8::Local<v8::Value> error =
343         shutdown_error_.NewLocal(script_state_->GetIsolate());
344 
345     // ii. Let actions be an empty ordered set.
346     HeapVector<ScriptPromise> actions;
347 
348     // This method runs later than the equivalent steps in the standard. This
349     // means that it is safe to do the checks of the state of the destination
350     // and source synchronously, simplifying the logic.
351 
352     // iii. If preventAbort is false, append the following action to actions:
353     //      1. If dest.[[state]] is "writable", return !
354     //         WritableStreamAbort(dest, error).
355     //      2. Otherwise, return a promise resolved with undefined.
356     if (!pipe_options_->PreventAbort() && Destination()->IsWritable()) {
357       actions.push_back(ScriptPromise(
358           script_state_,
359           WritableStream::Abort(script_state_, Destination(), error)));
360     }
361 
362     //  iv. If preventCancel is false, append the following action action to
363     //      actions:
364     //      1. If source.[[state]] is "readable", return !
365     //         ReadableStreamCancel(source, error).
366     //      2. Otherwise, return a promise resolved with undefined.
367     if (!pipe_options_->PreventCancel() && IsReadable(Readable())) {
368       actions.push_back(ScriptPromise(
369           script_state_,
370           ReadableStream::Cancel(script_state_, Readable(), error)));
371     }
372 
373     return ScriptPromise::All(script_state_, actions)
374         .V8Value()
375         .As<v8::Promise>();
376   }
377 
378   // HandleNextEvent() has an unused argument and return value because it is a
379   // PromiseReaction. HandleNextEvent() and ReadFulfilled() call each other
380   // asynchronously in a loop until the pipe completes.
HandleNextEvent(v8::Local<v8::Value>)381   v8::Local<v8::Value> HandleNextEvent(v8::Local<v8::Value>) {
382     DCHECK(!is_reading_);
383     if (is_shutting_down_) {
384       return Undefined();
385     }
386 
387     base::Optional<double> desired_size = writer_->GetDesiredSizeInternal();
388     if (!desired_size.has_value()) {
389       // This can happen if abort() is queued but not yet started when
390       // pipeTo() is called. In that case [[storedError]] is not set yet, and
391       // we need to wait until it is before we can cancel the pipe. Once
392       // [[storedError]] has been set, the rejection handler set on the writer
393       // closed promise above will detect it, so all we need to do here is
394       // nothing.
395       return Undefined();
396     }
397 
398     if (desired_size.value() <= 0) {
399       // Need to wait for backpressure to go away.
400       ThenPromise(
401           writer_->ReadyPromise()->V8Promise(script_state_->GetIsolate()),
402           &PipeToEngine::HandleNextEvent, &PipeToEngine::WritableError);
403       return Undefined();
404     }
405 
406     is_reading_ = true;
407     ThenPromise(ReadableStreamReader::Read(script_state_, reader_)
408                     ->V8Promise(script_state_->GetIsolate()),
409                 &PipeToEngine::ReadFulfilled, &PipeToEngine::ReadRejected);
410     return Undefined();
411   }
412 
ReadFulfilled(v8::Local<v8::Value> result)413   v8::Local<v8::Value> ReadFulfilled(v8::Local<v8::Value> result) {
414     is_reading_ = false;
415     DCHECK(result->IsObject());
416     auto* isolate = script_state_->GetIsolate();
417     v8::Local<v8::Value> value;
418     bool done = false;
419     bool unpack_succeeded =
420         V8UnpackIteratorResult(script_state_, result.As<v8::Object>(), &done)
421             .ToLocal(&value);
422     DCHECK(unpack_succeeded);
423     if (done) {
424       ReadableClosed();
425       return Undefined();
426     }
427     const auto write =
428         WritableStreamDefaultWriter::Write(script_state_, writer_, value);
429     last_write_.Set(isolate, write);
430     ThenPromise(write, nullptr, &PipeToEngine::WritableError);
431     HandleNextEvent(Undefined());
432     return Undefined();
433   }
434 
ReadRejected(v8::Local<v8::Value>)435   v8::Local<v8::Value> ReadRejected(v8::Local<v8::Value>) {
436     is_reading_ = false;
437     ReadableError(Readable()->GetStoredError(script_state_->GetIsolate()));
438     return Undefined();
439   }
440 
441   // If read() is in progress, then wait for it to tell us that the stream is
442   // closed so that we write all the data before shutdown.
OnReaderClosed(v8::Local<v8::Value>)443   v8::Local<v8::Value> OnReaderClosed(v8::Local<v8::Value>) {
444     if (!is_reading_) {
445       ReadableClosed();
446     }
447     return Undefined();
448   }
449 
450   // 1. Errors must be propagated forward: if source.[[state]] is or
451   //    becomes "errored", then
ReadableError(v8::Local<v8::Value> error)452   v8::Local<v8::Value> ReadableError(v8::Local<v8::Value> error) {
453     // This function can be called during shutdown when the lock is released.
454     // Exit early in that case.
455     if (is_shutting_down_) {
456       return Undefined();
457     }
458 
459     // a. If preventAbort is false, shutdown with an action of !
460     //    WritableStreamAbort(dest, source.[[storedError]]) and with
461     //    source.[[storedError]].
462     DCHECK(error->SameValue(
463         Readable()->GetStoredError(script_state_->GetIsolate())));
464     if (!pipe_options_->PreventAbort()) {
465       ShutdownWithAction(&PipeToEngine::WritableStreamAbortAction, error);
466     } else {
467       // b. Otherwise, shutdown with source.[[storedError]].
468       Shutdown(error);
469     }
470     return Undefined();
471   }
472 
473   // 2. Errors must be propagated backward: if dest.[[state]] is or becomes
474   //    "errored", then
WritableError(v8::Local<v8::Value> error)475   v8::Local<v8::Value> WritableError(v8::Local<v8::Value> error) {
476     // This function can be called during shutdown when the lock is released.
477     // Exit early in that case.
478     if (is_shutting_down_) {
479       return Undefined();
480     }
481 
482     // a. If preventCancel is false, shutdown with an action of !
483     //    ReadableStreamCancel(source, dest.[[storedError]]) and with
484     //    dest.[[storedError]].
485     DCHECK(error->SameValue(
486         Destination()->GetStoredError(script_state_->GetIsolate())));
487     if (!pipe_options_->PreventCancel()) {
488       ShutdownWithAction(&PipeToEngine::ReadableStreamCancelAction, error);
489     } else {
490       // b. Otherwise, shutdown with dest.[[storedError]].
491       Shutdown(error);
492     }
493     return Undefined();
494   }
495 
496   // 3. Closing must be propagated forward: if source.[[state]] is or
497   //    becomes "closed", then
ReadableClosed()498   void ReadableClosed() {
499     // a. If preventClose is false, shutdown with an action of !
500     //    WritableStreamDefaultWriterCloseWithErrorPropagation(writer).
501     if (!pipe_options_->PreventClose()) {
502       ShutdownWithAction(
503           &PipeToEngine::
504               WritableStreamDefaultWriterCloseWithErrorPropagationAction,
505           v8::MaybeLocal<v8::Value>());
506     } else {
507       // b. Otherwise, shutdown.
508       Shutdown(v8::MaybeLocal<v8::Value>());
509     }
510   }
511 
512   // 4. Closing must be propagated backward: if !
513   //    WritableStreamCloseQueuedOrInFlight(dest) is true or dest.[[state]] is
514   //    "closed", then
WritableStartedClosed()515   void WritableStartedClosed() {
516     // a. Assert: no chunks have been read or written.
517     // This is trivially true because this method is only called from
518     // CheckInitialState().
519 
520     // b. Let destClosed be a new TypeError.
521     const auto dest_closed = v8::Exception::TypeError(
522         V8String(script_state_->GetIsolate(), "Destination stream closed"));
523 
524     // c. If preventCancel is false, shutdown with an action of !
525     //    ReadableStreamCancel(source, destClosed) and with destClosed.
526     if (!pipe_options_->PreventCancel()) {
527       ShutdownWithAction(&PipeToEngine::ReadableStreamCancelAction,
528                          dest_closed);
529     } else {
530       // d. Otherwise, shutdown with destClosed.
531       Shutdown(dest_closed);
532     }
533   }
534 
535   // * Shutdown with an action: if any of the above requirements ask to shutdown
536   //   with an action |action|, optionally with an error |originalError|, then:
ShutdownWithAction(Action action,v8::MaybeLocal<v8::Value> original_error)537   void ShutdownWithAction(Action action,
538                           v8::MaybeLocal<v8::Value> original_error) {
539     // a. If shuttingDown is true, abort these substeps.
540     if (is_shutting_down_) {
541       return;
542     }
543 
544     // b. Set shuttingDown to true.
545     is_shutting_down_ = true;
546 
547     // Store the action in case we need to call it asynchronously. This is safe
548     // because the |is_shutting_down_| guard flag ensures that we can only reach
549     // this assignment once.
550     shutdown_action_ = action;
551 
552     // Store |original_error| as |shutdown_error_| if it was supplied.
553     v8::Local<v8::Value> original_error_local;
554     if (original_error.ToLocal(&original_error_local)) {
555       shutdown_error_.Set(script_state_->GetIsolate(), original_error_local);
556     }
557     v8::Local<v8::Promise> p;
558 
559     // c. If dest.[[state]] is "writable" and !
560     //    WritableStreamCloseQueuedOrInFlight(dest) is false,
561     if (ShouldWriteQueuedChunks()) {
562       //  i. If any chunks have been read but not yet written, write them to
563       //     dest.
564       // ii. Wait until every chunk that has been read has been written
565       //     (i.e. the corresponding promises have settled).
566       p = ThenPromise(WriteQueuedChunks(), &PipeToEngine::InvokeShutdownAction);
567     } else {
568       // d. Let p be the result of performing action.
569       p = InvokeShutdownAction();
570     }
571 
572     // e. Upon fulfillment of p, finalize, passing along originalError if it
573     //    was given.
574     // f. Upon rejection of p with reason newError, finalize with newError.
575     ThenPromise(p, &PipeToEngine::FinalizeWithOriginalErrorIfSet,
576                 &PipeToEngine::FinalizeWithNewError);
577   }
578 
579   // * Shutdown: if any of the above requirements or steps ask to shutdown,
580   //   optionally with an error error, then:
Shutdown(v8::MaybeLocal<v8::Value> error_maybe)581   void Shutdown(v8::MaybeLocal<v8::Value> error_maybe) {
582     // a. If shuttingDown is true, abort these substeps.
583     if (is_shutting_down_) {
584       return;
585     }
586 
587     // b. Set shuttingDown to true.
588     is_shutting_down_ = true;
589 
590     // c. If dest.[[state]] is "writable" and !
591     //    WritableStreamCloseQueuedOrInFlight(dest) is false,
592     if (ShouldWriteQueuedChunks()) {
593       // Need to stash the value of |error_maybe| since we are calling
594       // Finalize() asynchronously.
595       v8::Local<v8::Value> error;
596       if (error_maybe.ToLocal(&error)) {
597         shutdown_error_.Set(script_state_->GetIsolate(), error);
598       }
599 
600       //  i. If any chunks have been read but not yet written, write them to
601       //     dest.
602       // ii. Wait until every chunk that has been read has been written
603       //     (i.e. the corresponding promises have settled).
604       // d. Finalize, passing along error if it was given.
605       ThenPromise(WriteQueuedChunks(),
606                   &PipeToEngine::FinalizeWithOriginalErrorIfSet);
607     } else {
608       // d. Finalize, passing along error if it was given.
609       Finalize(error_maybe);
610     }
611   }
612 
613   // Calls Finalize(), using the stored shutdown error rather than the value
614   // that was passed.
FinalizeWithOriginalErrorIfSet(v8::Local<v8::Value>)615   v8::Local<v8::Value> FinalizeWithOriginalErrorIfSet(v8::Local<v8::Value>) {
616     v8::MaybeLocal<v8::Value> error_maybe;
617     if (!shutdown_error_.IsEmpty()) {
618       error_maybe = shutdown_error_.NewLocal(script_state_->GetIsolate());
619     }
620     Finalize(error_maybe);
621     return Undefined();
622   }
623 
624   // Calls Finalize(), using the value that was passed as the error.
FinalizeWithNewError(v8::Local<v8::Value> new_error)625   v8::Local<v8::Value> FinalizeWithNewError(v8::Local<v8::Value> new_error) {
626     Finalize(new_error);
627     return Undefined();
628   }
629 
630   // * Finalize: both forms of shutdown will eventually ask to finalize,
631   //   optionally with an error error, which means to perform the following
632   //   steps:
Finalize(v8::MaybeLocal<v8::Value> error_maybe)633   void Finalize(v8::MaybeLocal<v8::Value> error_maybe) {
634     // a. Perform ! WritableStreamDefaultWriterRelease(writer).
635     WritableStreamDefaultWriter::Release(script_state_, writer_);
636 
637     // b. Perform ! ReadableStreamReaderGenericRelease(reader).
638     ReadableStreamReader::GenericRelease(script_state_, reader_);
639 
640     // TODO(ricea): Implement signal.
641     // c. If signal is not undefined, remove abortAlgorithm from signal.
642 
643     v8::Local<v8::Value> error;
644     if (error_maybe.ToLocal(&error)) {
645       // d. If error was given, reject promise with error.
646       promise_->Reject(script_state_, error);
647     } else {
648       // e. Otherwise, resolve promise with undefined.
649       promise_->ResolveWithUndefined(script_state_);
650     }
651   }
652 
ShouldWriteQueuedChunks() const653   bool ShouldWriteQueuedChunks() const {
654     // "If dest.[[state]] is "writable" and !
655     // WritableStreamCloseQueuedOrInFlight(dest) is false"
656     return Destination()->IsWritable() &&
657            !WritableStream::CloseQueuedOrInFlight(Destination());
658   }
659 
WriteQueuedChunks()660   v8::Local<v8::Promise> WriteQueuedChunks() {
661     if (!last_write_.IsEmpty()) {
662       // "Wait until every chunk that has been read has been written (i.e.
663       // the corresponding promises have settled)"
664       // This implies that we behave the same whether the promise fulfills or
665       // rejects. IgnoreErrors() will convert a rejection into a successful
666       // resolution.
667       return ThenPromise(last_write_.NewLocal(script_state_->GetIsolate()),
668                          nullptr, &PipeToEngine::IgnoreErrors);
669     }
670     return PromiseResolveWithUndefined(script_state_);
671   }
672 
IgnoreErrors(v8::Local<v8::Value>)673   v8::Local<v8::Value> IgnoreErrors(v8::Local<v8::Value>) {
674     return Undefined();
675   }
676 
677   // InvokeShutdownAction(), version for calling directly.
InvokeShutdownAction()678   v8::Local<v8::Promise> InvokeShutdownAction() {
679     return (this->*shutdown_action_)();
680   }
681 
682   // InvokeShutdownAction(), version for use as a PromiseReaction.
InvokeShutdownAction(v8::Local<v8::Value>)683   v8::Local<v8::Value> InvokeShutdownAction(v8::Local<v8::Value>) {
684     return InvokeShutdownAction();
685   }
686 
ShutdownError() const687   v8::Local<v8::Value> ShutdownError() const {
688     DCHECK(!shutdown_error_.IsEmpty());
689     return shutdown_error_.NewLocal(script_state_->GetIsolate());
690   }
691 
WritableStreamAbortAction()692   v8::Local<v8::Promise> WritableStreamAbortAction() {
693     return WritableStream::Abort(script_state_, Destination(), ShutdownError());
694   }
695 
ReadableStreamCancelAction()696   v8::Local<v8::Promise> ReadableStreamCancelAction() {
697     return ReadableStream::Cancel(script_state_, Readable(), ShutdownError());
698   }
699 
700   v8::Local<v8::Promise>
WritableStreamDefaultWriterCloseWithErrorPropagationAction()701   WritableStreamDefaultWriterCloseWithErrorPropagationAction() {
702     return WritableStreamDefaultWriter::CloseWithErrorPropagation(script_state_,
703                                                                   writer_);
704   }
705 
706   // Reduces the visual noise when we are returning an undefined value.
Undefined()707   v8::Local<v8::Value> Undefined() {
708     return v8::Undefined(script_state_->GetIsolate());
709   }
710 
Destination()711   WritableStream* Destination() { return writer_->OwnerWritableStream(); }
712 
Destination() const713   const WritableStream* Destination() const {
714     return writer_->OwnerWritableStream();
715   }
716 
Readable()717   ReadableStream* Readable() { return reader_->owner_readable_stream_; }
718 
719   // Performs promise.then(on_fulfilled, on_rejected). It behaves like
720   // StreamPromiseThen(). Only the types are different.
ThenPromise(v8::Local<v8::Promise> promise,PromiseReaction on_fulfilled,PromiseReaction on_rejected=nullptr)721   v8::Local<v8::Promise> ThenPromise(v8::Local<v8::Promise> promise,
722                                      PromiseReaction on_fulfilled,
723                                      PromiseReaction on_rejected = nullptr) {
724     return StreamThenPromise(
725         script_state_->GetContext(), promise,
726         on_fulfilled ? MakeGarbageCollected<WrappedPromiseReaction>(
727                            script_state_, this, on_fulfilled)
728                      : nullptr,
729         on_rejected ? MakeGarbageCollected<WrappedPromiseReaction>(
730                           script_state_, this, on_rejected)
731                     : nullptr);
732   }
733 
734   Member<ScriptState> script_state_;
735   Member<PipeOptions> pipe_options_;
736   Member<ReadableStreamReader> reader_;
737   Member<WritableStreamDefaultWriter> writer_;
738   Member<StreamPromiseResolver> promise_;
739   TraceWrapperV8Reference<v8::Promise> last_write_;
740   Action shutdown_action_;
741   TraceWrapperV8Reference<v8::Value> shutdown_error_;
742   bool is_shutting_down_ = false;
743   bool is_reading_ = false;
744 
745   DISALLOW_COPY_AND_ASSIGN(PipeToEngine);
746 };
747 
748 class ReadableStream::TeeEngine final : public GarbageCollected<TeeEngine> {
749  public:
750   TeeEngine() = default;
751 
752   // Create the streams and start copying data.
753   void Start(ScriptState*, ReadableStream*, ExceptionState&);
754 
755   // Branch1() and Branch2() are null until Start() is called.
Branch1() const756   ReadableStream* Branch1() const { return branch_[0]; }
Branch2() const757   ReadableStream* Branch2() const { return branch_[1]; }
758 
Trace(Visitor * visitor)759   void Trace(Visitor* visitor) {
760     visitor->Trace(stream_);
761     visitor->Trace(reader_);
762     visitor->Trace(reason_[0]);
763     visitor->Trace(reason_[1]);
764     visitor->Trace(branch_[0]);
765     visitor->Trace(branch_[1]);
766     visitor->Trace(controller_[0]);
767     visitor->Trace(controller_[1]);
768     visitor->Trace(cancel_promise_);
769   }
770 
771  private:
772   class PullAlgorithm;
773   class CancelAlgorithm;
774 
775   Member<ReadableStream> stream_;
776   Member<ReadableStreamReader> reader_;
777   Member<StreamPromiseResolver> cancel_promise_;
778   bool closed_ = false;
779 
780   // The standard contains a number of pairs of variables with one for each
781   // stream. These are implemented as arrays here. While they are 1-indexed in
782   // the standard, they are 0-indexed here; ie. "canceled_[0]" here corresponds
783   // to "canceled1" in the standard.
784   bool canceled_[2] = {false, false};
785   TraceWrapperV8Reference<v8::Value> reason_[2];
786   Member<ReadableStream> branch_[2];
787   Member<ReadableStreamDefaultController> controller_[2];
788 
789   DISALLOW_COPY_AND_ASSIGN(TeeEngine);
790 };
791 
792 class ReadableStream::TeeEngine::PullAlgorithm final : public StreamAlgorithm {
793  public:
PullAlgorithm(TeeEngine * engine)794   explicit PullAlgorithm(TeeEngine* engine) : engine_(engine) {}
795 
Run(ScriptState * script_state,int,v8::Local<v8::Value>[])796   v8::Local<v8::Promise> Run(ScriptState* script_state,
797                              int,
798                              v8::Local<v8::Value>[]) override {
799     // https://streams.spec.whatwg.org/#readable-stream-tee
800     // 12. Let pullAlgorithm be the following steps:
801     //   a. Return the result of transforming ! ReadableStreamDefaultReaderRead(
802     //      reader) with a fulfillment handler which takes the argument result
803     //      and performs the following steps:
804     return StreamThenPromise(
805         script_state->GetContext(),
806         ReadableStreamReader::Read(script_state, engine_->reader_)
807             ->V8Promise(script_state->GetIsolate()),
808         MakeGarbageCollected<ResolveFunction>(script_state, engine_));
809   }
810 
Trace(Visitor * visitor)811   void Trace(Visitor* visitor) override {
812     visitor->Trace(engine_);
813     StreamAlgorithm::Trace(visitor);
814   }
815 
816  private:
817   class ResolveFunction final : public PromiseHandler {
818    public:
ResolveFunction(ScriptState * script_state,TeeEngine * engine)819     ResolveFunction(ScriptState* script_state, TeeEngine* engine)
820         : PromiseHandler(script_state), engine_(engine) {}
821 
CallWithLocal(v8::Local<v8::Value> result)822     void CallWithLocal(v8::Local<v8::Value> result) override {
823       //    i. If closed is true, return.
824       if (engine_->closed_) {
825         return;
826       }
827 
828       //   ii. Assert: Type(result) is Object.
829       DCHECK(result->IsObject());
830 
831       auto* script_state = GetScriptState();
832       auto* isolate = script_state->GetIsolate();
833 
834       //  iii. Let done be ! Get(result, "done").
835       //   vi. Let value be ! Get(result, "value").
836       // The precise order of operations is not important here, because |result|
837       // is guaranteed to have own properties of "value" and "done" and so the
838       // "Get" operations cannot have side-effects.
839       v8::Local<v8::Value> value;
840       bool done = false;
841       bool unpack_succeeded =
842           V8UnpackIteratorResult(script_state, result.As<v8::Object>(), &done)
843               .ToLocal(&value);
844       CHECK(unpack_succeeded);
845 
846       //   vi. Assert: Type(done) is Boolean.
847       //    v. If done is true,
848       if (done) {
849         //    1. If canceled1 is false,
850         //        a. Perform ! ReadableStreamDefaultControllerClose(branch1.
851         //           [[readableStreamController]]).
852         //    2. If canceled2 is false,
853         //        b. Perform ! ReadableStreamDefaultControllerClose(branch2.
854         //           [[readableStreamController]]).
855         for (int branch = 0; branch < 2; ++branch) {
856           if (!engine_->canceled_[branch] &&
857               ReadableStreamDefaultController::CanCloseOrEnqueue(
858                   engine_->controller_[branch])) {
859             ReadableStreamDefaultController::Close(
860                 script_state, engine_->controller_[branch]);
861           }
862         }
863         //    3. Set closed to true.
864         engine_->closed_ = true;
865 
866         //    4. Return.
867         return;
868       }
869       ExceptionState exception_state(isolate, ExceptionState::kUnknownContext,
870                                      "", "");
871       //  vii. Let value1 and value2 be value.
872       // viii. If canceled2 is false and cloneForBranch2 is true, set value2 to
873       //       ? StructuredDeserialize(? StructuredSerialize(value2), the
874       //       current Realm Record).
875       // TODO(ricea): Support cloneForBranch2
876 
877       //   ix. If canceled1 is false, perform ?
878       //       ReadableStreamDefaultControllerEnqueue(branch1.
879       //       [[readableStreamController]], value1).
880       //    x. If canceled2 is false, perform ?
881       //       ReadableStreamDefaultControllerEnqueue(branch2.
882       //       [[readableStreamController]], value2).
883       for (int branch = 0; branch < 2; ++branch) {
884         if (!engine_->canceled_[branch] &&
885             ReadableStreamDefaultController::CanCloseOrEnqueue(
886                 engine_->controller_[branch])) {
887           ReadableStreamDefaultController::Enqueue(script_state,
888                                                    engine_->controller_[branch],
889                                                    value, exception_state);
890           if (exception_state.HadException()) {
891             // Instead of returning a rejection, which is inconvenient here,
892             // call ControllerError(). The only difference this makes is that
893             // it happens synchronously, but that should not be observable.
894             ReadableStreamDefaultController::Error(
895                 script_state, engine_->controller_[branch],
896                 exception_state.GetException());
897             exception_state.ClearException();
898             return;
899           }
900         }
901       }
902     }
903 
Trace(Visitor * visitor)904     void Trace(Visitor* visitor) override {
905       visitor->Trace(engine_);
906       PromiseHandler::Trace(visitor);
907     }
908 
909    private:
910     Member<TeeEngine> engine_;
911   };
912 
913   Member<TeeEngine> engine_;
914 };
915 
916 class ReadableStream::TeeEngine::CancelAlgorithm final
917     : public StreamAlgorithm {
918  public:
CancelAlgorithm(TeeEngine * engine,int branch)919   CancelAlgorithm(TeeEngine* engine, int branch)
920       : engine_(engine), branch_(branch) {
921     DCHECK(branch == 0 || branch == 1);
922   }
923 
Run(ScriptState * script_state,int argc,v8::Local<v8::Value> argv[])924   v8::Local<v8::Promise> Run(ScriptState* script_state,
925                              int argc,
926                              v8::Local<v8::Value> argv[]) override {
927     // https://streams.spec.whatwg.org/#readable-stream-tee
928     // This implements both cancel1Algorithm and cancel2Algorithm as they are
929     // identical except for the index they operate on. Standard comments are
930     // from cancel1Algorithm.
931     // 13. Let cancel1Algorithm be the following steps, taking a reason
932     //     argument:
933     auto* isolate = script_state->GetIsolate();
934 
935     // a. Set canceled1 to true.
936     engine_->canceled_[branch_] = true;
937     DCHECK_EQ(argc, 1);
938 
939     // b. Set reason1 to reason.
940     engine_->reason_[branch_].Set(isolate, argv[0]);
941 
942     const int other_branch = 1 - branch_;
943 
944     // c. If canceled2 is true,
945     if (engine_->canceled_[other_branch]) {
946       // i. Let compositeReason be ! CreateArrayFromList(« reason1, reason2 »).
947       v8::Local<v8::Value> reason[] = {engine_->reason_[0].NewLocal(isolate),
948                                        engine_->reason_[1].NewLocal(isolate)};
949       v8::Local<v8::Value> composite_reason =
950           v8::Array::New(script_state->GetIsolate(), reason, 2);
951 
952       // ii. Let cancelResult be ! ReadableStreamCancel(stream,
953       //    compositeReason).
954       auto cancel_result = ReadableStream::Cancel(
955           script_state, engine_->stream_, composite_reason);
956 
957       // iii. Resolve cancelPromise with cancelResult.
958       engine_->cancel_promise_->Resolve(script_state, cancel_result);
959     }
960     return engine_->cancel_promise_->V8Promise(isolate);
961   }
962 
Trace(Visitor * visitor)963   void Trace(Visitor* visitor) override {
964     visitor->Trace(engine_);
965     StreamAlgorithm::Trace(visitor);
966   }
967 
968  private:
969   Member<TeeEngine> engine_;
970   const int branch_;
971 };
972 
Start(ScriptState * script_state,ReadableStream * stream,ExceptionState & exception_state)973 void ReadableStream::TeeEngine::Start(ScriptState* script_state,
974                                       ReadableStream* stream,
975                                       ExceptionState& exception_state) {
976   // https://streams.spec.whatwg.org/#readable-stream-tee
977   //  1. Assert: ! IsReadableStream(stream) is true.
978   DCHECK(stream);
979 
980   // TODO(ricea):  2. Assert: Type(cloneForBranch2) is Boolean.
981 
982   stream_ = stream;
983 
984   // 3. Let reader be ? AcquireReadableStreamDefaultReader(stream).
985   reader_ = ReadableStream::AcquireDefaultReader(script_state, stream, false,
986                                                  exception_state);
987   if (exception_state.HadException()) {
988     return;
989   }
990 
991   // These steps are performed by the constructor:
992   //  4. Let closed be false.
993   DCHECK(!closed_);
994 
995   //  5. Let canceled1 be false.
996   DCHECK(!canceled_[0]);
997 
998   //  6. Let canceled2 be false.
999   DCHECK(!canceled_[1]);
1000 
1001   //  7. Let reason1 be undefined.
1002   DCHECK(reason_[0].IsEmpty());
1003 
1004   //  8. Let reason2 be undefined.
1005   DCHECK(reason_[1].IsEmpty());
1006 
1007   //  9. Let branch1 be undefined.
1008   DCHECK(!branch_[0]);
1009 
1010   // 10. Let branch2 be undefined.
1011   DCHECK(!branch_[1]);
1012 
1013   // 11. Let cancelPromise be a new promise.
1014   cancel_promise_ = MakeGarbageCollected<StreamPromiseResolver>(script_state);
1015 
1016   // 12. Let pullAlgorithm be the following steps:
1017   // (steps are defined in PullAlgorithm::Run()).
1018   auto* pull_algorithm = MakeGarbageCollected<PullAlgorithm>(this);
1019 
1020   // 13. Let cancel1Algorithm be the following steps, taking a reason argument:
1021   // (see CancelAlgorithm::Run()).
1022   auto* cancel1_algorithm = MakeGarbageCollected<CancelAlgorithm>(this, 0);
1023 
1024   // 14. Let cancel2Algorithm be the following steps, taking a reason argument:
1025   // (both algorithms share a single implementation).
1026   auto* cancel2_algorithm = MakeGarbageCollected<CancelAlgorithm>(this, 1);
1027 
1028   // 15. Let startAlgorithm be an algorithm that returns undefined.
1029   auto* start_algorithm = CreateTrivialStartAlgorithm();
1030 
1031   auto* size_algorithm = CreateDefaultSizeAlgorithm();
1032 
1033   // 16. Set branch1 to ! CreateReadableStream(startAlgorithm, pullAlgorithm,
1034   //   cancel1Algorithm).
1035   branch_[0] = ReadableStream::Create(script_state, start_algorithm,
1036                                       pull_algorithm, cancel1_algorithm, 1.0,
1037                                       size_algorithm, exception_state);
1038   if (exception_state.HadException()) {
1039     return;
1040   }
1041 
1042   // 17. Set branch2 to ! CreateReadableStream(startAlgorithm, pullAlgorithm,
1043   //   cancel2Algorithm).
1044   branch_[1] = ReadableStream::Create(script_state, start_algorithm,
1045                                       pull_algorithm, cancel2_algorithm, 1.0,
1046                                       size_algorithm, exception_state);
1047   if (exception_state.HadException()) {
1048     return;
1049   }
1050 
1051   for (int branch = 0; branch < 2; ++branch) {
1052     controller_[branch] = branch_[branch]->readable_stream_controller_;
1053   }
1054 
1055   class RejectFunction final : public PromiseHandler {
1056    public:
1057     RejectFunction(ScriptState* script_state, TeeEngine* engine)
1058         : PromiseHandler(script_state), engine_(engine) {}
1059 
1060     void CallWithLocal(v8::Local<v8::Value> r) override {
1061       // 18. Upon rejection of reader.[[closedPromise]] with reason r,
1062       //   a. Perform ! ReadableStreamDefaultControllerError(branch1.
1063       //      [[readableStreamController]], r).
1064       ReadableStreamDefaultController::Error(GetScriptState(),
1065                                              engine_->controller_[0], r);
1066 
1067       //   b. Perform ! ReadableStreamDefaultControllerError(branch2.
1068       //      [[readableStreamController]], r).
1069       ReadableStreamDefaultController::Error(GetScriptState(),
1070                                              engine_->controller_[1], r);
1071     }
1072 
1073     void Trace(Visitor* visitor) override {
1074       visitor->Trace(engine_);
1075       PromiseHandler::Trace(visitor);
1076     }
1077 
1078    private:
1079     Member<TeeEngine> engine_;
1080   };
1081 
1082   // 18. Upon rejection of reader.[[closedPromise]] with reason r,
1083   StreamThenPromise(
1084       script_state->GetContext(),
1085       reader_->closed_promise_->V8Promise(script_state->GetIsolate()), nullptr,
1086       MakeGarbageCollected<RejectFunction>(script_state, this));
1087 
1088   // Step "19. Return « branch1, branch2 »."
1089   // is performed by the caller.
1090 }
1091 
Create(ScriptState * script_state,ExceptionState & exception_state)1092 ReadableStream* ReadableStream::Create(ScriptState* script_state,
1093                                        ExceptionState& exception_state) {
1094   return Create(script_state,
1095                 ScriptValue(script_state->GetIsolate(),
1096                             v8::Undefined(script_state->GetIsolate())),
1097                 ScriptValue(script_state->GetIsolate(),
1098                             v8::Undefined(script_state->GetIsolate())),
1099                 exception_state);
1100 }
1101 
Create(ScriptState * script_state,ScriptValue underlying_source,ExceptionState & exception_state)1102 ReadableStream* ReadableStream::Create(ScriptState* script_state,
1103                                        ScriptValue underlying_source,
1104                                        ExceptionState& exception_state) {
1105   return Create(script_state, underlying_source,
1106                 ScriptValue(script_state->GetIsolate(),
1107                             v8::Undefined(script_state->GetIsolate())),
1108                 exception_state);
1109 }
1110 
Create(ScriptState * script_state,ScriptValue underlying_source,ScriptValue strategy,ExceptionState & exception_state)1111 ReadableStream* ReadableStream::Create(ScriptState* script_state,
1112                                        ScriptValue underlying_source,
1113                                        ScriptValue strategy,
1114                                        ExceptionState& exception_state) {
1115   auto* stream = MakeGarbageCollected<ReadableStream>();
1116   stream->InitInternal(script_state, underlying_source, strategy, false,
1117                        exception_state);
1118   if (exception_state.HadException()) {
1119     return nullptr;
1120   }
1121 
1122   return stream;
1123 }
1124 
CreateWithCountQueueingStrategy(ScriptState * script_state,UnderlyingSourceBase * underlying_source,size_t high_water_mark)1125 ReadableStream* ReadableStream::CreateWithCountQueueingStrategy(
1126     ScriptState* script_state,
1127     UnderlyingSourceBase* underlying_source,
1128     size_t high_water_mark) {
1129   auto* isolate = script_state->GetIsolate();
1130 
1131   // It's safer to use a workalike rather than a real CountQueuingStrategy
1132   // object. We use the default "size" function as it is implemented in C++ and
1133   // so much faster than calling into JavaScript. Since the create object has a
1134   // null prototype, there is no danger of us finding some other "size" function
1135   // via the prototype chain.
1136   v8::Local<v8::Name> high_water_mark_string =
1137       V8AtomicString(isolate, "highWaterMark");
1138   v8::Local<v8::Value> high_water_mark_value =
1139       v8::Number::New(isolate, high_water_mark);
1140 
1141   auto strategy_object =
1142       v8::Object::New(isolate, v8::Null(isolate), &high_water_mark_string,
1143                       &high_water_mark_value, 1);
1144 
1145   ExceptionState exception_state(script_state->GetIsolate(),
1146                                  ExceptionState::kConstructionContext,
1147                                  "ReadableStream");
1148 
1149   v8::Local<v8::Value> underlying_source_v8 =
1150       ToV8(underlying_source, script_state);
1151 
1152   auto* stream = MakeGarbageCollected<ReadableStream>();
1153   stream->InitInternal(
1154       script_state,
1155       ScriptValue(script_state->GetIsolate(), underlying_source_v8),
1156       ScriptValue(script_state->GetIsolate(), strategy_object), true,
1157       exception_state);
1158 
1159   if (exception_state.HadException()) {
1160     exception_state.ClearException();
1161     DLOG(WARNING)
1162         << "Ignoring an exception in CreateWithCountQueuingStrategy().";
1163   }
1164 
1165   return stream;
1166 }
1167 
Create(ScriptState * script_state,StreamStartAlgorithm * start_algorithm,StreamAlgorithm * pull_algorithm,StreamAlgorithm * cancel_algorithm,double high_water_mark,StrategySizeAlgorithm * size_algorithm,ExceptionState & exception_state)1168 ReadableStream* ReadableStream::Create(ScriptState* script_state,
1169                                        StreamStartAlgorithm* start_algorithm,
1170                                        StreamAlgorithm* pull_algorithm,
1171                                        StreamAlgorithm* cancel_algorithm,
1172                                        double high_water_mark,
1173                                        StrategySizeAlgorithm* size_algorithm,
1174                                        ExceptionState& exception_state) {
1175   // https://streams.spec.whatwg.org/#create-readable-stream
1176   // All arguments are compulsory in this implementation, so the first two steps
1177   // are skipped:
1178   // 1. If highWaterMark was not passed, set it to 1.
1179   // 2. If sizeAlgorithm was not passed, set it to an algorithm that returns 1.
1180 
1181   // 3. Assert: ! IsNonNegativeNumber(highWaterMark) is true.
1182   DCHECK_GE(high_water_mark, 0);
1183 
1184   // 4. Let stream be ObjectCreate(the original value of ReadableStream's
1185   //    prototype property).
1186   auto* stream = MakeGarbageCollected<ReadableStream>();
1187 
1188   // 5. Perform ! InitializeReadableStream(stream).
1189   Initialize(stream);
1190 
1191   // 6. Let controller be ObjectCreate(the original value of
1192   //    ReadableStreamDefaultController's prototype property).
1193   auto* controller = MakeGarbageCollected<ReadableStreamDefaultController>();
1194 
1195   // 7. Perform ? SetUpReadableStreamDefaultController(stream, controller,
1196   //    startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark,
1197   //    sizeAlgorithm).
1198   ReadableStreamDefaultController::SetUp(
1199       script_state, stream, controller, start_algorithm, pull_algorithm,
1200       cancel_algorithm, high_water_mark, size_algorithm, exception_state);
1201   if (exception_state.HadException()) {
1202     return nullptr;
1203   }
1204 
1205   // 8. Return stream.
1206   return stream;
1207 }
1208 
1209 ReadableStream::ReadableStream() = default;
1210 
1211 ReadableStream::~ReadableStream() = default;
1212 
locked(ScriptState * script_state,ExceptionState & exception_state) const1213 bool ReadableStream::locked(ScriptState* script_state,
1214                             ExceptionState& exception_state) const {
1215   // https://streams.spec.whatwg.org/#rs-locked
1216   // 2. Return ! IsReadableStreamLocked(this).
1217   return IsLocked(this);
1218 }
1219 
cancel(ScriptState * script_state,ExceptionState & exception_state)1220 ScriptPromise ReadableStream::cancel(ScriptState* script_state,
1221                                      ExceptionState& exception_state) {
1222   return cancel(script_state,
1223                 ScriptValue(script_state->GetIsolate(),
1224                             v8::Undefined(script_state->GetIsolate())),
1225                 exception_state);
1226 }
1227 
cancel(ScriptState * script_state,ScriptValue reason,ExceptionState & exception_state)1228 ScriptPromise ReadableStream::cancel(ScriptState* script_state,
1229                                      ScriptValue reason,
1230                                      ExceptionState& exception_state) {
1231   // https://streams.spec.whatwg.org/#rs-cancel
1232   // 2. If ! IsReadableStreamLocked(this) is true, return a promise rejected
1233   //    with a TypeError exception.
1234   if (IsLocked(this)) {
1235     exception_state.ThrowTypeError("Cannot cancel a locked stream");
1236     return ScriptPromise();
1237   }
1238 
1239   // 3. Return ! ReadableStreamCancel(this, reason).
1240   v8::Local<v8::Promise> result = Cancel(script_state, this, reason.V8Value());
1241   return ScriptPromise(script_state, result);
1242 }
1243 
getReader(ScriptState * script_state,ExceptionState & exception_state)1244 ReadableStreamDefaultReader* ReadableStream::getReader(
1245     ScriptState* script_state,
1246     ExceptionState& exception_state) {
1247   // https://streams.spec.whatwg.org/#rs-get-reader
1248   // 2. If mode is undefined, return ? AcquireReadableStreamDefaultReader(this,
1249   //    true).
1250   return AcquireDefaultReader(script_state, this, true, exception_state);
1251 }
1252 
getReader(ScriptState * script_state,ScriptValue options,ExceptionState & exception_state)1253 ReadableStreamDefaultReader* ReadableStream::getReader(
1254     ScriptState* script_state,
1255     ScriptValue options,
1256     ExceptionState& exception_state) {
1257   // https://streams.spec.whatwg.org/#rs-get-reader
1258   // Since we don't support byob readers, the only thing
1259   // GetReaderValidateOptions() needs to do is throw an exception if
1260   // |options.mode| is invalid.
1261   GetReaderValidateOptions(script_state, options, exception_state);
1262   if (exception_state.HadException()) {
1263     return nullptr;
1264   }
1265 
1266   return getReader(script_state, exception_state);
1267 }
1268 
pipeThrough(ScriptState * script_state,ScriptValue transform_stream,ExceptionState & exception_state)1269 ScriptValue ReadableStream::pipeThrough(ScriptState* script_state,
1270                                         ScriptValue transform_stream,
1271                                         ExceptionState& exception_state) {
1272   return pipeThrough(script_state, transform_stream,
1273                      ScriptValue(script_state->GetIsolate(),
1274                                  v8::Undefined(script_state->GetIsolate())),
1275                      exception_state);
1276 }
1277 
1278 // https://streams.spec.whatwg.org/#rs-pipe-through
pipeThrough(ScriptState * script_state,ScriptValue transform_stream,ScriptValue options,ExceptionState & exception_state)1279 ScriptValue ReadableStream::pipeThrough(ScriptState* script_state,
1280                                         ScriptValue transform_stream,
1281                                         ScriptValue options,
1282                                         ExceptionState& exception_state) {
1283   // https://streams.spec.whatwg.org/#rs-pipe-through
1284   // The first part of this function implements the unpacking of the {readable,
1285   // writable} argument to the method.
1286   v8::Local<v8::Value> pair_value = transform_stream.V8Value();
1287   v8::Local<v8::Context> context = script_state->GetContext();
1288 
1289   constexpr char kWritableIsNotWritableStream[] =
1290       "parameter 1's 'writable' property is not a WritableStream.";
1291   constexpr char kReadableIsNotReadableStream[] =
1292       "parameter 1's 'readable' property is not a ReadableStream.";
1293   constexpr char kWritableIsLocked[] = "parameter 1's 'writable' is locked.";
1294 
1295   v8::Local<v8::Object> pair;
1296   if (!pair_value->ToObject(context).ToLocal(&pair)) {
1297     exception_state.ThrowTypeError(kWritableIsNotWritableStream);
1298     return ScriptValue();
1299   }
1300 
1301   v8::Isolate* isolate = script_state->GetIsolate();
1302   v8::Local<v8::Value> writable, readable;
1303   {
1304     v8::TryCatch block(isolate);
1305     if (!pair->Get(context, V8String(isolate, "writable")).ToLocal(&writable)) {
1306       exception_state.RethrowV8Exception(block.Exception());
1307       return ScriptValue();
1308     }
1309     DCHECK(!block.HasCaught());
1310 
1311     if (!pair->Get(context, V8String(isolate, "readable")).ToLocal(&readable)) {
1312       exception_state.RethrowV8Exception(block.Exception());
1313       return ScriptValue();
1314     }
1315     DCHECK(!block.HasCaught());
1316   }
1317 
1318   // 2. If ! IsWritableStream(_writable_) is *false*, throw a *TypeError*
1319   //    exception.
1320   WritableStream* writable_stream =
1321       V8WritableStream::ToImplWithTypeCheck(isolate, writable);
1322   if (!writable_stream) {
1323     exception_state.ThrowTypeError(kWritableIsNotWritableStream);
1324     return ScriptValue();
1325   }
1326 
1327   // 3. If ! IsReadableStream(_readable_) is *false*, throw a *TypeError*
1328   //    exception.
1329   if (!V8ReadableStream::HasInstance(readable, isolate)) {
1330     exception_state.ThrowTypeError(kReadableIsNotReadableStream);
1331     return ScriptValue();
1332   }
1333 
1334   // 4. Set preventClose to ! ToBoolean(preventClose), set preventAbort to !
1335   //    ToBoolean(preventAbort), and set preventCancel to !
1336   //    ToBoolean(preventCancel).
1337 
1338   // 5. If signal is not undefined, and signal is not an instance of the
1339   //    AbortSignal interface, throw a TypeError exception.
1340   auto* pipe_options =
1341       MakeGarbageCollected<PipeOptions>(script_state, options, exception_state);
1342   if (exception_state.HadException()) {
1343     return ScriptValue();
1344   }
1345 
1346   // 6. If ! IsReadableStreamLocked(*this*) is *true*, throw a *TypeError*
1347   //    exception.
1348   if (IsLocked(this)) {
1349     exception_state.ThrowTypeError("Cannot pipe a locked stream");
1350     return ScriptValue();
1351   }
1352 
1353   // 7. If ! IsWritableStreamLocked(_writable_) is *true*, throw a *TypeError*
1354   //    exception.
1355   if (WritableStream::IsLocked(writable_stream)) {
1356     exception_state.ThrowTypeError(kWritableIsLocked);
1357     return ScriptValue();
1358   }
1359 
1360   // 8. Let _promise_ be ! ReadableStreamPipeTo(*this*, _writable_,
1361   //    _preventClose_, _preventAbort_, _preventCancel_,
1362   //   _signal_).
1363 
1364   ScriptPromise promise =
1365       PipeTo(script_state, this, writable_stream, pipe_options);
1366 
1367   // 9. Set _promise_.[[PromiseIsHandled]] to *true*.
1368   promise.MarkAsHandled();
1369 
1370   // 10. Return _readable_.
1371   return ScriptValue(script_state->GetIsolate(), readable);
1372 }
1373 
pipeTo(ScriptState * script_state,ScriptValue destination,ExceptionState & exception_state)1374 ScriptPromise ReadableStream::pipeTo(ScriptState* script_state,
1375                                      ScriptValue destination,
1376                                      ExceptionState& exception_state) {
1377   return pipeTo(script_state, destination,
1378                 ScriptValue(script_state->GetIsolate(),
1379                             v8::Undefined(script_state->GetIsolate())),
1380                 exception_state);
1381 }
1382 
pipeTo(ScriptState * script_state,ScriptValue destination_value,ScriptValue options,ExceptionState & exception_state)1383 ScriptPromise ReadableStream::pipeTo(ScriptState* script_state,
1384                                      ScriptValue destination_value,
1385                                      ScriptValue options,
1386                                      ExceptionState& exception_state) {
1387   // https://streams.spec.whatwg.org/#rs-pipe-to
1388   // 2. If ! IsWritableStream(dest) is false, return a promise rejected with a
1389   //    TypeError exception.
1390   // TODO(ricea): Do this in the IDL instead.
1391   WritableStream* destination = V8WritableStream::ToImplWithTypeCheck(
1392       script_state->GetIsolate(), destination_value.V8Value());
1393 
1394   if (!destination) {
1395     exception_state.ThrowTypeError("Illegal invocation");
1396     return ScriptPromise();
1397   }
1398 
1399   // 3. Set preventClose to ! ToBoolean(preventClose), set preventAbort to !
1400   //    ToBoolean(preventAbort), and set preventCancel to !
1401   //    ToBoolean(preventCancel).
1402   // 4. If signal is not undefined, and signal is not an instance of the
1403   //    AbortSignal interface, return a promise rejected with a TypeError
1404   //    exception.
1405   auto* pipe_options =
1406       MakeGarbageCollected<PipeOptions>(script_state, options, exception_state);
1407   if (exception_state.HadException()) {
1408     return ScriptPromise();
1409   }
1410 
1411   // 5. If ! IsReadableStreamLocked(this) is true, return a promise rejected
1412   // with a TypeError exception.
1413   if (IsLocked(this)) {
1414     exception_state.ThrowTypeError("Cannot pipe a locked stream");
1415     return ScriptPromise();
1416   }
1417 
1418   // 6. If ! IsWritableStreamLocked(dest) is true, return a promise rejected
1419   // with a TypeError exception.
1420   if (WritableStream::IsLocked(destination)) {
1421     exception_state.ThrowTypeError("Cannot pipe to a locked stream");
1422     return ScriptPromise();
1423   }
1424 
1425   return PipeTo(script_state, this, destination, pipe_options);
1426 }
1427 
tee(ScriptState * script_state,ExceptionState & exception_state)1428 ScriptValue ReadableStream::tee(ScriptState* script_state,
1429                                 ExceptionState& exception_state) {
1430   return CallTeeAndReturnBranchArray(script_state, this, exception_state);
1431 }
1432 
1433 // Unlike in the standard, this is defined as a separate method from the
1434 // constructor. This prevents problems when garbage collection happens
1435 // re-entrantly during construction.
InitInternal(ScriptState * script_state,ScriptValue raw_underlying_source,ScriptValue raw_strategy,bool created_by_ua,ExceptionState & exception_state)1436 void ReadableStream::InitInternal(ScriptState* script_state,
1437                                   ScriptValue raw_underlying_source,
1438                                   ScriptValue raw_strategy,
1439                                   bool created_by_ua,
1440                                   ExceptionState& exception_state) {
1441   if (!created_by_ua) {
1442     // TODO(ricea): Move this to IDL once blink::ReadableStreamOperations is
1443     // no longer using the public constructor.
1444     UseCounter::Count(ExecutionContext::From(script_state),
1445                       WebFeature::kReadableStreamConstructor);
1446   }
1447 
1448   // https://streams.spec.whatwg.org/#rs-constructor
1449   //  1. Perform ! InitializeReadableStream(this).
1450   Initialize(this);
1451 
1452   // The next part of this constructor corresponds to the object conversions
1453   // that are implicit in the definition in the standard.
1454   DCHECK(!raw_underlying_source.IsEmpty());
1455   DCHECK(!raw_strategy.IsEmpty());
1456 
1457   auto context = script_state->GetContext();
1458   auto* isolate = script_state->GetIsolate();
1459 
1460   v8::Local<v8::Object> underlying_source;
1461   ScriptValueToObject(script_state, raw_underlying_source, &underlying_source,
1462                       exception_state);
1463   if (exception_state.HadException()) {
1464     return;
1465   }
1466 
1467   // 2. Let size be ? GetV(strategy, "size").
1468   // 3. Let highWaterMark be ? GetV(strategy, "highWaterMark").
1469   StrategyUnpacker strategy_unpacker(script_state, raw_strategy,
1470                                      exception_state);
1471   if (exception_state.HadException()) {
1472     return;
1473   }
1474 
1475   // 4. Let type be ? GetV(underlyingSource, "type").
1476   v8::TryCatch try_catch(isolate);
1477   v8::Local<v8::Value> type;
1478   if (!underlying_source->Get(context, V8AtomicString(isolate, "type"))
1479            .ToLocal(&type)) {
1480     exception_state.RethrowV8Exception(try_catch.Exception());
1481     return;
1482   }
1483 
1484   if (!type->IsUndefined()) {
1485     // 5. Let typeString be ? ToString(type).
1486     v8::Local<v8::String> type_string;
1487     if (!type->ToString(context).ToLocal(&type_string)) {
1488       exception_state.RethrowV8Exception(try_catch.Exception());
1489       return;
1490     }
1491 
1492     // 6. If typeString is "bytes",
1493     if (type_string == V8AtomicString(isolate, "bytes")) {
1494       // TODO(ricea): Implement bytes type.
1495       exception_state.ThrowRangeError("bytes type is not yet implemented");
1496       return;
1497     }
1498 
1499     // 8. Otherwise, throw a RangeError exception.
1500     exception_state.ThrowRangeError("Invalid type is specified");
1501     return;
1502   }
1503 
1504   // 7. Otherwise, if type is undefined,
1505   //   a. Let sizeAlgorithm be ? MakeSizeAlgorithmFromSizeFunction(size).
1506   auto* size_algorithm =
1507       strategy_unpacker.MakeSizeAlgorithm(script_state, exception_state);
1508   if (exception_state.HadException()) {
1509     return;
1510   }
1511   DCHECK(size_algorithm);
1512 
1513   //   b. If highWaterMark is undefined, let highWaterMark be 1.
1514   //   c. Set highWaterMark to ? ValidateAndNormalizeHighWaterMark(
1515   //      highWaterMark).
1516   double high_water_mark =
1517       strategy_unpacker.GetHighWaterMark(script_state, 1, exception_state);
1518   if (exception_state.HadException()) {
1519     return;
1520   }
1521 
1522   // 4. Perform ? SetUpReadableStreamDefaultControllerFromUnderlyingSource
1523   //  (this, underlyingSource, highWaterMark, sizeAlgorithm).
1524   ReadableStreamDefaultController::SetUpFromUnderlyingSource(
1525       script_state, this, underlying_source, high_water_mark, size_algorithm,
1526       exception_state);
1527 }
1528 
1529 //
1530 // Readable stream abstract operations
1531 //
AcquireDefaultReader(ScriptState * script_state,ReadableStream * stream,bool for_author_code,ExceptionState & exception_state)1532 ReadableStreamReader* ReadableStream::AcquireDefaultReader(
1533     ScriptState* script_state,
1534     ReadableStream* stream,
1535     bool for_author_code,
1536     ExceptionState& exception_state) {
1537   // https://streams.spec.whatwg.org/#acquire-readable-stream-reader
1538   // for_author_code is compulsory in this implementation
1539   // 1. If forAuthorCode was not passed, set it to false.
1540 
1541   // 2. Let reader be ? Construct(ReadableStreamDefaultReader, « stream »).
1542   auto* reader = MakeGarbageCollected<ReadableStreamReader>(
1543       script_state, stream, exception_state);
1544   if (exception_state.HadException()) {
1545     return nullptr;
1546   }
1547 
1548   // 3. Set reader.[[forAuthorCode]] to forAuthorCode.
1549   reader->for_author_code_ = for_author_code;
1550 
1551   // 4. Return reader.
1552   return reader;
1553 }
1554 
Initialize(ReadableStream * stream)1555 void ReadableStream::Initialize(ReadableStream* stream) {
1556   // Fields are initialised by the constructor, so we only check that they were
1557   // initialised correctly.
1558   // https://streams.spec.whatwg.org/#initialize-readable-stream
1559   // 1. Set stream.[[state]] to "readable".
1560   CHECK_EQ(stream->state_, kReadable);
1561   // 2. Set stream.[[reader]] and stream.[[storedError]] to undefined.
1562   DCHECK(!stream->reader_);
1563   DCHECK(stream->stored_error_.IsEmpty());
1564   // 3. Set stream.[[disturbed]] to false.
1565   DCHECK(!stream->is_disturbed_);
1566 }
1567 
1568 // TODO(domenic): cloneForBranch2 argument from spec not supported yet
Tee(ScriptState * script_state,ReadableStream ** branch1,ReadableStream ** branch2,ExceptionState & exception_state)1569 void ReadableStream::Tee(ScriptState* script_state,
1570                          ReadableStream** branch1,
1571                          ReadableStream** branch2,
1572                          ExceptionState& exception_state) {
1573   auto* engine = MakeGarbageCollected<TeeEngine>();
1574   engine->Start(script_state, this, exception_state);
1575   if (exception_state.HadException()) {
1576     return;
1577   }
1578 
1579   // Instead of returning a List like ReadableStreamTee in the standard, the
1580   // branches are returned via output parameters.
1581   *branch1 = engine->Branch1();
1582   *branch2 = engine->Branch2();
1583 }
1584 
LockAndDisturb(ScriptState * script_state,ExceptionState & exception_state)1585 void ReadableStream::LockAndDisturb(ScriptState* script_state,
1586                                     ExceptionState& exception_state) {
1587   ScriptState::Scope scope(script_state);
1588 
1589   if (reader_) {
1590     return;
1591   }
1592 
1593   ReadableStreamReader* reader =
1594       AcquireDefaultReader(script_state, this, false, exception_state);
1595   if (!reader) {
1596     return;
1597   }
1598 
1599   is_disturbed_ = true;
1600 }
1601 
Serialize(ScriptState * script_state,MessagePort * port,ExceptionState & exception_state)1602 void ReadableStream::Serialize(ScriptState* script_state,
1603                                MessagePort* port,
1604                                ExceptionState& exception_state) {
1605   if (IsLocked(this)) {
1606     exception_state.ThrowTypeError("Cannot transfer a locked stream");
1607     return;
1608   }
1609 
1610   auto* writable =
1611       CreateCrossRealmTransformWritable(script_state, port, exception_state);
1612   if (exception_state.HadException()) {
1613     return;
1614   }
1615 
1616   auto promise =
1617       PipeTo(script_state, this, writable, MakeGarbageCollected<PipeOptions>());
1618   promise.MarkAsHandled();
1619 }
1620 
Deserialize(ScriptState * script_state,MessagePort * port,ExceptionState & exception_state)1621 ReadableStream* ReadableStream::Deserialize(ScriptState* script_state,
1622                                             MessagePort* port,
1623                                             ExceptionState& exception_state) {
1624   // We need to execute JavaScript to call "Then" on v8::Promises. We will not
1625   // run author code.
1626   v8::Isolate::AllowJavascriptExecutionScope allow_js(
1627       script_state->GetIsolate());
1628   auto* readable =
1629       CreateCrossRealmTransformReadable(script_state, port, exception_state);
1630   if (exception_state.HadException()) {
1631     return nullptr;
1632   }
1633   return readable;
1634 }
1635 
GetReaderNotForAuthorCode(ScriptState * script_state,ExceptionState & exception_state)1636 ReadableStreamDefaultReader* ReadableStream::GetReaderNotForAuthorCode(
1637     ScriptState* script_state,
1638     ExceptionState& exception_state) {
1639   return AcquireDefaultReader(script_state, this, false, exception_state);
1640 }
1641 
PipeTo(ScriptState * script_state,ReadableStream * readable,WritableStream * destination,PipeOptions * pipe_options)1642 ScriptPromise ReadableStream::PipeTo(ScriptState* script_state,
1643                                      ReadableStream* readable,
1644                                      WritableStream* destination,
1645                                      PipeOptions* pipe_options) {
1646   auto* engine = MakeGarbageCollected<PipeToEngine>(script_state, pipe_options);
1647   return engine->Start(readable, destination);
1648 }
1649 
GetStoredError(v8::Isolate * isolate) const1650 v8::Local<v8::Value> ReadableStream::GetStoredError(
1651     v8::Isolate* isolate) const {
1652   return stored_error_.NewLocal(isolate);
1653 }
1654 
Trace(Visitor * visitor)1655 void ReadableStream::Trace(Visitor* visitor) {
1656   visitor->Trace(readable_stream_controller_);
1657   visitor->Trace(reader_);
1658   visitor->Trace(stored_error_);
1659   ScriptWrappable::Trace(visitor);
1660 }
1661 
1662 //
1663 // Abstract Operations Used By Controllers
1664 //
1665 
AddReadRequest(ScriptState * script_state,ReadableStream * stream)1666 StreamPromiseResolver* ReadableStream::AddReadRequest(ScriptState* script_state,
1667                                                       ReadableStream* stream) {
1668   // https://streams.spec.whatwg.org/#readable-stream-add-read-request
1669   // 1. Assert: ! IsReadableStreamDefaultReader(stream.[[reader]]) is true.
1670   DCHECK(stream->reader_);
1671 
1672   // 2. Assert: stream.[[state]] is "readable".
1673   CHECK_EQ(stream->state_, kReadable);
1674 
1675   // 3. Let promise be a new promise.
1676   auto* promise = MakeGarbageCollected<StreamPromiseResolver>(script_state);
1677 
1678   // This implementation stores promises directly in |read_requests_| rather
1679   // than wrapping them in a Record.
1680   // 4. Let readRequest be Record {[[promise]]: promise}.
1681   // 5. Append readRequest as the last element of stream.[[reader]].
1682   //  [[readRequests]].
1683   stream->reader_->read_requests_.push_back(promise);
1684 
1685   // 6. Return promise.
1686   return promise;
1687 }
1688 
Cancel(ScriptState * script_state,ReadableStream * stream,v8::Local<v8::Value> reason)1689 v8::Local<v8::Promise> ReadableStream::Cancel(ScriptState* script_state,
1690                                               ReadableStream* stream,
1691                                               v8::Local<v8::Value> reason) {
1692   // https://streams.spec.whatwg.org/#readable-stream-cancel
1693   // 1. Set stream.[[disturbed]] to true.
1694   stream->is_disturbed_ = true;
1695 
1696   // 2. If stream.[[state]] is "closed", return a promise resolved with
1697   //    undefined.
1698   const auto state = stream->state_;
1699   if (state == kClosed) {
1700     return PromiseResolveWithUndefined(script_state);
1701   }
1702 
1703   // 3. If stream.[[state]] is "errored", return a promise rejected with stream.
1704   //    [[storedError]].
1705   if (state == kErrored) {
1706     return PromiseReject(script_state,
1707                          stream->GetStoredError(script_state->GetIsolate()));
1708   }
1709 
1710   // 4. Perform ! ReadableStreamClose(stream).
1711   Close(script_state, stream);
1712 
1713   // 5. Let sourceCancelPromise be ! stream.[[readableStreamController]].
1714   //    [[CancelSteps]](reason).
1715   v8::Local<v8::Promise> source_cancel_promise =
1716       stream->readable_stream_controller_->CancelSteps(script_state, reason);
1717 
1718   class ReturnUndefinedFunction final : public PromiseHandler {
1719    public:
1720     explicit ReturnUndefinedFunction(ScriptState* script_state)
1721         : PromiseHandler(script_state) {}
1722 
1723     // The method does nothing; the default value of undefined is returned to
1724     // JavaScript.
1725     void CallWithLocal(v8::Local<v8::Value>) override {}
1726   };
1727 
1728   // 6. Return the result of transforming sourceCancelPromise with a
1729   //    fulfillment handler that returns undefined.
1730   return StreamThenPromise(
1731       script_state->GetContext(), source_cancel_promise,
1732       MakeGarbageCollected<ReturnUndefinedFunction>(script_state));
1733 }
1734 
Close(ScriptState * script_state,ReadableStream * stream)1735 void ReadableStream::Close(ScriptState* script_state, ReadableStream* stream) {
1736   // https://streams.spec.whatwg.org/#readable-stream-close
1737   // 1. Assert: stream.[[state]] is "readable".
1738   CHECK_EQ(stream->state_, kReadable);
1739 
1740   // 2. Set stream.[[state]] to "closed".
1741   stream->state_ = kClosed;
1742 
1743   // 3. Let reader be stream.[[reader]].
1744   ReadableStreamReader* reader = stream->reader_;
1745 
1746   // 4. If reader is undefined, return.
1747   if (!reader) {
1748     return;
1749   }
1750 
1751   // TODO(ricea): Support BYOB readers.
1752   // 5. If ! IsReadableStreamDefaultReader(reader) is true,
1753   //   a. Repeat for each readRequest that is an element of reader.
1754   //      [[readRequests]],
1755   HeapDeque<Member<StreamPromiseResolver>> requests;
1756   requests.Swap(reader->read_requests_);
1757   for (StreamPromiseResolver* promise : requests) {
1758     //   i. Resolve readRequest.[[promise]] with !
1759     //      ReadableStreamCreateReadResult(undefined, true, reader.
1760     //      [[forAuthorCode]]).
1761     promise->Resolve(script_state,
1762                      CreateReadResult(script_state,
1763                                       v8::Undefined(script_state->GetIsolate()),
1764                                       true, reader->for_author_code_));
1765   }
1766 
1767   //   b. Set reader.[[readRequests]] to an empty List.
1768   //      This is not required since we've already called Swap().
1769 
1770   // 6. Resolve reader.[[closedPromise]] with undefined.
1771   reader->closed_promise_->ResolveWithUndefined(script_state);
1772 }
1773 
CreateReadResult(ScriptState * script_state,v8::Local<v8::Value> value,bool done,bool for_author_code)1774 v8::Local<v8::Value> ReadableStream::CreateReadResult(
1775     ScriptState* script_state,
1776     v8::Local<v8::Value> value,
1777     bool done,
1778     bool for_author_code) {
1779   // https://streams.spec.whatwg.org/#readable-stream-create-read-result
1780   auto* isolate = script_state->GetIsolate();
1781   auto context = script_state->GetContext();
1782   auto value_string = V8AtomicString(isolate, "value");
1783   auto done_string = V8AtomicString(isolate, "done");
1784   auto done_value = v8::Boolean::New(isolate, done);
1785   // 1. Let prototype be null.
1786   // 2. If forAuthorCode is true, set prototype to %ObjectPrototype%.
1787   // This implementation doesn't use a |prototype| variable, instead using
1788   // different code paths depending on the value of |for_author_code|.
1789   if (for_author_code) {
1790     // 4. Let obj be ObjectCreate(prototype).
1791     auto obj = v8::Object::New(isolate);
1792 
1793     // 5. Perform CreateDataProperty(obj, "value", value).
1794     obj->CreateDataProperty(context, value_string, value).Check();
1795 
1796     // 6. Perform CreateDataProperty(obj, "done", done).
1797     obj->CreateDataProperty(context, done_string, done_value).Check();
1798 
1799     // 7. Return obj.
1800     return obj;
1801   }
1802 
1803   // When |for_author_code| is false, we can perform all the steps in a single
1804   // call to V8.
1805 
1806   // 4. Let obj be ObjectCreate(prototype).
1807   // 5. Perform CreateDataProperty(obj, "value", value).
1808   // 6. Perform CreateDataProperty(obj, "done", done).
1809   // 7. Return obj.
1810   // TODO(ricea): Is it possible to use this optimised API in both cases?
1811   v8::Local<v8::Name> names[2] = {value_string, done_string};
1812   v8::Local<v8::Value> values[2] = {value, done_value};
1813 
1814   static_assert(base::size(names) == base::size(values),
1815                 "names and values arrays must be the same size");
1816   return v8::Object::New(isolate, v8::Null(isolate), names, values,
1817                          base::size(names));
1818 }
1819 
Error(ScriptState * script_state,ReadableStream * stream,v8::Local<v8::Value> e)1820 void ReadableStream::Error(ScriptState* script_state,
1821                            ReadableStream* stream,
1822                            v8::Local<v8::Value> e) {
1823   // https://streams.spec.whatwg.org/#readable-stream-error
1824   // 2. Assert: stream.[[state]] is "readable".
1825   CHECK_EQ(stream->state_, kReadable);
1826   auto* isolate = script_state->GetIsolate();
1827 
1828   // 3. Set stream.[[state]] to "errored".
1829   stream->state_ = kErrored;
1830 
1831   // 4. Set stream.[[storedError]] to e.
1832   stream->stored_error_.Set(isolate, e);
1833 
1834   // 5. Let reader be stream.[[reader]].
1835   ReadableStreamReader* reader = stream->reader_;
1836 
1837   // 6. If reader is undefined, return.
1838   if (!reader) {
1839     return;
1840   }
1841 
1842   // 7. If ! IsReadableStreamDefaultReader(reader) is true,
1843   // TODO(ricea): Support BYOB readers.
1844   //   a. Repeat for each readRequest that is an element of reader.
1845   //      [[readRequests]],
1846   for (StreamPromiseResolver* promise : reader->read_requests_) {
1847     //   i. Reject readRequest.[[promise]] with e.
1848     promise->Reject(script_state, e);
1849   }
1850 
1851   //   b. Set reader.[[readRequests]] to a new empty List.
1852   reader->read_requests_.clear();
1853 
1854   // 9. Reject reader.[[closedPromise]] with e.
1855   reader->closed_promise_->Reject(script_state, e);
1856 
1857   // 10. Set reader.[[closedPromise]].[[PromiseIsHandled]] to true.
1858   reader->closed_promise_->MarkAsHandled(isolate);
1859 }
1860 
FulfillReadRequest(ScriptState * script_state,ReadableStream * stream,v8::Local<v8::Value> chunk,bool done)1861 void ReadableStream::FulfillReadRequest(ScriptState* script_state,
1862                                         ReadableStream* stream,
1863                                         v8::Local<v8::Value> chunk,
1864                                         bool done) {
1865   // https://streams.spec.whatwg.org/#readable-stream-fulfill-read-request
1866   // 1. Let reader be stream.[[reader]].
1867   ReadableStreamReader* reader = stream->reader_;
1868 
1869   // 2. Let readRequest be the first element of reader.[[readRequests]].
1870   StreamPromiseResolver* read_request = reader->read_requests_.front();
1871 
1872   // 3. Remove readIntoRequest from reader.[[readIntoRequests]], shifting all
1873   //    other elements downward (so that the second becomes the first, and so
1874   //    on).
1875   reader->read_requests_.pop_front();
1876 
1877   // 4. Resolve readIntoRequest.[[promise]] with !
1878   //    ReadableStreamCreateReadResult(chunk, done, reader.[[forAuthorCode]]).
1879   read_request->Resolve(
1880       script_state, ReadableStream::CreateReadResult(script_state, chunk, done,
1881                                                      reader->for_author_code_));
1882 }
1883 
GetNumReadRequests(const ReadableStream * stream)1884 int ReadableStream::GetNumReadRequests(const ReadableStream* stream) {
1885   // https://streams.spec.whatwg.org/#readable-stream-get-num-read-requests
1886   // 1. Return the number of elements in stream.[[reader]].[[readRequests]].
1887   return stream->reader_->read_requests_.size();
1888 }
1889 
1890 //
1891 // TODO(ricea): Functions for transferable streams.
1892 //
1893 
GetReaderValidateOptions(ScriptState * script_state,ScriptValue options,ExceptionState & exception_state)1894 void ReadableStream::GetReaderValidateOptions(ScriptState* script_state,
1895                                               ScriptValue options,
1896                                               ExceptionState& exception_state) {
1897   // https://streams.spec.whatwg.org/#rs-get-reader
1898   // The unpacking of |options| is indicated as part of the signature of the
1899   // function in the standard.
1900   v8::TryCatch block(script_state->GetIsolate());
1901   v8::Local<v8::Value> mode;
1902   v8::Local<v8::String> mode_string;
1903   v8::Local<v8::Context> context = script_state->GetContext();
1904   if (options.V8Value()->IsUndefined()) {
1905     mode = v8::Undefined(script_state->GetIsolate());
1906   } else {
1907     v8::Local<v8::Object> v8_options;
1908     if (!options.V8Value()->ToObject(context).ToLocal(&v8_options)) {
1909       exception_state.RethrowV8Exception(block.Exception());
1910       return;
1911     }
1912     if (!v8_options->Get(context, V8String(script_state->GetIsolate(), "mode"))
1913              .ToLocal(&mode)) {
1914       exception_state.RethrowV8Exception(block.Exception());
1915       return;
1916     }
1917   }
1918 
1919   // 3. Set mode to ? ToString(mode).
1920   if (!mode->ToString(context).ToLocal(&mode_string)) {
1921     exception_state.RethrowV8Exception(block.Exception());
1922     return;
1923   }
1924 
1925   // 4. If mode is "byob", return ? AcquireReadableStreamBYOBReader(this, true).
1926   if (ToCoreString(mode_string) == "byob") {
1927     // TODO(ricea): Support BYOB readers.
1928     exception_state.ThrowTypeError("invalid mode");
1929     return;
1930   }
1931 
1932   if (!mode->IsUndefined()) {
1933     // 5. Throw a RangeError exception.
1934     exception_state.ThrowRangeError("invalid mode");
1935     return;
1936   }
1937 }
1938 
CallTeeAndReturnBranchArray(ScriptState * script_state,ReadableStream * readable,ExceptionState & exception_state)1939 ScriptValue ReadableStream::CallTeeAndReturnBranchArray(
1940     ScriptState* script_state,
1941     ReadableStream* readable,
1942     ExceptionState& exception_state) {
1943   // https://streams.spec.whatwg.org/#rs-tee
1944   v8::Isolate* isolate = script_state->GetIsolate();
1945   ReadableStream* branch1 = nullptr;
1946   ReadableStream* branch2 = nullptr;
1947 
1948   // 2. Let branches be ? ReadableStreamTee(this, false).
1949   readable->Tee(script_state, &branch1, &branch2, exception_state);
1950 
1951   if (!branch1 || !branch2)
1952     return ScriptValue();
1953 
1954   DCHECK(!exception_state.HadException());
1955 
1956   // 3. Return ! CreateArrayFromList(branches).
1957   v8::TryCatch block(isolate);
1958   v8::Local<v8::Context> context = script_state->GetContext();
1959   v8::Local<v8::Array> array = v8::Array::New(isolate, 2);
1960   v8::Local<v8::Object> global = context->Global();
1961 
1962   v8::Local<v8::Value> v8_branch1 = ToV8(branch1, global, isolate);
1963   if (v8_branch1.IsEmpty()) {
1964     exception_state.RethrowV8Exception(block.Exception());
1965     return ScriptValue();
1966   }
1967   v8::Local<v8::Value> v8_branch2 = ToV8(branch2, global, isolate);
1968   if (v8_branch1.IsEmpty()) {
1969     exception_state.RethrowV8Exception(block.Exception());
1970     return ScriptValue();
1971   }
1972   if (array->Set(context, V8String(isolate, "0"), v8_branch1).IsNothing()) {
1973     exception_state.RethrowV8Exception(block.Exception());
1974     return ScriptValue();
1975   }
1976   if (array->Set(context, V8String(isolate, "1"), v8_branch2).IsNothing()) {
1977     exception_state.RethrowV8Exception(block.Exception());
1978     return ScriptValue();
1979   }
1980   return ScriptValue(script_state->GetIsolate(), array);
1981 }
1982 
1983 }  // namespace blink
1984