1 // Copyright (c) 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/third_party/quiche/src/quic/quartc/quartc_multiplexer.h"
6 
7 #include <cstdint>
8 #include <utility>
9 
10 #include "net/third_party/quiche/src/quic/core/quic_buffer_allocator.h"
11 #include "net/third_party/quiche/src/quic/core/quic_data_writer.h"
12 #include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice.h"
13 #include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice_span.h"
14 #include "net/third_party/quiche/src/common/platform/api/quiche_string_piece.h"
15 
16 namespace quic {
17 
QuartcSendChannel(QuartcMultiplexer * multiplexer,uint64_t id,QuicBufferAllocator * allocator,Delegate * delegate)18 QuartcSendChannel::QuartcSendChannel(QuartcMultiplexer* multiplexer,
19                                      uint64_t id,
20                                      QuicBufferAllocator* allocator,
21                                      Delegate* delegate)
22     : multiplexer_(multiplexer),
23       id_(id),
24       encoded_length_(QuicDataWriter::GetVarInt62Len(id_)),
25       allocator_(allocator),
26       delegate_(delegate) {}
27 
CreateOutgoingBidirectionalStream()28 QuartcStream* QuartcSendChannel::CreateOutgoingBidirectionalStream() {
29   if (!session_) {
30     QUIC_LOG(DFATAL) << "Session is not ready to write yet; channel_id=" << id_;
31     return nullptr;
32   }
33   QuicMemSlice id_slice = EncodeChannelId();
34 
35   QuartcStream* stream = session_->CreateOutgoingBidirectionalStream();
36   QuicConsumedData consumed =
37       stream->WriteMemSlices(QuicMemSliceSpan(&id_slice), /*fin=*/false);
38   DCHECK_EQ(consumed.bytes_consumed, encoded_length_);
39   return stream;
40 }
41 
SendOrQueueMessage(QuicMemSliceSpan message,int64_t datagram_id)42 bool QuartcSendChannel::SendOrQueueMessage(QuicMemSliceSpan message,
43                                            int64_t datagram_id) {
44   if (!session_) {
45     QUIC_LOG(DFATAL) << "Session is not ready to write yet; channel_id=" << id_
46                      << "datagram size=" << message.total_length();
47     return false;
48   }
49   QuicMemSliceStorage storage(nullptr, 0, nullptr, 0);  // Empty storage.
50   storage.Append(EncodeChannelId());
51 
52   message.ConsumeAll(
53       [&storage](QuicMemSlice slice) { storage.Append(std::move(slice)); });
54 
55   // Allocate a unique datagram id so that notifications can be routed back to
56   // the right send channel.
57   int64_t unique_datagram_id = multiplexer_->AllocateDatagramId(this);
58   multiplexer_to_user_datagram_ids_[unique_datagram_id] = datagram_id;
59 
60   return session_->SendOrQueueMessage(storage.ToSpan(), unique_datagram_id);
61 }
62 
OnMessageSent(int64_t datagram_id)63 void QuartcSendChannel::OnMessageSent(int64_t datagram_id) {
64   // Map back to the caller-chosen |datagram_id|.
65   datagram_id = multiplexer_to_user_datagram_ids_[datagram_id];
66   delegate_->OnMessageSent(datagram_id);
67 }
68 
OnMessageAcked(int64_t datagram_id,QuicTime receive_timestamp)69 void QuartcSendChannel::OnMessageAcked(int64_t datagram_id,
70                                        QuicTime receive_timestamp) {
71   // Map back to the caller-chosen |datagram_id|.
72   auto it = multiplexer_to_user_datagram_ids_.find(datagram_id);
73   if (it == multiplexer_to_user_datagram_ids_.end()) {
74     QUIC_LOG(DFATAL) << "Datagram acked/lost multiple times; datagram_id="
75                      << datagram_id;
76     return;
77   }
78   delegate_->OnMessageAcked(it->second, receive_timestamp);
79   multiplexer_to_user_datagram_ids_.erase(it);
80 }
81 
OnMessageLost(int64_t datagram_id)82 void QuartcSendChannel::OnMessageLost(int64_t datagram_id) {
83   // Map back to the caller-chosen |datagram_id|.
84   auto it = multiplexer_to_user_datagram_ids_.find(datagram_id);
85   if (it == multiplexer_to_user_datagram_ids_.end()) {
86     QUIC_LOG(DFATAL) << "Datagram acked/lost multiple times; datagram_id="
87                      << datagram_id;
88     return;
89   }
90   delegate_->OnMessageLost(it->second);
91   multiplexer_to_user_datagram_ids_.erase(it);
92 }
93 
OnSessionCreated(QuartcSession * session)94 void QuartcSendChannel::OnSessionCreated(QuartcSession* session) {
95   session_ = session;
96 }
97 
EncodeChannelId()98 QuicMemSlice QuartcSendChannel::EncodeChannelId() {
99   QuicUniqueBufferPtr buffer = MakeUniqueBuffer(allocator_, encoded_length_);
100   QuicDataWriter writer(encoded_length_, buffer.get());
101   writer.WriteVarInt62(id_);
102   return QuicMemSlice(std::move(buffer), encoded_length_);
103 }
104 
QuartcMultiplexer(QuicBufferAllocator * allocator,QuartcSessionEventDelegate * session_delegate,QuartcReceiveChannel * default_receive_channel)105 QuartcMultiplexer::QuartcMultiplexer(
106     QuicBufferAllocator* allocator,
107     QuartcSessionEventDelegate* session_delegate,
108     QuartcReceiveChannel* default_receive_channel)
109     : allocator_(allocator),
110       session_delegate_(session_delegate),
111       default_receive_channel_(default_receive_channel) {
112   CHECK_NE(session_delegate_, nullptr);
113   CHECK_NE(default_receive_channel_, nullptr);
114 }
115 
CreateSendChannel(uint64_t channel_id,QuartcSendChannel::Delegate * delegate)116 QuartcSendChannel* QuartcMultiplexer::CreateSendChannel(
117     uint64_t channel_id,
118     QuartcSendChannel::Delegate* delegate) {
119   send_channels_.push_back(std::make_unique<QuartcSendChannel>(
120       this, channel_id, allocator_, delegate));
121   if (session_) {
122     send_channels_.back()->OnSessionCreated(session_);
123   }
124   return send_channels_.back().get();
125 }
126 
RegisterReceiveChannel(uint64_t channel_id,QuartcReceiveChannel * channel)127 void QuartcMultiplexer::RegisterReceiveChannel(uint64_t channel_id,
128                                                QuartcReceiveChannel* channel) {
129   if (channel == nullptr) {
130     receive_channels_.erase(channel_id);
131     return;
132   }
133   auto& registered_channel = receive_channels_[channel_id];
134   if (registered_channel) {
135     QUIC_LOG(DFATAL) << "Attempted to overwrite existing channel_id="
136                      << channel_id;
137     return;
138   }
139   registered_channel = channel;
140 }
141 
AllocateDatagramId(QuartcSendChannel * channel)142 int64_t QuartcMultiplexer::AllocateDatagramId(QuartcSendChannel* channel) {
143   send_channels_by_datagram_id_[next_datagram_id_] = channel;
144   return next_datagram_id_++;
145 }
146 
OnSessionCreated(QuartcSession * session)147 void QuartcMultiplexer::OnSessionCreated(QuartcSession* session) {
148   for (auto& channel : send_channels_) {
149     channel->OnSessionCreated(session);
150   }
151   session_ = session;
152   session_delegate_->OnSessionCreated(session);
153 }
154 
OnCryptoHandshakeComplete()155 void QuartcMultiplexer::OnCryptoHandshakeComplete() {
156   session_delegate_->OnCryptoHandshakeComplete();
157 }
158 
OnConnectionWritable()159 void QuartcMultiplexer::OnConnectionWritable() {
160   session_delegate_->OnConnectionWritable();
161 }
162 
OnIncomingStream(QuartcStream * stream)163 void QuartcMultiplexer::OnIncomingStream(QuartcStream* stream) {
164   stream->SetDelegate(this);
165 }
166 
OnCongestionControlChange(QuicBandwidth bandwidth_estimate,QuicBandwidth pacing_rate,QuicTime::Delta latest_rtt)167 void QuartcMultiplexer::OnCongestionControlChange(
168     QuicBandwidth bandwidth_estimate,
169     QuicBandwidth pacing_rate,
170     QuicTime::Delta latest_rtt) {
171   session_delegate_->OnCongestionControlChange(bandwidth_estimate, pacing_rate,
172                                                latest_rtt);
173 }
174 
OnConnectionClosed(const QuicConnectionCloseFrame & frame,ConnectionCloseSource source)175 void QuartcMultiplexer::OnConnectionClosed(
176     const QuicConnectionCloseFrame& frame,
177     ConnectionCloseSource source) {
178   session_delegate_->OnConnectionClosed(frame, source);
179 }
180 
OnMessageReceived(quiche::QuicheStringPiece message)181 void QuartcMultiplexer::OnMessageReceived(quiche::QuicheStringPiece message) {
182   QuicDataReader reader(message);
183   QuicVariableLengthIntegerLength channel_id_length =
184       reader.PeekVarInt62Length();
185 
186   uint64_t channel_id;
187   if (!reader.ReadVarInt62(&channel_id)) {
188     QUIC_LOG(DFATAL) << "Received message without properly encoded channel id";
189     return;
190   }
191 
192   QuartcReceiveChannel* channel = default_receive_channel_;
193   auto it = receive_channels_.find(channel_id);
194   if (it != receive_channels_.end()) {
195     channel = it->second;
196   }
197 
198   channel->OnMessageReceived(channel_id, message.substr(channel_id_length));
199 }
200 
OnMessageSent(int64_t datagram_id)201 void QuartcMultiplexer::OnMessageSent(int64_t datagram_id) {
202   auto it = send_channels_by_datagram_id_.find(datagram_id);
203   if (it == send_channels_by_datagram_id_.end()) {
204     return;
205   }
206   it->second->OnMessageSent(datagram_id);
207 }
208 
OnMessageAcked(int64_t datagram_id,QuicTime receive_timestamp)209 void QuartcMultiplexer::OnMessageAcked(int64_t datagram_id,
210                                        QuicTime receive_timestamp) {
211   auto it = send_channels_by_datagram_id_.find(datagram_id);
212   if (it == send_channels_by_datagram_id_.end()) {
213     return;
214   }
215   it->second->OnMessageAcked(datagram_id, receive_timestamp);
216   send_channels_by_datagram_id_.erase(it);
217 }
218 
OnMessageLost(int64_t datagram_id)219 void QuartcMultiplexer::OnMessageLost(int64_t datagram_id) {
220   auto it = send_channels_by_datagram_id_.find(datagram_id);
221   if (it == send_channels_by_datagram_id_.end()) {
222     return;
223   }
224   it->second->OnMessageLost(datagram_id);
225   send_channels_by_datagram_id_.erase(it);
226 }
227 
OnReceived(QuartcStream * stream,iovec * iov,size_t iov_length,bool)228 size_t QuartcMultiplexer::OnReceived(QuartcStream* stream,
229                                      iovec* iov,
230                                      size_t iov_length,
231                                      bool /*fin*/) {
232   if (iov == nullptr || iov_length <= 0) {
233     return 0;
234   }
235 
236   QuicDataReader reader(static_cast<char*>(iov[0].iov_base), iov[0].iov_len);
237   QuicVariableLengthIntegerLength channel_id_length =
238       reader.PeekVarInt62Length();
239 
240   uint64_t channel_id;
241   if (reader.BytesRemaining() >= channel_id_length) {
242     // Fast path, have enough data to read immediately.
243     if (!reader.ReadVarInt62(&channel_id)) {
244       return 0;
245     }
246   } else {
247     // Slow path, need to coalesce multiple iovecs.
248     std::string data;
249     for (size_t i = 0; i < iov_length; ++i) {
250       data += std::string(static_cast<char*>(iov[i].iov_base), iov[i].iov_len);
251     }
252     QuicDataReader combined_reader(data);
253     if (!combined_reader.ReadVarInt62(&channel_id)) {
254       return 0;
255     }
256   }
257 
258   QuartcReceiveChannel* channel = default_receive_channel_;
259   auto it = receive_channels_.find(channel_id);
260   if (it != receive_channels_.end()) {
261     channel = it->second;
262   }
263   channel->OnIncomingStream(channel_id, stream);
264   return channel_id_length;
265 }
266 
OnClose(QuartcStream *)267 void QuartcMultiplexer::OnClose(QuartcStream* /*stream*/) {}
268 
OnBufferChanged(QuartcStream *)269 void QuartcMultiplexer::OnBufferChanged(QuartcStream* /*stream*/) {}
270 
271 }  // namespace quic
272