1 // Copyright 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "services/network/quic_transport.h"
6
7 #include "base/auto_reset.h"
8 #include "base/bind.h"
9 #include "base/threading/sequenced_task_runner_handle.h"
10 #include "net/base/io_buffer.h"
11 #include "net/quic/platform/impl/quic_mem_slice_impl.h"
12 #include "net/third_party/quiche/src/quic/core/quic_session.h"
13 #include "net/third_party/quiche/src/quic/core/quic_types.h"
14 #include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice.h"
15 #include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice_span.h"
16 #include "net/third_party/quiche/src/quic/quic_transport/quic_transport_stream.h"
17 #include "services/network/network_context.h"
18 #include "services/network/public/mojom/quic_transport.mojom.h"
19
20 namespace network {
21
22 class QuicTransport::Stream final {
23 public:
24 class StreamVisitor final : public quic::QuicTransportStream::Visitor {
25 public:
StreamVisitor(Stream * stream)26 explicit StreamVisitor(Stream* stream)
27 : stream_(stream->weak_factory_.GetWeakPtr()) {}
~StreamVisitor()28 ~StreamVisitor() override {
29 if (stream_) {
30 if (stream_->incoming_) {
31 stream_->writable_watcher_.Cancel();
32 stream_->writable_.reset();
33 stream_->transport_->client_->OnIncomingStreamClosed(
34 stream_->id_,
35 /*fin_received=*/false);
36 stream_->incoming_ = nullptr;
37 }
38 if (stream_->outgoing_) {
39 stream_->readable_watcher_.Cancel();
40 stream_->readable_.reset();
41 stream_->outgoing_ = nullptr;
42 }
43 stream_->MayDisposeLater();
44 }
45 }
46
47 // Visitor implementation:
OnCanRead()48 void OnCanRead() override {
49 base::SequencedTaskRunnerHandle::Get()->PostTask(
50 FROM_HERE, base::BindOnce(&Stream::Receive, stream_));
51 }
OnFinRead()52 void OnFinRead() override {
53 if (stream_) {
54 stream_->OnFinRead();
55 }
56 }
OnCanWrite()57 void OnCanWrite() override {
58 base::SequencedTaskRunnerHandle::Get()->PostTask(
59 FROM_HERE, base::BindOnce(&Stream::Send, stream_));
60 }
61
62 private:
63 const base::WeakPtr<Stream> stream_;
64 };
65
66 // Bidirectional
Stream(QuicTransport * transport,quic::QuicTransportStream * stream,mojo::ScopedDataPipeConsumerHandle readable,mojo::ScopedDataPipeProducerHandle writable)67 Stream(QuicTransport* transport,
68 quic::QuicTransportStream* stream,
69 mojo::ScopedDataPipeConsumerHandle readable,
70 mojo::ScopedDataPipeProducerHandle writable)
71 : transport_(transport),
72 id_(stream->id()),
73 outgoing_(stream),
74 incoming_(stream),
75 readable_(std::move(readable)),
76 writable_(std::move(writable)),
77 readable_watcher_(FROM_HERE, ArmingPolicy::AUTOMATIC),
78 writable_watcher_(FROM_HERE, ArmingPolicy::AUTOMATIC) {
79 DCHECK(outgoing_);
80 DCHECK(incoming_);
81 DCHECK(readable_);
82 DCHECK(writable_);
83 Init();
84 }
85
86 // Unidirectional: outgoing
Stream(QuicTransport * transport,quic::QuicTransportStream * outgoing,mojo::ScopedDataPipeConsumerHandle readable)87 Stream(QuicTransport* transport,
88 quic::QuicTransportStream* outgoing,
89 mojo::ScopedDataPipeConsumerHandle readable)
90 : transport_(transport),
91 id_(outgoing->id()),
92 outgoing_(outgoing),
93 readable_(std::move(readable)),
94 readable_watcher_(FROM_HERE, ArmingPolicy::AUTOMATIC),
95 writable_watcher_(FROM_HERE, ArmingPolicy::AUTOMATIC) {
96 DCHECK(outgoing_);
97 DCHECK(readable_);
98 Init();
99 }
100
101 // Unidirectional: incoming
Stream(QuicTransport * transport,quic::QuicTransportStream * incoming,mojo::ScopedDataPipeProducerHandle writable)102 Stream(QuicTransport* transport,
103 quic::QuicTransportStream* incoming,
104 mojo::ScopedDataPipeProducerHandle writable)
105 : transport_(transport),
106 id_(incoming->id()),
107 incoming_(incoming),
108 writable_(std::move(writable)),
109 readable_watcher_(FROM_HERE, ArmingPolicy::AUTOMATIC),
110 writable_watcher_(FROM_HERE, ArmingPolicy::AUTOMATIC) {
111 DCHECK(incoming_);
112 DCHECK(writable_);
113 Init();
114 }
115
NotifyFinFromClient()116 void NotifyFinFromClient() {
117 has_received_fin_from_client_ = true;
118 MaySendFin();
119 }
120
~Stream()121 ~Stream() { transport_->transport_->session()->CloseStream(id_); }
122
123 private:
124 using ArmingPolicy = mojo::SimpleWatcher::ArmingPolicy;
125
Init()126 void Init() {
127 if (outgoing_) {
128 DCHECK(readable_);
129 outgoing_->set_visitor(std::make_unique<StreamVisitor>(this));
130 readable_watcher_.Watch(
131 readable_.get(),
132 MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
133 MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED,
134 base::BindRepeating(&Stream::OnReadable, base::Unretained(this)));
135 }
136
137 if (incoming_) {
138 DCHECK(writable_);
139 if (incoming_ != outgoing_) {
140 incoming_->set_visitor(std::make_unique<StreamVisitor>(this));
141 }
142 writable_watcher_.Watch(
143 writable_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
144 MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED,
145 base::BindRepeating(&Stream::OnWritable, base::Unretained(this)));
146 }
147 }
148
OnReadable(MojoResult result,const mojo::HandleSignalsState & state)149 void OnReadable(MojoResult result, const mojo::HandleSignalsState& state) {
150 DCHECK_EQ(result, MOJO_RESULT_OK);
151 Send();
152 }
153
Send()154 void Send() {
155 MaySendFin();
156 while (outgoing_ && outgoing_->CanWrite()) {
157 const void* data = nullptr;
158 uint32_t available = 0;
159 MojoResult result = readable_->BeginReadData(
160 &data, &available, MOJO_BEGIN_READ_DATA_FLAG_NONE);
161 if (result == MOJO_RESULT_SHOULD_WAIT) {
162 return;
163 }
164 if (result == MOJO_RESULT_FAILED_PRECONDITION) {
165 has_seen_end_of_pipe_for_readable_ = true;
166 MaySendFin();
167 return;
168 }
169 DCHECK_EQ(result, MOJO_RESULT_OK);
170
171 bool send_result = outgoing_->Write(quiche::QuicheStringPiece(
172 reinterpret_cast<const char*>(data), available));
173 if (!send_result) {
174 // TODO(yhirano): Handle this failure.
175 readable_->EndReadData(0);
176 return;
177 }
178 readable_->EndReadData(available);
179 }
180 }
181
OnWritable(MojoResult result,const mojo::HandleSignalsState & state)182 void OnWritable(MojoResult result, const mojo::HandleSignalsState& state) {
183 DCHECK_EQ(result, MOJO_RESULT_OK);
184 Receive();
185 }
186
MaySendFin()187 void MaySendFin() {
188 if (!outgoing_) {
189 return;
190 }
191 if (!has_seen_end_of_pipe_for_readable_ || !has_received_fin_from_client_) {
192 return;
193 }
194 if (outgoing_->SendFin()) {
195 outgoing_ = nullptr;
196 readable_watcher_.Cancel();
197 readable_.reset();
198 MayDisposeLater();
199 }
200 // Otherwise, retry in Send().
201 }
202
Receive()203 void Receive() {
204 while (incoming_ && incoming_->ReadableBytes() > 0) {
205 void* buffer = nullptr;
206 uint32_t available = 0;
207 base::AutoReset<bool> auto_reset(&in_two_phase_write_, true);
208 MojoResult result = writable_->BeginWriteData(
209 &buffer, &available, MOJO_BEGIN_WRITE_DATA_FLAG_NONE);
210 if (result == MOJO_RESULT_SHOULD_WAIT) {
211 return;
212 }
213 if (result == MOJO_RESULT_FAILED_PRECONDITION) {
214 // The client doesn't want further data.
215 writable_watcher_.Cancel();
216 writable_.reset();
217 incoming_ = nullptr;
218 MayDisposeLater();
219 return;
220 }
221 DCHECK_EQ(result, MOJO_RESULT_OK);
222
223 const size_t num_read_bytes =
224 incoming_->Read(reinterpret_cast<char*>(buffer), available);
225 writable_->EndWriteData(num_read_bytes);
226 if (!incoming_) {
227 // |incoming_| can be null here, because OnFinRead can be called in
228 // QuicTransportStream::Read.
229 writable_watcher_.Cancel();
230 writable_.reset();
231 MayDisposeLater();
232 return;
233 }
234 }
235 }
236
OnFinRead()237 void OnFinRead() {
238 incoming_ = nullptr;
239 transport_->client_->OnIncomingStreamClosed(id_, /*fin_received=*/true);
240 if (in_two_phase_write_) {
241 return;
242 }
243 writable_watcher_.Cancel();
244 writable_.reset();
245 }
246
Dispose()247 void Dispose() {
248 transport_->streams_.erase(id_);
249 // Deletes |this|.
250 }
MayDisposeLater()251 void MayDisposeLater() {
252 if (outgoing_ || incoming_) {
253 return;
254 }
255
256 base::SequencedTaskRunnerHandle::Get()->PostTask(
257 FROM_HERE,
258 base::BindOnce(&Stream::Dispose, weak_factory_.GetWeakPtr()));
259 }
260
261 QuicTransport* const transport_; // outlives |this|.
262 const uint32_t id_;
263 // |outgoing_| and |incoming_| point to the same stream when this is a
264 // bidirectional stream. They are owned by |transport_| (via
265 // quic::QuicSession), and the properties will be null-set when the streams
266 // are gone (via StreamVisitor).
267 quic::QuicTransportStream* outgoing_ = nullptr;
268 quic::QuicTransportStream* incoming_ = nullptr;
269 mojo::ScopedDataPipeConsumerHandle readable_; // for |outgoing|
270 mojo::ScopedDataPipeProducerHandle writable_; // for |incoming|
271
272 mojo::SimpleWatcher readable_watcher_;
273 mojo::SimpleWatcher writable_watcher_;
274
275 bool in_two_phase_write_ = false;
276 bool has_seen_end_of_pipe_for_readable_ = false;
277 bool has_received_fin_from_client_ = false;
278
279 // This must be the last member.
280 base::WeakPtrFactory<Stream> weak_factory_{this};
281 };
282
QuicTransport(const GURL & url,const url::Origin & origin,const net::NetworkIsolationKey & key,NetworkContext * context,mojo::PendingRemote<mojom::QuicTransportHandshakeClient> handshake_client)283 QuicTransport::QuicTransport(
284 const GURL& url,
285 const url::Origin& origin,
286 const net::NetworkIsolationKey& key,
287 NetworkContext* context,
288 mojo::PendingRemote<mojom::QuicTransportHandshakeClient> handshake_client)
289 : transport_(std::make_unique<net::QuicTransportClient>(
290 url,
291 origin,
292 this,
293 key,
294 context->url_request_context())),
295 context_(context),
296 receiver_(this),
297 handshake_client_(std::move(handshake_client)) {
298 handshake_client_.set_disconnect_handler(
299 base::BindOnce(&QuicTransport::Dispose, base::Unretained(this)));
300
301 transport_->Connect();
302 }
303
304 QuicTransport::~QuicTransport() = default;
305
SendDatagram(base::span<const uint8_t> data,base::OnceCallback<void (bool)> callback)306 void QuicTransport::SendDatagram(base::span<const uint8_t> data,
307 base::OnceCallback<void(bool)> callback) {
308 DCHECK(!torn_down_);
309
310 auto buffer = base::MakeRefCounted<net::IOBuffer>(data.size());
311 memcpy(buffer->data(), data.data(), data.size());
312 quic::QuicMemSlice slice(
313 quic::QuicMemSliceImpl(std::move(buffer), data.size()));
314 const quic::MessageStatus status =
315 transport_->session()->datagram_queue()->SendOrQueueDatagram(
316 std::move(slice));
317 std::move(callback).Run(status == quic::MESSAGE_STATUS_SUCCESS);
318 }
319
CreateStream(mojo::ScopedDataPipeConsumerHandle readable,mojo::ScopedDataPipeProducerHandle writable,base::OnceCallback<void (bool,uint32_t)> callback)320 void QuicTransport::CreateStream(
321 mojo::ScopedDataPipeConsumerHandle readable,
322 mojo::ScopedDataPipeProducerHandle writable,
323 base::OnceCallback<void(bool, uint32_t)> callback) {
324 // |readable| is non-nullable, |writable| is nullable.
325 DCHECK(readable);
326
327 if (handshake_client_) {
328 // Invalid request.
329 std::move(callback).Run(false, 0);
330 return;
331 }
332
333 quic::QuicTransportClientSession* const session = transport_->session();
334
335 if (writable) {
336 // Bidirectional
337 if (!session->CanOpenNextOutgoingBidirectionalStream()) {
338 // TODO(crbug.com/104236): Instead of rejecting the creation request, we
339 // should wait in this case.
340 std::move(callback).Run(false, 0);
341 return;
342 }
343 quic::QuicTransportStream* const stream =
344 session->OpenOutgoingBidirectionalStream();
345 DCHECK(stream);
346 streams_.insert(std::make_pair(
347 stream->id(),
348 std::make_unique<Stream>(this, stream, std::move(readable),
349 std::move(writable))));
350 std::move(callback).Run(true, stream->id());
351 return;
352 }
353
354 // Unidirectional
355 if (!session->CanOpenNextOutgoingUnidirectionalStream()) {
356 // TODO(crbug.com/104236): Instead of rejecting the creation request, we
357 // should wait in this case.
358 std::move(callback).Run(false, 0);
359 return;
360 }
361
362 quic::QuicTransportStream* const stream =
363 session->OpenOutgoingUnidirectionalStream();
364 DCHECK(stream);
365 streams_.insert(std::make_pair(
366 stream->id(),
367 std::make_unique<Stream>(this, stream, std::move(readable))));
368 std::move(callback).Run(true, stream->id());
369 }
370
AcceptBidirectionalStream(BidirectionalStreamAcceptanceCallback acceptance)371 void QuicTransport::AcceptBidirectionalStream(
372 BidirectionalStreamAcceptanceCallback acceptance) {
373 bidirectional_stream_acceptances_.push(std::move(acceptance));
374
375 OnIncomingBidirectionalStreamAvailable();
376 }
377
AcceptUnidirectionalStream(UnidirectionalStreamAcceptanceCallback acceptance)378 void QuicTransport::AcceptUnidirectionalStream(
379 UnidirectionalStreamAcceptanceCallback acceptance) {
380 unidirectional_stream_acceptances_.push(std::move(acceptance));
381
382 OnIncomingUnidirectionalStreamAvailable();
383 }
384
SendFin(uint32_t stream)385 void QuicTransport::SendFin(uint32_t stream) {
386 auto it = streams_.find(stream);
387 if (it == streams_.end()) {
388 return;
389 }
390 it->second->NotifyFinFromClient();
391 }
392
OnConnected()393 void QuicTransport::OnConnected() {
394 if (torn_down_) {
395 return;
396 }
397
398 DCHECK(handshake_client_);
399
400 handshake_client_->OnConnectionEstablished(
401 receiver_.BindNewPipeAndPassRemote(),
402 client_.BindNewPipeAndPassReceiver());
403
404 handshake_client_.reset();
405 client_.set_disconnect_handler(
406 base::BindOnce(&QuicTransport::Dispose, base::Unretained(this)));
407 }
408
OnConnectionFailed()409 void QuicTransport::OnConnectionFailed() {
410 if (torn_down_) {
411 return;
412 }
413
414 DCHECK(handshake_client_);
415
416 handshake_client_->OnHandshakeFailed();
417
418 TearDown();
419 }
420
OnClosed()421 void QuicTransport::OnClosed() {
422 if (torn_down_) {
423 return;
424 }
425
426 DCHECK(!handshake_client_);
427
428 TearDown();
429 }
430
OnError()431 void QuicTransport::OnError() {
432 if (torn_down_) {
433 return;
434 }
435
436 DCHECK(!handshake_client_);
437
438 TearDown();
439 }
440
OnIncomingBidirectionalStreamAvailable()441 void QuicTransport::OnIncomingBidirectionalStreamAvailable() {
442 DCHECK(!handshake_client_);
443 DCHECK(client_);
444
445 while (!bidirectional_stream_acceptances_.empty()) {
446 quic::QuicTransportStream* const stream =
447 transport_->session()->AcceptIncomingBidirectionalStream();
448 if (!stream) {
449 return;
450 }
451 auto acceptance = std::move(bidirectional_stream_acceptances_.front());
452 bidirectional_stream_acceptances_.pop();
453
454 mojo::ScopedDataPipeConsumerHandle readable_for_outgoing;
455 mojo::ScopedDataPipeProducerHandle writable_for_outgoing;
456 mojo::ScopedDataPipeConsumerHandle readable_for_incoming;
457 mojo::ScopedDataPipeProducerHandle writable_for_incoming;
458 const MojoCreateDataPipeOptions options = {
459 sizeof(options), MOJO_CREATE_DATA_PIPE_FLAG_NONE, 1, 256 * 1024};
460 if (mojo::CreateDataPipe(&options, &writable_for_outgoing,
461 &readable_for_outgoing) != MOJO_RESULT_OK) {
462 transport_->session()->CloseStream(stream->id());
463 // TODO(yhirano): Error the entire connection.
464 return;
465 }
466 if (mojo::CreateDataPipe(&options, &writable_for_incoming,
467 &readable_for_incoming) != MOJO_RESULT_OK) {
468 transport_->session()->CloseStream(stream->id());
469 // TODO(yhirano): Error the entire connection.
470 return;
471 }
472
473 streams_.insert(std::make_pair(
474 stream->id(),
475 std::make_unique<Stream>(this, stream, std::move(readable_for_outgoing),
476 std::move(writable_for_incoming))));
477 std::move(acceptance)
478 .Run(stream->id(), std::move(readable_for_incoming),
479 std::move(writable_for_outgoing));
480 }
481 }
482
OnIncomingUnidirectionalStreamAvailable()483 void QuicTransport::OnIncomingUnidirectionalStreamAvailable() {
484 DCHECK(!handshake_client_);
485 DCHECK(client_);
486
487 while (!unidirectional_stream_acceptances_.empty()) {
488 quic::QuicTransportStream* const stream =
489 transport_->session()->AcceptIncomingUnidirectionalStream();
490
491 if (!stream) {
492 return;
493 }
494 auto acceptance = std::move(unidirectional_stream_acceptances_.front());
495 unidirectional_stream_acceptances_.pop();
496
497 mojo::ScopedDataPipeConsumerHandle readable_for_incoming;
498 mojo::ScopedDataPipeProducerHandle writable_for_incoming;
499 const MojoCreateDataPipeOptions options = {
500 sizeof(options), MOJO_CREATE_DATA_PIPE_FLAG_NONE, 1, 256 * 1024};
501 if (mojo::CreateDataPipe(&options, &writable_for_incoming,
502 &readable_for_incoming) != MOJO_RESULT_OK) {
503 transport_->session()->CloseStream(stream->id());
504 // TODO(yhirano): Error the entire connection.
505 return;
506 }
507
508 streams_.insert(std::make_pair(
509 stream->id(), std::make_unique<Stream>(
510 this, stream, std::move(writable_for_incoming))));
511 std::move(acceptance).Run(stream->id(), std::move(readable_for_incoming));
512 }
513 }
514
OnDatagramReceived(base::StringPiece datagram)515 void QuicTransport::OnDatagramReceived(base::StringPiece datagram) {
516 if (torn_down_) {
517 return;
518 }
519
520 client_->OnDatagramReceived(base::make_span(
521 reinterpret_cast<const uint8_t*>(datagram.data()), datagram.size()));
522 }
523
OnCanCreateNewOutgoingBidirectionalStream()524 void QuicTransport::OnCanCreateNewOutgoingBidirectionalStream() {
525 // TODO(yhirano): Implement this.
526 }
527
OnCanCreateNewOutgoingUnidirectionalStream()528 void QuicTransport::OnCanCreateNewOutgoingUnidirectionalStream() {
529 // TODO(yhirano): Implement this.
530 }
531
TearDown()532 void QuicTransport::TearDown() {
533 torn_down_ = true;
534 receiver_.reset();
535 handshake_client_.reset();
536 client_.reset();
537
538 base::SequencedTaskRunnerHandle::Get()->PostTask(
539 FROM_HERE,
540 base::BindOnce(&QuicTransport::Dispose, weak_factory_.GetWeakPtr()));
541 }
542
Dispose()543 void QuicTransport::Dispose() {
544 receiver_.reset();
545
546 context_->Remove(this);
547 // |this| is deleted.
548 }
549
550 } // namespace network
551