1 // Copyright 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "third_party/blink/renderer/core/streams/readable_stream.h"
6
7 #include "base/stl_util.h"
8 #include "third_party/blink/renderer/bindings/core/v8/script_function.h"
9 #include "third_party/blink/renderer/bindings/core/v8/v8_abort_signal.h"
10 #include "third_party/blink/renderer/bindings/core/v8/v8_iterator_result_value.h"
11 #include "third_party/blink/renderer/bindings/core/v8/v8_readable_stream.h"
12 #include "third_party/blink/renderer/bindings/core/v8/v8_throw_dom_exception.h"
13 #include "third_party/blink/renderer/bindings/core/v8/v8_writable_stream.h"
14 #include "third_party/blink/renderer/core/dom/abort_signal.h"
15 #include "third_party/blink/renderer/core/execution_context/execution_context.h"
16 #include "third_party/blink/renderer/core/frame/web_feature.h"
17 #include "third_party/blink/renderer/core/streams/miscellaneous_operations.h"
18 #include "third_party/blink/renderer/core/streams/promise_handler.h"
19 #include "third_party/blink/renderer/core/streams/readable_stream_default_controller.h"
20 #include "third_party/blink/renderer/core/streams/readable_stream_reader.h"
21 #include "third_party/blink/renderer/core/streams/stream_algorithms.h"
22 #include "third_party/blink/renderer/core/streams/stream_promise_resolver.h"
23 #include "third_party/blink/renderer/core/streams/transferable_streams.h"
24 #include "third_party/blink/renderer/core/streams/underlying_source_base.h"
25 #include "third_party/blink/renderer/core/streams/writable_stream.h"
26 #include "third_party/blink/renderer/core/streams/writable_stream_default_controller.h"
27 #include "third_party/blink/renderer/core/streams/writable_stream_default_writer.h"
28 #include "third_party/blink/renderer/platform/bindings/exception_code.h"
29 #include "third_party/blink/renderer/platform/bindings/exception_state.h"
30 #include "third_party/blink/renderer/platform/bindings/script_state.h"
31 #include "third_party/blink/renderer/platform/bindings/v8_binding.h"
32 #include "third_party/blink/renderer/platform/heap/garbage_collected.h"
33 #include "third_party/blink/renderer/platform/heap/heap_allocator.h"
34 #include "third_party/blink/renderer/platform/heap/persistent.h"
35 #include "third_party/blink/renderer/platform/heap/visitor.h"
36 #include "third_party/blink/renderer/platform/instrumentation/use_counter.h"
37 #include "third_party/blink/renderer/platform/wtf/assertions.h"
38 #include "third_party/blink/renderer/platform/wtf/deque.h"
39
40 namespace blink {
41
PipeOptions()42 ReadableStream::PipeOptions::PipeOptions()
43 : prevent_close_(false), prevent_abort_(false), prevent_cancel_(false) {}
44
PipeOptions(ScriptState * script_state,ScriptValue options,ExceptionState & exception_state)45 ReadableStream::PipeOptions::PipeOptions(ScriptState* script_state,
46 ScriptValue options,
47 ExceptionState& exception_state) {
48 auto* isolate = script_state->GetIsolate();
49 v8::TryCatch block(isolate);
50 v8::Local<v8::Value> options_value = options.V8Value();
51 v8::Local<v8::Object> options_object;
52 if (options_value->IsUndefined()) {
53 options_object = v8::Object::New(isolate);
54 } else if (!options_value->ToObject(script_state->GetContext())
55 .ToLocal(&options_object)) {
56 exception_state.RethrowV8Exception(block.Exception());
57 return;
58 }
59
60 // 4. Set preventClose to ! ToBoolean(preventClose), set preventAbort to !
61 // ToBoolean(preventAbort), and set preventCancel to !
62 // ToBoolean(preventCancel).
63 prevent_close_ =
64 GetBoolean(script_state, options_object, "preventClose", exception_state);
65 if (exception_state.HadException()) {
66 return;
67 }
68
69 prevent_abort_ =
70 GetBoolean(script_state, options_object, "preventAbort", exception_state);
71 if (exception_state.HadException()) {
72 return;
73 }
74
75 prevent_cancel_ = GetBoolean(script_state, options_object, "preventCancel",
76 exception_state);
77 if (exception_state.HadException()) {
78 return;
79 }
80
81 v8::Local<v8::Value> signal_value;
82 if (!options_object
83 ->Get(script_state->GetContext(), V8AtomicString(isolate, "signal"))
84 .ToLocal(&signal_value)) {
85 exception_state.RethrowV8Exception(block.Exception());
86 return;
87 }
88
89 // 5. If signal is not undefined, and signal is not an instance of the
90 // AbortSignal interface, throw a TypeError exception.
91 if (signal_value->IsUndefined())
92 return;
93
94 signal_ = V8AbortSignal::ToImplWithTypeCheck(isolate, signal_value);
95 if (!signal_) {
96 exception_state.ThrowTypeError(
97 "'signal' must be an AbortSignal object or undefined");
98 return;
99 }
100 }
101
Trace(Visitor * visitor)102 void ReadableStream::PipeOptions::Trace(Visitor* visitor) {
103 visitor->Trace(signal_);
104 }
105
GetBoolean(ScriptState * script_state,v8::Local<v8::Object> dictionary,const char * property_name,ExceptionState & exception_state)106 bool ReadableStream::PipeOptions::GetBoolean(ScriptState* script_state,
107 v8::Local<v8::Object> dictionary,
108 const char* property_name,
109 ExceptionState& exception_state) {
110 auto* isolate = script_state->GetIsolate();
111 v8::TryCatch block(isolate);
112 v8::Local<v8::Value> property_value;
113 if (!dictionary
114 ->Get(script_state->GetContext(),
115 V8AtomicString(isolate, property_name))
116 .ToLocal(&property_value)) {
117 exception_state.RethrowV8Exception(block.Exception());
118 return false;
119 }
120 return property_value->ToBoolean(isolate)->Value();
121 }
122
123 // PipeToEngine implements PipeTo(). All standard steps in this class come from
124 // https://streams.spec.whatwg.org/#readable-stream-pipe-to
125 //
126 // This implementation is simple but suboptimal because it uses V8 promises to
127 // drive its asynchronous state machine, allocating a lot of temporary V8
128 // objects as a result.
129 //
130 // TODO(ricea): Create internal versions of ReadableStreamDefaultReader::Read()
131 // and WritableStreamDefaultWriter::Write() to bypass promise creation and so
132 // reduce the number of allocations on the hot path.
133 class ReadableStream::PipeToEngine final
134 : public GarbageCollected<PipeToEngine> {
135 public:
PipeToEngine(ScriptState * script_state,PipeOptions * pipe_options)136 PipeToEngine(ScriptState* script_state, PipeOptions* pipe_options)
137 : script_state_(script_state), pipe_options_(pipe_options) {}
138
139 // This is the main entrypoint for ReadableStreamPipeTo().
Start(ReadableStream * readable,WritableStream * destination)140 ScriptPromise Start(ReadableStream* readable, WritableStream* destination) {
141 // 1. Assert: ! IsReadableStream(source) is true.
142 DCHECK(readable);
143
144 // 2. Assert: ! IsWritableStream(dest) is true.
145 DCHECK(destination);
146
147 // Not relevant to C++ implementation:
148 // 3. Assert: Type(preventClose) is Boolean, Type(preventAbort) is Boolean,
149 // and Type(preventCancel) is Boolean.
150
151 // TODO(ricea): Implement |signal|.
152 // 4. Assert: signal is undefined or signal is an instance of the
153 // AbortSignal interface.
154
155 // 5. Assert: ! IsReadableStreamLocked(source) is false.
156 DCHECK(!ReadableStream::IsLocked(readable));
157
158 // 6. Assert: ! IsWritableStreamLocked(dest) is false.
159 DCHECK(!WritableStream::IsLocked(destination));
160
161 auto* isolate = script_state_->GetIsolate();
162 ExceptionState exception_state(isolate, ExceptionState::kUnknownContext, "",
163 "");
164
165 // 7. If !
166 // IsReadableByteStreamController(source.[[readableStreamController]]) is
167 // true, let reader be either ! AcquireReadableStreamBYOBReader(source)
168 // or ! AcquireReadableStreamDefaultReader(source), at the user agent’s
169 // discretion.
170 // 8. Otherwise, let reader be ! AcquireReadableStreamDefaultReader(source).
171 reader_ = ReadableStream::AcquireDefaultReader(script_state_, readable,
172 false, exception_state);
173 DCHECK(!exception_state.HadException());
174
175 // 9. Let writer be ! AcquireWritableStreamDefaultWriter(dest).
176 writer_ = WritableStream::AcquireDefaultWriter(script_state_, destination,
177 exception_state);
178 DCHECK(!exception_state.HadException());
179
180 // 10. Let shuttingDown be false.
181 DCHECK(!is_shutting_down_);
182
183 // 11. Let promise be a new promise.
184 promise_ = MakeGarbageCollected<StreamPromiseResolver>(script_state_);
185
186 // 12. If signal is not undefined,
187 if (auto* signal = pipe_options_->Signal()) {
188 // b. If signal’s aborted flag is set, perform abortAlgorithm and
189 // return promise.
190 if (signal->aborted()) {
191 AbortAlgorithm();
192 return promise_->GetScriptPromise(script_state_);
193 }
194
195 // c. Add abortAlgorithm to signal.
196 signal->AddAlgorithm(
197 WTF::Bind(&PipeToEngine::AbortAlgorithm, WrapWeakPersistent(this)));
198 }
199
200 // 13. In parallel ...
201 // The rest of the algorithm is described in terms of a series of
202 // constraints rather than as explicit steps.
203 if (CheckInitialState()) {
204 // Need to detect closing and error when we are not reading. This
205 // corresponds to the following conditions from the standard:
206 // 1. Errors must be propagated forward: if source.[[state]] is or
207 // becomes "errored", ...
208 // and
209 // 3. Closing must be propagated forward: if source.[[state]] is or
210 // becomes "closed", ...
211 ThenPromise(reader_->ClosedPromise()->V8Promise(isolate),
212 &PipeToEngine::OnReaderClosed, &PipeToEngine::ReadableError);
213
214 // Need to detect error when we are not writing. This corresponds to this
215 // condition from the standard:
216 // 2. Errors must be propagated backward: if dest.[[state]] is or
217 // becomes "errored", ...
218 // We do not need to detect closure of the writable end of the pipe,
219 // because we have it locked and so it can only be closed by us.
220 ThenPromise(writer_->ClosedPromise()->V8Promise(isolate), nullptr,
221 &PipeToEngine::WritableError);
222
223 // Start the main read / write loop.
224 HandleNextEvent(Undefined());
225 }
226
227 // 14. Return promise.
228 return promise_->GetScriptPromise(script_state_);
229 }
230
Trace(Visitor * visitor)231 void Trace(Visitor* visitor) {
232 visitor->Trace(script_state_);
233 visitor->Trace(pipe_options_);
234 visitor->Trace(reader_);
235 visitor->Trace(writer_);
236 visitor->Trace(promise_);
237 visitor->Trace(last_write_);
238 visitor->Trace(shutdown_error_);
239 }
240
241 private:
242 // The implementation uses method pointers to maximise code reuse.
243
244 // |Action| represents an action that can be passed to the "Shutdown with an
245 // action" operation. Each Action is implemented as a method which delegates
246 // to some abstract operation, inferring the arguments from the state of
247 // |this|.
248 using Action = v8::Local<v8::Promise> (PipeToEngine::*)();
249
250 // This implementation uses ThenPromise() 7 times. Instead of creating a dozen
251 // separate subclasses of ScriptFunction, we use a single implementation and
252 // pass a method pointer at runtime to control the behaviour. Most
253 // PromiseReaction methods don't need to return a value, but because some do,
254 // the rest have to return undefined so that they can have the same method
255 // signature. Similarly, many of the methods ignore the argument that is
256 // passed to them.
257 using PromiseReaction =
258 v8::Local<v8::Value> (PipeToEngine::*)(v8::Local<v8::Value>);
259
260 class WrappedPromiseReaction final : public PromiseHandlerWithValue {
261 public:
WrappedPromiseReaction(ScriptState * script_state,PipeToEngine * instance,PromiseReaction method)262 WrappedPromiseReaction(ScriptState* script_state,
263 PipeToEngine* instance,
264 PromiseReaction method)
265 : PromiseHandlerWithValue(script_state),
266 instance_(instance),
267 method_(method) {}
268
CallWithLocal(v8::Local<v8::Value> value)269 v8::Local<v8::Value> CallWithLocal(v8::Local<v8::Value> value) override {
270 return (instance_->*method_)(value);
271 }
272
Trace(Visitor * visitor)273 void Trace(Visitor* visitor) override {
274 visitor->Trace(instance_);
275 ScriptFunction::Trace(visitor);
276 }
277
278 private:
279 Member<PipeToEngine> instance_;
280 PromiseReaction method_;
281 };
282
283 // Checks the state of the streams and executes the shutdown handlers if
284 // necessary. Returns true if piping can continue.
CheckInitialState()285 bool CheckInitialState() {
286 auto* isolate = script_state_->GetIsolate();
287 const auto state = Readable()->state_;
288
289 // Both streams can be errored or closed. To perform the right action the
290 // order of the checks must match the standard: "the following conditions
291 // must be applied in order." This method only checks the initial state;
292 // detection of state changes elsewhere is done through checking promise
293 // reactions.
294
295 // a. Errors must be propagated forward: if source.[[state]] is or
296 // becomes "errored",
297 if (state == kErrored) {
298 ReadableError(Readable()->GetStoredError(isolate));
299 return false;
300 }
301
302 // 2. Errors must be propagated backward: if dest.[[state]] is or becomes
303 // "errored",
304 if (Destination()->IsErrored()) {
305 WritableError(Destination()->GetStoredError(isolate));
306 return false;
307 }
308
309 // 3. Closing must be propagated forward: if source.[[state]] is or
310 // becomes "closed", then
311 if (state == kClosed) {
312 ReadableClosed();
313 return false;
314 }
315
316 // 4. Closing must be propagated backward: if !
317 // WritableStreamCloseQueuedOrInFlight(dest) is true or dest.[[state]]
318 // is "closed",
319 if (Destination()->IsClosingOrClosed()) {
320 WritableStartedClosed();
321 return false;
322 }
323
324 return true;
325 }
326
AbortAlgorithm()327 void AbortAlgorithm() {
328 // a. Let abortAlgorithm be the following steps:
329 // i. Let error be a new "AbortError" DOMException.
330 v8::Local<v8::Value> error = V8ThrowDOMException::CreateOrEmpty(
331 script_state_->GetIsolate(), DOMExceptionCode::kAbortError,
332 "Pipe aborted.");
333
334 // Steps ii. to iv. are implemented in AbortAlgorithmAction.
335
336 // v. Shutdown with an action consisting of getting a promise to wait for
337 // all of the actions in actions, and with error.
338 ShutdownWithAction(&PipeToEngine::AbortAlgorithmAction, error);
339 }
340
AbortAlgorithmAction()341 v8::Local<v8::Promise> AbortAlgorithmAction() {
342 v8::Local<v8::Value> error =
343 shutdown_error_.NewLocal(script_state_->GetIsolate());
344
345 // ii. Let actions be an empty ordered set.
346 HeapVector<ScriptPromise> actions;
347
348 // This method runs later than the equivalent steps in the standard. This
349 // means that it is safe to do the checks of the state of the destination
350 // and source synchronously, simplifying the logic.
351
352 // iii. If preventAbort is false, append the following action to actions:
353 // 1. If dest.[[state]] is "writable", return !
354 // WritableStreamAbort(dest, error).
355 // 2. Otherwise, return a promise resolved with undefined.
356 if (!pipe_options_->PreventAbort() && Destination()->IsWritable()) {
357 actions.push_back(ScriptPromise(
358 script_state_,
359 WritableStream::Abort(script_state_, Destination(), error)));
360 }
361
362 // iv. If preventCancel is false, append the following action action to
363 // actions:
364 // 1. If source.[[state]] is "readable", return !
365 // ReadableStreamCancel(source, error).
366 // 2. Otherwise, return a promise resolved with undefined.
367 if (!pipe_options_->PreventCancel() && IsReadable(Readable())) {
368 actions.push_back(ScriptPromise(
369 script_state_,
370 ReadableStream::Cancel(script_state_, Readable(), error)));
371 }
372
373 return ScriptPromise::All(script_state_, actions)
374 .V8Value()
375 .As<v8::Promise>();
376 }
377
378 // HandleNextEvent() has an unused argument and return value because it is a
379 // PromiseReaction. HandleNextEvent() and ReadFulfilled() call each other
380 // asynchronously in a loop until the pipe completes.
HandleNextEvent(v8::Local<v8::Value>)381 v8::Local<v8::Value> HandleNextEvent(v8::Local<v8::Value>) {
382 DCHECK(!is_reading_);
383 if (is_shutting_down_) {
384 return Undefined();
385 }
386
387 base::Optional<double> desired_size = writer_->GetDesiredSizeInternal();
388 if (!desired_size.has_value()) {
389 // This can happen if abort() is queued but not yet started when
390 // pipeTo() is called. In that case [[storedError]] is not set yet, and
391 // we need to wait until it is before we can cancel the pipe. Once
392 // [[storedError]] has been set, the rejection handler set on the writer
393 // closed promise above will detect it, so all we need to do here is
394 // nothing.
395 return Undefined();
396 }
397
398 if (desired_size.value() <= 0) {
399 // Need to wait for backpressure to go away.
400 ThenPromise(
401 writer_->ReadyPromise()->V8Promise(script_state_->GetIsolate()),
402 &PipeToEngine::HandleNextEvent, &PipeToEngine::WritableError);
403 return Undefined();
404 }
405
406 is_reading_ = true;
407 ThenPromise(ReadableStreamReader::Read(script_state_, reader_)
408 ->V8Promise(script_state_->GetIsolate()),
409 &PipeToEngine::ReadFulfilled, &PipeToEngine::ReadRejected);
410 return Undefined();
411 }
412
ReadFulfilled(v8::Local<v8::Value> result)413 v8::Local<v8::Value> ReadFulfilled(v8::Local<v8::Value> result) {
414 is_reading_ = false;
415 DCHECK(result->IsObject());
416 auto* isolate = script_state_->GetIsolate();
417 v8::Local<v8::Value> value;
418 bool done = false;
419 bool unpack_succeeded =
420 V8UnpackIteratorResult(script_state_, result.As<v8::Object>(), &done)
421 .ToLocal(&value);
422 DCHECK(unpack_succeeded);
423 if (done) {
424 ReadableClosed();
425 return Undefined();
426 }
427 const auto write =
428 WritableStreamDefaultWriter::Write(script_state_, writer_, value);
429 last_write_.Set(isolate, write);
430 ThenPromise(write, nullptr, &PipeToEngine::WritableError);
431 HandleNextEvent(Undefined());
432 return Undefined();
433 }
434
ReadRejected(v8::Local<v8::Value>)435 v8::Local<v8::Value> ReadRejected(v8::Local<v8::Value>) {
436 is_reading_ = false;
437 ReadableError(Readable()->GetStoredError(script_state_->GetIsolate()));
438 return Undefined();
439 }
440
441 // If read() is in progress, then wait for it to tell us that the stream is
442 // closed so that we write all the data before shutdown.
OnReaderClosed(v8::Local<v8::Value>)443 v8::Local<v8::Value> OnReaderClosed(v8::Local<v8::Value>) {
444 if (!is_reading_) {
445 ReadableClosed();
446 }
447 return Undefined();
448 }
449
450 // 1. Errors must be propagated forward: if source.[[state]] is or
451 // becomes "errored", then
ReadableError(v8::Local<v8::Value> error)452 v8::Local<v8::Value> ReadableError(v8::Local<v8::Value> error) {
453 // This function can be called during shutdown when the lock is released.
454 // Exit early in that case.
455 if (is_shutting_down_) {
456 return Undefined();
457 }
458
459 // a. If preventAbort is false, shutdown with an action of !
460 // WritableStreamAbort(dest, source.[[storedError]]) and with
461 // source.[[storedError]].
462 DCHECK(error->SameValue(
463 Readable()->GetStoredError(script_state_->GetIsolate())));
464 if (!pipe_options_->PreventAbort()) {
465 ShutdownWithAction(&PipeToEngine::WritableStreamAbortAction, error);
466 } else {
467 // b. Otherwise, shutdown with source.[[storedError]].
468 Shutdown(error);
469 }
470 return Undefined();
471 }
472
473 // 2. Errors must be propagated backward: if dest.[[state]] is or becomes
474 // "errored", then
WritableError(v8::Local<v8::Value> error)475 v8::Local<v8::Value> WritableError(v8::Local<v8::Value> error) {
476 // This function can be called during shutdown when the lock is released.
477 // Exit early in that case.
478 if (is_shutting_down_) {
479 return Undefined();
480 }
481
482 // a. If preventCancel is false, shutdown with an action of !
483 // ReadableStreamCancel(source, dest.[[storedError]]) and with
484 // dest.[[storedError]].
485 DCHECK(error->SameValue(
486 Destination()->GetStoredError(script_state_->GetIsolate())));
487 if (!pipe_options_->PreventCancel()) {
488 ShutdownWithAction(&PipeToEngine::ReadableStreamCancelAction, error);
489 } else {
490 // b. Otherwise, shutdown with dest.[[storedError]].
491 Shutdown(error);
492 }
493 return Undefined();
494 }
495
496 // 3. Closing must be propagated forward: if source.[[state]] is or
497 // becomes "closed", then
ReadableClosed()498 void ReadableClosed() {
499 // a. If preventClose is false, shutdown with an action of !
500 // WritableStreamDefaultWriterCloseWithErrorPropagation(writer).
501 if (!pipe_options_->PreventClose()) {
502 ShutdownWithAction(
503 &PipeToEngine::
504 WritableStreamDefaultWriterCloseWithErrorPropagationAction,
505 v8::MaybeLocal<v8::Value>());
506 } else {
507 // b. Otherwise, shutdown.
508 Shutdown(v8::MaybeLocal<v8::Value>());
509 }
510 }
511
512 // 4. Closing must be propagated backward: if !
513 // WritableStreamCloseQueuedOrInFlight(dest) is true or dest.[[state]] is
514 // "closed", then
WritableStartedClosed()515 void WritableStartedClosed() {
516 // a. Assert: no chunks have been read or written.
517 // This is trivially true because this method is only called from
518 // CheckInitialState().
519
520 // b. Let destClosed be a new TypeError.
521 const auto dest_closed = v8::Exception::TypeError(
522 V8String(script_state_->GetIsolate(), "Destination stream closed"));
523
524 // c. If preventCancel is false, shutdown with an action of !
525 // ReadableStreamCancel(source, destClosed) and with destClosed.
526 if (!pipe_options_->PreventCancel()) {
527 ShutdownWithAction(&PipeToEngine::ReadableStreamCancelAction,
528 dest_closed);
529 } else {
530 // d. Otherwise, shutdown with destClosed.
531 Shutdown(dest_closed);
532 }
533 }
534
535 // * Shutdown with an action: if any of the above requirements ask to shutdown
536 // with an action |action|, optionally with an error |originalError|, then:
ShutdownWithAction(Action action,v8::MaybeLocal<v8::Value> original_error)537 void ShutdownWithAction(Action action,
538 v8::MaybeLocal<v8::Value> original_error) {
539 // a. If shuttingDown is true, abort these substeps.
540 if (is_shutting_down_) {
541 return;
542 }
543
544 // b. Set shuttingDown to true.
545 is_shutting_down_ = true;
546
547 // Store the action in case we need to call it asynchronously. This is safe
548 // because the |is_shutting_down_| guard flag ensures that we can only reach
549 // this assignment once.
550 shutdown_action_ = action;
551
552 // Store |original_error| as |shutdown_error_| if it was supplied.
553 v8::Local<v8::Value> original_error_local;
554 if (original_error.ToLocal(&original_error_local)) {
555 shutdown_error_.Set(script_state_->GetIsolate(), original_error_local);
556 }
557 v8::Local<v8::Promise> p;
558
559 // c. If dest.[[state]] is "writable" and !
560 // WritableStreamCloseQueuedOrInFlight(dest) is false,
561 if (ShouldWriteQueuedChunks()) {
562 // i. If any chunks have been read but not yet written, write them to
563 // dest.
564 // ii. Wait until every chunk that has been read has been written
565 // (i.e. the corresponding promises have settled).
566 p = ThenPromise(WriteQueuedChunks(), &PipeToEngine::InvokeShutdownAction);
567 } else {
568 // d. Let p be the result of performing action.
569 p = InvokeShutdownAction();
570 }
571
572 // e. Upon fulfillment of p, finalize, passing along originalError if it
573 // was given.
574 // f. Upon rejection of p with reason newError, finalize with newError.
575 ThenPromise(p, &PipeToEngine::FinalizeWithOriginalErrorIfSet,
576 &PipeToEngine::FinalizeWithNewError);
577 }
578
579 // * Shutdown: if any of the above requirements or steps ask to shutdown,
580 // optionally with an error error, then:
Shutdown(v8::MaybeLocal<v8::Value> error_maybe)581 void Shutdown(v8::MaybeLocal<v8::Value> error_maybe) {
582 // a. If shuttingDown is true, abort these substeps.
583 if (is_shutting_down_) {
584 return;
585 }
586
587 // b. Set shuttingDown to true.
588 is_shutting_down_ = true;
589
590 // c. If dest.[[state]] is "writable" and !
591 // WritableStreamCloseQueuedOrInFlight(dest) is false,
592 if (ShouldWriteQueuedChunks()) {
593 // Need to stash the value of |error_maybe| since we are calling
594 // Finalize() asynchronously.
595 v8::Local<v8::Value> error;
596 if (error_maybe.ToLocal(&error)) {
597 shutdown_error_.Set(script_state_->GetIsolate(), error);
598 }
599
600 // i. If any chunks have been read but not yet written, write them to
601 // dest.
602 // ii. Wait until every chunk that has been read has been written
603 // (i.e. the corresponding promises have settled).
604 // d. Finalize, passing along error if it was given.
605 ThenPromise(WriteQueuedChunks(),
606 &PipeToEngine::FinalizeWithOriginalErrorIfSet);
607 } else {
608 // d. Finalize, passing along error if it was given.
609 Finalize(error_maybe);
610 }
611 }
612
613 // Calls Finalize(), using the stored shutdown error rather than the value
614 // that was passed.
FinalizeWithOriginalErrorIfSet(v8::Local<v8::Value>)615 v8::Local<v8::Value> FinalizeWithOriginalErrorIfSet(v8::Local<v8::Value>) {
616 v8::MaybeLocal<v8::Value> error_maybe;
617 if (!shutdown_error_.IsEmpty()) {
618 error_maybe = shutdown_error_.NewLocal(script_state_->GetIsolate());
619 }
620 Finalize(error_maybe);
621 return Undefined();
622 }
623
624 // Calls Finalize(), using the value that was passed as the error.
FinalizeWithNewError(v8::Local<v8::Value> new_error)625 v8::Local<v8::Value> FinalizeWithNewError(v8::Local<v8::Value> new_error) {
626 Finalize(new_error);
627 return Undefined();
628 }
629
630 // * Finalize: both forms of shutdown will eventually ask to finalize,
631 // optionally with an error error, which means to perform the following
632 // steps:
Finalize(v8::MaybeLocal<v8::Value> error_maybe)633 void Finalize(v8::MaybeLocal<v8::Value> error_maybe) {
634 // a. Perform ! WritableStreamDefaultWriterRelease(writer).
635 WritableStreamDefaultWriter::Release(script_state_, writer_);
636
637 // b. Perform ! ReadableStreamReaderGenericRelease(reader).
638 ReadableStreamReader::GenericRelease(script_state_, reader_);
639
640 // TODO(ricea): Implement signal.
641 // c. If signal is not undefined, remove abortAlgorithm from signal.
642
643 v8::Local<v8::Value> error;
644 if (error_maybe.ToLocal(&error)) {
645 // d. If error was given, reject promise with error.
646 promise_->Reject(script_state_, error);
647 } else {
648 // e. Otherwise, resolve promise with undefined.
649 promise_->ResolveWithUndefined(script_state_);
650 }
651 }
652
ShouldWriteQueuedChunks() const653 bool ShouldWriteQueuedChunks() const {
654 // "If dest.[[state]] is "writable" and !
655 // WritableStreamCloseQueuedOrInFlight(dest) is false"
656 return Destination()->IsWritable() &&
657 !WritableStream::CloseQueuedOrInFlight(Destination());
658 }
659
WriteQueuedChunks()660 v8::Local<v8::Promise> WriteQueuedChunks() {
661 if (!last_write_.IsEmpty()) {
662 // "Wait until every chunk that has been read has been written (i.e.
663 // the corresponding promises have settled)"
664 // This implies that we behave the same whether the promise fulfills or
665 // rejects. IgnoreErrors() will convert a rejection into a successful
666 // resolution.
667 return ThenPromise(last_write_.NewLocal(script_state_->GetIsolate()),
668 nullptr, &PipeToEngine::IgnoreErrors);
669 }
670 return PromiseResolveWithUndefined(script_state_);
671 }
672
IgnoreErrors(v8::Local<v8::Value>)673 v8::Local<v8::Value> IgnoreErrors(v8::Local<v8::Value>) {
674 return Undefined();
675 }
676
677 // InvokeShutdownAction(), version for calling directly.
InvokeShutdownAction()678 v8::Local<v8::Promise> InvokeShutdownAction() {
679 return (this->*shutdown_action_)();
680 }
681
682 // InvokeShutdownAction(), version for use as a PromiseReaction.
InvokeShutdownAction(v8::Local<v8::Value>)683 v8::Local<v8::Value> InvokeShutdownAction(v8::Local<v8::Value>) {
684 return InvokeShutdownAction();
685 }
686
ShutdownError() const687 v8::Local<v8::Value> ShutdownError() const {
688 DCHECK(!shutdown_error_.IsEmpty());
689 return shutdown_error_.NewLocal(script_state_->GetIsolate());
690 }
691
WritableStreamAbortAction()692 v8::Local<v8::Promise> WritableStreamAbortAction() {
693 return WritableStream::Abort(script_state_, Destination(), ShutdownError());
694 }
695
ReadableStreamCancelAction()696 v8::Local<v8::Promise> ReadableStreamCancelAction() {
697 return ReadableStream::Cancel(script_state_, Readable(), ShutdownError());
698 }
699
700 v8::Local<v8::Promise>
WritableStreamDefaultWriterCloseWithErrorPropagationAction()701 WritableStreamDefaultWriterCloseWithErrorPropagationAction() {
702 return WritableStreamDefaultWriter::CloseWithErrorPropagation(script_state_,
703 writer_);
704 }
705
706 // Reduces the visual noise when we are returning an undefined value.
Undefined()707 v8::Local<v8::Value> Undefined() {
708 return v8::Undefined(script_state_->GetIsolate());
709 }
710
Destination()711 WritableStream* Destination() { return writer_->OwnerWritableStream(); }
712
Destination() const713 const WritableStream* Destination() const {
714 return writer_->OwnerWritableStream();
715 }
716
Readable()717 ReadableStream* Readable() { return reader_->owner_readable_stream_; }
718
719 // Performs promise.then(on_fulfilled, on_rejected). It behaves like
720 // StreamPromiseThen(). Only the types are different.
ThenPromise(v8::Local<v8::Promise> promise,PromiseReaction on_fulfilled,PromiseReaction on_rejected=nullptr)721 v8::Local<v8::Promise> ThenPromise(v8::Local<v8::Promise> promise,
722 PromiseReaction on_fulfilled,
723 PromiseReaction on_rejected = nullptr) {
724 return StreamThenPromise(
725 script_state_->GetContext(), promise,
726 on_fulfilled ? MakeGarbageCollected<WrappedPromiseReaction>(
727 script_state_, this, on_fulfilled)
728 : nullptr,
729 on_rejected ? MakeGarbageCollected<WrappedPromiseReaction>(
730 script_state_, this, on_rejected)
731 : nullptr);
732 }
733
734 Member<ScriptState> script_state_;
735 Member<PipeOptions> pipe_options_;
736 Member<ReadableStreamReader> reader_;
737 Member<WritableStreamDefaultWriter> writer_;
738 Member<StreamPromiseResolver> promise_;
739 TraceWrapperV8Reference<v8::Promise> last_write_;
740 Action shutdown_action_;
741 TraceWrapperV8Reference<v8::Value> shutdown_error_;
742 bool is_shutting_down_ = false;
743 bool is_reading_ = false;
744
745 DISALLOW_COPY_AND_ASSIGN(PipeToEngine);
746 };
747
748 class ReadableStream::TeeEngine final : public GarbageCollected<TeeEngine> {
749 public:
750 TeeEngine() = default;
751
752 // Create the streams and start copying data.
753 void Start(ScriptState*, ReadableStream*, ExceptionState&);
754
755 // Branch1() and Branch2() are null until Start() is called.
Branch1() const756 ReadableStream* Branch1() const { return branch_[0]; }
Branch2() const757 ReadableStream* Branch2() const { return branch_[1]; }
758
Trace(Visitor * visitor)759 void Trace(Visitor* visitor) {
760 visitor->Trace(stream_);
761 visitor->Trace(reader_);
762 visitor->Trace(reason_[0]);
763 visitor->Trace(reason_[1]);
764 visitor->Trace(branch_[0]);
765 visitor->Trace(branch_[1]);
766 visitor->Trace(controller_[0]);
767 visitor->Trace(controller_[1]);
768 visitor->Trace(cancel_promise_);
769 }
770
771 private:
772 class PullAlgorithm;
773 class CancelAlgorithm;
774
775 Member<ReadableStream> stream_;
776 Member<ReadableStreamReader> reader_;
777 Member<StreamPromiseResolver> cancel_promise_;
778 bool closed_ = false;
779
780 // The standard contains a number of pairs of variables with one for each
781 // stream. These are implemented as arrays here. While they are 1-indexed in
782 // the standard, they are 0-indexed here; ie. "canceled_[0]" here corresponds
783 // to "canceled1" in the standard.
784 bool canceled_[2] = {false, false};
785 TraceWrapperV8Reference<v8::Value> reason_[2];
786 Member<ReadableStream> branch_[2];
787 Member<ReadableStreamDefaultController> controller_[2];
788
789 DISALLOW_COPY_AND_ASSIGN(TeeEngine);
790 };
791
792 class ReadableStream::TeeEngine::PullAlgorithm final : public StreamAlgorithm {
793 public:
PullAlgorithm(TeeEngine * engine)794 explicit PullAlgorithm(TeeEngine* engine) : engine_(engine) {}
795
Run(ScriptState * script_state,int,v8::Local<v8::Value>[])796 v8::Local<v8::Promise> Run(ScriptState* script_state,
797 int,
798 v8::Local<v8::Value>[]) override {
799 // https://streams.spec.whatwg.org/#readable-stream-tee
800 // 12. Let pullAlgorithm be the following steps:
801 // a. Return the result of transforming ! ReadableStreamDefaultReaderRead(
802 // reader) with a fulfillment handler which takes the argument result
803 // and performs the following steps:
804 return StreamThenPromise(
805 script_state->GetContext(),
806 ReadableStreamReader::Read(script_state, engine_->reader_)
807 ->V8Promise(script_state->GetIsolate()),
808 MakeGarbageCollected<ResolveFunction>(script_state, engine_));
809 }
810
Trace(Visitor * visitor)811 void Trace(Visitor* visitor) override {
812 visitor->Trace(engine_);
813 StreamAlgorithm::Trace(visitor);
814 }
815
816 private:
817 class ResolveFunction final : public PromiseHandler {
818 public:
ResolveFunction(ScriptState * script_state,TeeEngine * engine)819 ResolveFunction(ScriptState* script_state, TeeEngine* engine)
820 : PromiseHandler(script_state), engine_(engine) {}
821
CallWithLocal(v8::Local<v8::Value> result)822 void CallWithLocal(v8::Local<v8::Value> result) override {
823 // i. If closed is true, return.
824 if (engine_->closed_) {
825 return;
826 }
827
828 // ii. Assert: Type(result) is Object.
829 DCHECK(result->IsObject());
830
831 auto* script_state = GetScriptState();
832 auto* isolate = script_state->GetIsolate();
833
834 // iii. Let done be ! Get(result, "done").
835 // vi. Let value be ! Get(result, "value").
836 // The precise order of operations is not important here, because |result|
837 // is guaranteed to have own properties of "value" and "done" and so the
838 // "Get" operations cannot have side-effects.
839 v8::Local<v8::Value> value;
840 bool done = false;
841 bool unpack_succeeded =
842 V8UnpackIteratorResult(script_state, result.As<v8::Object>(), &done)
843 .ToLocal(&value);
844 CHECK(unpack_succeeded);
845
846 // vi. Assert: Type(done) is Boolean.
847 // v. If done is true,
848 if (done) {
849 // 1. If canceled1 is false,
850 // a. Perform ! ReadableStreamDefaultControllerClose(branch1.
851 // [[readableStreamController]]).
852 // 2. If canceled2 is false,
853 // b. Perform ! ReadableStreamDefaultControllerClose(branch2.
854 // [[readableStreamController]]).
855 for (int branch = 0; branch < 2; ++branch) {
856 if (!engine_->canceled_[branch] &&
857 ReadableStreamDefaultController::CanCloseOrEnqueue(
858 engine_->controller_[branch])) {
859 ReadableStreamDefaultController::Close(
860 script_state, engine_->controller_[branch]);
861 }
862 }
863 // 3. Set closed to true.
864 engine_->closed_ = true;
865
866 // 4. Return.
867 return;
868 }
869 ExceptionState exception_state(isolate, ExceptionState::kUnknownContext,
870 "", "");
871 // vii. Let value1 and value2 be value.
872 // viii. If canceled2 is false and cloneForBranch2 is true, set value2 to
873 // ? StructuredDeserialize(? StructuredSerialize(value2), the
874 // current Realm Record).
875 // TODO(ricea): Support cloneForBranch2
876
877 // ix. If canceled1 is false, perform ?
878 // ReadableStreamDefaultControllerEnqueue(branch1.
879 // [[readableStreamController]], value1).
880 // x. If canceled2 is false, perform ?
881 // ReadableStreamDefaultControllerEnqueue(branch2.
882 // [[readableStreamController]], value2).
883 for (int branch = 0; branch < 2; ++branch) {
884 if (!engine_->canceled_[branch] &&
885 ReadableStreamDefaultController::CanCloseOrEnqueue(
886 engine_->controller_[branch])) {
887 ReadableStreamDefaultController::Enqueue(script_state,
888 engine_->controller_[branch],
889 value, exception_state);
890 if (exception_state.HadException()) {
891 // Instead of returning a rejection, which is inconvenient here,
892 // call ControllerError(). The only difference this makes is that
893 // it happens synchronously, but that should not be observable.
894 ReadableStreamDefaultController::Error(
895 script_state, engine_->controller_[branch],
896 exception_state.GetException());
897 exception_state.ClearException();
898 return;
899 }
900 }
901 }
902 }
903
Trace(Visitor * visitor)904 void Trace(Visitor* visitor) override {
905 visitor->Trace(engine_);
906 PromiseHandler::Trace(visitor);
907 }
908
909 private:
910 Member<TeeEngine> engine_;
911 };
912
913 Member<TeeEngine> engine_;
914 };
915
916 class ReadableStream::TeeEngine::CancelAlgorithm final
917 : public StreamAlgorithm {
918 public:
CancelAlgorithm(TeeEngine * engine,int branch)919 CancelAlgorithm(TeeEngine* engine, int branch)
920 : engine_(engine), branch_(branch) {
921 DCHECK(branch == 0 || branch == 1);
922 }
923
Run(ScriptState * script_state,int argc,v8::Local<v8::Value> argv[])924 v8::Local<v8::Promise> Run(ScriptState* script_state,
925 int argc,
926 v8::Local<v8::Value> argv[]) override {
927 // https://streams.spec.whatwg.org/#readable-stream-tee
928 // This implements both cancel1Algorithm and cancel2Algorithm as they are
929 // identical except for the index they operate on. Standard comments are
930 // from cancel1Algorithm.
931 // 13. Let cancel1Algorithm be the following steps, taking a reason
932 // argument:
933 auto* isolate = script_state->GetIsolate();
934
935 // a. Set canceled1 to true.
936 engine_->canceled_[branch_] = true;
937 DCHECK_EQ(argc, 1);
938
939 // b. Set reason1 to reason.
940 engine_->reason_[branch_].Set(isolate, argv[0]);
941
942 const int other_branch = 1 - branch_;
943
944 // c. If canceled2 is true,
945 if (engine_->canceled_[other_branch]) {
946 // i. Let compositeReason be ! CreateArrayFromList(« reason1, reason2 »).
947 v8::Local<v8::Value> reason[] = {engine_->reason_[0].NewLocal(isolate),
948 engine_->reason_[1].NewLocal(isolate)};
949 v8::Local<v8::Value> composite_reason =
950 v8::Array::New(script_state->GetIsolate(), reason, 2);
951
952 // ii. Let cancelResult be ! ReadableStreamCancel(stream,
953 // compositeReason).
954 auto cancel_result = ReadableStream::Cancel(
955 script_state, engine_->stream_, composite_reason);
956
957 // iii. Resolve cancelPromise with cancelResult.
958 engine_->cancel_promise_->Resolve(script_state, cancel_result);
959 }
960 return engine_->cancel_promise_->V8Promise(isolate);
961 }
962
Trace(Visitor * visitor)963 void Trace(Visitor* visitor) override {
964 visitor->Trace(engine_);
965 StreamAlgorithm::Trace(visitor);
966 }
967
968 private:
969 Member<TeeEngine> engine_;
970 const int branch_;
971 };
972
Start(ScriptState * script_state,ReadableStream * stream,ExceptionState & exception_state)973 void ReadableStream::TeeEngine::Start(ScriptState* script_state,
974 ReadableStream* stream,
975 ExceptionState& exception_state) {
976 // https://streams.spec.whatwg.org/#readable-stream-tee
977 // 1. Assert: ! IsReadableStream(stream) is true.
978 DCHECK(stream);
979
980 // TODO(ricea): 2. Assert: Type(cloneForBranch2) is Boolean.
981
982 stream_ = stream;
983
984 // 3. Let reader be ? AcquireReadableStreamDefaultReader(stream).
985 reader_ = ReadableStream::AcquireDefaultReader(script_state, stream, false,
986 exception_state);
987 if (exception_state.HadException()) {
988 return;
989 }
990
991 // These steps are performed by the constructor:
992 // 4. Let closed be false.
993 DCHECK(!closed_);
994
995 // 5. Let canceled1 be false.
996 DCHECK(!canceled_[0]);
997
998 // 6. Let canceled2 be false.
999 DCHECK(!canceled_[1]);
1000
1001 // 7. Let reason1 be undefined.
1002 DCHECK(reason_[0].IsEmpty());
1003
1004 // 8. Let reason2 be undefined.
1005 DCHECK(reason_[1].IsEmpty());
1006
1007 // 9. Let branch1 be undefined.
1008 DCHECK(!branch_[0]);
1009
1010 // 10. Let branch2 be undefined.
1011 DCHECK(!branch_[1]);
1012
1013 // 11. Let cancelPromise be a new promise.
1014 cancel_promise_ = MakeGarbageCollected<StreamPromiseResolver>(script_state);
1015
1016 // 12. Let pullAlgorithm be the following steps:
1017 // (steps are defined in PullAlgorithm::Run()).
1018 auto* pull_algorithm = MakeGarbageCollected<PullAlgorithm>(this);
1019
1020 // 13. Let cancel1Algorithm be the following steps, taking a reason argument:
1021 // (see CancelAlgorithm::Run()).
1022 auto* cancel1_algorithm = MakeGarbageCollected<CancelAlgorithm>(this, 0);
1023
1024 // 14. Let cancel2Algorithm be the following steps, taking a reason argument:
1025 // (both algorithms share a single implementation).
1026 auto* cancel2_algorithm = MakeGarbageCollected<CancelAlgorithm>(this, 1);
1027
1028 // 15. Let startAlgorithm be an algorithm that returns undefined.
1029 auto* start_algorithm = CreateTrivialStartAlgorithm();
1030
1031 auto* size_algorithm = CreateDefaultSizeAlgorithm();
1032
1033 // 16. Set branch1 to ! CreateReadableStream(startAlgorithm, pullAlgorithm,
1034 // cancel1Algorithm).
1035 branch_[0] = ReadableStream::Create(script_state, start_algorithm,
1036 pull_algorithm, cancel1_algorithm, 1.0,
1037 size_algorithm, exception_state);
1038 if (exception_state.HadException()) {
1039 return;
1040 }
1041
1042 // 17. Set branch2 to ! CreateReadableStream(startAlgorithm, pullAlgorithm,
1043 // cancel2Algorithm).
1044 branch_[1] = ReadableStream::Create(script_state, start_algorithm,
1045 pull_algorithm, cancel2_algorithm, 1.0,
1046 size_algorithm, exception_state);
1047 if (exception_state.HadException()) {
1048 return;
1049 }
1050
1051 for (int branch = 0; branch < 2; ++branch) {
1052 controller_[branch] = branch_[branch]->readable_stream_controller_;
1053 }
1054
1055 class RejectFunction final : public PromiseHandler {
1056 public:
1057 RejectFunction(ScriptState* script_state, TeeEngine* engine)
1058 : PromiseHandler(script_state), engine_(engine) {}
1059
1060 void CallWithLocal(v8::Local<v8::Value> r) override {
1061 // 18. Upon rejection of reader.[[closedPromise]] with reason r,
1062 // a. Perform ! ReadableStreamDefaultControllerError(branch1.
1063 // [[readableStreamController]], r).
1064 ReadableStreamDefaultController::Error(GetScriptState(),
1065 engine_->controller_[0], r);
1066
1067 // b. Perform ! ReadableStreamDefaultControllerError(branch2.
1068 // [[readableStreamController]], r).
1069 ReadableStreamDefaultController::Error(GetScriptState(),
1070 engine_->controller_[1], r);
1071 }
1072
1073 void Trace(Visitor* visitor) override {
1074 visitor->Trace(engine_);
1075 PromiseHandler::Trace(visitor);
1076 }
1077
1078 private:
1079 Member<TeeEngine> engine_;
1080 };
1081
1082 // 18. Upon rejection of reader.[[closedPromise]] with reason r,
1083 StreamThenPromise(
1084 script_state->GetContext(),
1085 reader_->closed_promise_->V8Promise(script_state->GetIsolate()), nullptr,
1086 MakeGarbageCollected<RejectFunction>(script_state, this));
1087
1088 // Step "19. Return « branch1, branch2 »."
1089 // is performed by the caller.
1090 }
1091
Create(ScriptState * script_state,ExceptionState & exception_state)1092 ReadableStream* ReadableStream::Create(ScriptState* script_state,
1093 ExceptionState& exception_state) {
1094 return Create(script_state,
1095 ScriptValue(script_state->GetIsolate(),
1096 v8::Undefined(script_state->GetIsolate())),
1097 ScriptValue(script_state->GetIsolate(),
1098 v8::Undefined(script_state->GetIsolate())),
1099 exception_state);
1100 }
1101
Create(ScriptState * script_state,ScriptValue underlying_source,ExceptionState & exception_state)1102 ReadableStream* ReadableStream::Create(ScriptState* script_state,
1103 ScriptValue underlying_source,
1104 ExceptionState& exception_state) {
1105 return Create(script_state, underlying_source,
1106 ScriptValue(script_state->GetIsolate(),
1107 v8::Undefined(script_state->GetIsolate())),
1108 exception_state);
1109 }
1110
Create(ScriptState * script_state,ScriptValue underlying_source,ScriptValue strategy,ExceptionState & exception_state)1111 ReadableStream* ReadableStream::Create(ScriptState* script_state,
1112 ScriptValue underlying_source,
1113 ScriptValue strategy,
1114 ExceptionState& exception_state) {
1115 auto* stream = MakeGarbageCollected<ReadableStream>();
1116 stream->InitInternal(script_state, underlying_source, strategy, false,
1117 exception_state);
1118 if (exception_state.HadException()) {
1119 return nullptr;
1120 }
1121
1122 return stream;
1123 }
1124
CreateWithCountQueueingStrategy(ScriptState * script_state,UnderlyingSourceBase * underlying_source,size_t high_water_mark)1125 ReadableStream* ReadableStream::CreateWithCountQueueingStrategy(
1126 ScriptState* script_state,
1127 UnderlyingSourceBase* underlying_source,
1128 size_t high_water_mark) {
1129 auto* isolate = script_state->GetIsolate();
1130
1131 // It's safer to use a workalike rather than a real CountQueuingStrategy
1132 // object. We use the default "size" function as it is implemented in C++ and
1133 // so much faster than calling into JavaScript. Since the create object has a
1134 // null prototype, there is no danger of us finding some other "size" function
1135 // via the prototype chain.
1136 v8::Local<v8::Name> high_water_mark_string =
1137 V8AtomicString(isolate, "highWaterMark");
1138 v8::Local<v8::Value> high_water_mark_value =
1139 v8::Number::New(isolate, high_water_mark);
1140
1141 auto strategy_object =
1142 v8::Object::New(isolate, v8::Null(isolate), &high_water_mark_string,
1143 &high_water_mark_value, 1);
1144
1145 ExceptionState exception_state(script_state->GetIsolate(),
1146 ExceptionState::kConstructionContext,
1147 "ReadableStream");
1148
1149 v8::Local<v8::Value> underlying_source_v8 =
1150 ToV8(underlying_source, script_state);
1151
1152 auto* stream = MakeGarbageCollected<ReadableStream>();
1153 stream->InitInternal(
1154 script_state,
1155 ScriptValue(script_state->GetIsolate(), underlying_source_v8),
1156 ScriptValue(script_state->GetIsolate(), strategy_object), true,
1157 exception_state);
1158
1159 if (exception_state.HadException()) {
1160 exception_state.ClearException();
1161 DLOG(WARNING)
1162 << "Ignoring an exception in CreateWithCountQueuingStrategy().";
1163 }
1164
1165 return stream;
1166 }
1167
Create(ScriptState * script_state,StreamStartAlgorithm * start_algorithm,StreamAlgorithm * pull_algorithm,StreamAlgorithm * cancel_algorithm,double high_water_mark,StrategySizeAlgorithm * size_algorithm,ExceptionState & exception_state)1168 ReadableStream* ReadableStream::Create(ScriptState* script_state,
1169 StreamStartAlgorithm* start_algorithm,
1170 StreamAlgorithm* pull_algorithm,
1171 StreamAlgorithm* cancel_algorithm,
1172 double high_water_mark,
1173 StrategySizeAlgorithm* size_algorithm,
1174 ExceptionState& exception_state) {
1175 // https://streams.spec.whatwg.org/#create-readable-stream
1176 // All arguments are compulsory in this implementation, so the first two steps
1177 // are skipped:
1178 // 1. If highWaterMark was not passed, set it to 1.
1179 // 2. If sizeAlgorithm was not passed, set it to an algorithm that returns 1.
1180
1181 // 3. Assert: ! IsNonNegativeNumber(highWaterMark) is true.
1182 DCHECK_GE(high_water_mark, 0);
1183
1184 // 4. Let stream be ObjectCreate(the original value of ReadableStream's
1185 // prototype property).
1186 auto* stream = MakeGarbageCollected<ReadableStream>();
1187
1188 // 5. Perform ! InitializeReadableStream(stream).
1189 Initialize(stream);
1190
1191 // 6. Let controller be ObjectCreate(the original value of
1192 // ReadableStreamDefaultController's prototype property).
1193 auto* controller = MakeGarbageCollected<ReadableStreamDefaultController>();
1194
1195 // 7. Perform ? SetUpReadableStreamDefaultController(stream, controller,
1196 // startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark,
1197 // sizeAlgorithm).
1198 ReadableStreamDefaultController::SetUp(
1199 script_state, stream, controller, start_algorithm, pull_algorithm,
1200 cancel_algorithm, high_water_mark, size_algorithm, exception_state);
1201 if (exception_state.HadException()) {
1202 return nullptr;
1203 }
1204
1205 // 8. Return stream.
1206 return stream;
1207 }
1208
1209 ReadableStream::ReadableStream() = default;
1210
1211 ReadableStream::~ReadableStream() = default;
1212
locked(ScriptState * script_state,ExceptionState & exception_state) const1213 bool ReadableStream::locked(ScriptState* script_state,
1214 ExceptionState& exception_state) const {
1215 // https://streams.spec.whatwg.org/#rs-locked
1216 // 2. Return ! IsReadableStreamLocked(this).
1217 return IsLocked(this);
1218 }
1219
cancel(ScriptState * script_state,ExceptionState & exception_state)1220 ScriptPromise ReadableStream::cancel(ScriptState* script_state,
1221 ExceptionState& exception_state) {
1222 return cancel(script_state,
1223 ScriptValue(script_state->GetIsolate(),
1224 v8::Undefined(script_state->GetIsolate())),
1225 exception_state);
1226 }
1227
cancel(ScriptState * script_state,ScriptValue reason,ExceptionState & exception_state)1228 ScriptPromise ReadableStream::cancel(ScriptState* script_state,
1229 ScriptValue reason,
1230 ExceptionState& exception_state) {
1231 // https://streams.spec.whatwg.org/#rs-cancel
1232 // 2. If ! IsReadableStreamLocked(this) is true, return a promise rejected
1233 // with a TypeError exception.
1234 if (IsLocked(this)) {
1235 exception_state.ThrowTypeError("Cannot cancel a locked stream");
1236 return ScriptPromise();
1237 }
1238
1239 // 3. Return ! ReadableStreamCancel(this, reason).
1240 v8::Local<v8::Promise> result = Cancel(script_state, this, reason.V8Value());
1241 return ScriptPromise(script_state, result);
1242 }
1243
getReader(ScriptState * script_state,ExceptionState & exception_state)1244 ReadableStreamDefaultReader* ReadableStream::getReader(
1245 ScriptState* script_state,
1246 ExceptionState& exception_state) {
1247 // https://streams.spec.whatwg.org/#rs-get-reader
1248 // 2. If mode is undefined, return ? AcquireReadableStreamDefaultReader(this,
1249 // true).
1250 return AcquireDefaultReader(script_state, this, true, exception_state);
1251 }
1252
getReader(ScriptState * script_state,ScriptValue options,ExceptionState & exception_state)1253 ReadableStreamDefaultReader* ReadableStream::getReader(
1254 ScriptState* script_state,
1255 ScriptValue options,
1256 ExceptionState& exception_state) {
1257 // https://streams.spec.whatwg.org/#rs-get-reader
1258 // Since we don't support byob readers, the only thing
1259 // GetReaderValidateOptions() needs to do is throw an exception if
1260 // |options.mode| is invalid.
1261 GetReaderValidateOptions(script_state, options, exception_state);
1262 if (exception_state.HadException()) {
1263 return nullptr;
1264 }
1265
1266 return getReader(script_state, exception_state);
1267 }
1268
pipeThrough(ScriptState * script_state,ScriptValue transform_stream,ExceptionState & exception_state)1269 ScriptValue ReadableStream::pipeThrough(ScriptState* script_state,
1270 ScriptValue transform_stream,
1271 ExceptionState& exception_state) {
1272 return pipeThrough(script_state, transform_stream,
1273 ScriptValue(script_state->GetIsolate(),
1274 v8::Undefined(script_state->GetIsolate())),
1275 exception_state);
1276 }
1277
1278 // https://streams.spec.whatwg.org/#rs-pipe-through
pipeThrough(ScriptState * script_state,ScriptValue transform_stream,ScriptValue options,ExceptionState & exception_state)1279 ScriptValue ReadableStream::pipeThrough(ScriptState* script_state,
1280 ScriptValue transform_stream,
1281 ScriptValue options,
1282 ExceptionState& exception_state) {
1283 // https://streams.spec.whatwg.org/#rs-pipe-through
1284 // The first part of this function implements the unpacking of the {readable,
1285 // writable} argument to the method.
1286 v8::Local<v8::Value> pair_value = transform_stream.V8Value();
1287 v8::Local<v8::Context> context = script_state->GetContext();
1288
1289 constexpr char kWritableIsNotWritableStream[] =
1290 "parameter 1's 'writable' property is not a WritableStream.";
1291 constexpr char kReadableIsNotReadableStream[] =
1292 "parameter 1's 'readable' property is not a ReadableStream.";
1293 constexpr char kWritableIsLocked[] = "parameter 1's 'writable' is locked.";
1294
1295 v8::Local<v8::Object> pair;
1296 if (!pair_value->ToObject(context).ToLocal(&pair)) {
1297 exception_state.ThrowTypeError(kWritableIsNotWritableStream);
1298 return ScriptValue();
1299 }
1300
1301 v8::Isolate* isolate = script_state->GetIsolate();
1302 v8::Local<v8::Value> writable, readable;
1303 {
1304 v8::TryCatch block(isolate);
1305 if (!pair->Get(context, V8String(isolate, "writable")).ToLocal(&writable)) {
1306 exception_state.RethrowV8Exception(block.Exception());
1307 return ScriptValue();
1308 }
1309 DCHECK(!block.HasCaught());
1310
1311 if (!pair->Get(context, V8String(isolate, "readable")).ToLocal(&readable)) {
1312 exception_state.RethrowV8Exception(block.Exception());
1313 return ScriptValue();
1314 }
1315 DCHECK(!block.HasCaught());
1316 }
1317
1318 // 2. If ! IsWritableStream(_writable_) is *false*, throw a *TypeError*
1319 // exception.
1320 WritableStream* writable_stream =
1321 V8WritableStream::ToImplWithTypeCheck(isolate, writable);
1322 if (!writable_stream) {
1323 exception_state.ThrowTypeError(kWritableIsNotWritableStream);
1324 return ScriptValue();
1325 }
1326
1327 // 3. If ! IsReadableStream(_readable_) is *false*, throw a *TypeError*
1328 // exception.
1329 if (!V8ReadableStream::HasInstance(readable, isolate)) {
1330 exception_state.ThrowTypeError(kReadableIsNotReadableStream);
1331 return ScriptValue();
1332 }
1333
1334 // 4. Set preventClose to ! ToBoolean(preventClose), set preventAbort to !
1335 // ToBoolean(preventAbort), and set preventCancel to !
1336 // ToBoolean(preventCancel).
1337
1338 // 5. If signal is not undefined, and signal is not an instance of the
1339 // AbortSignal interface, throw a TypeError exception.
1340 auto* pipe_options =
1341 MakeGarbageCollected<PipeOptions>(script_state, options, exception_state);
1342 if (exception_state.HadException()) {
1343 return ScriptValue();
1344 }
1345
1346 // 6. If ! IsReadableStreamLocked(*this*) is *true*, throw a *TypeError*
1347 // exception.
1348 if (IsLocked(this)) {
1349 exception_state.ThrowTypeError("Cannot pipe a locked stream");
1350 return ScriptValue();
1351 }
1352
1353 // 7. If ! IsWritableStreamLocked(_writable_) is *true*, throw a *TypeError*
1354 // exception.
1355 if (WritableStream::IsLocked(writable_stream)) {
1356 exception_state.ThrowTypeError(kWritableIsLocked);
1357 return ScriptValue();
1358 }
1359
1360 // 8. Let _promise_ be ! ReadableStreamPipeTo(*this*, _writable_,
1361 // _preventClose_, _preventAbort_, _preventCancel_,
1362 // _signal_).
1363
1364 ScriptPromise promise =
1365 PipeTo(script_state, this, writable_stream, pipe_options);
1366
1367 // 9. Set _promise_.[[PromiseIsHandled]] to *true*.
1368 promise.MarkAsHandled();
1369
1370 // 10. Return _readable_.
1371 return ScriptValue(script_state->GetIsolate(), readable);
1372 }
1373
pipeTo(ScriptState * script_state,ScriptValue destination,ExceptionState & exception_state)1374 ScriptPromise ReadableStream::pipeTo(ScriptState* script_state,
1375 ScriptValue destination,
1376 ExceptionState& exception_state) {
1377 return pipeTo(script_state, destination,
1378 ScriptValue(script_state->GetIsolate(),
1379 v8::Undefined(script_state->GetIsolate())),
1380 exception_state);
1381 }
1382
pipeTo(ScriptState * script_state,ScriptValue destination_value,ScriptValue options,ExceptionState & exception_state)1383 ScriptPromise ReadableStream::pipeTo(ScriptState* script_state,
1384 ScriptValue destination_value,
1385 ScriptValue options,
1386 ExceptionState& exception_state) {
1387 // https://streams.spec.whatwg.org/#rs-pipe-to
1388 // 2. If ! IsWritableStream(dest) is false, return a promise rejected with a
1389 // TypeError exception.
1390 // TODO(ricea): Do this in the IDL instead.
1391 WritableStream* destination = V8WritableStream::ToImplWithTypeCheck(
1392 script_state->GetIsolate(), destination_value.V8Value());
1393
1394 if (!destination) {
1395 exception_state.ThrowTypeError("Illegal invocation");
1396 return ScriptPromise();
1397 }
1398
1399 // 3. Set preventClose to ! ToBoolean(preventClose), set preventAbort to !
1400 // ToBoolean(preventAbort), and set preventCancel to !
1401 // ToBoolean(preventCancel).
1402 // 4. If signal is not undefined, and signal is not an instance of the
1403 // AbortSignal interface, return a promise rejected with a TypeError
1404 // exception.
1405 auto* pipe_options =
1406 MakeGarbageCollected<PipeOptions>(script_state, options, exception_state);
1407 if (exception_state.HadException()) {
1408 return ScriptPromise();
1409 }
1410
1411 // 5. If ! IsReadableStreamLocked(this) is true, return a promise rejected
1412 // with a TypeError exception.
1413 if (IsLocked(this)) {
1414 exception_state.ThrowTypeError("Cannot pipe a locked stream");
1415 return ScriptPromise();
1416 }
1417
1418 // 6. If ! IsWritableStreamLocked(dest) is true, return a promise rejected
1419 // with a TypeError exception.
1420 if (WritableStream::IsLocked(destination)) {
1421 exception_state.ThrowTypeError("Cannot pipe to a locked stream");
1422 return ScriptPromise();
1423 }
1424
1425 return PipeTo(script_state, this, destination, pipe_options);
1426 }
1427
tee(ScriptState * script_state,ExceptionState & exception_state)1428 ScriptValue ReadableStream::tee(ScriptState* script_state,
1429 ExceptionState& exception_state) {
1430 return CallTeeAndReturnBranchArray(script_state, this, exception_state);
1431 }
1432
1433 // Unlike in the standard, this is defined as a separate method from the
1434 // constructor. This prevents problems when garbage collection happens
1435 // re-entrantly during construction.
InitInternal(ScriptState * script_state,ScriptValue raw_underlying_source,ScriptValue raw_strategy,bool created_by_ua,ExceptionState & exception_state)1436 void ReadableStream::InitInternal(ScriptState* script_state,
1437 ScriptValue raw_underlying_source,
1438 ScriptValue raw_strategy,
1439 bool created_by_ua,
1440 ExceptionState& exception_state) {
1441 if (!created_by_ua) {
1442 // TODO(ricea): Move this to IDL once blink::ReadableStreamOperations is
1443 // no longer using the public constructor.
1444 UseCounter::Count(ExecutionContext::From(script_state),
1445 WebFeature::kReadableStreamConstructor);
1446 }
1447
1448 // https://streams.spec.whatwg.org/#rs-constructor
1449 // 1. Perform ! InitializeReadableStream(this).
1450 Initialize(this);
1451
1452 // The next part of this constructor corresponds to the object conversions
1453 // that are implicit in the definition in the standard.
1454 DCHECK(!raw_underlying_source.IsEmpty());
1455 DCHECK(!raw_strategy.IsEmpty());
1456
1457 auto context = script_state->GetContext();
1458 auto* isolate = script_state->GetIsolate();
1459
1460 v8::Local<v8::Object> underlying_source;
1461 ScriptValueToObject(script_state, raw_underlying_source, &underlying_source,
1462 exception_state);
1463 if (exception_state.HadException()) {
1464 return;
1465 }
1466
1467 // 2. Let size be ? GetV(strategy, "size").
1468 // 3. Let highWaterMark be ? GetV(strategy, "highWaterMark").
1469 StrategyUnpacker strategy_unpacker(script_state, raw_strategy,
1470 exception_state);
1471 if (exception_state.HadException()) {
1472 return;
1473 }
1474
1475 // 4. Let type be ? GetV(underlyingSource, "type").
1476 v8::TryCatch try_catch(isolate);
1477 v8::Local<v8::Value> type;
1478 if (!underlying_source->Get(context, V8AtomicString(isolate, "type"))
1479 .ToLocal(&type)) {
1480 exception_state.RethrowV8Exception(try_catch.Exception());
1481 return;
1482 }
1483
1484 if (!type->IsUndefined()) {
1485 // 5. Let typeString be ? ToString(type).
1486 v8::Local<v8::String> type_string;
1487 if (!type->ToString(context).ToLocal(&type_string)) {
1488 exception_state.RethrowV8Exception(try_catch.Exception());
1489 return;
1490 }
1491
1492 // 6. If typeString is "bytes",
1493 if (type_string == V8AtomicString(isolate, "bytes")) {
1494 // TODO(ricea): Implement bytes type.
1495 exception_state.ThrowRangeError("bytes type is not yet implemented");
1496 return;
1497 }
1498
1499 // 8. Otherwise, throw a RangeError exception.
1500 exception_state.ThrowRangeError("Invalid type is specified");
1501 return;
1502 }
1503
1504 // 7. Otherwise, if type is undefined,
1505 // a. Let sizeAlgorithm be ? MakeSizeAlgorithmFromSizeFunction(size).
1506 auto* size_algorithm =
1507 strategy_unpacker.MakeSizeAlgorithm(script_state, exception_state);
1508 if (exception_state.HadException()) {
1509 return;
1510 }
1511 DCHECK(size_algorithm);
1512
1513 // b. If highWaterMark is undefined, let highWaterMark be 1.
1514 // c. Set highWaterMark to ? ValidateAndNormalizeHighWaterMark(
1515 // highWaterMark).
1516 double high_water_mark =
1517 strategy_unpacker.GetHighWaterMark(script_state, 1, exception_state);
1518 if (exception_state.HadException()) {
1519 return;
1520 }
1521
1522 // 4. Perform ? SetUpReadableStreamDefaultControllerFromUnderlyingSource
1523 // (this, underlyingSource, highWaterMark, sizeAlgorithm).
1524 ReadableStreamDefaultController::SetUpFromUnderlyingSource(
1525 script_state, this, underlying_source, high_water_mark, size_algorithm,
1526 exception_state);
1527 }
1528
1529 //
1530 // Readable stream abstract operations
1531 //
AcquireDefaultReader(ScriptState * script_state,ReadableStream * stream,bool for_author_code,ExceptionState & exception_state)1532 ReadableStreamReader* ReadableStream::AcquireDefaultReader(
1533 ScriptState* script_state,
1534 ReadableStream* stream,
1535 bool for_author_code,
1536 ExceptionState& exception_state) {
1537 // https://streams.spec.whatwg.org/#acquire-readable-stream-reader
1538 // for_author_code is compulsory in this implementation
1539 // 1. If forAuthorCode was not passed, set it to false.
1540
1541 // 2. Let reader be ? Construct(ReadableStreamDefaultReader, « stream »).
1542 auto* reader = MakeGarbageCollected<ReadableStreamReader>(
1543 script_state, stream, exception_state);
1544 if (exception_state.HadException()) {
1545 return nullptr;
1546 }
1547
1548 // 3. Set reader.[[forAuthorCode]] to forAuthorCode.
1549 reader->for_author_code_ = for_author_code;
1550
1551 // 4. Return reader.
1552 return reader;
1553 }
1554
Initialize(ReadableStream * stream)1555 void ReadableStream::Initialize(ReadableStream* stream) {
1556 // Fields are initialised by the constructor, so we only check that they were
1557 // initialised correctly.
1558 // https://streams.spec.whatwg.org/#initialize-readable-stream
1559 // 1. Set stream.[[state]] to "readable".
1560 CHECK_EQ(stream->state_, kReadable);
1561 // 2. Set stream.[[reader]] and stream.[[storedError]] to undefined.
1562 DCHECK(!stream->reader_);
1563 DCHECK(stream->stored_error_.IsEmpty());
1564 // 3. Set stream.[[disturbed]] to false.
1565 DCHECK(!stream->is_disturbed_);
1566 }
1567
1568 // TODO(domenic): cloneForBranch2 argument from spec not supported yet
Tee(ScriptState * script_state,ReadableStream ** branch1,ReadableStream ** branch2,ExceptionState & exception_state)1569 void ReadableStream::Tee(ScriptState* script_state,
1570 ReadableStream** branch1,
1571 ReadableStream** branch2,
1572 ExceptionState& exception_state) {
1573 auto* engine = MakeGarbageCollected<TeeEngine>();
1574 engine->Start(script_state, this, exception_state);
1575 if (exception_state.HadException()) {
1576 return;
1577 }
1578
1579 // Instead of returning a List like ReadableStreamTee in the standard, the
1580 // branches are returned via output parameters.
1581 *branch1 = engine->Branch1();
1582 *branch2 = engine->Branch2();
1583 }
1584
LockAndDisturb(ScriptState * script_state,ExceptionState & exception_state)1585 void ReadableStream::LockAndDisturb(ScriptState* script_state,
1586 ExceptionState& exception_state) {
1587 ScriptState::Scope scope(script_state);
1588
1589 if (reader_) {
1590 return;
1591 }
1592
1593 ReadableStreamReader* reader =
1594 AcquireDefaultReader(script_state, this, false, exception_state);
1595 if (!reader) {
1596 return;
1597 }
1598
1599 is_disturbed_ = true;
1600 }
1601
Serialize(ScriptState * script_state,MessagePort * port,ExceptionState & exception_state)1602 void ReadableStream::Serialize(ScriptState* script_state,
1603 MessagePort* port,
1604 ExceptionState& exception_state) {
1605 if (IsLocked(this)) {
1606 exception_state.ThrowTypeError("Cannot transfer a locked stream");
1607 return;
1608 }
1609
1610 auto* writable =
1611 CreateCrossRealmTransformWritable(script_state, port, exception_state);
1612 if (exception_state.HadException()) {
1613 return;
1614 }
1615
1616 auto promise =
1617 PipeTo(script_state, this, writable, MakeGarbageCollected<PipeOptions>());
1618 promise.MarkAsHandled();
1619 }
1620
Deserialize(ScriptState * script_state,MessagePort * port,ExceptionState & exception_state)1621 ReadableStream* ReadableStream::Deserialize(ScriptState* script_state,
1622 MessagePort* port,
1623 ExceptionState& exception_state) {
1624 // We need to execute JavaScript to call "Then" on v8::Promises. We will not
1625 // run author code.
1626 v8::Isolate::AllowJavascriptExecutionScope allow_js(
1627 script_state->GetIsolate());
1628 auto* readable =
1629 CreateCrossRealmTransformReadable(script_state, port, exception_state);
1630 if (exception_state.HadException()) {
1631 return nullptr;
1632 }
1633 return readable;
1634 }
1635
GetReaderNotForAuthorCode(ScriptState * script_state,ExceptionState & exception_state)1636 ReadableStreamDefaultReader* ReadableStream::GetReaderNotForAuthorCode(
1637 ScriptState* script_state,
1638 ExceptionState& exception_state) {
1639 return AcquireDefaultReader(script_state, this, false, exception_state);
1640 }
1641
PipeTo(ScriptState * script_state,ReadableStream * readable,WritableStream * destination,PipeOptions * pipe_options)1642 ScriptPromise ReadableStream::PipeTo(ScriptState* script_state,
1643 ReadableStream* readable,
1644 WritableStream* destination,
1645 PipeOptions* pipe_options) {
1646 auto* engine = MakeGarbageCollected<PipeToEngine>(script_state, pipe_options);
1647 return engine->Start(readable, destination);
1648 }
1649
GetStoredError(v8::Isolate * isolate) const1650 v8::Local<v8::Value> ReadableStream::GetStoredError(
1651 v8::Isolate* isolate) const {
1652 return stored_error_.NewLocal(isolate);
1653 }
1654
Trace(Visitor * visitor)1655 void ReadableStream::Trace(Visitor* visitor) {
1656 visitor->Trace(readable_stream_controller_);
1657 visitor->Trace(reader_);
1658 visitor->Trace(stored_error_);
1659 ScriptWrappable::Trace(visitor);
1660 }
1661
1662 //
1663 // Abstract Operations Used By Controllers
1664 //
1665
AddReadRequest(ScriptState * script_state,ReadableStream * stream)1666 StreamPromiseResolver* ReadableStream::AddReadRequest(ScriptState* script_state,
1667 ReadableStream* stream) {
1668 // https://streams.spec.whatwg.org/#readable-stream-add-read-request
1669 // 1. Assert: ! IsReadableStreamDefaultReader(stream.[[reader]]) is true.
1670 DCHECK(stream->reader_);
1671
1672 // 2. Assert: stream.[[state]] is "readable".
1673 CHECK_EQ(stream->state_, kReadable);
1674
1675 // 3. Let promise be a new promise.
1676 auto* promise = MakeGarbageCollected<StreamPromiseResolver>(script_state);
1677
1678 // This implementation stores promises directly in |read_requests_| rather
1679 // than wrapping them in a Record.
1680 // 4. Let readRequest be Record {[[promise]]: promise}.
1681 // 5. Append readRequest as the last element of stream.[[reader]].
1682 // [[readRequests]].
1683 stream->reader_->read_requests_.push_back(promise);
1684
1685 // 6. Return promise.
1686 return promise;
1687 }
1688
Cancel(ScriptState * script_state,ReadableStream * stream,v8::Local<v8::Value> reason)1689 v8::Local<v8::Promise> ReadableStream::Cancel(ScriptState* script_state,
1690 ReadableStream* stream,
1691 v8::Local<v8::Value> reason) {
1692 // https://streams.spec.whatwg.org/#readable-stream-cancel
1693 // 1. Set stream.[[disturbed]] to true.
1694 stream->is_disturbed_ = true;
1695
1696 // 2. If stream.[[state]] is "closed", return a promise resolved with
1697 // undefined.
1698 const auto state = stream->state_;
1699 if (state == kClosed) {
1700 return PromiseResolveWithUndefined(script_state);
1701 }
1702
1703 // 3. If stream.[[state]] is "errored", return a promise rejected with stream.
1704 // [[storedError]].
1705 if (state == kErrored) {
1706 return PromiseReject(script_state,
1707 stream->GetStoredError(script_state->GetIsolate()));
1708 }
1709
1710 // 4. Perform ! ReadableStreamClose(stream).
1711 Close(script_state, stream);
1712
1713 // 5. Let sourceCancelPromise be ! stream.[[readableStreamController]].
1714 // [[CancelSteps]](reason).
1715 v8::Local<v8::Promise> source_cancel_promise =
1716 stream->readable_stream_controller_->CancelSteps(script_state, reason);
1717
1718 class ReturnUndefinedFunction final : public PromiseHandler {
1719 public:
1720 explicit ReturnUndefinedFunction(ScriptState* script_state)
1721 : PromiseHandler(script_state) {}
1722
1723 // The method does nothing; the default value of undefined is returned to
1724 // JavaScript.
1725 void CallWithLocal(v8::Local<v8::Value>) override {}
1726 };
1727
1728 // 6. Return the result of transforming sourceCancelPromise with a
1729 // fulfillment handler that returns undefined.
1730 return StreamThenPromise(
1731 script_state->GetContext(), source_cancel_promise,
1732 MakeGarbageCollected<ReturnUndefinedFunction>(script_state));
1733 }
1734
Close(ScriptState * script_state,ReadableStream * stream)1735 void ReadableStream::Close(ScriptState* script_state, ReadableStream* stream) {
1736 // https://streams.spec.whatwg.org/#readable-stream-close
1737 // 1. Assert: stream.[[state]] is "readable".
1738 CHECK_EQ(stream->state_, kReadable);
1739
1740 // 2. Set stream.[[state]] to "closed".
1741 stream->state_ = kClosed;
1742
1743 // 3. Let reader be stream.[[reader]].
1744 ReadableStreamReader* reader = stream->reader_;
1745
1746 // 4. If reader is undefined, return.
1747 if (!reader) {
1748 return;
1749 }
1750
1751 // TODO(ricea): Support BYOB readers.
1752 // 5. If ! IsReadableStreamDefaultReader(reader) is true,
1753 // a. Repeat for each readRequest that is an element of reader.
1754 // [[readRequests]],
1755 HeapDeque<Member<StreamPromiseResolver>> requests;
1756 requests.Swap(reader->read_requests_);
1757 for (StreamPromiseResolver* promise : requests) {
1758 // i. Resolve readRequest.[[promise]] with !
1759 // ReadableStreamCreateReadResult(undefined, true, reader.
1760 // [[forAuthorCode]]).
1761 promise->Resolve(script_state,
1762 CreateReadResult(script_state,
1763 v8::Undefined(script_state->GetIsolate()),
1764 true, reader->for_author_code_));
1765 }
1766
1767 // b. Set reader.[[readRequests]] to an empty List.
1768 // This is not required since we've already called Swap().
1769
1770 // 6. Resolve reader.[[closedPromise]] with undefined.
1771 reader->closed_promise_->ResolveWithUndefined(script_state);
1772 }
1773
CreateReadResult(ScriptState * script_state,v8::Local<v8::Value> value,bool done,bool for_author_code)1774 v8::Local<v8::Value> ReadableStream::CreateReadResult(
1775 ScriptState* script_state,
1776 v8::Local<v8::Value> value,
1777 bool done,
1778 bool for_author_code) {
1779 // https://streams.spec.whatwg.org/#readable-stream-create-read-result
1780 auto* isolate = script_state->GetIsolate();
1781 auto context = script_state->GetContext();
1782 auto value_string = V8AtomicString(isolate, "value");
1783 auto done_string = V8AtomicString(isolate, "done");
1784 auto done_value = v8::Boolean::New(isolate, done);
1785 // 1. Let prototype be null.
1786 // 2. If forAuthorCode is true, set prototype to %ObjectPrototype%.
1787 // This implementation doesn't use a |prototype| variable, instead using
1788 // different code paths depending on the value of |for_author_code|.
1789 if (for_author_code) {
1790 // 4. Let obj be ObjectCreate(prototype).
1791 auto obj = v8::Object::New(isolate);
1792
1793 // 5. Perform CreateDataProperty(obj, "value", value).
1794 obj->CreateDataProperty(context, value_string, value).Check();
1795
1796 // 6. Perform CreateDataProperty(obj, "done", done).
1797 obj->CreateDataProperty(context, done_string, done_value).Check();
1798
1799 // 7. Return obj.
1800 return obj;
1801 }
1802
1803 // When |for_author_code| is false, we can perform all the steps in a single
1804 // call to V8.
1805
1806 // 4. Let obj be ObjectCreate(prototype).
1807 // 5. Perform CreateDataProperty(obj, "value", value).
1808 // 6. Perform CreateDataProperty(obj, "done", done).
1809 // 7. Return obj.
1810 // TODO(ricea): Is it possible to use this optimised API in both cases?
1811 v8::Local<v8::Name> names[2] = {value_string, done_string};
1812 v8::Local<v8::Value> values[2] = {value, done_value};
1813
1814 static_assert(base::size(names) == base::size(values),
1815 "names and values arrays must be the same size");
1816 return v8::Object::New(isolate, v8::Null(isolate), names, values,
1817 base::size(names));
1818 }
1819
Error(ScriptState * script_state,ReadableStream * stream,v8::Local<v8::Value> e)1820 void ReadableStream::Error(ScriptState* script_state,
1821 ReadableStream* stream,
1822 v8::Local<v8::Value> e) {
1823 // https://streams.spec.whatwg.org/#readable-stream-error
1824 // 2. Assert: stream.[[state]] is "readable".
1825 CHECK_EQ(stream->state_, kReadable);
1826 auto* isolate = script_state->GetIsolate();
1827
1828 // 3. Set stream.[[state]] to "errored".
1829 stream->state_ = kErrored;
1830
1831 // 4. Set stream.[[storedError]] to e.
1832 stream->stored_error_.Set(isolate, e);
1833
1834 // 5. Let reader be stream.[[reader]].
1835 ReadableStreamReader* reader = stream->reader_;
1836
1837 // 6. If reader is undefined, return.
1838 if (!reader) {
1839 return;
1840 }
1841
1842 // 7. If ! IsReadableStreamDefaultReader(reader) is true,
1843 // TODO(ricea): Support BYOB readers.
1844 // a. Repeat for each readRequest that is an element of reader.
1845 // [[readRequests]],
1846 for (StreamPromiseResolver* promise : reader->read_requests_) {
1847 // i. Reject readRequest.[[promise]] with e.
1848 promise->Reject(script_state, e);
1849 }
1850
1851 // b. Set reader.[[readRequests]] to a new empty List.
1852 reader->read_requests_.clear();
1853
1854 // 9. Reject reader.[[closedPromise]] with e.
1855 reader->closed_promise_->Reject(script_state, e);
1856
1857 // 10. Set reader.[[closedPromise]].[[PromiseIsHandled]] to true.
1858 reader->closed_promise_->MarkAsHandled(isolate);
1859 }
1860
FulfillReadRequest(ScriptState * script_state,ReadableStream * stream,v8::Local<v8::Value> chunk,bool done)1861 void ReadableStream::FulfillReadRequest(ScriptState* script_state,
1862 ReadableStream* stream,
1863 v8::Local<v8::Value> chunk,
1864 bool done) {
1865 // https://streams.spec.whatwg.org/#readable-stream-fulfill-read-request
1866 // 1. Let reader be stream.[[reader]].
1867 ReadableStreamReader* reader = stream->reader_;
1868
1869 // 2. Let readRequest be the first element of reader.[[readRequests]].
1870 StreamPromiseResolver* read_request = reader->read_requests_.front();
1871
1872 // 3. Remove readIntoRequest from reader.[[readIntoRequests]], shifting all
1873 // other elements downward (so that the second becomes the first, and so
1874 // on).
1875 reader->read_requests_.pop_front();
1876
1877 // 4. Resolve readIntoRequest.[[promise]] with !
1878 // ReadableStreamCreateReadResult(chunk, done, reader.[[forAuthorCode]]).
1879 read_request->Resolve(
1880 script_state, ReadableStream::CreateReadResult(script_state, chunk, done,
1881 reader->for_author_code_));
1882 }
1883
GetNumReadRequests(const ReadableStream * stream)1884 int ReadableStream::GetNumReadRequests(const ReadableStream* stream) {
1885 // https://streams.spec.whatwg.org/#readable-stream-get-num-read-requests
1886 // 1. Return the number of elements in stream.[[reader]].[[readRequests]].
1887 return stream->reader_->read_requests_.size();
1888 }
1889
1890 //
1891 // TODO(ricea): Functions for transferable streams.
1892 //
1893
GetReaderValidateOptions(ScriptState * script_state,ScriptValue options,ExceptionState & exception_state)1894 void ReadableStream::GetReaderValidateOptions(ScriptState* script_state,
1895 ScriptValue options,
1896 ExceptionState& exception_state) {
1897 // https://streams.spec.whatwg.org/#rs-get-reader
1898 // The unpacking of |options| is indicated as part of the signature of the
1899 // function in the standard.
1900 v8::TryCatch block(script_state->GetIsolate());
1901 v8::Local<v8::Value> mode;
1902 v8::Local<v8::String> mode_string;
1903 v8::Local<v8::Context> context = script_state->GetContext();
1904 if (options.V8Value()->IsUndefined()) {
1905 mode = v8::Undefined(script_state->GetIsolate());
1906 } else {
1907 v8::Local<v8::Object> v8_options;
1908 if (!options.V8Value()->ToObject(context).ToLocal(&v8_options)) {
1909 exception_state.RethrowV8Exception(block.Exception());
1910 return;
1911 }
1912 if (!v8_options->Get(context, V8String(script_state->GetIsolate(), "mode"))
1913 .ToLocal(&mode)) {
1914 exception_state.RethrowV8Exception(block.Exception());
1915 return;
1916 }
1917 }
1918
1919 // 3. Set mode to ? ToString(mode).
1920 if (!mode->ToString(context).ToLocal(&mode_string)) {
1921 exception_state.RethrowV8Exception(block.Exception());
1922 return;
1923 }
1924
1925 // 4. If mode is "byob", return ? AcquireReadableStreamBYOBReader(this, true).
1926 if (ToCoreString(mode_string) == "byob") {
1927 // TODO(ricea): Support BYOB readers.
1928 exception_state.ThrowTypeError("invalid mode");
1929 return;
1930 }
1931
1932 if (!mode->IsUndefined()) {
1933 // 5. Throw a RangeError exception.
1934 exception_state.ThrowRangeError("invalid mode");
1935 return;
1936 }
1937 }
1938
CallTeeAndReturnBranchArray(ScriptState * script_state,ReadableStream * readable,ExceptionState & exception_state)1939 ScriptValue ReadableStream::CallTeeAndReturnBranchArray(
1940 ScriptState* script_state,
1941 ReadableStream* readable,
1942 ExceptionState& exception_state) {
1943 // https://streams.spec.whatwg.org/#rs-tee
1944 v8::Isolate* isolate = script_state->GetIsolate();
1945 ReadableStream* branch1 = nullptr;
1946 ReadableStream* branch2 = nullptr;
1947
1948 // 2. Let branches be ? ReadableStreamTee(this, false).
1949 readable->Tee(script_state, &branch1, &branch2, exception_state);
1950
1951 if (!branch1 || !branch2)
1952 return ScriptValue();
1953
1954 DCHECK(!exception_state.HadException());
1955
1956 // 3. Return ! CreateArrayFromList(branches).
1957 v8::TryCatch block(isolate);
1958 v8::Local<v8::Context> context = script_state->GetContext();
1959 v8::Local<v8::Array> array = v8::Array::New(isolate, 2);
1960 v8::Local<v8::Object> global = context->Global();
1961
1962 v8::Local<v8::Value> v8_branch1 = ToV8(branch1, global, isolate);
1963 if (v8_branch1.IsEmpty()) {
1964 exception_state.RethrowV8Exception(block.Exception());
1965 return ScriptValue();
1966 }
1967 v8::Local<v8::Value> v8_branch2 = ToV8(branch2, global, isolate);
1968 if (v8_branch1.IsEmpty()) {
1969 exception_state.RethrowV8Exception(block.Exception());
1970 return ScriptValue();
1971 }
1972 if (array->Set(context, V8String(isolate, "0"), v8_branch1).IsNothing()) {
1973 exception_state.RethrowV8Exception(block.Exception());
1974 return ScriptValue();
1975 }
1976 if (array->Set(context, V8String(isolate, "1"), v8_branch2).IsNothing()) {
1977 exception_state.RethrowV8Exception(block.Exception());
1978 return ScriptValue();
1979 }
1980 return ScriptValue(script_state->GetIsolate(), array);
1981 }
1982
1983 } // namespace blink
1984