1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "remoting/protocol/pseudotcp_adapter.h"
6
7 #include <stddef.h>
8
9 #include <utility>
10
11 #include "base/bind.h"
12 #include "base/compiler_specific.h"
13 #include "base/logging.h"
14 #include "base/macros.h"
15 #include "base/time/time.h"
16 #include "base/timer/timer.h"
17 #include "net/base/address_list.h"
18 #include "net/base/completion_once_callback.h"
19 #include "net/base/io_buffer.h"
20 #include "net/base/net_errors.h"
21 #include "net/traffic_annotation/network_traffic_annotation.h"
22 #include "remoting/protocol/p2p_datagram_socket.h"
23
24 using cricket::PseudoTcp;
25
26 namespace {
27 const int kReadBufferSize = 65536; // Maximum size of a packet.
28 const uint16_t kDefaultMtu = 1280;
29 } // namespace
30
31 namespace remoting {
32 namespace protocol {
33
34 class PseudoTcpAdapter::Core : public cricket::IPseudoTcpNotify,
35 public base::RefCounted<Core> {
36 public:
37 explicit Core(std::unique_ptr<P2PDatagramSocket> socket);
38
39 // Functions used to implement net::StreamSocket.
40 int Read(const scoped_refptr<net::IOBuffer>& buffer,
41 int buffer_size,
42 net::CompletionOnceCallback callback);
43 int Write(const scoped_refptr<net::IOBuffer>& buffer,
44 int buffer_size,
45 net::CompletionOnceCallback callback,
46 const net::NetworkTrafficAnnotationTag& traffic_annotation);
47 net::CompletionOnceCallback Connect(net::CompletionOnceCallback callback);
48
49 // cricket::IPseudoTcpNotify interface.
50 // These notifications are triggered from NotifyPacket.
51 void OnTcpOpen(cricket::PseudoTcp* tcp) override;
52 void OnTcpReadable(cricket::PseudoTcp* tcp) override;
53 void OnTcpWriteable(cricket::PseudoTcp* tcp) override;
54 // This is triggered by NotifyClock or NotifyPacket.
55 void OnTcpClosed(cricket::PseudoTcp* tcp, uint32_t error) override;
56 // This is triggered by NotifyClock, NotifyPacket, Recv and Send.
57 WriteResult TcpWritePacket(cricket::PseudoTcp* tcp,
58 const char* buffer,
59 size_t len) override;
60
61 void SetAckDelay(int delay_ms);
62 void SetNoDelay(bool no_delay);
63 void SetReceiveBufferSize(int32_t size);
64 void SetSendBufferSize(int32_t size);
65 void SetWriteWaitsForSend(bool write_waits_for_send);
66
67 void DeleteSocket();
68
69 private:
70 friend class base::RefCounted<Core>;
71 ~Core() override;
72
73 // These are invoked by the underlying Socket, and may trigger callbacks.
74 // They hold a reference to |this| while running, to protect from deletion.
75 void OnRead(int result);
76 void OnWritten(int result);
77
78 // These may trigger callbacks, so the holder must hold a reference on
79 // the stack while calling them.
80 void DoReadFromSocket();
81 void HandleReadResults(int result);
82 void HandleTcpClock();
83
84 // Checks if current write has completed in the write-waits-for-send
85 // mode.
86 void CheckWriteComplete();
87
88 // This re-sets |timer| without triggering callbacks.
89 void AdjustClock();
90
91 net::CompletionOnceCallback connect_callback_;
92 net::CompletionOnceCallback read_callback_;
93 net::CompletionOnceCallback write_callback_;
94
95 cricket::PseudoTcp pseudo_tcp_;
96 std::unique_ptr<P2PDatagramSocket> socket_;
97
98 scoped_refptr<net::IOBuffer> read_buffer_;
99 int read_buffer_size_;
100 scoped_refptr<net::IOBuffer> write_buffer_;
101 int write_buffer_size_;
102
103 // Whether we need to wait for data to be sent before completing write.
104 bool write_waits_for_send_;
105
106 // Set to true in the write-waits-for-send mode when we've
107 // successfully writtend data to the send buffer and waiting for the
108 // data to be sent to the remote end.
109 bool waiting_write_position_;
110
111 // Number of the bytes written by the last write stored while we wait
112 // for the data to be sent (i.e. when waiting_write_position_ = true).
113 int last_write_result_;
114
115 bool socket_write_pending_;
116 scoped_refptr<net::IOBuffer> socket_read_buffer_;
117
118 base::OneShotTimer timer_;
119
120 DISALLOW_COPY_AND_ASSIGN(Core);
121 };
122
Core(std::unique_ptr<P2PDatagramSocket> socket)123 PseudoTcpAdapter::Core::Core(std::unique_ptr<P2PDatagramSocket> socket)
124 : pseudo_tcp_(this, 0),
125 socket_(std::move(socket)),
126 write_waits_for_send_(false),
127 waiting_write_position_(false),
128 socket_write_pending_(false) {
129 // Doesn't trigger callbacks.
130 pseudo_tcp_.NotifyMTU(kDefaultMtu);
131 }
132
133 PseudoTcpAdapter::Core::~Core() = default;
134
Read(const scoped_refptr<net::IOBuffer> & buffer,int buffer_size,net::CompletionOnceCallback callback)135 int PseudoTcpAdapter::Core::Read(const scoped_refptr<net::IOBuffer>& buffer,
136 int buffer_size,
137 net::CompletionOnceCallback callback) {
138 DCHECK(read_callback_.is_null());
139
140 // Reference the Core in case a callback deletes the adapter.
141 scoped_refptr<Core> core(this);
142
143 int result = pseudo_tcp_.Recv(buffer->data(), buffer_size);
144 if (result < 0) {
145 result = net::MapSystemError(pseudo_tcp_.GetError());
146 DCHECK(result < 0);
147 }
148
149 if (result == net::ERR_IO_PENDING) {
150 read_buffer_ = buffer;
151 read_buffer_size_ = buffer_size;
152 read_callback_ = std::move(callback);
153 }
154
155 AdjustClock();
156
157 return result;
158 }
159
Write(const scoped_refptr<net::IOBuffer> & buffer,int buffer_size,net::CompletionOnceCallback callback,const net::NetworkTrafficAnnotationTag &)160 int PseudoTcpAdapter::Core::Write(
161 const scoped_refptr<net::IOBuffer>& buffer,
162 int buffer_size,
163 net::CompletionOnceCallback callback,
164 const net::NetworkTrafficAnnotationTag& /*traffic_annotation*/) {
165 DCHECK(write_callback_.is_null());
166
167 // Reference the Core in case a callback deletes the adapter.
168 scoped_refptr<Core> core(this);
169
170 int result = pseudo_tcp_.Send(buffer->data(), buffer_size);
171 if (result < 0) {
172 result = net::MapSystemError(pseudo_tcp_.GetError());
173 DCHECK(result < 0);
174 }
175
176 AdjustClock();
177
178 if (result == net::ERR_IO_PENDING) {
179 write_buffer_ = buffer;
180 write_buffer_size_ = buffer_size;
181 write_callback_ = std::move(callback);
182 return result;
183 }
184
185 if (result < 0)
186 return result;
187
188 // Need to wait until the data is sent to the peer when
189 // send-confirmation mode is enabled.
190 if (write_waits_for_send_ && pseudo_tcp_.GetBytesBufferedNotSent() > 0) {
191 DCHECK(!waiting_write_position_);
192 waiting_write_position_ = true;
193 last_write_result_ = result;
194 write_buffer_ = buffer;
195 write_buffer_size_ = buffer_size;
196 write_callback_ = std::move(callback);
197 return net::ERR_IO_PENDING;
198 }
199
200 return result;
201 }
202
Connect(net::CompletionOnceCallback callback)203 net::CompletionOnceCallback PseudoTcpAdapter::Core::Connect(
204 net::CompletionOnceCallback callback) {
205 DCHECK_EQ(pseudo_tcp_.State(), cricket::PseudoTcp::TCP_LISTEN);
206
207 // Reference the Core in case a callback deletes the adapter.
208 scoped_refptr<Core> core(this);
209
210 // Start the connection attempt.
211 int result = pseudo_tcp_.Connect();
212 if (result < 0)
213 return callback;
214
215 AdjustClock();
216
217 connect_callback_ = std::move(callback);
218 DoReadFromSocket();
219
220 return {};
221 }
222
OnTcpOpen(PseudoTcp * tcp)223 void PseudoTcpAdapter::Core::OnTcpOpen(PseudoTcp* tcp) {
224 DCHECK(tcp == &pseudo_tcp_);
225
226 if (connect_callback_)
227 std::move(connect_callback_).Run(net::OK);
228
229 OnTcpReadable(tcp);
230 OnTcpWriteable(tcp);
231 }
232
OnTcpReadable(PseudoTcp * tcp)233 void PseudoTcpAdapter::Core::OnTcpReadable(PseudoTcp* tcp) {
234 DCHECK_EQ(tcp, &pseudo_tcp_);
235 if (read_callback_.is_null())
236 return;
237
238 int result = pseudo_tcp_.Recv(read_buffer_->data(), read_buffer_size_);
239 if (result < 0) {
240 result = net::MapSystemError(pseudo_tcp_.GetError());
241 DCHECK(result < 0);
242 if (result == net::ERR_IO_PENDING)
243 return;
244 }
245
246 AdjustClock();
247
248 read_buffer_.reset();
249 std::move(read_callback_).Run(result);
250 }
251
OnTcpWriteable(PseudoTcp * tcp)252 void PseudoTcpAdapter::Core::OnTcpWriteable(PseudoTcp* tcp) {
253 DCHECK_EQ(tcp, &pseudo_tcp_);
254 if (write_callback_.is_null())
255 return;
256
257 if (waiting_write_position_) {
258 CheckWriteComplete();
259 return;
260 }
261
262 int result = pseudo_tcp_.Send(write_buffer_->data(), write_buffer_size_);
263 if (result < 0) {
264 result = net::MapSystemError(pseudo_tcp_.GetError());
265 DCHECK(result < 0);
266 if (result == net::ERR_IO_PENDING)
267 return;
268 }
269
270 AdjustClock();
271
272 if (write_waits_for_send_ && pseudo_tcp_.GetBytesBufferedNotSent() > 0) {
273 DCHECK(!waiting_write_position_);
274 waiting_write_position_ = true;
275 last_write_result_ = result;
276 return;
277 }
278
279 write_buffer_.reset();
280 std::move(write_callback_).Run(result);
281 }
282
OnTcpClosed(PseudoTcp * tcp,uint32_t error)283 void PseudoTcpAdapter::Core::OnTcpClosed(PseudoTcp* tcp, uint32_t error) {
284 DCHECK_EQ(tcp, &pseudo_tcp_);
285
286 if (!connect_callback_.is_null()) {
287 std::move(connect_callback_).Run(net::MapSystemError(error));
288 }
289
290 if (!read_callback_.is_null()) {
291 std::move(read_callback_).Run(net::MapSystemError(error));
292 }
293
294 if (!write_callback_.is_null()) {
295 std::move(write_callback_).Run(net::MapSystemError(error));
296 }
297 }
298
SetAckDelay(int delay_ms)299 void PseudoTcpAdapter::Core::SetAckDelay(int delay_ms) {
300 pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_ACKDELAY, delay_ms);
301 }
302
SetNoDelay(bool no_delay)303 void PseudoTcpAdapter::Core::SetNoDelay(bool no_delay) {
304 pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_NODELAY, no_delay ? 1 : 0);
305 }
306
SetReceiveBufferSize(int32_t size)307 void PseudoTcpAdapter::Core::SetReceiveBufferSize(int32_t size) {
308 pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_RCVBUF, size);
309 }
310
SetSendBufferSize(int32_t size)311 void PseudoTcpAdapter::Core::SetSendBufferSize(int32_t size) {
312 pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_SNDBUF, size);
313 }
314
SetWriteWaitsForSend(bool write_waits_for_send)315 void PseudoTcpAdapter::Core::SetWriteWaitsForSend(bool write_waits_for_send) {
316 write_waits_for_send_ = write_waits_for_send;
317 }
318
DeleteSocket()319 void PseudoTcpAdapter::Core::DeleteSocket() {
320 // Don't dispatch outstanding callbacks when the socket is deleted.
321 read_callback_.Reset();
322 read_buffer_.reset();
323 write_callback_.Reset();
324 write_buffer_.reset();
325 connect_callback_.Reset();
326
327 socket_.reset();
328 }
329
TcpWritePacket(PseudoTcp * tcp,const char * buffer,size_t len)330 cricket::IPseudoTcpNotify::WriteResult PseudoTcpAdapter::Core::TcpWritePacket(
331 PseudoTcp* tcp,
332 const char* buffer,
333 size_t len) {
334 DCHECK_EQ(tcp, &pseudo_tcp_);
335
336 // If we already have a write pending, we behave like a congested network,
337 // returning success for the write, but dropping the packet. PseudoTcp will
338 // back-off and retransmit, adjusting for the perceived congestion.
339 if (socket_write_pending_)
340 return IPseudoTcpNotify::WR_SUCCESS;
341
342 scoped_refptr<net::IOBuffer> write_buffer =
343 base::MakeRefCounted<net::IOBuffer>(len);
344 memcpy(write_buffer->data(), buffer, len);
345
346 // Our underlying socket is datagram-oriented, which means it should either
347 // send exactly as many bytes as we requested, or fail.
348 int result;
349 if (socket_) {
350 result =
351 socket_->Send(write_buffer.get(), len,
352 base::BindRepeating(&PseudoTcpAdapter::Core::OnWritten,
353 base::Unretained(this)));
354 } else {
355 result = net::ERR_CONNECTION_CLOSED;
356 }
357 if (result == net::ERR_IO_PENDING) {
358 socket_write_pending_ = true;
359 return IPseudoTcpNotify::WR_SUCCESS;
360 } else if (result == net::ERR_MSG_TOO_BIG) {
361 return IPseudoTcpNotify::WR_TOO_LARGE;
362 } else if (result < 0) {
363 return IPseudoTcpNotify::WR_FAIL;
364 } else {
365 return IPseudoTcpNotify::WR_SUCCESS;
366 }
367 }
368
DoReadFromSocket()369 void PseudoTcpAdapter::Core::DoReadFromSocket() {
370 if (!socket_read_buffer_.get())
371 socket_read_buffer_ = base::MakeRefCounted<net::IOBuffer>(kReadBufferSize);
372
373 int result = 1;
374 while (socket_ && result > 0) {
375 result = socket_->Recv(socket_read_buffer_.get(), kReadBufferSize,
376 base::BindRepeating(&PseudoTcpAdapter::Core::OnRead,
377 base::Unretained(this)));
378 if (result != net::ERR_IO_PENDING)
379 HandleReadResults(result);
380 }
381 }
382
HandleReadResults(int result)383 void PseudoTcpAdapter::Core::HandleReadResults(int result) {
384 if (result <= 0) {
385 LOG(ERROR) << "Read returned " << result;
386 return;
387 }
388
389 // TODO(wez): Disconnect on failure of NotifyPacket?
390 pseudo_tcp_.NotifyPacket(socket_read_buffer_->data(), result);
391 AdjustClock();
392
393 CheckWriteComplete();
394 }
395
OnRead(int result)396 void PseudoTcpAdapter::Core::OnRead(int result) {
397 // Reference the Core in case a callback deletes the adapter.
398 scoped_refptr<Core> core(this);
399
400 HandleReadResults(result);
401 if (result >= 0)
402 DoReadFromSocket();
403 }
404
OnWritten(int result)405 void PseudoTcpAdapter::Core::OnWritten(int result) {
406 // Reference the Core in case a callback deletes the adapter.
407 scoped_refptr<Core> core(this);
408
409 socket_write_pending_ = false;
410 if (result < 0) {
411 LOG(WARNING) << "Write failed. Error code: " << result;
412 }
413 }
414
AdjustClock()415 void PseudoTcpAdapter::Core::AdjustClock() {
416 long timeout = 0;
417 if (pseudo_tcp_.GetNextClock(PseudoTcp::Now(), timeout)) {
418 timer_.Stop();
419 timer_.Start(FROM_HERE,
420 base::TimeDelta::FromMilliseconds(std::max(timeout, 0L)), this,
421 &PseudoTcpAdapter::Core::HandleTcpClock);
422 }
423 }
424
HandleTcpClock()425 void PseudoTcpAdapter::Core::HandleTcpClock() {
426 // Reference the Core in case a callback deletes the adapter.
427 scoped_refptr<Core> core(this);
428
429 pseudo_tcp_.NotifyClock(PseudoTcp::Now());
430 AdjustClock();
431
432 CheckWriteComplete();
433 }
434
CheckWriteComplete()435 void PseudoTcpAdapter::Core::CheckWriteComplete() {
436 if (!write_callback_.is_null() && waiting_write_position_) {
437 if (pseudo_tcp_.GetBytesBufferedNotSent() == 0) {
438 waiting_write_position_ = false;
439
440 write_buffer_.reset();
441 std::move(write_callback_).Run(last_write_result_);
442 }
443 }
444 }
445
446 // Public interface implementation.
447
PseudoTcpAdapter(std::unique_ptr<P2PDatagramSocket> socket)448 PseudoTcpAdapter::PseudoTcpAdapter(std::unique_ptr<P2PDatagramSocket> socket)
449 : core_(new Core(std::move(socket))) {}
450
~PseudoTcpAdapter()451 PseudoTcpAdapter::~PseudoTcpAdapter() {
452 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
453 // Make sure that the underlying socket is destroyed before PseudoTcp.
454 core_->DeleteSocket();
455 }
456
Read(const scoped_refptr<net::IOBuffer> & buffer,int buffer_size,net::CompletionOnceCallback callback)457 int PseudoTcpAdapter::Read(const scoped_refptr<net::IOBuffer>& buffer,
458 int buffer_size,
459 net::CompletionOnceCallback callback) {
460 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
461 return core_->Read(buffer, buffer_size, std::move(callback));
462 }
463
Write(const scoped_refptr<net::IOBuffer> & buffer,int buffer_size,net::CompletionOnceCallback callback,const net::NetworkTrafficAnnotationTag & traffic_annotation)464 int PseudoTcpAdapter::Write(
465 const scoped_refptr<net::IOBuffer>& buffer,
466 int buffer_size,
467 net::CompletionOnceCallback callback,
468 const net::NetworkTrafficAnnotationTag& traffic_annotation) {
469 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
470 return core_->Write(buffer, buffer_size, std::move(callback),
471 traffic_annotation);
472 }
473
SetReceiveBufferSize(int32_t size)474 int PseudoTcpAdapter::SetReceiveBufferSize(int32_t size) {
475 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
476 core_->SetReceiveBufferSize(size);
477 return net::OK;
478 }
479
SetSendBufferSize(int32_t size)480 int PseudoTcpAdapter::SetSendBufferSize(int32_t size) {
481 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
482 core_->SetSendBufferSize(size);
483 return net::OK;
484 }
485
Connect(net::CompletionOnceCallback callback)486 net::CompletionOnceCallback PseudoTcpAdapter::Connect(
487 net::CompletionOnceCallback callback) {
488 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
489 return core_->Connect(std::move(callback));
490 }
491
SetAckDelay(int delay_ms)492 void PseudoTcpAdapter::SetAckDelay(int delay_ms) {
493 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
494 core_->SetAckDelay(delay_ms);
495 }
496
SetNoDelay(bool no_delay)497 void PseudoTcpAdapter::SetNoDelay(bool no_delay) {
498 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
499 core_->SetNoDelay(no_delay);
500 }
501
SetWriteWaitsForSend(bool write_waits_for_send)502 void PseudoTcpAdapter::SetWriteWaitsForSend(bool write_waits_for_send) {
503 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
504 core_->SetWriteWaitsForSend(write_waits_for_send);
505 }
506
507 } // namespace protocol
508 } // namespace remoting
509