1 /*
2  *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 #include "rtc_base/physical_socket_server.h"
11 
12 #if defined(_MSC_VER) && _MSC_VER < 1300
13 #pragma warning(disable : 4786)
14 #endif
15 
16 #ifdef MEMORY_SANITIZER
17 #include <sanitizer/msan_interface.h>
18 #endif
19 
20 #if defined(WEBRTC_POSIX)
21 #include <fcntl.h>
22 #include <string.h>
23 #if defined(WEBRTC_USE_EPOLL)
24 // "poll" will be used to wait for the signal dispatcher.
25 #include <poll.h>
26 #endif
27 #include <sys/ioctl.h>
28 #include <sys/select.h>
29 #include <sys/time.h>
30 #include <unistd.h>
31 #endif
32 
33 #if defined(WEBRTC_WIN)
34 #include <windows.h>
35 #include <winsock2.h>
36 #include <ws2tcpip.h>
37 #undef SetPort
38 #endif
39 
40 #include <errno.h>
41 
42 #include <algorithm>
43 #include <map>
44 
45 #include "rtc_base/arraysize.h"
46 #include "rtc_base/byte_order.h"
47 #include "rtc_base/checks.h"
48 #include "rtc_base/logging.h"
49 #include "rtc_base/network_monitor.h"
50 #include "rtc_base/null_socket_server.h"
51 #include "rtc_base/time_utils.h"
52 
53 #if defined(WEBRTC_LINUX)
54 #include <linux/sockios.h>
55 #endif
56 
57 #if defined(WEBRTC_WIN)
58 #define LAST_SYSTEM_ERROR (::GetLastError())
59 #elif defined(__native_client__) && __native_client__
60 #define LAST_SYSTEM_ERROR (0)
61 #elif defined(WEBRTC_POSIX)
62 #define LAST_SYSTEM_ERROR (errno)
63 #endif  // WEBRTC_WIN
64 
65 #if defined(WEBRTC_POSIX)
66 #include <netinet/tcp.h>  // for TCP_NODELAY
67 #define IP_MTU 14  // Until this is integrated from linux/in.h to netinet/in.h
68 typedef void* SockOptArg;
69 
70 #endif  // WEBRTC_POSIX
71 
72 #if defined(WEBRTC_POSIX) && !defined(WEBRTC_MAC) && !defined(WEBRTC_BSD) && !defined(__native_client__)
73 #if defined(WEBRTC_LINUX)
74 #include <linux/sockios.h>
75 #endif
76 
GetSocketRecvTimestamp(int socket)77 int64_t GetSocketRecvTimestamp(int socket) {
78   struct timeval tv_ioctl;
79   int ret = ioctl(socket, SIOCGSTAMP, &tv_ioctl);
80   if (ret != 0)
81     return -1;
82   int64_t timestamp =
83       rtc::kNumMicrosecsPerSec * static_cast<int64_t>(tv_ioctl.tv_sec) +
84       static_cast<int64_t>(tv_ioctl.tv_usec);
85   return timestamp;
86 }
87 
88 #else
89 
GetSocketRecvTimestamp(int socket)90 int64_t GetSocketRecvTimestamp(int socket) {
91   return -1;
92 }
93 #endif
94 
95 #if defined(WEBRTC_WIN)
96 typedef char* SockOptArg;
97 #endif
98 
99 #if defined(WEBRTC_USE_EPOLL)
100 // POLLRDHUP / EPOLLRDHUP are only defined starting with Linux 2.6.17.
101 #if !defined(POLLRDHUP)
102 #define POLLRDHUP 0x2000
103 #endif
104 #if !defined(EPOLLRDHUP)
105 #define EPOLLRDHUP 0x2000
106 #endif
107 #endif
108 
109 namespace {
110 class ScopedSetTrue {
111  public:
ScopedSetTrue(bool * value)112   ScopedSetTrue(bool* value) : value_(value) {
113     RTC_DCHECK(!*value_);
114     *value_ = true;
115   }
~ScopedSetTrue()116   ~ScopedSetTrue() { *value_ = false; }
117 
118  private:
119   bool* value_;
120 };
121 }  // namespace
122 
123 namespace rtc {
124 
CreateDefault()125 std::unique_ptr<SocketServer> SocketServer::CreateDefault() {
126 #if defined(__native_client__)
127   return std::unique_ptr<SocketServer>(new rtc::NullSocketServer);
128 #else
129   return std::unique_ptr<SocketServer>(new rtc::PhysicalSocketServer);
130 #endif
131 }
132 
PhysicalSocket(PhysicalSocketServer * ss,SOCKET s)133 PhysicalSocket::PhysicalSocket(PhysicalSocketServer* ss, SOCKET s)
134     : ss_(ss),
135       s_(s),
136       error_(0),
137       state_((s == INVALID_SOCKET) ? CS_CLOSED : CS_CONNECTED),
138       resolver_(nullptr) {
139   if (s_ != INVALID_SOCKET) {
140     SetEnabledEvents(DE_READ | DE_WRITE);
141 
142     int type = SOCK_STREAM;
143     socklen_t len = sizeof(type);
144     const int res =
145         getsockopt(s_, SOL_SOCKET, SO_TYPE, (SockOptArg)&type, &len);
146     RTC_DCHECK_EQ(0, res);
147     udp_ = (SOCK_DGRAM == type);
148   }
149 }
150 
~PhysicalSocket()151 PhysicalSocket::~PhysicalSocket() {
152   Close();
153 }
154 
Create(int family,int type)155 bool PhysicalSocket::Create(int family, int type) {
156   Close();
157   s_ = ::socket(family, type, 0);
158   udp_ = (SOCK_DGRAM == type);
159   family_ = family;
160   UpdateLastError();
161   if (udp_) {
162     SetEnabledEvents(DE_READ | DE_WRITE);
163   }
164   return s_ != INVALID_SOCKET;
165 }
166 
GetLocalAddress() const167 SocketAddress PhysicalSocket::GetLocalAddress() const {
168   sockaddr_storage addr_storage = {};
169   socklen_t addrlen = sizeof(addr_storage);
170   sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
171   int result = ::getsockname(s_, addr, &addrlen);
172   SocketAddress address;
173   if (result >= 0) {
174     SocketAddressFromSockAddrStorage(addr_storage, &address);
175   } else {
176     RTC_LOG(LS_WARNING) << "GetLocalAddress: unable to get local addr, socket="
177                         << s_;
178   }
179   return address;
180 }
181 
GetRemoteAddress() const182 SocketAddress PhysicalSocket::GetRemoteAddress() const {
183   sockaddr_storage addr_storage = {};
184   socklen_t addrlen = sizeof(addr_storage);
185   sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
186   int result = ::getpeername(s_, addr, &addrlen);
187   SocketAddress address;
188   if (result >= 0) {
189     SocketAddressFromSockAddrStorage(addr_storage, &address);
190   } else {
191     RTC_LOG(LS_WARNING)
192         << "GetRemoteAddress: unable to get remote addr, socket=" << s_;
193   }
194   return address;
195 }
196 
Bind(const SocketAddress & bind_addr)197 int PhysicalSocket::Bind(const SocketAddress& bind_addr) {
198   SocketAddress copied_bind_addr = bind_addr;
199   // If a network binder is available, use it to bind a socket to an interface
200   // instead of bind(), since this is more reliable on an OS with a weak host
201   // model.
202   if (ss_->network_binder() && !bind_addr.IsAnyIP()) {
203     NetworkBindingResult result =
204         ss_->network_binder()->BindSocketToNetwork(s_, bind_addr.ipaddr());
205     if (result == NetworkBindingResult::SUCCESS) {
206       // Since the network binder handled binding the socket to the desired
207       // network interface, we don't need to (and shouldn't) include an IP in
208       // the bind() call; bind() just needs to assign a port.
209       copied_bind_addr.SetIP(GetAnyIP(copied_bind_addr.ipaddr().family()));
210     } else if (result == NetworkBindingResult::NOT_IMPLEMENTED) {
211       RTC_LOG(LS_INFO) << "Can't bind socket to network because "
212                           "network binding is not implemented for this OS.";
213     } else {
214       if (bind_addr.IsLoopbackIP()) {
215         // If we couldn't bind to a loopback IP (which should only happen in
216         // test scenarios), continue on. This may be expected behavior.
217         RTC_LOG(LS_VERBOSE) << "Binding socket to loopback address"
218                             << " failed; result: " << static_cast<int>(result);
219       } else {
220         RTC_LOG(LS_WARNING) << "Binding socket to network address"
221                             << " failed; result: " << static_cast<int>(result);
222         // If a network binding was attempted and failed, we should stop here
223         // and not try to use the socket. Otherwise, we may end up sending
224         // packets with an invalid source address.
225         // See: https://bugs.chromium.org/p/webrtc/issues/detail?id=7026
226         return -1;
227       }
228     }
229   }
230   sockaddr_storage addr_storage;
231   size_t len = copied_bind_addr.ToSockAddrStorage(&addr_storage);
232   sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
233   int err = ::bind(s_, addr, static_cast<int>(len));
234   UpdateLastError();
235 #if !defined(NDEBUG)
236   if (0 == err) {
237     dbg_addr_ = "Bound @ ";
238     dbg_addr_.append(GetLocalAddress().ToString());
239   }
240 #endif
241   return err;
242 }
243 
Connect(const SocketAddress & addr)244 int PhysicalSocket::Connect(const SocketAddress& addr) {
245   // TODO(pthatcher): Implicit creation is required to reconnect...
246   // ...but should we make it more explicit?
247   if (state_ != CS_CLOSED) {
248     SetError(EALREADY);
249     return SOCKET_ERROR;
250   }
251   if (addr.IsUnresolvedIP()) {
252     RTC_LOG(LS_VERBOSE) << "Resolving addr in PhysicalSocket::Connect";
253     resolver_ = new AsyncResolver();
254     resolver_->SignalDone.connect(this, &PhysicalSocket::OnResolveResult);
255     resolver_->Start(addr);
256     state_ = CS_CONNECTING;
257     return 0;
258   }
259 
260   return DoConnect(addr);
261 }
262 
DoConnect(const SocketAddress & connect_addr)263 int PhysicalSocket::DoConnect(const SocketAddress& connect_addr) {
264   if ((s_ == INVALID_SOCKET) && !Create(connect_addr.family(), SOCK_STREAM)) {
265     return SOCKET_ERROR;
266   }
267   sockaddr_storage addr_storage;
268   size_t len = connect_addr.ToSockAddrStorage(&addr_storage);
269   sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
270   int err = ::connect(s_, addr, static_cast<int>(len));
271   UpdateLastError();
272   uint8_t events = DE_READ | DE_WRITE;
273   if (err == 0) {
274     state_ = CS_CONNECTED;
275   } else if (IsBlockingError(GetError())) {
276     state_ = CS_CONNECTING;
277     events |= DE_CONNECT;
278   } else {
279     return SOCKET_ERROR;
280   }
281 
282   EnableEvents(events);
283   return 0;
284 }
285 
GetError() const286 int PhysicalSocket::GetError() const {
287   CritScope cs(&crit_);
288   return error_;
289 }
290 
SetError(int error)291 void PhysicalSocket::SetError(int error) {
292   CritScope cs(&crit_);
293   error_ = error;
294 }
295 
GetState() const296 AsyncSocket::ConnState PhysicalSocket::GetState() const {
297   return state_;
298 }
299 
GetOption(Option opt,int * value)300 int PhysicalSocket::GetOption(Option opt, int* value) {
301   int slevel;
302   int sopt;
303   if (TranslateOption(opt, &slevel, &sopt) == -1)
304     return -1;
305   socklen_t optlen = sizeof(*value);
306   int ret = ::getsockopt(s_, slevel, sopt, (SockOptArg)value, &optlen);
307   if (ret == -1) {
308     return -1;
309   }
310   if (opt == OPT_DONTFRAGMENT) {
311 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
312     *value = (*value != IP_PMTUDISC_DONT) ? 1 : 0;
313 #endif
314   } else if (opt == OPT_DSCP) {
315 #if defined(WEBRTC_POSIX)
316     // unshift DSCP value to get six most significant bits of IP DiffServ field
317     *value >>= 2;
318 #endif
319   }
320   return ret;
321 }
322 
SetOption(Option opt,int value)323 int PhysicalSocket::SetOption(Option opt, int value) {
324   int slevel;
325   int sopt;
326   if (TranslateOption(opt, &slevel, &sopt) == -1)
327     return -1;
328   if (opt == OPT_DONTFRAGMENT) {
329 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
330     value = (value) ? IP_PMTUDISC_DO : IP_PMTUDISC_DONT;
331 #endif
332   } else if (opt == OPT_DSCP) {
333 #if defined(WEBRTC_POSIX)
334     // shift DSCP value to fit six most significant bits of IP DiffServ field
335     value <<= 2;
336 #endif
337   }
338 #if defined(WEBRTC_POSIX)
339   if (sopt == IPV6_TCLASS) {
340     // Set the IPv4 option in all cases to support dual-stack sockets.
341     ::setsockopt(s_, IPPROTO_IP, IP_TOS, (SockOptArg)&value, sizeof(value));
342   }
343 #endif
344   return ::setsockopt(s_, slevel, sopt, (SockOptArg)&value, sizeof(value));
345 }
346 
Send(const void * pv,size_t cb)347 int PhysicalSocket::Send(const void* pv, size_t cb) {
348   int sent = DoSend(
349       s_, reinterpret_cast<const char*>(pv), static_cast<int>(cb),
350 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
351       // Suppress SIGPIPE. Without this, attempting to send on a socket whose
352       // other end is closed will result in a SIGPIPE signal being raised to
353       // our process, which by default will terminate the process, which we
354       // don't want. By specifying this flag, we'll just get the error EPIPE
355       // instead and can handle the error gracefully.
356       MSG_NOSIGNAL
357 #else
358       0
359 #endif
360   );
361   UpdateLastError();
362   MaybeRemapSendError();
363   // We have seen minidumps where this may be false.
364   RTC_DCHECK(sent <= static_cast<int>(cb));
365   if ((sent > 0 && sent < static_cast<int>(cb)) ||
366       (sent < 0 && IsBlockingError(GetError()))) {
367     EnableEvents(DE_WRITE);
368   }
369   return sent;
370 }
371 
SendTo(const void * buffer,size_t length,const SocketAddress & addr)372 int PhysicalSocket::SendTo(const void* buffer,
373                            size_t length,
374                            const SocketAddress& addr) {
375   sockaddr_storage saddr;
376   size_t len = addr.ToSockAddrStorage(&saddr);
377   int sent =
378       DoSendTo(s_, static_cast<const char*>(buffer), static_cast<int>(length),
379 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
380                // Suppress SIGPIPE. See above for explanation.
381                MSG_NOSIGNAL,
382 #else
383                0,
384 #endif
385                reinterpret_cast<sockaddr*>(&saddr), static_cast<int>(len));
386   UpdateLastError();
387   MaybeRemapSendError();
388   // We have seen minidumps where this may be false.
389   RTC_DCHECK(sent <= static_cast<int>(length));
390   if ((sent > 0 && sent < static_cast<int>(length)) ||
391       (sent < 0 && IsBlockingError(GetError()))) {
392     EnableEvents(DE_WRITE);
393   }
394   return sent;
395 }
396 
Recv(void * buffer,size_t length,int64_t * timestamp)397 int PhysicalSocket::Recv(void* buffer, size_t length, int64_t* timestamp) {
398   int received =
399       ::recv(s_, static_cast<char*>(buffer), static_cast<int>(length), 0);
400   if ((received == 0) && (length != 0)) {
401     // Note: on graceful shutdown, recv can return 0.  In this case, we
402     // pretend it is blocking, and then signal close, so that simplifying
403     // assumptions can be made about Recv.
404     RTC_LOG(LS_WARNING) << "EOF from socket; deferring close event";
405     // Must turn this back on so that the select() loop will notice the close
406     // event.
407     EnableEvents(DE_READ);
408     SetError(EWOULDBLOCK);
409     return SOCKET_ERROR;
410   }
411   if (timestamp) {
412     *timestamp = GetSocketRecvTimestamp(s_);
413   }
414   UpdateLastError();
415   int error = GetError();
416   bool success = (received >= 0) || IsBlockingError(error);
417   if (udp_ || success) {
418     EnableEvents(DE_READ);
419   }
420   if (!success) {
421     RTC_LOG_F(LS_VERBOSE) << "Error = " << error;
422   }
423   return received;
424 }
425 
RecvFrom(void * buffer,size_t length,SocketAddress * out_addr,int64_t * timestamp)426 int PhysicalSocket::RecvFrom(void* buffer,
427                              size_t length,
428                              SocketAddress* out_addr,
429                              int64_t* timestamp) {
430   sockaddr_storage addr_storage;
431   socklen_t addr_len = sizeof(addr_storage);
432   sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
433   int received = ::recvfrom(s_, static_cast<char*>(buffer),
434                             static_cast<int>(length), 0, addr, &addr_len);
435   if (timestamp) {
436     *timestamp = GetSocketRecvTimestamp(s_);
437   }
438   UpdateLastError();
439   if ((received >= 0) && (out_addr != nullptr))
440     SocketAddressFromSockAddrStorage(addr_storage, out_addr);
441   int error = GetError();
442   bool success = (received >= 0) || IsBlockingError(error);
443   if (udp_ || success) {
444     EnableEvents(DE_READ);
445   }
446   if (!success) {
447     RTC_LOG_F(LS_VERBOSE) << "Error = " << error;
448   }
449   return received;
450 }
451 
Listen(int backlog)452 int PhysicalSocket::Listen(int backlog) {
453   int err = ::listen(s_, backlog);
454   UpdateLastError();
455   if (err == 0) {
456     state_ = CS_CONNECTING;
457     EnableEvents(DE_ACCEPT);
458 #if !defined(NDEBUG)
459     dbg_addr_ = "Listening @ ";
460     dbg_addr_.append(GetLocalAddress().ToString());
461 #endif
462   }
463   return err;
464 }
465 
Accept(SocketAddress * out_addr)466 AsyncSocket* PhysicalSocket::Accept(SocketAddress* out_addr) {
467   // Always re-subscribe DE_ACCEPT to make sure new incoming connections will
468   // trigger an event even if DoAccept returns an error here.
469   EnableEvents(DE_ACCEPT);
470   sockaddr_storage addr_storage;
471   socklen_t addr_len = sizeof(addr_storage);
472   sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
473   SOCKET s = DoAccept(s_, addr, &addr_len);
474   UpdateLastError();
475   if (s == INVALID_SOCKET)
476     return nullptr;
477   if (out_addr != nullptr)
478     SocketAddressFromSockAddrStorage(addr_storage, out_addr);
479   return ss_->WrapSocket(s);
480 }
481 
Close()482 int PhysicalSocket::Close() {
483   if (s_ == INVALID_SOCKET)
484     return 0;
485   int err = ::closesocket(s_);
486   UpdateLastError();
487   s_ = INVALID_SOCKET;
488   state_ = CS_CLOSED;
489   SetEnabledEvents(0);
490   if (resolver_) {
491     resolver_->Destroy(false);
492     resolver_ = nullptr;
493   }
494   return err;
495 }
496 
DoAccept(SOCKET socket,sockaddr * addr,socklen_t * addrlen)497 SOCKET PhysicalSocket::DoAccept(SOCKET socket,
498                                 sockaddr* addr,
499                                 socklen_t* addrlen) {
500   return ::accept(socket, addr, addrlen);
501 }
502 
DoSend(SOCKET socket,const char * buf,int len,int flags)503 int PhysicalSocket::DoSend(SOCKET socket, const char* buf, int len, int flags) {
504   return ::send(socket, buf, len, flags);
505 }
506 
DoSendTo(SOCKET socket,const char * buf,int len,int flags,const struct sockaddr * dest_addr,socklen_t addrlen)507 int PhysicalSocket::DoSendTo(SOCKET socket,
508                              const char* buf,
509                              int len,
510                              int flags,
511                              const struct sockaddr* dest_addr,
512                              socklen_t addrlen) {
513   return ::sendto(socket, buf, len, flags, dest_addr, addrlen);
514 }
515 
OnResolveResult(AsyncResolverInterface * resolver)516 void PhysicalSocket::OnResolveResult(AsyncResolverInterface* resolver) {
517   if (resolver != resolver_) {
518     return;
519   }
520 
521   int error = resolver_->GetError();
522   if (error == 0) {
523     error = DoConnect(resolver_->address());
524   } else {
525     Close();
526   }
527 
528   if (error) {
529     SetError(error);
530     SignalCloseEvent(this, error);
531   }
532 }
533 
UpdateLastError()534 void PhysicalSocket::UpdateLastError() {
535   SetError(LAST_SYSTEM_ERROR);
536 }
537 
MaybeRemapSendError()538 void PhysicalSocket::MaybeRemapSendError() {
539 #if defined(WEBRTC_MAC)
540   // https://developer.apple.com/library/mac/documentation/Darwin/
541   // Reference/ManPages/man2/sendto.2.html
542   // ENOBUFS - The output queue for a network interface is full.
543   // This generally indicates that the interface has stopped sending,
544   // but may be caused by transient congestion.
545   if (GetError() == ENOBUFS) {
546     SetError(EWOULDBLOCK);
547   }
548 #endif
549 }
550 
SetEnabledEvents(uint8_t events)551 void PhysicalSocket::SetEnabledEvents(uint8_t events) {
552   enabled_events_ = events;
553 }
554 
EnableEvents(uint8_t events)555 void PhysicalSocket::EnableEvents(uint8_t events) {
556   enabled_events_ |= events;
557 }
558 
DisableEvents(uint8_t events)559 void PhysicalSocket::DisableEvents(uint8_t events) {
560   enabled_events_ &= ~events;
561 }
562 
TranslateOption(Option opt,int * slevel,int * sopt)563 int PhysicalSocket::TranslateOption(Option opt, int* slevel, int* sopt) {
564   switch (opt) {
565     case OPT_DONTFRAGMENT:
566 #if defined(WEBRTC_WIN)
567       *slevel = IPPROTO_IP;
568       *sopt = IP_DONTFRAGMENT;
569       break;
570 #elif defined(WEBRTC_MAC) || defined(WEBRTC_BSD) || defined(__native_client__)
571       RTC_LOG(LS_WARNING) << "Socket::OPT_DONTFRAGMENT not supported.";
572       return -1;
573 #elif defined(WEBRTC_POSIX)
574       *slevel = IPPROTO_IP;
575       *sopt = IP_MTU_DISCOVER;
576       break;
577 #endif
578     case OPT_RCVBUF:
579       *slevel = SOL_SOCKET;
580       *sopt = SO_RCVBUF;
581       break;
582     case OPT_SNDBUF:
583       *slevel = SOL_SOCKET;
584       *sopt = SO_SNDBUF;
585       break;
586     case OPT_NODELAY:
587       *slevel = IPPROTO_TCP;
588       *sopt = TCP_NODELAY;
589       break;
590     case OPT_DSCP:
591 #if defined(WEBRTC_POSIX)
592       if (family_ == AF_INET6) {
593         *slevel = IPPROTO_IPV6;
594         *sopt = IPV6_TCLASS;
595       } else {
596         *slevel = IPPROTO_IP;
597         *sopt = IP_TOS;
598       }
599       break;
600 #else
601       RTC_LOG(LS_WARNING) << "Socket::OPT_DSCP not supported.";
602       return -1;
603 #endif
604     case OPT_RTP_SENDTIME_EXTN_ID:
605       return -1;  // No logging is necessary as this not a OS socket option.
606     default:
607       RTC_NOTREACHED();
608       return -1;
609   }
610   return 0;
611 }
612 
SocketDispatcher(PhysicalSocketServer * ss)613 SocketDispatcher::SocketDispatcher(PhysicalSocketServer* ss)
614 #if defined(WEBRTC_WIN)
615     : PhysicalSocket(ss),
616       id_(0),
617       signal_close_(false)
618 #else
619     : PhysicalSocket(ss)
620 #endif
621 {
622 }
623 
SocketDispatcher(SOCKET s,PhysicalSocketServer * ss)624 SocketDispatcher::SocketDispatcher(SOCKET s, PhysicalSocketServer* ss)
625 #if defined(WEBRTC_WIN)
626     : PhysicalSocket(ss, s),
627       id_(0),
628       signal_close_(false)
629 #else
630     : PhysicalSocket(ss, s)
631 #endif
632 {
633 }
634 
~SocketDispatcher()635 SocketDispatcher::~SocketDispatcher() {
636   Close();
637 }
638 
Initialize()639 bool SocketDispatcher::Initialize() {
640   RTC_DCHECK(s_ != INVALID_SOCKET);
641 // Must be a non-blocking
642 #if defined(WEBRTC_WIN)
643   u_long argp = 1;
644   ioctlsocket(s_, FIONBIO, &argp);
645 #elif defined(WEBRTC_POSIX)
646   fcntl(s_, F_SETFL, fcntl(s_, F_GETFL, 0) | O_NONBLOCK);
647 #endif
648 #if defined(WEBRTC_IOS)
649   // iOS may kill sockets when the app is moved to the background
650   // (specifically, if the app doesn't use the "voip" UIBackgroundMode). When
651   // we attempt to write to such a socket, SIGPIPE will be raised, which by
652   // default will terminate the process, which we don't want. By specifying
653   // this socket option, SIGPIPE will be disabled for the socket.
654   int value = 1;
655   ::setsockopt(s_, SOL_SOCKET, SO_NOSIGPIPE, &value, sizeof(value));
656 #endif
657   ss_->Add(this);
658   return true;
659 }
660 
Create(int type)661 bool SocketDispatcher::Create(int type) {
662   return Create(AF_INET, type);
663 }
664 
Create(int family,int type)665 bool SocketDispatcher::Create(int family, int type) {
666   // Change the socket to be non-blocking.
667   if (!PhysicalSocket::Create(family, type))
668     return false;
669 
670   if (!Initialize())
671     return false;
672 
673 #if defined(WEBRTC_WIN)
674   do {
675     id_ = ++next_id_;
676   } while (id_ == 0);
677 #endif
678   return true;
679 }
680 
681 #if defined(WEBRTC_WIN)
682 
GetWSAEvent()683 WSAEVENT SocketDispatcher::GetWSAEvent() {
684   return WSA_INVALID_EVENT;
685 }
686 
GetSocket()687 SOCKET SocketDispatcher::GetSocket() {
688   return s_;
689 }
690 
CheckSignalClose()691 bool SocketDispatcher::CheckSignalClose() {
692   if (!signal_close_)
693     return false;
694 
695   char ch;
696   if (recv(s_, &ch, 1, MSG_PEEK) > 0)
697     return false;
698 
699   state_ = CS_CLOSED;
700   signal_close_ = false;
701   SignalCloseEvent(this, signal_err_);
702   return true;
703 }
704 
705 int SocketDispatcher::next_id_ = 0;
706 
707 #elif defined(WEBRTC_POSIX)
708 
GetDescriptor()709 int SocketDispatcher::GetDescriptor() {
710   return s_;
711 }
712 
IsDescriptorClosed()713 bool SocketDispatcher::IsDescriptorClosed() {
714   if (udp_) {
715     // The MSG_PEEK trick doesn't work for UDP, since (at least in some
716     // circumstances) it requires reading an entire UDP packet, which would be
717     // bad for performance here. So, just check whether |s_| has been closed,
718     // which should be sufficient.
719     return s_ == INVALID_SOCKET;
720   }
721   // We don't have a reliable way of distinguishing end-of-stream
722   // from readability.  So test on each readable call.  Is this
723   // inefficient?  Probably.
724   char ch;
725   ssize_t res = ::recv(s_, &ch, 1, MSG_PEEK);
726   if (res > 0) {
727     // Data available, so not closed.
728     return false;
729   } else if (res == 0) {
730     // EOF, so closed.
731     return true;
732   } else {  // error
733     switch (errno) {
734       // Returned if we've already closed s_.
735       case EBADF:
736       // Returned during ungraceful peer shutdown.
737       case ECONNRESET:
738         return true;
739       // The normal blocking error; don't log anything.
740       case EWOULDBLOCK:
741       // Interrupted system call.
742       case EINTR:
743         return false;
744       default:
745         // Assume that all other errors are just blocking errors, meaning the
746         // connection is still good but we just can't read from it right now.
747         // This should only happen when connecting (and at most once), because
748         // in all other cases this function is only called if the file
749         // descriptor is already known to be in the readable state. However,
750         // it's not necessary a problem if we spuriously interpret a
751         // "connection lost"-type error as a blocking error, because typically
752         // the next recv() will get EOF, so we'll still eventually notice that
753         // the socket is closed.
754         RTC_LOG_ERR(LS_WARNING) << "Assuming benign blocking error";
755         return false;
756     }
757   }
758 }
759 
760 #endif  // WEBRTC_POSIX
761 
GetRequestedEvents()762 uint32_t SocketDispatcher::GetRequestedEvents() {
763   return enabled_events();
764 }
765 
OnPreEvent(uint32_t ff)766 void SocketDispatcher::OnPreEvent(uint32_t ff) {
767   if ((ff & DE_CONNECT) != 0)
768     state_ = CS_CONNECTED;
769 
770 #if defined(WEBRTC_WIN)
771 // We set CS_CLOSED from CheckSignalClose.
772 #elif defined(WEBRTC_POSIX)
773   if ((ff & DE_CLOSE) != 0)
774     state_ = CS_CLOSED;
775 #endif
776 }
777 
778 #if defined(WEBRTC_WIN)
779 
OnEvent(uint32_t ff,int err)780 void SocketDispatcher::OnEvent(uint32_t ff, int err) {
781   int cache_id = id_;
782   // Make sure we deliver connect/accept first. Otherwise, consumers may see
783   // something like a READ followed by a CONNECT, which would be odd.
784   if (((ff & DE_CONNECT) != 0) && (id_ == cache_id)) {
785     if (ff != DE_CONNECT)
786       RTC_LOG(LS_VERBOSE) << "Signalled with DE_CONNECT: " << ff;
787     DisableEvents(DE_CONNECT);
788 #if !defined(NDEBUG)
789     dbg_addr_ = "Connected @ ";
790     dbg_addr_.append(GetRemoteAddress().ToString());
791 #endif
792     SignalConnectEvent(this);
793   }
794   if (((ff & DE_ACCEPT) != 0) && (id_ == cache_id)) {
795     DisableEvents(DE_ACCEPT);
796     SignalReadEvent(this);
797   }
798   if ((ff & DE_READ) != 0) {
799     DisableEvents(DE_READ);
800     SignalReadEvent(this);
801   }
802   if (((ff & DE_WRITE) != 0) && (id_ == cache_id)) {
803     DisableEvents(DE_WRITE);
804     SignalWriteEvent(this);
805   }
806   if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) {
807     signal_close_ = true;
808     signal_err_ = err;
809   }
810 }
811 
812 #elif defined(WEBRTC_POSIX)
813 
OnEvent(uint32_t ff,int err)814 void SocketDispatcher::OnEvent(uint32_t ff, int err) {
815 #if defined(WEBRTC_USE_EPOLL)
816   // Remember currently enabled events so we can combine multiple changes
817   // into one update call later.
818   // The signal handlers might re-enable events disabled here, so we can't
819   // keep a list of events to disable at the end of the method. This list
820   // would not be updated with the events enabled by the signal handlers.
821   StartBatchedEventUpdates();
822 #endif
823   // Make sure we deliver connect/accept first. Otherwise, consumers may see
824   // something like a READ followed by a CONNECT, which would be odd.
825   if ((ff & DE_CONNECT) != 0) {
826     DisableEvents(DE_CONNECT);
827     SignalConnectEvent(this);
828   }
829   if ((ff & DE_ACCEPT) != 0) {
830     DisableEvents(DE_ACCEPT);
831     SignalReadEvent(this);
832   }
833   if ((ff & DE_READ) != 0) {
834     DisableEvents(DE_READ);
835     SignalReadEvent(this);
836   }
837   if ((ff & DE_WRITE) != 0) {
838     DisableEvents(DE_WRITE);
839     SignalWriteEvent(this);
840   }
841   if ((ff & DE_CLOSE) != 0) {
842     // The socket is now dead to us, so stop checking it.
843     SetEnabledEvents(0);
844     SignalCloseEvent(this, err);
845   }
846 #if defined(WEBRTC_USE_EPOLL)
847   FinishBatchedEventUpdates();
848 #endif
849 }
850 
851 #endif  // WEBRTC_POSIX
852 
853 #if defined(WEBRTC_USE_EPOLL)
854 
GetEpollEvents(uint32_t ff)855 inline static int GetEpollEvents(uint32_t ff) {
856   int events = 0;
857   if (ff & (DE_READ | DE_ACCEPT)) {
858     events |= EPOLLIN;
859   }
860   if (ff & (DE_WRITE | DE_CONNECT)) {
861     events |= EPOLLOUT;
862   }
863   return events;
864 }
865 
StartBatchedEventUpdates()866 void SocketDispatcher::StartBatchedEventUpdates() {
867   RTC_DCHECK_EQ(saved_enabled_events_, -1);
868   saved_enabled_events_ = enabled_events();
869 }
870 
FinishBatchedEventUpdates()871 void SocketDispatcher::FinishBatchedEventUpdates() {
872   RTC_DCHECK_NE(saved_enabled_events_, -1);
873   uint8_t old_events = static_cast<uint8_t>(saved_enabled_events_);
874   saved_enabled_events_ = -1;
875   MaybeUpdateDispatcher(old_events);
876 }
877 
MaybeUpdateDispatcher(uint8_t old_events)878 void SocketDispatcher::MaybeUpdateDispatcher(uint8_t old_events) {
879   if (GetEpollEvents(enabled_events()) != GetEpollEvents(old_events) &&
880       saved_enabled_events_ == -1) {
881     ss_->Update(this);
882   }
883 }
884 
SetEnabledEvents(uint8_t events)885 void SocketDispatcher::SetEnabledEvents(uint8_t events) {
886   uint8_t old_events = enabled_events();
887   PhysicalSocket::SetEnabledEvents(events);
888   MaybeUpdateDispatcher(old_events);
889 }
890 
EnableEvents(uint8_t events)891 void SocketDispatcher::EnableEvents(uint8_t events) {
892   uint8_t old_events = enabled_events();
893   PhysicalSocket::EnableEvents(events);
894   MaybeUpdateDispatcher(old_events);
895 }
896 
DisableEvents(uint8_t events)897 void SocketDispatcher::DisableEvents(uint8_t events) {
898   uint8_t old_events = enabled_events();
899   PhysicalSocket::DisableEvents(events);
900   MaybeUpdateDispatcher(old_events);
901 }
902 
903 #endif  // WEBRTC_USE_EPOLL
904 
Close()905 int SocketDispatcher::Close() {
906   if (s_ == INVALID_SOCKET)
907     return 0;
908 
909 #if defined(WEBRTC_WIN)
910   id_ = 0;
911   signal_close_ = false;
912 #endif
913 #if defined(WEBRTC_USE_EPOLL)
914   // If we're batching events, the socket can be closed and reopened
915   // during the batch. Set saved_enabled_events_ to 0 here so the new
916   // socket, if any, has the correct old events bitfield
917   if (saved_enabled_events_ != -1) {
918     saved_enabled_events_ = 0;
919   }
920 #endif
921   ss_->Remove(this);
922   return PhysicalSocket::Close();
923 }
924 
925 #if defined(WEBRTC_POSIX)
926 class EventDispatcher : public Dispatcher {
927  public:
EventDispatcher(PhysicalSocketServer * ss)928   EventDispatcher(PhysicalSocketServer* ss) : ss_(ss), fSignaled_(false) {
929     if (pipe(afd_) < 0)
930       RTC_LOG(LERROR) << "pipe failed";
931     ss_->Add(this);
932   }
933 
~EventDispatcher()934   ~EventDispatcher() override {
935     ss_->Remove(this);
936     close(afd_[0]);
937     close(afd_[1]);
938   }
939 
Signal()940   virtual void Signal() {
941     CritScope cs(&crit_);
942     if (!fSignaled_) {
943       const uint8_t b[1] = {0};
944       const ssize_t res = write(afd_[1], b, sizeof(b));
945       RTC_DCHECK_EQ(1, res);
946       fSignaled_ = true;
947     }
948   }
949 
GetRequestedEvents()950   uint32_t GetRequestedEvents() override { return DE_READ; }
951 
OnPreEvent(uint32_t ff)952   void OnPreEvent(uint32_t ff) override {
953     // It is not possible to perfectly emulate an auto-resetting event with
954     // pipes.  This simulates it by resetting before the event is handled.
955 
956     CritScope cs(&crit_);
957     if (fSignaled_) {
958       uint8_t b[4];  // Allow for reading more than 1 byte, but expect 1.
959       const ssize_t res = read(afd_[0], b, sizeof(b));
960       RTC_DCHECK_EQ(1, res);
961       fSignaled_ = false;
962     }
963   }
964 
OnEvent(uint32_t ff,int err)965   void OnEvent(uint32_t ff, int err) override { RTC_NOTREACHED(); }
966 
GetDescriptor()967   int GetDescriptor() override { return afd_[0]; }
968 
IsDescriptorClosed()969   bool IsDescriptorClosed() override { return false; }
970 
971  private:
972   PhysicalSocketServer* ss_;
973   int afd_[2];
974   bool fSignaled_;
975   RecursiveCriticalSection crit_;
976 };
977 
978 #endif  // WEBRTC_POSIX
979 
980 #if defined(WEBRTC_WIN)
FlagsToEvents(uint32_t events)981 static uint32_t FlagsToEvents(uint32_t events) {
982   uint32_t ffFD = FD_CLOSE;
983   if (events & DE_READ)
984     ffFD |= FD_READ;
985   if (events & DE_WRITE)
986     ffFD |= FD_WRITE;
987   if (events & DE_CONNECT)
988     ffFD |= FD_CONNECT;
989   if (events & DE_ACCEPT)
990     ffFD |= FD_ACCEPT;
991   return ffFD;
992 }
993 
994 class EventDispatcher : public Dispatcher {
995  public:
EventDispatcher(PhysicalSocketServer * ss)996   EventDispatcher(PhysicalSocketServer* ss) : ss_(ss) {
997     hev_ = WSACreateEvent();
998     if (hev_) {
999       ss_->Add(this);
1000     }
1001   }
1002 
~EventDispatcher()1003   ~EventDispatcher() override {
1004     if (hev_ != nullptr) {
1005       ss_->Remove(this);
1006       WSACloseEvent(hev_);
1007       hev_ = nullptr;
1008     }
1009   }
1010 
Signal()1011   virtual void Signal() {
1012     if (hev_ != nullptr)
1013       WSASetEvent(hev_);
1014   }
1015 
GetRequestedEvents()1016   uint32_t GetRequestedEvents() override { return 0; }
1017 
OnPreEvent(uint32_t ff)1018   void OnPreEvent(uint32_t ff) override { WSAResetEvent(hev_); }
1019 
OnEvent(uint32_t ff,int err)1020   void OnEvent(uint32_t ff, int err) override {}
1021 
GetWSAEvent()1022   WSAEVENT GetWSAEvent() override { return hev_; }
1023 
GetSocket()1024   SOCKET GetSocket() override { return INVALID_SOCKET; }
1025 
CheckSignalClose()1026   bool CheckSignalClose() override { return false; }
1027 
1028  private:
1029   PhysicalSocketServer* ss_;
1030   WSAEVENT hev_;
1031 };
1032 #endif  // WEBRTC_WIN
1033 
1034 // Sets the value of a boolean value to false when signaled.
1035 class Signaler : public EventDispatcher {
1036  public:
Signaler(PhysicalSocketServer * ss,bool * pf)1037   Signaler(PhysicalSocketServer* ss, bool* pf) : EventDispatcher(ss), pf_(pf) {}
~Signaler()1038   ~Signaler() override {}
1039 
OnEvent(uint32_t ff,int err)1040   void OnEvent(uint32_t ff, int err) override {
1041     if (pf_)
1042       *pf_ = false;
1043   }
1044 
1045  private:
1046   bool* pf_;
1047 };
1048 
PhysicalSocketServer()1049 PhysicalSocketServer::PhysicalSocketServer()
1050     :
1051 #if defined(WEBRTC_USE_EPOLL)
1052       // Since Linux 2.6.8, the size argument is ignored, but must be greater
1053       // than zero. Before that the size served as hint to the kernel for the
1054       // amount of space to initially allocate in internal data structures.
1055       epoll_fd_(epoll_create(FD_SETSIZE)),
1056 #endif
1057 #if defined(WEBRTC_WIN)
1058       socket_ev_(WSACreateEvent()),
1059 #endif
1060       fWait_(false) {
1061 #if defined(WEBRTC_USE_EPOLL)
1062   if (epoll_fd_ == -1) {
1063     // Not an error, will fall back to "select" below.
1064     RTC_LOG_E(LS_WARNING, EN, errno) << "epoll_create";
1065     // Note that -1 == INVALID_SOCKET, the alias used by later checks.
1066   }
1067 #endif
1068   signal_wakeup_ = new Signaler(this, &fWait_);
1069 }
1070 
~PhysicalSocketServer()1071 PhysicalSocketServer::~PhysicalSocketServer() {
1072 #if defined(WEBRTC_WIN)
1073   WSACloseEvent(socket_ev_);
1074 #endif
1075   delete signal_wakeup_;
1076 #if defined(WEBRTC_USE_EPOLL)
1077   if (epoll_fd_ != INVALID_SOCKET) {
1078     close(epoll_fd_);
1079   }
1080 #endif
1081   RTC_DCHECK(dispatcher_by_key_.empty());
1082   RTC_DCHECK(key_by_dispatcher_.empty());
1083 }
1084 
WakeUp()1085 void PhysicalSocketServer::WakeUp() {
1086   signal_wakeup_->Signal();
1087 }
1088 
CreateSocket(int family,int type)1089 Socket* PhysicalSocketServer::CreateSocket(int family, int type) {
1090   PhysicalSocket* socket = new PhysicalSocket(this);
1091   if (socket->Create(family, type)) {
1092     return socket;
1093   } else {
1094     delete socket;
1095     return nullptr;
1096   }
1097 }
1098 
CreateAsyncSocket(int family,int type)1099 AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int family, int type) {
1100   SocketDispatcher* dispatcher = new SocketDispatcher(this);
1101   if (dispatcher->Create(family, type)) {
1102     return dispatcher;
1103   } else {
1104     delete dispatcher;
1105     return nullptr;
1106   }
1107 }
1108 
WrapSocket(SOCKET s)1109 AsyncSocket* PhysicalSocketServer::WrapSocket(SOCKET s) {
1110   SocketDispatcher* dispatcher = new SocketDispatcher(s, this);
1111   if (dispatcher->Initialize()) {
1112     return dispatcher;
1113   } else {
1114     delete dispatcher;
1115     return nullptr;
1116   }
1117 }
1118 
Add(Dispatcher * pdispatcher)1119 void PhysicalSocketServer::Add(Dispatcher* pdispatcher) {
1120   CritScope cs(&crit_);
1121   if (key_by_dispatcher_.count(pdispatcher)) {
1122     RTC_LOG(LS_WARNING)
1123         << "PhysicalSocketServer asked to add a duplicate dispatcher.";
1124     return;
1125   }
1126   uint64_t key = next_dispatcher_key_++;
1127   dispatcher_by_key_.emplace(key, pdispatcher);
1128   key_by_dispatcher_.emplace(pdispatcher, key);
1129 #if defined(WEBRTC_USE_EPOLL)
1130   if (epoll_fd_ != INVALID_SOCKET) {
1131     AddEpoll(pdispatcher, key);
1132   }
1133 #endif  // WEBRTC_USE_EPOLL
1134 }
1135 
Remove(Dispatcher * pdispatcher)1136 void PhysicalSocketServer::Remove(Dispatcher* pdispatcher) {
1137   CritScope cs(&crit_);
1138   if (!key_by_dispatcher_.count(pdispatcher)) {
1139     RTC_LOG(LS_WARNING)
1140         << "PhysicalSocketServer asked to remove a unknown "
1141            "dispatcher, potentially from a duplicate call to Add.";
1142     return;
1143   }
1144   uint64_t key = key_by_dispatcher_.at(pdispatcher);
1145   key_by_dispatcher_.erase(pdispatcher);
1146   dispatcher_by_key_.erase(key);
1147 #if defined(WEBRTC_USE_EPOLL)
1148   if (epoll_fd_ != INVALID_SOCKET) {
1149     RemoveEpoll(pdispatcher);
1150   }
1151 #endif  // WEBRTC_USE_EPOLL
1152 }
1153 
Update(Dispatcher * pdispatcher)1154 void PhysicalSocketServer::Update(Dispatcher* pdispatcher) {
1155 #if defined(WEBRTC_USE_EPOLL)
1156   if (epoll_fd_ == INVALID_SOCKET) {
1157     return;
1158   }
1159 
1160   // Don't update dispatchers that haven't yet been added.
1161   CritScope cs(&crit_);
1162   if (!key_by_dispatcher_.count(pdispatcher)) {
1163     return;
1164   }
1165 
1166   UpdateEpoll(pdispatcher, key_by_dispatcher_.at(pdispatcher));
1167 #endif
1168 }
1169 
1170 #if defined(WEBRTC_POSIX)
1171 
Wait(int cmsWait,bool process_io)1172 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
1173   // We don't support reentrant waiting.
1174   RTC_DCHECK(!waiting_);
1175   ScopedSetTrue s(&waiting_);
1176 #if defined(WEBRTC_USE_EPOLL)
1177   // We don't keep a dedicated "epoll" descriptor containing only the non-IO
1178   // (i.e. signaling) dispatcher, so "poll" will be used instead of the default
1179   // "select" to support sockets larger than FD_SETSIZE.
1180   if (!process_io) {
1181     return WaitPoll(cmsWait, signal_wakeup_);
1182   } else if (epoll_fd_ != INVALID_SOCKET) {
1183     return WaitEpoll(cmsWait);
1184   }
1185 #endif
1186   return WaitSelect(cmsWait, process_io);
1187 }
1188 
ProcessEvents(Dispatcher * dispatcher,bool readable,bool writable,bool check_error)1189 static void ProcessEvents(Dispatcher* dispatcher,
1190                           bool readable,
1191                           bool writable,
1192                           bool check_error) {
1193   int errcode = 0;
1194   // TODO(pthatcher): Should we set errcode if getsockopt fails?
1195   if (check_error) {
1196     socklen_t len = sizeof(errcode);
1197     ::getsockopt(dispatcher->GetDescriptor(), SOL_SOCKET, SO_ERROR, &errcode,
1198                  &len);
1199   }
1200 
1201   // Most often the socket is writable or readable or both, so make a single
1202   // virtual call to get requested events
1203   const uint32_t requested_events = dispatcher->GetRequestedEvents();
1204   uint32_t ff = 0;
1205 
1206   // Check readable descriptors. If we're waiting on an accept, signal
1207   // that. Otherwise we're waiting for data, check to see if we're
1208   // readable or really closed.
1209   // TODO(pthatcher): Only peek at TCP descriptors.
1210   if (readable) {
1211     if (requested_events & DE_ACCEPT) {
1212       ff |= DE_ACCEPT;
1213     } else if (errcode || dispatcher->IsDescriptorClosed()) {
1214       ff |= DE_CLOSE;
1215     } else {
1216       ff |= DE_READ;
1217     }
1218   }
1219 
1220   // Check writable descriptors. If we're waiting on a connect, detect
1221   // success versus failure by the reaped error code.
1222   if (writable) {
1223     if (requested_events & DE_CONNECT) {
1224       if (!errcode) {
1225         ff |= DE_CONNECT;
1226       } else {
1227         ff |= DE_CLOSE;
1228       }
1229     } else {
1230       ff |= DE_WRITE;
1231     }
1232   }
1233 
1234   // Tell the descriptor about the event.
1235   if (ff != 0) {
1236     dispatcher->OnPreEvent(ff);
1237     dispatcher->OnEvent(ff, errcode);
1238   }
1239 }
1240 
WaitSelect(int cmsWait,bool process_io)1241 bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) {
1242   // Calculate timing information
1243 
1244   struct timeval* ptvWait = nullptr;
1245   struct timeval tvWait;
1246   int64_t stop_us;
1247   if (cmsWait != kForever) {
1248     // Calculate wait timeval
1249     tvWait.tv_sec = cmsWait / 1000;
1250     tvWait.tv_usec = (cmsWait % 1000) * 1000;
1251     ptvWait = &tvWait;
1252 
1253     // Calculate when to return
1254     stop_us = rtc::TimeMicros() + cmsWait * 1000;
1255   }
1256 
1257 
1258   fd_set fdsRead;
1259   fd_set fdsWrite;
1260 // Explicitly unpoison these FDs on MemorySanitizer which doesn't handle the
1261 // inline assembly in FD_ZERO.
1262 // http://crbug.com/344505
1263 #ifdef MEMORY_SANITIZER
1264   __msan_unpoison(&fdsRead, sizeof(fdsRead));
1265   __msan_unpoison(&fdsWrite, sizeof(fdsWrite));
1266 #endif
1267 
1268   fWait_ = true;
1269 
1270   while (fWait_) {
1271     // Zero all fd_sets. Although select() zeros the descriptors not signaled,
1272     // we may need to do this for dispatchers that were deleted while
1273     // iterating.
1274     FD_ZERO(&fdsRead);
1275     FD_ZERO(&fdsWrite);
1276     int fdmax = -1;
1277     {
1278       CritScope cr(&crit_);
1279       current_dispatcher_keys_.clear();
1280       for (auto const& kv : dispatcher_by_key_) {
1281         uint64_t key = kv.first;
1282         Dispatcher* pdispatcher = kv.second;
1283         // Query dispatchers for read and write wait state
1284         if (!process_io && (pdispatcher != signal_wakeup_))
1285           continue;
1286         current_dispatcher_keys_.push_back(key);
1287         int fd = pdispatcher->GetDescriptor();
1288         // "select"ing a file descriptor that is equal to or larger than
1289         // FD_SETSIZE will result in undefined behavior.
1290         RTC_CHECK_LT(fd, FD_SETSIZE);
1291         if (fd > fdmax)
1292           fdmax = fd;
1293 
1294         uint32_t ff = pdispatcher->GetRequestedEvents();
1295         if (ff & (DE_READ | DE_ACCEPT))
1296           FD_SET(fd, &fdsRead);
1297         if (ff & (DE_WRITE | DE_CONNECT))
1298           FD_SET(fd, &fdsWrite);
1299       }
1300     }
1301 
1302     // Wait then call handlers as appropriate
1303     // < 0 means error
1304     // 0 means timeout
1305     // > 0 means count of descriptors ready
1306     int n = select(fdmax + 1, &fdsRead, &fdsWrite, nullptr, ptvWait);
1307 
1308     // If error, return error.
1309     if (n < 0) {
1310       if (errno != EINTR) {
1311         RTC_LOG_E(LS_ERROR, EN, errno) << "select";
1312         return false;
1313       }
1314       // Else ignore the error and keep going. If this EINTR was for one of the
1315       // signals managed by this PhysicalSocketServer, the
1316       // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1317       // iteration.
1318     } else if (n == 0) {
1319       // If timeout, return success
1320       return true;
1321     } else {
1322       // We have signaled descriptors
1323       CritScope cr(&crit_);
1324       // Iterate only on the dispatchers whose sockets were passed into
1325       // WSAEventSelect; this avoids the ABA problem (a socket being
1326       // destroyed and a new one created with the same file descriptor).
1327       for (uint64_t key : current_dispatcher_keys_) {
1328         if (!dispatcher_by_key_.count(key))
1329           continue;
1330         Dispatcher* pdispatcher = dispatcher_by_key_.at(key);
1331 
1332         int fd = pdispatcher->GetDescriptor();
1333 
1334         bool readable = FD_ISSET(fd, &fdsRead);
1335         if (readable) {
1336           FD_CLR(fd, &fdsRead);
1337         }
1338 
1339         bool writable = FD_ISSET(fd, &fdsWrite);
1340         if (writable) {
1341           FD_CLR(fd, &fdsWrite);
1342         }
1343 
1344         // The error code can be signaled through reads or writes.
1345         ProcessEvents(pdispatcher, readable, writable, readable || writable);
1346       }
1347     }
1348 
1349     // Recalc the time remaining to wait. Doing it here means it doesn't get
1350     // calced twice the first time through the loop
1351     if (ptvWait) {
1352       ptvWait->tv_sec = 0;
1353       ptvWait->tv_usec = 0;
1354       int64_t time_left_us = stop_us - rtc::TimeMicros();
1355       if (time_left_us > 0) {
1356         ptvWait->tv_sec = time_left_us / rtc::kNumMicrosecsPerSec;
1357         ptvWait->tv_usec = time_left_us % rtc::kNumMicrosecsPerSec;
1358       }
1359     }
1360   }
1361 
1362   return true;
1363 }
1364 
1365 #if defined(WEBRTC_USE_EPOLL)
1366 
AddEpoll(Dispatcher * pdispatcher,uint64_t key)1367 void PhysicalSocketServer::AddEpoll(Dispatcher* pdispatcher, uint64_t key) {
1368   RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1369   int fd = pdispatcher->GetDescriptor();
1370   RTC_DCHECK(fd != INVALID_SOCKET);
1371   if (fd == INVALID_SOCKET) {
1372     return;
1373   }
1374 
1375   struct epoll_event event = {0};
1376   event.events = GetEpollEvents(pdispatcher->GetRequestedEvents());
1377   event.data.u64 = key;
1378   int err = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event);
1379   RTC_DCHECK_EQ(err, 0);
1380   if (err == -1) {
1381     RTC_LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_ADD";
1382   }
1383 }
1384 
RemoveEpoll(Dispatcher * pdispatcher)1385 void PhysicalSocketServer::RemoveEpoll(Dispatcher* pdispatcher) {
1386   RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1387   int fd = pdispatcher->GetDescriptor();
1388   RTC_DCHECK(fd != INVALID_SOCKET);
1389   if (fd == INVALID_SOCKET) {
1390     return;
1391   }
1392 
1393   struct epoll_event event = {0};
1394   int err = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, &event);
1395   RTC_DCHECK(err == 0 || errno == ENOENT);
1396   if (err == -1) {
1397     if (errno == ENOENT) {
1398       // Socket has already been closed.
1399       RTC_LOG_E(LS_VERBOSE, EN, errno) << "epoll_ctl EPOLL_CTL_DEL";
1400     } else {
1401       RTC_LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_DEL";
1402     }
1403   }
1404 }
1405 
UpdateEpoll(Dispatcher * pdispatcher,uint64_t key)1406 void PhysicalSocketServer::UpdateEpoll(Dispatcher* pdispatcher, uint64_t key) {
1407   RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1408   int fd = pdispatcher->GetDescriptor();
1409   RTC_DCHECK(fd != INVALID_SOCKET);
1410   if (fd == INVALID_SOCKET) {
1411     return;
1412   }
1413 
1414   struct epoll_event event = {0};
1415   event.events = GetEpollEvents(pdispatcher->GetRequestedEvents());
1416   event.data.u64 = key;
1417   int err = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &event);
1418   RTC_DCHECK_EQ(err, 0);
1419   if (err == -1) {
1420     RTC_LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_MOD";
1421   }
1422 }
1423 
WaitEpoll(int cmsWait)1424 bool PhysicalSocketServer::WaitEpoll(int cmsWait) {
1425   RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1426   int64_t tvWait = -1;
1427   int64_t tvStop = -1;
1428   if (cmsWait != kForever) {
1429     tvWait = cmsWait;
1430     tvStop = TimeAfter(cmsWait);
1431   }
1432 
1433   fWait_ = true;
1434   while (fWait_) {
1435     // Wait then call handlers as appropriate
1436     // < 0 means error
1437     // 0 means timeout
1438     // > 0 means count of descriptors ready
1439     int n = epoll_wait(epoll_fd_, epoll_events_.data(), epoll_events_.size(),
1440                        static_cast<int>(tvWait));
1441     if (n < 0) {
1442       if (errno != EINTR) {
1443         RTC_LOG_E(LS_ERROR, EN, errno) << "epoll";
1444         return false;
1445       }
1446       // Else ignore the error and keep going. If this EINTR was for one of the
1447       // signals managed by this PhysicalSocketServer, the
1448       // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1449       // iteration.
1450     } else if (n == 0) {
1451       // If timeout, return success
1452       return true;
1453     } else {
1454       // We have signaled descriptors
1455       CritScope cr(&crit_);
1456       for (int i = 0; i < n; ++i) {
1457         const epoll_event& event = epoll_events_[i];
1458         uint64_t key = event.data.u64;
1459         if (!dispatcher_by_key_.count(key)) {
1460           // The dispatcher for this socket no longer exists.
1461           continue;
1462         }
1463         Dispatcher* pdispatcher = dispatcher_by_key_.at(key);
1464 
1465         bool readable = (event.events & (EPOLLIN | EPOLLPRI));
1466         bool writable = (event.events & EPOLLOUT);
1467         bool check_error = (event.events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP));
1468 
1469         ProcessEvents(pdispatcher, readable, writable, check_error);
1470       }
1471     }
1472 
1473     if (cmsWait != kForever) {
1474       tvWait = TimeDiff(tvStop, TimeMillis());
1475       if (tvWait <= 0) {
1476         // Return success on timeout.
1477         return true;
1478       }
1479     }
1480   }
1481 
1482   return true;
1483 }
1484 
WaitPoll(int cmsWait,Dispatcher * dispatcher)1485 bool PhysicalSocketServer::WaitPoll(int cmsWait, Dispatcher* dispatcher) {
1486   RTC_DCHECK(dispatcher);
1487   int64_t tvWait = -1;
1488   int64_t tvStop = -1;
1489   if (cmsWait != kForever) {
1490     tvWait = cmsWait;
1491     tvStop = TimeAfter(cmsWait);
1492   }
1493 
1494   fWait_ = true;
1495 
1496   struct pollfd fds = {0};
1497   int fd = dispatcher->GetDescriptor();
1498   fds.fd = fd;
1499 
1500   while (fWait_) {
1501     uint32_t ff = dispatcher->GetRequestedEvents();
1502     fds.events = 0;
1503     if (ff & (DE_READ | DE_ACCEPT)) {
1504       fds.events |= POLLIN;
1505     }
1506     if (ff & (DE_WRITE | DE_CONNECT)) {
1507       fds.events |= POLLOUT;
1508     }
1509     fds.revents = 0;
1510 
1511     // Wait then call handlers as appropriate
1512     // < 0 means error
1513     // 0 means timeout
1514     // > 0 means count of descriptors ready
1515     int n = poll(&fds, 1, static_cast<int>(tvWait));
1516     if (n < 0) {
1517       if (errno != EINTR) {
1518         RTC_LOG_E(LS_ERROR, EN, errno) << "poll";
1519         return false;
1520       }
1521       // Else ignore the error and keep going. If this EINTR was for one of the
1522       // signals managed by this PhysicalSocketServer, the
1523       // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1524       // iteration.
1525     } else if (n == 0) {
1526       // If timeout, return success
1527       return true;
1528     } else {
1529       // We have signaled descriptors (should only be the passed dispatcher).
1530       RTC_DCHECK_EQ(n, 1);
1531       RTC_DCHECK_EQ(fds.fd, fd);
1532 
1533       bool readable = (fds.revents & (POLLIN | POLLPRI));
1534       bool writable = (fds.revents & POLLOUT);
1535       bool check_error = (fds.revents & (POLLRDHUP | POLLERR | POLLHUP));
1536 
1537       ProcessEvents(dispatcher, readable, writable, check_error);
1538     }
1539 
1540     if (cmsWait != kForever) {
1541       tvWait = TimeDiff(tvStop, TimeMillis());
1542       if (tvWait < 0) {
1543         // Return success on timeout.
1544         return true;
1545       }
1546     }
1547   }
1548 
1549   return true;
1550 }
1551 
1552 #endif  // WEBRTC_USE_EPOLL
1553 
1554 #endif  // WEBRTC_POSIX
1555 
1556 #if defined(WEBRTC_WIN)
Wait(int cmsWait,bool process_io)1557 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
1558   // We don't support reentrant waiting.
1559   RTC_DCHECK(!waiting_);
1560   ScopedSetTrue s(&waiting_);
1561 
1562   int64_t cmsTotal = cmsWait;
1563   int64_t cmsElapsed = 0;
1564   int64_t msStart = Time();
1565 
1566   fWait_ = true;
1567   while (fWait_) {
1568     std::vector<WSAEVENT> events;
1569     std::vector<uint64_t> event_owners;
1570 
1571     events.push_back(socket_ev_);
1572 
1573     {
1574       CritScope cr(&crit_);
1575       // Get a snapshot of all current dispatchers; this is used to avoid the
1576       // ABA problem (see later comment) and avoids the dispatcher_by_key_
1577       // iterator being invalidated by calling CheckSignalClose, which may
1578       // remove the dispatcher from the list.
1579       current_dispatcher_keys_.clear();
1580       for (auto const& kv : dispatcher_by_key_) {
1581         current_dispatcher_keys_.push_back(kv.first);
1582       }
1583       for (uint64_t key : current_dispatcher_keys_) {
1584         if (!dispatcher_by_key_.count(key)) {
1585           continue;
1586         }
1587         Dispatcher* disp = dispatcher_by_key_.at(key);
1588         if (!disp)
1589           continue;
1590         if (!process_io && (disp != signal_wakeup_))
1591           continue;
1592         SOCKET s = disp->GetSocket();
1593         if (disp->CheckSignalClose()) {
1594           // We just signalled close, don't poll this socket.
1595         } else if (s != INVALID_SOCKET) {
1596           WSAEventSelect(s, events[0],
1597                          FlagsToEvents(disp->GetRequestedEvents()));
1598         } else {
1599           events.push_back(disp->GetWSAEvent());
1600           event_owners.push_back(key);
1601         }
1602       }
1603     }
1604 
1605     // Which is shorter, the delay wait or the asked wait?
1606 
1607     int64_t cmsNext;
1608     if (cmsWait == kForever) {
1609       cmsNext = cmsWait;
1610     } else {
1611       cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
1612     }
1613 
1614     // Wait for one of the events to signal
1615     DWORD dw =
1616         WSAWaitForMultipleEvents(static_cast<DWORD>(events.size()), &events[0],
1617                                  false, static_cast<DWORD>(cmsNext), false);
1618 
1619     if (dw == WSA_WAIT_FAILED) {
1620       // Failed?
1621       // TODO(pthatcher): need a better strategy than this!
1622       WSAGetLastError();
1623       RTC_NOTREACHED();
1624       return false;
1625     } else if (dw == WSA_WAIT_TIMEOUT) {
1626       // Timeout?
1627       return true;
1628     } else {
1629       // Figure out which one it is and call it
1630       CritScope cr(&crit_);
1631       int index = dw - WSA_WAIT_EVENT_0;
1632       if (index > 0) {
1633         --index;  // The first event is the socket event
1634         uint64_t key = event_owners[index];
1635         if (!dispatcher_by_key_.count(key)) {
1636           // The dispatcher could have been removed while waiting for events.
1637           continue;
1638         }
1639         Dispatcher* disp = dispatcher_by_key_.at(key);
1640         disp->OnPreEvent(0);
1641         disp->OnEvent(0, 0);
1642       } else if (process_io) {
1643         // Iterate only on the dispatchers whose sockets were passed into
1644         // WSAEventSelect; this avoids the ABA problem (a socket being
1645         // destroyed and a new one created with the same SOCKET handle).
1646         for (uint64_t key : current_dispatcher_keys_) {
1647           if (!dispatcher_by_key_.count(key)) {
1648             continue;
1649           }
1650           Dispatcher* disp = dispatcher_by_key_.at(key);
1651           SOCKET s = disp->GetSocket();
1652           if (s == INVALID_SOCKET)
1653             continue;
1654 
1655           WSANETWORKEVENTS wsaEvents;
1656           int err = WSAEnumNetworkEvents(s, events[0], &wsaEvents);
1657           if (err == 0) {
1658             {
1659               if ((wsaEvents.lNetworkEvents & FD_READ) &&
1660                   wsaEvents.iErrorCode[FD_READ_BIT] != 0) {
1661                 RTC_LOG(WARNING)
1662                     << "PhysicalSocketServer got FD_READ_BIT error "
1663                     << wsaEvents.iErrorCode[FD_READ_BIT];
1664               }
1665               if ((wsaEvents.lNetworkEvents & FD_WRITE) &&
1666                   wsaEvents.iErrorCode[FD_WRITE_BIT] != 0) {
1667                 RTC_LOG(WARNING)
1668                     << "PhysicalSocketServer got FD_WRITE_BIT error "
1669                     << wsaEvents.iErrorCode[FD_WRITE_BIT];
1670               }
1671               if ((wsaEvents.lNetworkEvents & FD_CONNECT) &&
1672                   wsaEvents.iErrorCode[FD_CONNECT_BIT] != 0) {
1673                 RTC_LOG(WARNING)
1674                     << "PhysicalSocketServer got FD_CONNECT_BIT error "
1675                     << wsaEvents.iErrorCode[FD_CONNECT_BIT];
1676               }
1677               if ((wsaEvents.lNetworkEvents & FD_ACCEPT) &&
1678                   wsaEvents.iErrorCode[FD_ACCEPT_BIT] != 0) {
1679                 RTC_LOG(WARNING)
1680                     << "PhysicalSocketServer got FD_ACCEPT_BIT error "
1681                     << wsaEvents.iErrorCode[FD_ACCEPT_BIT];
1682               }
1683               if ((wsaEvents.lNetworkEvents & FD_CLOSE) &&
1684                   wsaEvents.iErrorCode[FD_CLOSE_BIT] != 0) {
1685                 RTC_LOG(WARNING)
1686                     << "PhysicalSocketServer got FD_CLOSE_BIT error "
1687                     << wsaEvents.iErrorCode[FD_CLOSE_BIT];
1688               }
1689             }
1690             uint32_t ff = 0;
1691             int errcode = 0;
1692             if (wsaEvents.lNetworkEvents & FD_READ)
1693               ff |= DE_READ;
1694             if (wsaEvents.lNetworkEvents & FD_WRITE)
1695               ff |= DE_WRITE;
1696             if (wsaEvents.lNetworkEvents & FD_CONNECT) {
1697               if (wsaEvents.iErrorCode[FD_CONNECT_BIT] == 0) {
1698                 ff |= DE_CONNECT;
1699               } else {
1700                 ff |= DE_CLOSE;
1701                 errcode = wsaEvents.iErrorCode[FD_CONNECT_BIT];
1702               }
1703             }
1704             if (wsaEvents.lNetworkEvents & FD_ACCEPT)
1705               ff |= DE_ACCEPT;
1706             if (wsaEvents.lNetworkEvents & FD_CLOSE) {
1707               ff |= DE_CLOSE;
1708               errcode = wsaEvents.iErrorCode[FD_CLOSE_BIT];
1709             }
1710             if (ff != 0) {
1711               disp->OnPreEvent(ff);
1712               disp->OnEvent(ff, errcode);
1713             }
1714           }
1715         }
1716       }
1717 
1718       // Reset the network event until new activity occurs
1719       WSAResetEvent(socket_ev_);
1720     }
1721 
1722     // Break?
1723     if (!fWait_)
1724       break;
1725     cmsElapsed = TimeSince(msStart);
1726     if ((cmsWait != kForever) && (cmsElapsed >= cmsWait)) {
1727       break;
1728     }
1729   }
1730 
1731   // Done
1732   return true;
1733 }
1734 #endif  // WEBRTC_WIN
1735 
1736 }  // namespace rtc
1737