1 // Copyright 2020 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef THIRD_PARTY_BLINK_RENDERER_MODULES_WEBTRANSPORT_INCOMING_STREAM_H_
6 #define THIRD_PARTY_BLINK_RENDERER_MODULES_WEBTRANSPORT_INCOMING_STREAM_H_
7 
8 #include <stdint.h>
9 
10 #include "base/callback.h"
11 #include "base/logging.h"
12 #include "base/optional.h"
13 #include "base/util/type_safety/strong_alias.h"
14 #include "mojo/public/cpp/system/data_pipe.h"
15 #include "mojo/public/cpp/system/simple_watcher.h"
16 #include "third_party/blink/renderer/bindings/core/v8/script_promise.h"
17 #include "third_party/blink/renderer/bindings/core/v8/script_promise_resolver.h"
18 #include "third_party/blink/renderer/bindings/core/v8/script_value.h"
19 #include "third_party/blink/renderer/modules/modules_export.h"
20 #include "third_party/blink/renderer/platform/heap/thread_state.h"
21 
22 namespace blink {
23 
24 class ScriptState;
25 class StreamAbortInfo;
26 class ReadableStream;
27 class ReadableStreamDefaultControllerWithScriptScope;
28 class Visitor;
29 
30 // Implementation of the IncomingStream mixin from the standard:
31 // https://wicg.github.io/web-transport/#incoming-stream. ReceiveStream and
32 // BidirectionalStream delegate to this to implement the functionality.
33 class MODULES_EXPORT IncomingStream final
34     : public GarbageCollected<IncomingStream> {
35   USING_PRE_FINALIZER(IncomingStream, Dispose);
36 
37  public:
38   enum class State {
39     kOpen,
40     kAborted,
41     kClosed,
42   };
43 
44   IncomingStream(ScriptState*,
45                  base::OnceClosure on_abort,
46                  mojo::ScopedDataPipeConsumerHandle);
47   ~IncomingStream();
48 
49   // Init() must be called before the stream is used.
50   void Init();
51 
52   // Methods from the IncomingStream IDL:
53   // https://wicg.github.io/web-transport/#incoming-stream
Readable()54   ReadableStream* Readable() const {
55     DVLOG(1) << "IncomingStream::readable() called";
56 
57     return readable_;
58   }
59 
ReadingAborted()60   ScriptPromise ReadingAborted() const { return reading_aborted_; }
61 
62   void AbortReading(StreamAbortInfo*);
63 
64   // Called from QuicTransport via a WebTransportStream class. May execute
65   // JavaScript.
66   void OnIncomingStreamClosed(bool fin_received);
67 
68   // Called via QuicTransport via a WebTransportStream class. Expects a
69   // JavaScript scope to have been entered.
70   void Reset();
71 
72   // Called from QuicTransport rather than using
73   // ExecutionContextLifecycleObserver to ensure correct destruction order.
74   // Does not execute JavaScript.
75   void ContextDestroyed();
76 
GetState()77   State GetState() const { return state_; }
78 
79   void Trace(Visitor*) const;
80 
81  private:
82   class UnderlyingSource;
83 
84   using IsLocalAbort = util::StrongAlias<class IsLocalAbortTag, bool>;
85 
86   // Called when |data_pipe_| becomes readable or errored.
87   void OnHandleReady(MojoResult, const mojo::HandleSignalsState&);
88 
89   // Called when |data_pipe_| is closed.
90   void OnPeerClosed(MojoResult, const mojo::HandleSignalsState&);
91 
92   // Rejects any unfinished read() calls and resets |data_pipe_|.
93   void HandlePipeClosed();
94 
95   // Handles a remote close appropriately for the value of |fin_received_|.
96   void ProcessClose();
97 
98   // Reads all the data currently in the pipe and enqueues it. If no data is
99   // currently available, triggers the |read_watcher_| and enqueues when data
100   // becomes available.
101   void ReadFromPipeAndEnqueue();
102 
103   // Copies a sequence of bytes into an ArrayBuffer and enqueues it.
104   void EnqueueBytes(const void* source, uint32_t byte_length);
105 
106   // Creates a DOMException indicating that the stream has been aborted.
107   // If IsLocalAbort it true it will indicate a locally-initiated abort,
108   // otherwise it will indicate a server--initiated abort.
109   ScriptValue CreateAbortException(IsLocalAbort);
110 
111   // Closes |readable_|, resolves |reading_aborted_| and resets |data_pipe_|.
112   void CloseAbortAndReset();
113 
114   // Errors |readable_|, resolves |reading_aborted_| and resets |data_pipe_|.
115   // |exception| will be set as the error on |readable_|.
116   void ErrorStreamAbortAndReset(ScriptValue exception);
117 
118   // Resolves the |reading_aborted_| promise and resets the |data_pipe_|.
119   void AbortAndReset();
120 
121   // Resets |data_pipe_| and clears the watchers.
122   // If the pipe is open it will be closed as a side-effect.
123   void ResetPipe();
124 
125   // Prepares the object for destruction.
126   void Dispose();
127 
128   const Member<ScriptState> script_state_;
129 
130   base::OnceClosure on_abort_;
131 
132   mojo::ScopedDataPipeConsumerHandle data_pipe_;
133 
134   // Only armed when we need to read something.
135   mojo::SimpleWatcher read_watcher_;
136 
137   // Always armed to detect close.
138   mojo::SimpleWatcher close_watcher_;
139 
140   Member<ReadableStream> readable_;
141   Member<ReadableStreamDefaultControllerWithScriptScope> controller_;
142 
143   // Promise returned by the |readingAborted| attribute.
144   ScriptPromise reading_aborted_;
145   Member<ScriptPromiseResolver> reading_aborted_resolver_;
146 
147   State state_ = State::kOpen;
148 
149   // This is set when OnIncomingStreamClosed() is called.
150   base::Optional<bool> fin_received_;
151 
152   // True when |data_pipe_| has been detected to be closed. The close is not
153   // processed until |fin_received_| is also set.
154   bool is_pipe_closed_ = false;
155 
156   // Indicates if we are currently performing a two-phase read from the pipe and
157   // so can't start another read.
158   bool in_two_phase_read_ = false;
159 
160   // Indicates if we need to perform another read after the current one
161   // completes.
162   bool read_pending_ = false;
163 };
164 
165 }  // namespace blink
166 
167 #endif  // THIRD_PARTY_BLINK_RENDERER_MODULES_WEBTRANSPORT_INCOMING_STREAM_H_
168