1 // Copyright 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "services/network/quic_transport.h"
6 
7 #include "base/auto_reset.h"
8 #include "base/bind.h"
9 #include "base/threading/sequenced_task_runner_handle.h"
10 #include "net/base/io_buffer.h"
11 #include "net/quic/platform/impl/quic_mem_slice_impl.h"
12 #include "net/third_party/quiche/src/quic/core/quic_session.h"
13 #include "net/third_party/quiche/src/quic/core/quic_types.h"
14 #include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice.h"
15 #include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice_span.h"
16 #include "net/third_party/quiche/src/quic/quic_transport/quic_transport_stream.h"
17 #include "services/network/network_context.h"
18 #include "services/network/public/mojom/quic_transport.mojom.h"
19 
20 namespace network {
21 
22 class QuicTransport::Stream final {
23  public:
24   class StreamVisitor final : public quic::QuicTransportStream::Visitor {
25    public:
StreamVisitor(Stream * stream)26     explicit StreamVisitor(Stream* stream)
27         : stream_(stream->weak_factory_.GetWeakPtr()) {}
~StreamVisitor()28     ~StreamVisitor() override {
29       if (stream_) {
30         if (stream_->incoming_) {
31           stream_->writable_watcher_.Cancel();
32           stream_->writable_.reset();
33           stream_->transport_->client_->OnIncomingStreamClosed(
34               stream_->id_,
35               /*fin_received=*/false);
36           stream_->incoming_ = nullptr;
37         }
38         if (stream_->outgoing_) {
39           stream_->readable_watcher_.Cancel();
40           stream_->readable_.reset();
41           stream_->outgoing_ = nullptr;
42         }
43         stream_->MayDisposeLater();
44       }
45     }
46 
47     // Visitor implementation:
OnCanRead()48     void OnCanRead() override {
49       base::SequencedTaskRunnerHandle::Get()->PostTask(
50           FROM_HERE, base::BindOnce(&Stream::Receive, stream_));
51     }
OnFinRead()52     void OnFinRead() override {
53       if (stream_) {
54         stream_->OnFinRead();
55       }
56     }
OnCanWrite()57     void OnCanWrite() override {
58       base::SequencedTaskRunnerHandle::Get()->PostTask(
59           FROM_HERE, base::BindOnce(&Stream::Send, stream_));
60     }
61 
62    private:
63     const base::WeakPtr<Stream> stream_;
64   };
65 
66   // Bidirectional
Stream(QuicTransport * transport,quic::QuicTransportStream * stream,mojo::ScopedDataPipeConsumerHandle readable,mojo::ScopedDataPipeProducerHandle writable)67   Stream(QuicTransport* transport,
68          quic::QuicTransportStream* stream,
69          mojo::ScopedDataPipeConsumerHandle readable,
70          mojo::ScopedDataPipeProducerHandle writable)
71       : transport_(transport),
72         id_(stream->id()),
73         outgoing_(stream),
74         incoming_(stream),
75         readable_(std::move(readable)),
76         writable_(std::move(writable)),
77         readable_watcher_(FROM_HERE, ArmingPolicy::AUTOMATIC),
78         writable_watcher_(FROM_HERE, ArmingPolicy::AUTOMATIC) {
79     DCHECK(outgoing_);
80     DCHECK(incoming_);
81     DCHECK(readable_);
82     DCHECK(writable_);
83     Init();
84   }
85 
86   // Unidirectional: outgoing
Stream(QuicTransport * transport,quic::QuicTransportStream * outgoing,mojo::ScopedDataPipeConsumerHandle readable)87   Stream(QuicTransport* transport,
88          quic::QuicTransportStream* outgoing,
89          mojo::ScopedDataPipeConsumerHandle readable)
90       : transport_(transport),
91         id_(outgoing->id()),
92         outgoing_(outgoing),
93         readable_(std::move(readable)),
94         readable_watcher_(FROM_HERE, ArmingPolicy::AUTOMATIC),
95         writable_watcher_(FROM_HERE, ArmingPolicy::AUTOMATIC) {
96     DCHECK(outgoing_);
97     DCHECK(readable_);
98     Init();
99   }
100 
101   // Unidirectional: incoming
Stream(QuicTransport * transport,quic::QuicTransportStream * incoming,mojo::ScopedDataPipeProducerHandle writable)102   Stream(QuicTransport* transport,
103          quic::QuicTransportStream* incoming,
104          mojo::ScopedDataPipeProducerHandle writable)
105       : transport_(transport),
106         id_(incoming->id()),
107         incoming_(incoming),
108         writable_(std::move(writable)),
109         readable_watcher_(FROM_HERE, ArmingPolicy::AUTOMATIC),
110         writable_watcher_(FROM_HERE, ArmingPolicy::AUTOMATIC) {
111     DCHECK(incoming_);
112     DCHECK(writable_);
113     Init();
114   }
115 
NotifyFinFromClient()116   void NotifyFinFromClient() {
117     has_received_fin_from_client_ = true;
118     MaySendFin();
119   }
120 
~Stream()121   ~Stream() { transport_->transport_->session()->CloseStream(id_); }
122 
123  private:
124   using ArmingPolicy = mojo::SimpleWatcher::ArmingPolicy;
125 
Init()126   void Init() {
127     if (outgoing_) {
128       DCHECK(readable_);
129       outgoing_->set_visitor(std::make_unique<StreamVisitor>(this));
130       readable_watcher_.Watch(
131           readable_.get(),
132           MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
133           MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED,
134           base::BindRepeating(&Stream::OnReadable, base::Unretained(this)));
135     }
136 
137     if (incoming_) {
138       DCHECK(writable_);
139       if (incoming_ != outgoing_) {
140         incoming_->set_visitor(std::make_unique<StreamVisitor>(this));
141       }
142       writable_watcher_.Watch(
143           writable_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
144           MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED,
145           base::BindRepeating(&Stream::OnWritable, base::Unretained(this)));
146     }
147   }
148 
OnReadable(MojoResult result,const mojo::HandleSignalsState & state)149   void OnReadable(MojoResult result, const mojo::HandleSignalsState& state) {
150     DCHECK_EQ(result, MOJO_RESULT_OK);
151     Send();
152   }
153 
Send()154   void Send() {
155     MaySendFin();
156     while (outgoing_ && outgoing_->CanWrite()) {
157       const void* data = nullptr;
158       uint32_t available = 0;
159       MojoResult result = readable_->BeginReadData(
160           &data, &available, MOJO_BEGIN_READ_DATA_FLAG_NONE);
161       if (result == MOJO_RESULT_SHOULD_WAIT) {
162         return;
163       }
164       if (result == MOJO_RESULT_FAILED_PRECONDITION) {
165         has_seen_end_of_pipe_for_readable_ = true;
166         MaySendFin();
167         return;
168       }
169       DCHECK_EQ(result, MOJO_RESULT_OK);
170 
171       bool send_result = outgoing_->Write(quiche::QuicheStringPiece(
172           reinterpret_cast<const char*>(data), available));
173       if (!send_result) {
174         // TODO(yhirano): Handle this failure.
175         readable_->EndReadData(0);
176         return;
177       }
178       readable_->EndReadData(available);
179     }
180   }
181 
OnWritable(MojoResult result,const mojo::HandleSignalsState & state)182   void OnWritable(MojoResult result, const mojo::HandleSignalsState& state) {
183     DCHECK_EQ(result, MOJO_RESULT_OK);
184     Receive();
185   }
186 
MaySendFin()187   void MaySendFin() {
188     if (!outgoing_) {
189       return;
190     }
191     if (!has_seen_end_of_pipe_for_readable_ || !has_received_fin_from_client_) {
192       return;
193     }
194     if (outgoing_->SendFin()) {
195       outgoing_ = nullptr;
196       readable_watcher_.Cancel();
197       readable_.reset();
198       MayDisposeLater();
199     }
200     // Otherwise, retry in Send().
201   }
202 
Receive()203   void Receive() {
204     while (incoming_ && incoming_->ReadableBytes() > 0) {
205       void* buffer = nullptr;
206       uint32_t available = 0;
207       base::AutoReset<bool> auto_reset(&in_two_phase_write_, true);
208       MojoResult result = writable_->BeginWriteData(
209           &buffer, &available, MOJO_BEGIN_WRITE_DATA_FLAG_NONE);
210       if (result == MOJO_RESULT_SHOULD_WAIT) {
211         return;
212       }
213       if (result == MOJO_RESULT_FAILED_PRECONDITION) {
214         // The client doesn't want further data.
215         writable_watcher_.Cancel();
216         writable_.reset();
217         incoming_ = nullptr;
218         MayDisposeLater();
219         return;
220       }
221       DCHECK_EQ(result, MOJO_RESULT_OK);
222 
223       const size_t num_read_bytes =
224           incoming_->Read(reinterpret_cast<char*>(buffer), available);
225       writable_->EndWriteData(num_read_bytes);
226       if (!incoming_) {
227         // |incoming_| can be null here, because OnFinRead can be called in
228         // QuicTransportStream::Read.
229         writable_watcher_.Cancel();
230         writable_.reset();
231         MayDisposeLater();
232         return;
233       }
234     }
235   }
236 
OnFinRead()237   void OnFinRead() {
238     incoming_ = nullptr;
239     transport_->client_->OnIncomingStreamClosed(id_, /*fin_received=*/true);
240     if (in_two_phase_write_) {
241       return;
242     }
243     writable_watcher_.Cancel();
244     writable_.reset();
245   }
246 
Dispose()247   void Dispose() {
248     transport_->streams_.erase(id_);
249     // Deletes |this|.
250   }
MayDisposeLater()251   void MayDisposeLater() {
252     if (outgoing_ || incoming_) {
253       return;
254     }
255 
256     base::SequencedTaskRunnerHandle::Get()->PostTask(
257         FROM_HERE,
258         base::BindOnce(&Stream::Dispose, weak_factory_.GetWeakPtr()));
259   }
260 
261   QuicTransport* const transport_;  // outlives |this|.
262   const uint32_t id_;
263   // |outgoing_| and |incoming_| point to the same stream when this is a
264   // bidirectional stream. They are owned by |transport_| (via
265   // quic::QuicSession), and the properties will be null-set when the streams
266   // are gone (via StreamVisitor).
267   quic::QuicTransportStream* outgoing_ = nullptr;
268   quic::QuicTransportStream* incoming_ = nullptr;
269   mojo::ScopedDataPipeConsumerHandle readable_;  // for |outgoing|
270   mojo::ScopedDataPipeProducerHandle writable_;  // for |incoming|
271 
272   mojo::SimpleWatcher readable_watcher_;
273   mojo::SimpleWatcher writable_watcher_;
274 
275   bool in_two_phase_write_ = false;
276   bool has_seen_end_of_pipe_for_readable_ = false;
277   bool has_received_fin_from_client_ = false;
278 
279   // This must be the last member.
280   base::WeakPtrFactory<Stream> weak_factory_{this};
281 };
282 
QuicTransport(const GURL & url,const url::Origin & origin,const net::NetworkIsolationKey & key,NetworkContext * context,mojo::PendingRemote<mojom::QuicTransportHandshakeClient> handshake_client)283 QuicTransport::QuicTransport(
284     const GURL& url,
285     const url::Origin& origin,
286     const net::NetworkIsolationKey& key,
287     NetworkContext* context,
288     mojo::PendingRemote<mojom::QuicTransportHandshakeClient> handshake_client)
289     : transport_(std::make_unique<net::QuicTransportClient>(
290           url,
291           origin,
292           this,
293           key,
294           context->url_request_context())),
295       context_(context),
296       receiver_(this),
297       handshake_client_(std::move(handshake_client)) {
298   handshake_client_.set_disconnect_handler(
299       base::BindOnce(&QuicTransport::Dispose, base::Unretained(this)));
300 
301   transport_->Connect();
302 }
303 
304 QuicTransport::~QuicTransport() = default;
305 
SendDatagram(base::span<const uint8_t> data,base::OnceCallback<void (bool)> callback)306 void QuicTransport::SendDatagram(base::span<const uint8_t> data,
307                                  base::OnceCallback<void(bool)> callback) {
308   DCHECK(!torn_down_);
309 
310   auto buffer = base::MakeRefCounted<net::IOBuffer>(data.size());
311   memcpy(buffer->data(), data.data(), data.size());
312   quic::QuicMemSlice slice(
313       quic::QuicMemSliceImpl(std::move(buffer), data.size()));
314   const quic::MessageStatus status =
315       transport_->session()->datagram_queue()->SendOrQueueDatagram(
316           std::move(slice));
317   std::move(callback).Run(status == quic::MESSAGE_STATUS_SUCCESS);
318 }
319 
CreateStream(mojo::ScopedDataPipeConsumerHandle readable,mojo::ScopedDataPipeProducerHandle writable,base::OnceCallback<void (bool,uint32_t)> callback)320 void QuicTransport::CreateStream(
321     mojo::ScopedDataPipeConsumerHandle readable,
322     mojo::ScopedDataPipeProducerHandle writable,
323     base::OnceCallback<void(bool, uint32_t)> callback) {
324   // |readable| is non-nullable, |writable| is nullable.
325   DCHECK(readable);
326 
327   if (handshake_client_) {
328     // Invalid request.
329     std::move(callback).Run(false, 0);
330     return;
331   }
332 
333   quic::QuicTransportClientSession* const session = transport_->session();
334 
335   if (writable) {
336     // Bidirectional
337     if (!session->CanOpenNextOutgoingBidirectionalStream()) {
338       // TODO(crbug.com/104236): Instead of rejecting the creation request, we
339       // should wait in this case.
340       std::move(callback).Run(false, 0);
341       return;
342     }
343     quic::QuicTransportStream* const stream =
344         session->OpenOutgoingBidirectionalStream();
345     DCHECK(stream);
346     streams_.insert(std::make_pair(
347         stream->id(),
348         std::make_unique<Stream>(this, stream, std::move(readable),
349                                  std::move(writable))));
350     std::move(callback).Run(true, stream->id());
351     return;
352   }
353 
354   // Unidirectional
355   if (!session->CanOpenNextOutgoingUnidirectionalStream()) {
356     // TODO(crbug.com/104236): Instead of rejecting the creation request, we
357     // should wait in this case.
358     std::move(callback).Run(false, 0);
359     return;
360   }
361 
362   quic::QuicTransportStream* const stream =
363       session->OpenOutgoingUnidirectionalStream();
364   DCHECK(stream);
365   streams_.insert(std::make_pair(
366       stream->id(),
367       std::make_unique<Stream>(this, stream, std::move(readable))));
368   std::move(callback).Run(true, stream->id());
369 }
370 
AcceptBidirectionalStream(BidirectionalStreamAcceptanceCallback acceptance)371 void QuicTransport::AcceptBidirectionalStream(
372     BidirectionalStreamAcceptanceCallback acceptance) {
373   bidirectional_stream_acceptances_.push(std::move(acceptance));
374 
375   OnIncomingBidirectionalStreamAvailable();
376 }
377 
AcceptUnidirectionalStream(UnidirectionalStreamAcceptanceCallback acceptance)378 void QuicTransport::AcceptUnidirectionalStream(
379     UnidirectionalStreamAcceptanceCallback acceptance) {
380   unidirectional_stream_acceptances_.push(std::move(acceptance));
381 
382   OnIncomingUnidirectionalStreamAvailable();
383 }
384 
SendFin(uint32_t stream)385 void QuicTransport::SendFin(uint32_t stream) {
386   auto it = streams_.find(stream);
387   if (it == streams_.end()) {
388     return;
389   }
390   it->second->NotifyFinFromClient();
391 }
392 
OnConnected()393 void QuicTransport::OnConnected() {
394   if (torn_down_) {
395     return;
396   }
397 
398   DCHECK(handshake_client_);
399 
400   handshake_client_->OnConnectionEstablished(
401       receiver_.BindNewPipeAndPassRemote(),
402       client_.BindNewPipeAndPassReceiver());
403 
404   handshake_client_.reset();
405   client_.set_disconnect_handler(
406       base::BindOnce(&QuicTransport::Dispose, base::Unretained(this)));
407 }
408 
OnConnectionFailed()409 void QuicTransport::OnConnectionFailed() {
410   if (torn_down_) {
411     return;
412   }
413 
414   DCHECK(handshake_client_);
415 
416   handshake_client_->OnHandshakeFailed();
417 
418   TearDown();
419 }
420 
OnClosed()421 void QuicTransport::OnClosed() {
422   if (torn_down_) {
423     return;
424   }
425 
426   DCHECK(!handshake_client_);
427 
428   TearDown();
429 }
430 
OnError()431 void QuicTransport::OnError() {
432   if (torn_down_) {
433     return;
434   }
435 
436   DCHECK(!handshake_client_);
437 
438   TearDown();
439 }
440 
OnIncomingBidirectionalStreamAvailable()441 void QuicTransport::OnIncomingBidirectionalStreamAvailable() {
442   DCHECK(!handshake_client_);
443   DCHECK(client_);
444 
445   while (!bidirectional_stream_acceptances_.empty()) {
446     quic::QuicTransportStream* const stream =
447         transport_->session()->AcceptIncomingBidirectionalStream();
448     if (!stream) {
449       return;
450     }
451     auto acceptance = std::move(bidirectional_stream_acceptances_.front());
452     bidirectional_stream_acceptances_.pop();
453 
454     mojo::ScopedDataPipeConsumerHandle readable_for_outgoing;
455     mojo::ScopedDataPipeProducerHandle writable_for_outgoing;
456     mojo::ScopedDataPipeConsumerHandle readable_for_incoming;
457     mojo::ScopedDataPipeProducerHandle writable_for_incoming;
458     const MojoCreateDataPipeOptions options = {
459         sizeof(options), MOJO_CREATE_DATA_PIPE_FLAG_NONE, 1, 256 * 1024};
460     if (mojo::CreateDataPipe(&options, &writable_for_outgoing,
461                              &readable_for_outgoing) != MOJO_RESULT_OK) {
462       transport_->session()->CloseStream(stream->id());
463       // TODO(yhirano): Error the entire connection.
464       return;
465     }
466     if (mojo::CreateDataPipe(&options, &writable_for_incoming,
467                              &readable_for_incoming) != MOJO_RESULT_OK) {
468       transport_->session()->CloseStream(stream->id());
469       // TODO(yhirano): Error the entire connection.
470       return;
471     }
472 
473     streams_.insert(std::make_pair(
474         stream->id(),
475         std::make_unique<Stream>(this, stream, std::move(readable_for_outgoing),
476                                  std::move(writable_for_incoming))));
477     std::move(acceptance)
478         .Run(stream->id(), std::move(readable_for_incoming),
479              std::move(writable_for_outgoing));
480   }
481 }
482 
OnIncomingUnidirectionalStreamAvailable()483 void QuicTransport::OnIncomingUnidirectionalStreamAvailable() {
484   DCHECK(!handshake_client_);
485   DCHECK(client_);
486 
487   while (!unidirectional_stream_acceptances_.empty()) {
488     quic::QuicTransportStream* const stream =
489         transport_->session()->AcceptIncomingUnidirectionalStream();
490 
491     if (!stream) {
492       return;
493     }
494     auto acceptance = std::move(unidirectional_stream_acceptances_.front());
495     unidirectional_stream_acceptances_.pop();
496 
497     mojo::ScopedDataPipeConsumerHandle readable_for_incoming;
498     mojo::ScopedDataPipeProducerHandle writable_for_incoming;
499     const MojoCreateDataPipeOptions options = {
500         sizeof(options), MOJO_CREATE_DATA_PIPE_FLAG_NONE, 1, 256 * 1024};
501     if (mojo::CreateDataPipe(&options, &writable_for_incoming,
502                              &readable_for_incoming) != MOJO_RESULT_OK) {
503       transport_->session()->CloseStream(stream->id());
504       // TODO(yhirano): Error the entire connection.
505       return;
506     }
507 
508     streams_.insert(std::make_pair(
509         stream->id(), std::make_unique<Stream>(
510                           this, stream, std::move(writable_for_incoming))));
511     std::move(acceptance).Run(stream->id(), std::move(readable_for_incoming));
512   }
513 }
514 
OnDatagramReceived(base::StringPiece datagram)515 void QuicTransport::OnDatagramReceived(base::StringPiece datagram) {
516   if (torn_down_) {
517     return;
518   }
519 
520   client_->OnDatagramReceived(base::make_span(
521       reinterpret_cast<const uint8_t*>(datagram.data()), datagram.size()));
522 }
523 
OnCanCreateNewOutgoingBidirectionalStream()524 void QuicTransport::OnCanCreateNewOutgoingBidirectionalStream() {
525   // TODO(yhirano): Implement this.
526 }
527 
OnCanCreateNewOutgoingUnidirectionalStream()528 void QuicTransport::OnCanCreateNewOutgoingUnidirectionalStream() {
529   // TODO(yhirano): Implement this.
530 }
531 
TearDown()532 void QuicTransport::TearDown() {
533   torn_down_ = true;
534   receiver_.reset();
535   handshake_client_.reset();
536   client_.reset();
537 
538   base::SequencedTaskRunnerHandle::Get()->PostTask(
539       FROM_HERE,
540       base::BindOnce(&QuicTransport::Dispose, weak_factory_.GetWeakPtr()));
541 }
542 
Dispose()543 void QuicTransport::Dispose() {
544   receiver_.reset();
545 
546   context_->Remove(this);
547   // |this| is deleted.
548 }
549 
550 }  // namespace network
551