1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "remoting/protocol/pseudotcp_adapter.h"
6 
7 #include <stddef.h>
8 
9 #include <utility>
10 
11 #include "base/bind.h"
12 #include "base/compiler_specific.h"
13 #include "base/logging.h"
14 #include "base/macros.h"
15 #include "base/time/time.h"
16 #include "base/timer/timer.h"
17 #include "net/base/address_list.h"
18 #include "net/base/completion_once_callback.h"
19 #include "net/base/io_buffer.h"
20 #include "net/base/net_errors.h"
21 #include "net/traffic_annotation/network_traffic_annotation.h"
22 #include "remoting/protocol/p2p_datagram_socket.h"
23 
24 using cricket::PseudoTcp;
25 
26 namespace {
27 const int kReadBufferSize = 65536;  // Maximum size of a packet.
28 const uint16_t kDefaultMtu = 1280;
29 }  // namespace
30 
31 namespace remoting {
32 namespace protocol {
33 
34 class PseudoTcpAdapter::Core : public cricket::IPseudoTcpNotify,
35                                public base::RefCounted<Core> {
36  public:
37   explicit Core(std::unique_ptr<P2PDatagramSocket> socket);
38 
39   // Functions used to implement net::StreamSocket.
40   int Read(const scoped_refptr<net::IOBuffer>& buffer,
41            int buffer_size,
42            net::CompletionOnceCallback callback);
43   int Write(const scoped_refptr<net::IOBuffer>& buffer,
44             int buffer_size,
45             net::CompletionOnceCallback callback,
46             const net::NetworkTrafficAnnotationTag& traffic_annotation);
47   net::CompletionOnceCallback Connect(net::CompletionOnceCallback callback);
48 
49   // cricket::IPseudoTcpNotify interface.
50   // These notifications are triggered from NotifyPacket.
51   void OnTcpOpen(cricket::PseudoTcp* tcp) override;
52   void OnTcpReadable(cricket::PseudoTcp* tcp) override;
53   void OnTcpWriteable(cricket::PseudoTcp* tcp) override;
54   // This is triggered by NotifyClock or NotifyPacket.
55   void OnTcpClosed(cricket::PseudoTcp* tcp, uint32_t error) override;
56   // This is triggered by NotifyClock, NotifyPacket, Recv and Send.
57   WriteResult TcpWritePacket(cricket::PseudoTcp* tcp,
58                              const char* buffer,
59                              size_t len) override;
60 
61   void SetAckDelay(int delay_ms);
62   void SetNoDelay(bool no_delay);
63   void SetReceiveBufferSize(int32_t size);
64   void SetSendBufferSize(int32_t size);
65   void SetWriteWaitsForSend(bool write_waits_for_send);
66 
67   void DeleteSocket();
68 
69  private:
70   friend class base::RefCounted<Core>;
71   ~Core() override;
72 
73   // These are invoked by the underlying Socket, and may trigger callbacks.
74   // They hold a reference to |this| while running, to protect from deletion.
75   void OnRead(int result);
76   void OnWritten(int result);
77 
78   // These may trigger callbacks, so the holder must hold a reference on
79   // the stack while calling them.
80   void DoReadFromSocket();
81   void HandleReadResults(int result);
82   void HandleTcpClock();
83 
84   // Checks if current write has completed in the write-waits-for-send
85   // mode.
86   void CheckWriteComplete();
87 
88   // This re-sets |timer| without triggering callbacks.
89   void AdjustClock();
90 
91   net::CompletionOnceCallback connect_callback_;
92   net::CompletionOnceCallback read_callback_;
93   net::CompletionOnceCallback write_callback_;
94 
95   cricket::PseudoTcp pseudo_tcp_;
96   std::unique_ptr<P2PDatagramSocket> socket_;
97 
98   scoped_refptr<net::IOBuffer> read_buffer_;
99   int read_buffer_size_;
100   scoped_refptr<net::IOBuffer> write_buffer_;
101   int write_buffer_size_;
102 
103   // Whether we need to wait for data to be sent before completing write.
104   bool write_waits_for_send_;
105 
106   // Set to true in the write-waits-for-send mode when we've
107   // successfully writtend data to the send buffer and waiting for the
108   // data to be sent to the remote end.
109   bool waiting_write_position_;
110 
111   // Number of the bytes written by the last write stored while we wait
112   // for the data to be sent (i.e. when waiting_write_position_ = true).
113   int last_write_result_;
114 
115   bool socket_write_pending_;
116   scoped_refptr<net::IOBuffer> socket_read_buffer_;
117 
118   base::OneShotTimer timer_;
119 
120   DISALLOW_COPY_AND_ASSIGN(Core);
121 };
122 
Core(std::unique_ptr<P2PDatagramSocket> socket)123 PseudoTcpAdapter::Core::Core(std::unique_ptr<P2PDatagramSocket> socket)
124     : pseudo_tcp_(this, 0),
125       socket_(std::move(socket)),
126       write_waits_for_send_(false),
127       waiting_write_position_(false),
128       socket_write_pending_(false) {
129   // Doesn't trigger callbacks.
130   pseudo_tcp_.NotifyMTU(kDefaultMtu);
131 }
132 
133 PseudoTcpAdapter::Core::~Core() = default;
134 
Read(const scoped_refptr<net::IOBuffer> & buffer,int buffer_size,net::CompletionOnceCallback callback)135 int PseudoTcpAdapter::Core::Read(const scoped_refptr<net::IOBuffer>& buffer,
136                                  int buffer_size,
137                                  net::CompletionOnceCallback callback) {
138   DCHECK(read_callback_.is_null());
139 
140   // Reference the Core in case a callback deletes the adapter.
141   scoped_refptr<Core> core(this);
142 
143   int result = pseudo_tcp_.Recv(buffer->data(), buffer_size);
144   if (result < 0) {
145     result = net::MapSystemError(pseudo_tcp_.GetError());
146     DCHECK(result < 0);
147   }
148 
149   if (result == net::ERR_IO_PENDING) {
150     read_buffer_ = buffer;
151     read_buffer_size_ = buffer_size;
152     read_callback_ = std::move(callback);
153   }
154 
155   AdjustClock();
156 
157   return result;
158 }
159 
Write(const scoped_refptr<net::IOBuffer> & buffer,int buffer_size,net::CompletionOnceCallback callback,const net::NetworkTrafficAnnotationTag &)160 int PseudoTcpAdapter::Core::Write(
161     const scoped_refptr<net::IOBuffer>& buffer,
162     int buffer_size,
163     net::CompletionOnceCallback callback,
164     const net::NetworkTrafficAnnotationTag& /*traffic_annotation*/) {
165   DCHECK(write_callback_.is_null());
166 
167   // Reference the Core in case a callback deletes the adapter.
168   scoped_refptr<Core> core(this);
169 
170   int result = pseudo_tcp_.Send(buffer->data(), buffer_size);
171   if (result < 0) {
172     result = net::MapSystemError(pseudo_tcp_.GetError());
173     DCHECK(result < 0);
174   }
175 
176   AdjustClock();
177 
178   if (result == net::ERR_IO_PENDING) {
179     write_buffer_ = buffer;
180     write_buffer_size_ = buffer_size;
181     write_callback_ = std::move(callback);
182     return result;
183   }
184 
185   if (result < 0)
186     return result;
187 
188   // Need to wait until the data is sent to the peer when
189   // send-confirmation mode is enabled.
190   if (write_waits_for_send_ && pseudo_tcp_.GetBytesBufferedNotSent() > 0) {
191     DCHECK(!waiting_write_position_);
192     waiting_write_position_ = true;
193     last_write_result_ = result;
194     write_buffer_ = buffer;
195     write_buffer_size_ = buffer_size;
196     write_callback_ = std::move(callback);
197     return net::ERR_IO_PENDING;
198   }
199 
200   return result;
201 }
202 
Connect(net::CompletionOnceCallback callback)203 net::CompletionOnceCallback PseudoTcpAdapter::Core::Connect(
204     net::CompletionOnceCallback callback) {
205   DCHECK_EQ(pseudo_tcp_.State(), cricket::PseudoTcp::TCP_LISTEN);
206 
207   // Reference the Core in case a callback deletes the adapter.
208   scoped_refptr<Core> core(this);
209 
210   // Start the connection attempt.
211   int result = pseudo_tcp_.Connect();
212   if (result < 0)
213     return callback;
214 
215   AdjustClock();
216 
217   connect_callback_ = std::move(callback);
218   DoReadFromSocket();
219 
220   return {};
221 }
222 
OnTcpOpen(PseudoTcp * tcp)223 void PseudoTcpAdapter::Core::OnTcpOpen(PseudoTcp* tcp) {
224   DCHECK(tcp == &pseudo_tcp_);
225 
226   if (connect_callback_)
227     std::move(connect_callback_).Run(net::OK);
228 
229   OnTcpReadable(tcp);
230   OnTcpWriteable(tcp);
231 }
232 
OnTcpReadable(PseudoTcp * tcp)233 void PseudoTcpAdapter::Core::OnTcpReadable(PseudoTcp* tcp) {
234   DCHECK_EQ(tcp, &pseudo_tcp_);
235   if (read_callback_.is_null())
236     return;
237 
238   int result = pseudo_tcp_.Recv(read_buffer_->data(), read_buffer_size_);
239   if (result < 0) {
240     result = net::MapSystemError(pseudo_tcp_.GetError());
241     DCHECK(result < 0);
242     if (result == net::ERR_IO_PENDING)
243       return;
244   }
245 
246   AdjustClock();
247 
248   read_buffer_.reset();
249   std::move(read_callback_).Run(result);
250 }
251 
OnTcpWriteable(PseudoTcp * tcp)252 void PseudoTcpAdapter::Core::OnTcpWriteable(PseudoTcp* tcp) {
253   DCHECK_EQ(tcp, &pseudo_tcp_);
254   if (write_callback_.is_null())
255     return;
256 
257   if (waiting_write_position_) {
258     CheckWriteComplete();
259     return;
260   }
261 
262   int result = pseudo_tcp_.Send(write_buffer_->data(), write_buffer_size_);
263   if (result < 0) {
264     result = net::MapSystemError(pseudo_tcp_.GetError());
265     DCHECK(result < 0);
266     if (result == net::ERR_IO_PENDING)
267       return;
268   }
269 
270   AdjustClock();
271 
272   if (write_waits_for_send_ && pseudo_tcp_.GetBytesBufferedNotSent() > 0) {
273     DCHECK(!waiting_write_position_);
274     waiting_write_position_ = true;
275     last_write_result_ = result;
276     return;
277   }
278 
279   write_buffer_.reset();
280   std::move(write_callback_).Run(result);
281 }
282 
OnTcpClosed(PseudoTcp * tcp,uint32_t error)283 void PseudoTcpAdapter::Core::OnTcpClosed(PseudoTcp* tcp, uint32_t error) {
284   DCHECK_EQ(tcp, &pseudo_tcp_);
285 
286   if (!connect_callback_.is_null()) {
287     std::move(connect_callback_).Run(net::MapSystemError(error));
288   }
289 
290   if (!read_callback_.is_null()) {
291     std::move(read_callback_).Run(net::MapSystemError(error));
292   }
293 
294   if (!write_callback_.is_null()) {
295     std::move(write_callback_).Run(net::MapSystemError(error));
296   }
297 }
298 
SetAckDelay(int delay_ms)299 void PseudoTcpAdapter::Core::SetAckDelay(int delay_ms) {
300   pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_ACKDELAY, delay_ms);
301 }
302 
SetNoDelay(bool no_delay)303 void PseudoTcpAdapter::Core::SetNoDelay(bool no_delay) {
304   pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_NODELAY, no_delay ? 1 : 0);
305 }
306 
SetReceiveBufferSize(int32_t size)307 void PseudoTcpAdapter::Core::SetReceiveBufferSize(int32_t size) {
308   pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_RCVBUF, size);
309 }
310 
SetSendBufferSize(int32_t size)311 void PseudoTcpAdapter::Core::SetSendBufferSize(int32_t size) {
312   pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_SNDBUF, size);
313 }
314 
SetWriteWaitsForSend(bool write_waits_for_send)315 void PseudoTcpAdapter::Core::SetWriteWaitsForSend(bool write_waits_for_send) {
316   write_waits_for_send_ = write_waits_for_send;
317 }
318 
DeleteSocket()319 void PseudoTcpAdapter::Core::DeleteSocket() {
320   // Don't dispatch outstanding callbacks when the socket is deleted.
321   read_callback_.Reset();
322   read_buffer_.reset();
323   write_callback_.Reset();
324   write_buffer_.reset();
325   connect_callback_.Reset();
326 
327   socket_.reset();
328 }
329 
TcpWritePacket(PseudoTcp * tcp,const char * buffer,size_t len)330 cricket::IPseudoTcpNotify::WriteResult PseudoTcpAdapter::Core::TcpWritePacket(
331     PseudoTcp* tcp,
332     const char* buffer,
333     size_t len) {
334   DCHECK_EQ(tcp, &pseudo_tcp_);
335 
336   // If we already have a write pending, we behave like a congested network,
337   // returning success for the write, but dropping the packet.  PseudoTcp will
338   // back-off and retransmit, adjusting for the perceived congestion.
339   if (socket_write_pending_)
340     return IPseudoTcpNotify::WR_SUCCESS;
341 
342   scoped_refptr<net::IOBuffer> write_buffer =
343       base::MakeRefCounted<net::IOBuffer>(len);
344   memcpy(write_buffer->data(), buffer, len);
345 
346   // Our underlying socket is datagram-oriented, which means it should either
347   // send exactly as many bytes as we requested, or fail.
348   int result;
349   if (socket_) {
350     result =
351         socket_->Send(write_buffer.get(), len,
352                       base::BindRepeating(&PseudoTcpAdapter::Core::OnWritten,
353                                           base::Unretained(this)));
354   } else {
355     result = net::ERR_CONNECTION_CLOSED;
356   }
357   if (result == net::ERR_IO_PENDING) {
358     socket_write_pending_ = true;
359     return IPseudoTcpNotify::WR_SUCCESS;
360   } else if (result == net::ERR_MSG_TOO_BIG) {
361     return IPseudoTcpNotify::WR_TOO_LARGE;
362   } else if (result < 0) {
363     return IPseudoTcpNotify::WR_FAIL;
364   } else {
365     return IPseudoTcpNotify::WR_SUCCESS;
366   }
367 }
368 
DoReadFromSocket()369 void PseudoTcpAdapter::Core::DoReadFromSocket() {
370   if (!socket_read_buffer_.get())
371     socket_read_buffer_ = base::MakeRefCounted<net::IOBuffer>(kReadBufferSize);
372 
373   int result = 1;
374   while (socket_ && result > 0) {
375     result = socket_->Recv(socket_read_buffer_.get(), kReadBufferSize,
376                            base::BindRepeating(&PseudoTcpAdapter::Core::OnRead,
377                                                base::Unretained(this)));
378     if (result != net::ERR_IO_PENDING)
379       HandleReadResults(result);
380   }
381 }
382 
HandleReadResults(int result)383 void PseudoTcpAdapter::Core::HandleReadResults(int result) {
384   if (result <= 0) {
385     LOG(ERROR) << "Read returned " << result;
386     return;
387   }
388 
389   // TODO(wez): Disconnect on failure of NotifyPacket?
390   pseudo_tcp_.NotifyPacket(socket_read_buffer_->data(), result);
391   AdjustClock();
392 
393   CheckWriteComplete();
394 }
395 
OnRead(int result)396 void PseudoTcpAdapter::Core::OnRead(int result) {
397   // Reference the Core in case a callback deletes the adapter.
398   scoped_refptr<Core> core(this);
399 
400   HandleReadResults(result);
401   if (result >= 0)
402     DoReadFromSocket();
403 }
404 
OnWritten(int result)405 void PseudoTcpAdapter::Core::OnWritten(int result) {
406   // Reference the Core in case a callback deletes the adapter.
407   scoped_refptr<Core> core(this);
408 
409   socket_write_pending_ = false;
410   if (result < 0) {
411     LOG(WARNING) << "Write failed. Error code: " << result;
412   }
413 }
414 
AdjustClock()415 void PseudoTcpAdapter::Core::AdjustClock() {
416   long timeout = 0;
417   if (pseudo_tcp_.GetNextClock(PseudoTcp::Now(), timeout)) {
418     timer_.Stop();
419     timer_.Start(FROM_HERE,
420                  base::TimeDelta::FromMilliseconds(std::max(timeout, 0L)), this,
421                  &PseudoTcpAdapter::Core::HandleTcpClock);
422   }
423 }
424 
HandleTcpClock()425 void PseudoTcpAdapter::Core::HandleTcpClock() {
426   // Reference the Core in case a callback deletes the adapter.
427   scoped_refptr<Core> core(this);
428 
429   pseudo_tcp_.NotifyClock(PseudoTcp::Now());
430   AdjustClock();
431 
432   CheckWriteComplete();
433 }
434 
CheckWriteComplete()435 void PseudoTcpAdapter::Core::CheckWriteComplete() {
436   if (!write_callback_.is_null() && waiting_write_position_) {
437     if (pseudo_tcp_.GetBytesBufferedNotSent() == 0) {
438       waiting_write_position_ = false;
439 
440       write_buffer_.reset();
441       std::move(write_callback_).Run(last_write_result_);
442     }
443   }
444 }
445 
446 // Public interface implementation.
447 
PseudoTcpAdapter(std::unique_ptr<P2PDatagramSocket> socket)448 PseudoTcpAdapter::PseudoTcpAdapter(std::unique_ptr<P2PDatagramSocket> socket)
449     : core_(new Core(std::move(socket))) {}
450 
~PseudoTcpAdapter()451 PseudoTcpAdapter::~PseudoTcpAdapter() {
452   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
453   // Make sure that the underlying socket is destroyed before PseudoTcp.
454   core_->DeleteSocket();
455 }
456 
Read(const scoped_refptr<net::IOBuffer> & buffer,int buffer_size,net::CompletionOnceCallback callback)457 int PseudoTcpAdapter::Read(const scoped_refptr<net::IOBuffer>& buffer,
458                            int buffer_size,
459                            net::CompletionOnceCallback callback) {
460   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
461   return core_->Read(buffer, buffer_size, std::move(callback));
462 }
463 
Write(const scoped_refptr<net::IOBuffer> & buffer,int buffer_size,net::CompletionOnceCallback callback,const net::NetworkTrafficAnnotationTag & traffic_annotation)464 int PseudoTcpAdapter::Write(
465     const scoped_refptr<net::IOBuffer>& buffer,
466     int buffer_size,
467     net::CompletionOnceCallback callback,
468     const net::NetworkTrafficAnnotationTag& traffic_annotation) {
469   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
470   return core_->Write(buffer, buffer_size, std::move(callback),
471                       traffic_annotation);
472 }
473 
SetReceiveBufferSize(int32_t size)474 int PseudoTcpAdapter::SetReceiveBufferSize(int32_t size) {
475   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
476   core_->SetReceiveBufferSize(size);
477   return net::OK;
478 }
479 
SetSendBufferSize(int32_t size)480 int PseudoTcpAdapter::SetSendBufferSize(int32_t size) {
481   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
482   core_->SetSendBufferSize(size);
483   return net::OK;
484 }
485 
Connect(net::CompletionOnceCallback callback)486 net::CompletionOnceCallback PseudoTcpAdapter::Connect(
487     net::CompletionOnceCallback callback) {
488   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
489   return core_->Connect(std::move(callback));
490 }
491 
SetAckDelay(int delay_ms)492 void PseudoTcpAdapter::SetAckDelay(int delay_ms) {
493   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
494   core_->SetAckDelay(delay_ms);
495 }
496 
SetNoDelay(bool no_delay)497 void PseudoTcpAdapter::SetNoDelay(bool no_delay) {
498   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
499   core_->SetNoDelay(no_delay);
500 }
501 
SetWriteWaitsForSend(bool write_waits_for_send)502 void PseudoTcpAdapter::SetWriteWaitsForSend(bool write_waits_for_send) {
503   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
504   core_->SetWriteWaitsForSend(write_waits_for_send);
505 }
506 
507 }  // namespace protocol
508 }  // namespace remoting
509