1 /*
2  *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 #include "rtc_base/physical_socket_server.h"
11 
12 #if defined(_MSC_VER) && _MSC_VER < 1300
13 #pragma warning(disable : 4786)
14 #endif
15 
16 #ifdef MEMORY_SANITIZER
17 #include <sanitizer/msan_interface.h>
18 #endif
19 
20 #if defined(WEBRTC_POSIX)
21 #include <fcntl.h>
22 #include <string.h>
23 #if defined(WEBRTC_USE_EPOLL)
24 // "poll" will be used to wait for the signal dispatcher.
25 #include <poll.h>
26 #endif
27 #include <sys/ioctl.h>
28 #include <sys/select.h>
29 #include <sys/time.h>
30 #include <unistd.h>
31 #endif
32 
33 #if defined(WEBRTC_WIN)
34 #include <windows.h>
35 #include <winsock2.h>
36 #include <ws2tcpip.h>
37 #undef SetPort
38 #endif
39 
40 #include <errno.h>
41 
42 #include <algorithm>
43 #include <map>
44 
45 #include "rtc_base/arraysize.h"
46 #include "rtc_base/byte_order.h"
47 #include "rtc_base/checks.h"
48 #include "rtc_base/logging.h"
49 #include "rtc_base/network_monitor.h"
50 #include "rtc_base/null_socket_server.h"
51 #include "rtc_base/time_utils.h"
52 
53 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_BSD)
54 #include <linux/sockios.h>
55 #endif
56 
57 #if defined(WEBRTC_WIN)
58 #define LAST_SYSTEM_ERROR (::GetLastError())
59 #elif defined(__native_client__) && __native_client__
60 #define LAST_SYSTEM_ERROR (0)
61 #elif defined(WEBRTC_POSIX)
62 #define LAST_SYSTEM_ERROR (errno)
63 #endif  // WEBRTC_WIN
64 
65 #if defined(WEBRTC_POSIX)
66 #include <netinet/tcp.h>  // for TCP_NODELAY
67 #define IP_MTU 14  // Until this is integrated from linux/in.h to netinet/in.h
68 typedef void* SockOptArg;
69 
70 #endif  // WEBRTC_POSIX
71 
72 #if defined(WEBRTC_POSIX) && !defined(WEBRTC_MAC) && !defined(WEBRTC_BSD) && !defined(__native_client__)
73 
GetSocketRecvTimestamp(int socket)74 int64_t GetSocketRecvTimestamp(int socket) {
75   struct timeval tv_ioctl;
76   int ret = ioctl(socket, SIOCGSTAMP, &tv_ioctl);
77   if (ret != 0)
78     return -1;
79   int64_t timestamp =
80       rtc::kNumMicrosecsPerSec * static_cast<int64_t>(tv_ioctl.tv_sec) +
81       static_cast<int64_t>(tv_ioctl.tv_usec);
82   return timestamp;
83 }
84 
85 #else
86 
GetSocketRecvTimestamp(int socket)87 int64_t GetSocketRecvTimestamp(int socket) {
88   return -1;
89 }
90 #endif
91 
92 #if defined(WEBRTC_WIN)
93 typedef char* SockOptArg;
94 #endif
95 
96 #if defined(WEBRTC_USE_EPOLL)
97 // POLLRDHUP / EPOLLRDHUP are only defined starting with Linux 2.6.17.
98 #if !defined(POLLRDHUP)
99 #define POLLRDHUP 0x2000
100 #endif
101 #if !defined(EPOLLRDHUP)
102 #define EPOLLRDHUP 0x2000
103 #endif
104 #endif
105 
106 namespace {
107 class ScopedSetTrue {
108  public:
ScopedSetTrue(bool * value)109   ScopedSetTrue(bool* value) : value_(value) {
110     RTC_DCHECK(!*value_);
111     *value_ = true;
112   }
~ScopedSetTrue()113   ~ScopedSetTrue() { *value_ = false; }
114 
115  private:
116   bool* value_;
117 };
118 }  // namespace
119 
120 namespace rtc {
121 
CreateDefault()122 std::unique_ptr<SocketServer> SocketServer::CreateDefault() {
123 #if defined(__native_client__)
124   return std::unique_ptr<SocketServer>(new rtc::NullSocketServer);
125 #else
126   return std::unique_ptr<SocketServer>(new rtc::PhysicalSocketServer);
127 #endif
128 }
129 
PhysicalSocket(PhysicalSocketServer * ss,SOCKET s)130 PhysicalSocket::PhysicalSocket(PhysicalSocketServer* ss, SOCKET s)
131     : ss_(ss),
132       s_(s),
133       error_(0),
134       state_((s == INVALID_SOCKET) ? CS_CLOSED : CS_CONNECTED),
135       resolver_(nullptr) {
136   if (s_ != INVALID_SOCKET) {
137     SetEnabledEvents(DE_READ | DE_WRITE);
138 
139     int type = SOCK_STREAM;
140     socklen_t len = sizeof(type);
141     const int res =
142         getsockopt(s_, SOL_SOCKET, SO_TYPE, (SockOptArg)&type, &len);
143     RTC_DCHECK_EQ(0, res);
144     udp_ = (SOCK_DGRAM == type);
145   }
146 }
147 
~PhysicalSocket()148 PhysicalSocket::~PhysicalSocket() {
149   Close();
150 }
151 
Create(int family,int type)152 bool PhysicalSocket::Create(int family, int type) {
153   Close();
154   s_ = ::socket(family, type, 0);
155   udp_ = (SOCK_DGRAM == type);
156   family_ = family;
157   UpdateLastError();
158   if (udp_) {
159     SetEnabledEvents(DE_READ | DE_WRITE);
160   }
161   return s_ != INVALID_SOCKET;
162 }
163 
GetLocalAddress() const164 SocketAddress PhysicalSocket::GetLocalAddress() const {
165   sockaddr_storage addr_storage = {};
166   socklen_t addrlen = sizeof(addr_storage);
167   sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
168   int result = ::getsockname(s_, addr, &addrlen);
169   SocketAddress address;
170   if (result >= 0) {
171     SocketAddressFromSockAddrStorage(addr_storage, &address);
172   } else {
173     RTC_LOG(LS_WARNING) << "GetLocalAddress: unable to get local addr, socket="
174                         << s_;
175   }
176   return address;
177 }
178 
GetRemoteAddress() const179 SocketAddress PhysicalSocket::GetRemoteAddress() const {
180   sockaddr_storage addr_storage = {};
181   socklen_t addrlen = sizeof(addr_storage);
182   sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
183   int result = ::getpeername(s_, addr, &addrlen);
184   SocketAddress address;
185   if (result >= 0) {
186     SocketAddressFromSockAddrStorage(addr_storage, &address);
187   } else {
188     RTC_LOG(LS_WARNING)
189         << "GetRemoteAddress: unable to get remote addr, socket=" << s_;
190   }
191   return address;
192 }
193 
Bind(const SocketAddress & bind_addr)194 int PhysicalSocket::Bind(const SocketAddress& bind_addr) {
195   SocketAddress copied_bind_addr = bind_addr;
196   // If a network binder is available, use it to bind a socket to an interface
197   // instead of bind(), since this is more reliable on an OS with a weak host
198   // model.
199   if (ss_->network_binder() && !bind_addr.IsAnyIP()) {
200     NetworkBindingResult result =
201         ss_->network_binder()->BindSocketToNetwork(s_, bind_addr.ipaddr());
202     if (result == NetworkBindingResult::SUCCESS) {
203       // Since the network binder handled binding the socket to the desired
204       // network interface, we don't need to (and shouldn't) include an IP in
205       // the bind() call; bind() just needs to assign a port.
206       copied_bind_addr.SetIP(GetAnyIP(copied_bind_addr.ipaddr().family()));
207     } else if (result == NetworkBindingResult::NOT_IMPLEMENTED) {
208       RTC_LOG(LS_INFO) << "Can't bind socket to network because "
209                           "network binding is not implemented for this OS.";
210     } else {
211       if (bind_addr.IsLoopbackIP()) {
212         // If we couldn't bind to a loopback IP (which should only happen in
213         // test scenarios), continue on. This may be expected behavior.
214         RTC_LOG(LS_VERBOSE) << "Binding socket to loopback address"
215                             << " failed; result: " << static_cast<int>(result);
216       } else {
217         RTC_LOG(LS_WARNING) << "Binding socket to network address"
218                             << " failed; result: " << static_cast<int>(result);
219         // If a network binding was attempted and failed, we should stop here
220         // and not try to use the socket. Otherwise, we may end up sending
221         // packets with an invalid source address.
222         // See: https://bugs.chromium.org/p/webrtc/issues/detail?id=7026
223         return -1;
224       }
225     }
226   }
227   sockaddr_storage addr_storage;
228   size_t len = copied_bind_addr.ToSockAddrStorage(&addr_storage);
229   sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
230   int err = ::bind(s_, addr, static_cast<int>(len));
231   UpdateLastError();
232 #if !defined(NDEBUG)
233   if (0 == err) {
234     dbg_addr_ = "Bound @ ";
235     dbg_addr_.append(GetLocalAddress().ToString());
236   }
237 #endif
238   return err;
239 }
240 
Connect(const SocketAddress & addr)241 int PhysicalSocket::Connect(const SocketAddress& addr) {
242   // TODO(pthatcher): Implicit creation is required to reconnect...
243   // ...but should we make it more explicit?
244   if (state_ != CS_CLOSED) {
245     SetError(EALREADY);
246     return SOCKET_ERROR;
247   }
248   if (addr.IsUnresolvedIP()) {
249     RTC_LOG(LS_VERBOSE) << "Resolving addr in PhysicalSocket::Connect";
250     resolver_ = new AsyncResolver();
251     resolver_->SignalDone.connect(this, &PhysicalSocket::OnResolveResult);
252     resolver_->Start(addr);
253     state_ = CS_CONNECTING;
254     return 0;
255   }
256 
257   return DoConnect(addr);
258 }
259 
DoConnect(const SocketAddress & connect_addr)260 int PhysicalSocket::DoConnect(const SocketAddress& connect_addr) {
261   if ((s_ == INVALID_SOCKET) && !Create(connect_addr.family(), SOCK_STREAM)) {
262     return SOCKET_ERROR;
263   }
264   sockaddr_storage addr_storage;
265   size_t len = connect_addr.ToSockAddrStorage(&addr_storage);
266   sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
267   int err = ::connect(s_, addr, static_cast<int>(len));
268   UpdateLastError();
269   uint8_t events = DE_READ | DE_WRITE;
270   if (err == 0) {
271     state_ = CS_CONNECTED;
272   } else if (IsBlockingError(GetError())) {
273     state_ = CS_CONNECTING;
274     events |= DE_CONNECT;
275   } else {
276     return SOCKET_ERROR;
277   }
278 
279   EnableEvents(events);
280   return 0;
281 }
282 
GetError() const283 int PhysicalSocket::GetError() const {
284   CritScope cs(&crit_);
285   return error_;
286 }
287 
SetError(int error)288 void PhysicalSocket::SetError(int error) {
289   CritScope cs(&crit_);
290   error_ = error;
291 }
292 
GetState() const293 AsyncSocket::ConnState PhysicalSocket::GetState() const {
294   return state_;
295 }
296 
GetOption(Option opt,int * value)297 int PhysicalSocket::GetOption(Option opt, int* value) {
298   int slevel;
299   int sopt;
300   if (TranslateOption(opt, &slevel, &sopt) == -1)
301     return -1;
302   socklen_t optlen = sizeof(*value);
303   int ret = ::getsockopt(s_, slevel, sopt, (SockOptArg)value, &optlen);
304   if (ret == -1) {
305     return -1;
306   }
307   if (opt == OPT_DONTFRAGMENT) {
308 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID) && !defined(WEBRTC_BSD)
309     *value = (*value != IP_PMTUDISC_DONT) ? 1 : 0;
310 #endif
311   } else if (opt == OPT_DSCP) {
312 #if defined(WEBRTC_POSIX)
313     // unshift DSCP value to get six most significant bits of IP DiffServ field
314     *value >>= 2;
315 #endif
316   }
317   return ret;
318 }
319 
SetOption(Option opt,int value)320 int PhysicalSocket::SetOption(Option opt, int value) {
321   int slevel;
322   int sopt;
323   if (TranslateOption(opt, &slevel, &sopt) == -1)
324     return -1;
325   if (opt == OPT_DONTFRAGMENT) {
326 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID) && !defined(WEBRTC_BSD)
327     value = (value) ? IP_PMTUDISC_DO : IP_PMTUDISC_DONT;
328 #endif
329   } else if (opt == OPT_DSCP) {
330 #if defined(WEBRTC_POSIX)
331     // shift DSCP value to fit six most significant bits of IP DiffServ field
332     value <<= 2;
333 #endif
334   }
335 #if defined(WEBRTC_POSIX)
336   if (sopt == IPV6_TCLASS) {
337     // Set the IPv4 option in all cases to support dual-stack sockets.
338     ::setsockopt(s_, IPPROTO_IP, IP_TOS, (SockOptArg)&value, sizeof(value));
339   }
340 #endif
341   return ::setsockopt(s_, slevel, sopt, (SockOptArg)&value, sizeof(value));
342 }
343 
Send(const void * pv,size_t cb)344 int PhysicalSocket::Send(const void* pv, size_t cb) {
345   int sent = DoSend(
346       s_, reinterpret_cast<const char*>(pv), static_cast<int>(cb),
347 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
348       // Suppress SIGPIPE. Without this, attempting to send on a socket whose
349       // other end is closed will result in a SIGPIPE signal being raised to
350       // our process, which by default will terminate the process, which we
351       // don't want. By specifying this flag, we'll just get the error EPIPE
352       // instead and can handle the error gracefully.
353       MSG_NOSIGNAL
354 #else
355       0
356 #endif
357   );
358   UpdateLastError();
359   MaybeRemapSendError();
360   // We have seen minidumps where this may be false.
361   RTC_DCHECK(sent <= static_cast<int>(cb));
362   if ((sent > 0 && sent < static_cast<int>(cb)) ||
363       (sent < 0 && IsBlockingError(GetError()))) {
364     EnableEvents(DE_WRITE);
365   }
366   return sent;
367 }
368 
SendTo(const void * buffer,size_t length,const SocketAddress & addr)369 int PhysicalSocket::SendTo(const void* buffer,
370                            size_t length,
371                            const SocketAddress& addr) {
372   sockaddr_storage saddr;
373   size_t len = addr.ToSockAddrStorage(&saddr);
374   int sent =
375       DoSendTo(s_, static_cast<const char*>(buffer), static_cast<int>(length),
376 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
377                // Suppress SIGPIPE. See above for explanation.
378                MSG_NOSIGNAL,
379 #else
380                0,
381 #endif
382                reinterpret_cast<sockaddr*>(&saddr), static_cast<int>(len));
383   UpdateLastError();
384   MaybeRemapSendError();
385   // We have seen minidumps where this may be false.
386   RTC_DCHECK(sent <= static_cast<int>(length));
387   if ((sent > 0 && sent < static_cast<int>(length)) ||
388       (sent < 0 && IsBlockingError(GetError()))) {
389     EnableEvents(DE_WRITE);
390   }
391   return sent;
392 }
393 
Recv(void * buffer,size_t length,int64_t * timestamp)394 int PhysicalSocket::Recv(void* buffer, size_t length, int64_t* timestamp) {
395   int received =
396       ::recv(s_, static_cast<char*>(buffer), static_cast<int>(length), 0);
397   if ((received == 0) && (length != 0)) {
398     // Note: on graceful shutdown, recv can return 0.  In this case, we
399     // pretend it is blocking, and then signal close, so that simplifying
400     // assumptions can be made about Recv.
401     RTC_LOG(LS_WARNING) << "EOF from socket; deferring close event";
402     // Must turn this back on so that the select() loop will notice the close
403     // event.
404     EnableEvents(DE_READ);
405     SetError(EWOULDBLOCK);
406     return SOCKET_ERROR;
407   }
408   if (timestamp) {
409     *timestamp = GetSocketRecvTimestamp(s_);
410   }
411   UpdateLastError();
412   int error = GetError();
413   bool success = (received >= 0) || IsBlockingError(error);
414   if (udp_ || success) {
415     EnableEvents(DE_READ);
416   }
417   if (!success) {
418     RTC_LOG_F(LS_VERBOSE) << "Error = " << error;
419   }
420   return received;
421 }
422 
RecvFrom(void * buffer,size_t length,SocketAddress * out_addr,int64_t * timestamp)423 int PhysicalSocket::RecvFrom(void* buffer,
424                              size_t length,
425                              SocketAddress* out_addr,
426                              int64_t* timestamp) {
427   sockaddr_storage addr_storage;
428   socklen_t addr_len = sizeof(addr_storage);
429   sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
430   int received = ::recvfrom(s_, static_cast<char*>(buffer),
431                             static_cast<int>(length), 0, addr, &addr_len);
432   if (timestamp) {
433     *timestamp = GetSocketRecvTimestamp(s_);
434   }
435   UpdateLastError();
436   if ((received >= 0) && (out_addr != nullptr))
437     SocketAddressFromSockAddrStorage(addr_storage, out_addr);
438   int error = GetError();
439   bool success = (received >= 0) || IsBlockingError(error);
440   if (udp_ || success) {
441     EnableEvents(DE_READ);
442   }
443   if (!success) {
444     RTC_LOG_F(LS_VERBOSE) << "Error = " << error;
445   }
446   return received;
447 }
448 
Listen(int backlog)449 int PhysicalSocket::Listen(int backlog) {
450   int err = ::listen(s_, backlog);
451   UpdateLastError();
452   if (err == 0) {
453     state_ = CS_CONNECTING;
454     EnableEvents(DE_ACCEPT);
455 #if !defined(NDEBUG)
456     dbg_addr_ = "Listening @ ";
457     dbg_addr_.append(GetLocalAddress().ToString());
458 #endif
459   }
460   return err;
461 }
462 
Accept(SocketAddress * out_addr)463 AsyncSocket* PhysicalSocket::Accept(SocketAddress* out_addr) {
464   // Always re-subscribe DE_ACCEPT to make sure new incoming connections will
465   // trigger an event even if DoAccept returns an error here.
466   EnableEvents(DE_ACCEPT);
467   sockaddr_storage addr_storage;
468   socklen_t addr_len = sizeof(addr_storage);
469   sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
470   SOCKET s = DoAccept(s_, addr, &addr_len);
471   UpdateLastError();
472   if (s == INVALID_SOCKET)
473     return nullptr;
474   if (out_addr != nullptr)
475     SocketAddressFromSockAddrStorage(addr_storage, out_addr);
476   return ss_->WrapSocket(s);
477 }
478 
Close()479 int PhysicalSocket::Close() {
480   if (s_ == INVALID_SOCKET)
481     return 0;
482   int err = ::closesocket(s_);
483   UpdateLastError();
484   s_ = INVALID_SOCKET;
485   state_ = CS_CLOSED;
486   SetEnabledEvents(0);
487   if (resolver_) {
488     resolver_->Destroy(false);
489     resolver_ = nullptr;
490   }
491   return err;
492 }
493 
DoAccept(SOCKET socket,sockaddr * addr,socklen_t * addrlen)494 SOCKET PhysicalSocket::DoAccept(SOCKET socket,
495                                 sockaddr* addr,
496                                 socklen_t* addrlen) {
497   return ::accept(socket, addr, addrlen);
498 }
499 
DoSend(SOCKET socket,const char * buf,int len,int flags)500 int PhysicalSocket::DoSend(SOCKET socket, const char* buf, int len, int flags) {
501   return ::send(socket, buf, len, flags);
502 }
503 
DoSendTo(SOCKET socket,const char * buf,int len,int flags,const struct sockaddr * dest_addr,socklen_t addrlen)504 int PhysicalSocket::DoSendTo(SOCKET socket,
505                              const char* buf,
506                              int len,
507                              int flags,
508                              const struct sockaddr* dest_addr,
509                              socklen_t addrlen) {
510   return ::sendto(socket, buf, len, flags, dest_addr, addrlen);
511 }
512 
OnResolveResult(AsyncResolverInterface * resolver)513 void PhysicalSocket::OnResolveResult(AsyncResolverInterface* resolver) {
514   if (resolver != resolver_) {
515     return;
516   }
517 
518   int error = resolver_->GetError();
519   if (error == 0) {
520     error = DoConnect(resolver_->address());
521   } else {
522     Close();
523   }
524 
525   if (error) {
526     SetError(error);
527     SignalCloseEvent(this, error);
528   }
529 }
530 
UpdateLastError()531 void PhysicalSocket::UpdateLastError() {
532   SetError(LAST_SYSTEM_ERROR);
533 }
534 
MaybeRemapSendError()535 void PhysicalSocket::MaybeRemapSendError() {
536 #if defined(WEBRTC_MAC)
537   // https://developer.apple.com/library/mac/documentation/Darwin/
538   // Reference/ManPages/man2/sendto.2.html
539   // ENOBUFS - The output queue for a network interface is full.
540   // This generally indicates that the interface has stopped sending,
541   // but may be caused by transient congestion.
542   if (GetError() == ENOBUFS) {
543     SetError(EWOULDBLOCK);
544   }
545 #endif
546 }
547 
SetEnabledEvents(uint8_t events)548 void PhysicalSocket::SetEnabledEvents(uint8_t events) {
549   enabled_events_ = events;
550 }
551 
EnableEvents(uint8_t events)552 void PhysicalSocket::EnableEvents(uint8_t events) {
553   enabled_events_ |= events;
554 }
555 
DisableEvents(uint8_t events)556 void PhysicalSocket::DisableEvents(uint8_t events) {
557   enabled_events_ &= ~events;
558 }
559 
TranslateOption(Option opt,int * slevel,int * sopt)560 int PhysicalSocket::TranslateOption(Option opt, int* slevel, int* sopt) {
561   switch (opt) {
562     case OPT_DONTFRAGMENT:
563 #if defined(WEBRTC_WIN)
564       *slevel = IPPROTO_IP;
565       *sopt = IP_DONTFRAGMENT;
566       break;
567 #elif defined(WEBRTC_MAC) || defined(WEBRTC_BSD) || defined(__native_client__)
568       RTC_LOG(LS_WARNING) << "Socket::OPT_DONTFRAGMENT not supported.";
569       return -1;
570 #elif defined(WEBRTC_POSIX)
571       *slevel = IPPROTO_IP;
572       *sopt = IP_MTU_DISCOVER;
573       break;
574 #endif
575     case OPT_RCVBUF:
576       *slevel = SOL_SOCKET;
577       *sopt = SO_RCVBUF;
578       break;
579     case OPT_SNDBUF:
580       *slevel = SOL_SOCKET;
581       *sopt = SO_SNDBUF;
582       break;
583     case OPT_NODELAY:
584       *slevel = IPPROTO_TCP;
585       *sopt = TCP_NODELAY;
586       break;
587     case OPT_DSCP:
588 #if defined(WEBRTC_POSIX)
589       if (family_ == AF_INET6) {
590         *slevel = IPPROTO_IPV6;
591         *sopt = IPV6_TCLASS;
592       } else {
593         *slevel = IPPROTO_IP;
594         *sopt = IP_TOS;
595       }
596       break;
597 #else
598       RTC_LOG(LS_WARNING) << "Socket::OPT_DSCP not supported.";
599       return -1;
600 #endif
601     case OPT_RTP_SENDTIME_EXTN_ID:
602       return -1;  // No logging is necessary as this not a OS socket option.
603     default:
604       RTC_NOTREACHED();
605       return -1;
606   }
607   return 0;
608 }
609 
SocketDispatcher(PhysicalSocketServer * ss)610 SocketDispatcher::SocketDispatcher(PhysicalSocketServer* ss)
611 #if defined(WEBRTC_WIN)
612     : PhysicalSocket(ss),
613       id_(0),
614       signal_close_(false)
615 #else
616     : PhysicalSocket(ss)
617 #endif
618 {
619 }
620 
SocketDispatcher(SOCKET s,PhysicalSocketServer * ss)621 SocketDispatcher::SocketDispatcher(SOCKET s, PhysicalSocketServer* ss)
622 #if defined(WEBRTC_WIN)
623     : PhysicalSocket(ss, s),
624       id_(0),
625       signal_close_(false)
626 #else
627     : PhysicalSocket(ss, s)
628 #endif
629 {
630 }
631 
~SocketDispatcher()632 SocketDispatcher::~SocketDispatcher() {
633   Close();
634 }
635 
Initialize()636 bool SocketDispatcher::Initialize() {
637   RTC_DCHECK(s_ != INVALID_SOCKET);
638 // Must be a non-blocking
639 #if defined(WEBRTC_WIN)
640   u_long argp = 1;
641   ioctlsocket(s_, FIONBIO, &argp);
642 #elif defined(WEBRTC_POSIX)
643   fcntl(s_, F_SETFL, fcntl(s_, F_GETFL, 0) | O_NONBLOCK);
644 #endif
645 #if defined(WEBRTC_IOS)
646   // iOS may kill sockets when the app is moved to the background
647   // (specifically, if the app doesn't use the "voip" UIBackgroundMode). When
648   // we attempt to write to such a socket, SIGPIPE will be raised, which by
649   // default will terminate the process, which we don't want. By specifying
650   // this socket option, SIGPIPE will be disabled for the socket.
651   int value = 1;
652   ::setsockopt(s_, SOL_SOCKET, SO_NOSIGPIPE, &value, sizeof(value));
653 #endif
654   ss_->Add(this);
655   return true;
656 }
657 
Create(int type)658 bool SocketDispatcher::Create(int type) {
659   return Create(AF_INET, type);
660 }
661 
Create(int family,int type)662 bool SocketDispatcher::Create(int family, int type) {
663   // Change the socket to be non-blocking.
664   if (!PhysicalSocket::Create(family, type))
665     return false;
666 
667   if (!Initialize())
668     return false;
669 
670 #if defined(WEBRTC_WIN)
671   do {
672     id_ = ++next_id_;
673   } while (id_ == 0);
674 #endif
675   return true;
676 }
677 
678 #if defined(WEBRTC_WIN)
679 
GetWSAEvent()680 WSAEVENT SocketDispatcher::GetWSAEvent() {
681   return WSA_INVALID_EVENT;
682 }
683 
GetSocket()684 SOCKET SocketDispatcher::GetSocket() {
685   return s_;
686 }
687 
CheckSignalClose()688 bool SocketDispatcher::CheckSignalClose() {
689   if (!signal_close_)
690     return false;
691 
692   char ch;
693   if (recv(s_, &ch, 1, MSG_PEEK) > 0)
694     return false;
695 
696   state_ = CS_CLOSED;
697   signal_close_ = false;
698   SignalCloseEvent(this, signal_err_);
699   return true;
700 }
701 
702 int SocketDispatcher::next_id_ = 0;
703 
704 #elif defined(WEBRTC_POSIX)
705 
GetDescriptor()706 int SocketDispatcher::GetDescriptor() {
707   return s_;
708 }
709 
IsDescriptorClosed()710 bool SocketDispatcher::IsDescriptorClosed() {
711   if (udp_) {
712     // The MSG_PEEK trick doesn't work for UDP, since (at least in some
713     // circumstances) it requires reading an entire UDP packet, which would be
714     // bad for performance here. So, just check whether |s_| has been closed,
715     // which should be sufficient.
716     return s_ == INVALID_SOCKET;
717   }
718   // We don't have a reliable way of distinguishing end-of-stream
719   // from readability.  So test on each readable call.  Is this
720   // inefficient?  Probably.
721   char ch;
722   ssize_t res = ::recv(s_, &ch, 1, MSG_PEEK);
723   if (res > 0) {
724     // Data available, so not closed.
725     return false;
726   } else if (res == 0) {
727     // EOF, so closed.
728     return true;
729   } else {  // error
730     switch (errno) {
731       // Returned if we've already closed s_.
732       case EBADF:
733       // Returned during ungraceful peer shutdown.
734       case ECONNRESET:
735         return true;
736       // The normal blocking error; don't log anything.
737       case EWOULDBLOCK:
738       // Interrupted system call.
739       case EINTR:
740         return false;
741       default:
742         // Assume that all other errors are just blocking errors, meaning the
743         // connection is still good but we just can't read from it right now.
744         // This should only happen when connecting (and at most once), because
745         // in all other cases this function is only called if the file
746         // descriptor is already known to be in the readable state. However,
747         // it's not necessary a problem if we spuriously interpret a
748         // "connection lost"-type error as a blocking error, because typically
749         // the next recv() will get EOF, so we'll still eventually notice that
750         // the socket is closed.
751         RTC_LOG_ERR(LS_WARNING) << "Assuming benign blocking error";
752         return false;
753     }
754   }
755 }
756 
757 #endif  // WEBRTC_POSIX
758 
GetRequestedEvents()759 uint32_t SocketDispatcher::GetRequestedEvents() {
760   return enabled_events();
761 }
762 
OnPreEvent(uint32_t ff)763 void SocketDispatcher::OnPreEvent(uint32_t ff) {
764   if ((ff & DE_CONNECT) != 0)
765     state_ = CS_CONNECTED;
766 
767 #if defined(WEBRTC_WIN)
768 // We set CS_CLOSED from CheckSignalClose.
769 #elif defined(WEBRTC_POSIX)
770   if ((ff & DE_CLOSE) != 0)
771     state_ = CS_CLOSED;
772 #endif
773 }
774 
775 #if defined(WEBRTC_WIN)
776 
OnEvent(uint32_t ff,int err)777 void SocketDispatcher::OnEvent(uint32_t ff, int err) {
778   int cache_id = id_;
779   // Make sure we deliver connect/accept first. Otherwise, consumers may see
780   // something like a READ followed by a CONNECT, which would be odd.
781   if (((ff & DE_CONNECT) != 0) && (id_ == cache_id)) {
782     if (ff != DE_CONNECT)
783       RTC_LOG(LS_VERBOSE) << "Signalled with DE_CONNECT: " << ff;
784     DisableEvents(DE_CONNECT);
785 #if !defined(NDEBUG)
786     dbg_addr_ = "Connected @ ";
787     dbg_addr_.append(GetRemoteAddress().ToString());
788 #endif
789     SignalConnectEvent(this);
790   }
791   if (((ff & DE_ACCEPT) != 0) && (id_ == cache_id)) {
792     DisableEvents(DE_ACCEPT);
793     SignalReadEvent(this);
794   }
795   if ((ff & DE_READ) != 0) {
796     DisableEvents(DE_READ);
797     SignalReadEvent(this);
798   }
799   if (((ff & DE_WRITE) != 0) && (id_ == cache_id)) {
800     DisableEvents(DE_WRITE);
801     SignalWriteEvent(this);
802   }
803   if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) {
804     signal_close_ = true;
805     signal_err_ = err;
806   }
807 }
808 
809 #elif defined(WEBRTC_POSIX)
810 
OnEvent(uint32_t ff,int err)811 void SocketDispatcher::OnEvent(uint32_t ff, int err) {
812 #if defined(WEBRTC_USE_EPOLL)
813   // Remember currently enabled events so we can combine multiple changes
814   // into one update call later.
815   // The signal handlers might re-enable events disabled here, so we can't
816   // keep a list of events to disable at the end of the method. This list
817   // would not be updated with the events enabled by the signal handlers.
818   StartBatchedEventUpdates();
819 #endif
820   // Make sure we deliver connect/accept first. Otherwise, consumers may see
821   // something like a READ followed by a CONNECT, which would be odd.
822   if ((ff & DE_CONNECT) != 0) {
823     DisableEvents(DE_CONNECT);
824     SignalConnectEvent(this);
825   }
826   if ((ff & DE_ACCEPT) != 0) {
827     DisableEvents(DE_ACCEPT);
828     SignalReadEvent(this);
829   }
830   if ((ff & DE_READ) != 0) {
831     DisableEvents(DE_READ);
832     SignalReadEvent(this);
833   }
834   if ((ff & DE_WRITE) != 0) {
835     DisableEvents(DE_WRITE);
836     SignalWriteEvent(this);
837   }
838   if ((ff & DE_CLOSE) != 0) {
839     // The socket is now dead to us, so stop checking it.
840     SetEnabledEvents(0);
841     SignalCloseEvent(this, err);
842   }
843 #if defined(WEBRTC_USE_EPOLL)
844   FinishBatchedEventUpdates();
845 #endif
846 }
847 
848 #endif  // WEBRTC_POSIX
849 
850 #if defined(WEBRTC_USE_EPOLL)
851 
GetEpollEvents(uint32_t ff)852 inline static int GetEpollEvents(uint32_t ff) {
853   int events = 0;
854   if (ff & (DE_READ | DE_ACCEPT)) {
855     events |= EPOLLIN;
856   }
857   if (ff & (DE_WRITE | DE_CONNECT)) {
858     events |= EPOLLOUT;
859   }
860   return events;
861 }
862 
StartBatchedEventUpdates()863 void SocketDispatcher::StartBatchedEventUpdates() {
864   RTC_DCHECK_EQ(saved_enabled_events_, -1);
865   saved_enabled_events_ = enabled_events();
866 }
867 
FinishBatchedEventUpdates()868 void SocketDispatcher::FinishBatchedEventUpdates() {
869   RTC_DCHECK_NE(saved_enabled_events_, -1);
870   uint8_t old_events = static_cast<uint8_t>(saved_enabled_events_);
871   saved_enabled_events_ = -1;
872   MaybeUpdateDispatcher(old_events);
873 }
874 
MaybeUpdateDispatcher(uint8_t old_events)875 void SocketDispatcher::MaybeUpdateDispatcher(uint8_t old_events) {
876   if (GetEpollEvents(enabled_events()) != GetEpollEvents(old_events) &&
877       saved_enabled_events_ == -1) {
878     ss_->Update(this);
879   }
880 }
881 
SetEnabledEvents(uint8_t events)882 void SocketDispatcher::SetEnabledEvents(uint8_t events) {
883   uint8_t old_events = enabled_events();
884   PhysicalSocket::SetEnabledEvents(events);
885   MaybeUpdateDispatcher(old_events);
886 }
887 
EnableEvents(uint8_t events)888 void SocketDispatcher::EnableEvents(uint8_t events) {
889   uint8_t old_events = enabled_events();
890   PhysicalSocket::EnableEvents(events);
891   MaybeUpdateDispatcher(old_events);
892 }
893 
DisableEvents(uint8_t events)894 void SocketDispatcher::DisableEvents(uint8_t events) {
895   uint8_t old_events = enabled_events();
896   PhysicalSocket::DisableEvents(events);
897   MaybeUpdateDispatcher(old_events);
898 }
899 
900 #endif  // WEBRTC_USE_EPOLL
901 
Close()902 int SocketDispatcher::Close() {
903   if (s_ == INVALID_SOCKET)
904     return 0;
905 
906 #if defined(WEBRTC_WIN)
907   id_ = 0;
908   signal_close_ = false;
909 #endif
910 #if defined(WEBRTC_USE_EPOLL)
911   // If we're batching events, the socket can be closed and reopened
912   // during the batch. Set saved_enabled_events_ to 0 here so the new
913   // socket, if any, has the correct old events bitfield
914   if (saved_enabled_events_ != -1) {
915     saved_enabled_events_ = 0;
916   }
917 #endif
918   ss_->Remove(this);
919   return PhysicalSocket::Close();
920 }
921 
922 #if defined(WEBRTC_POSIX)
923 class EventDispatcher : public Dispatcher {
924  public:
EventDispatcher(PhysicalSocketServer * ss)925   EventDispatcher(PhysicalSocketServer* ss) : ss_(ss), fSignaled_(false) {
926     if (pipe(afd_) < 0)
927       RTC_LOG(LERROR) << "pipe failed";
928     ss_->Add(this);
929   }
930 
~EventDispatcher()931   ~EventDispatcher() override {
932     ss_->Remove(this);
933     close(afd_[0]);
934     close(afd_[1]);
935   }
936 
Signal()937   virtual void Signal() {
938     CritScope cs(&crit_);
939     if (!fSignaled_) {
940       const uint8_t b[1] = {0};
941       const ssize_t res = write(afd_[1], b, sizeof(b));
942       RTC_DCHECK_EQ(1, res);
943       fSignaled_ = true;
944     }
945   }
946 
GetRequestedEvents()947   uint32_t GetRequestedEvents() override { return DE_READ; }
948 
OnPreEvent(uint32_t ff)949   void OnPreEvent(uint32_t ff) override {
950     // It is not possible to perfectly emulate an auto-resetting event with
951     // pipes.  This simulates it by resetting before the event is handled.
952 
953     CritScope cs(&crit_);
954     if (fSignaled_) {
955       uint8_t b[4];  // Allow for reading more than 1 byte, but expect 1.
956       const ssize_t res = read(afd_[0], b, sizeof(b));
957       RTC_DCHECK_EQ(1, res);
958       fSignaled_ = false;
959     }
960   }
961 
OnEvent(uint32_t ff,int err)962   void OnEvent(uint32_t ff, int err) override { RTC_NOTREACHED(); }
963 
GetDescriptor()964   int GetDescriptor() override { return afd_[0]; }
965 
IsDescriptorClosed()966   bool IsDescriptorClosed() override { return false; }
967 
968  private:
969   PhysicalSocketServer* ss_;
970   int afd_[2];
971   bool fSignaled_;
972   RecursiveCriticalSection crit_;
973 };
974 
975 #endif  // WEBRTC_POSIX
976 
977 #if defined(WEBRTC_WIN)
FlagsToEvents(uint32_t events)978 static uint32_t FlagsToEvents(uint32_t events) {
979   uint32_t ffFD = FD_CLOSE;
980   if (events & DE_READ)
981     ffFD |= FD_READ;
982   if (events & DE_WRITE)
983     ffFD |= FD_WRITE;
984   if (events & DE_CONNECT)
985     ffFD |= FD_CONNECT;
986   if (events & DE_ACCEPT)
987     ffFD |= FD_ACCEPT;
988   return ffFD;
989 }
990 
991 class EventDispatcher : public Dispatcher {
992  public:
EventDispatcher(PhysicalSocketServer * ss)993   EventDispatcher(PhysicalSocketServer* ss) : ss_(ss) {
994     hev_ = WSACreateEvent();
995     if (hev_) {
996       ss_->Add(this);
997     }
998   }
999 
~EventDispatcher()1000   ~EventDispatcher() override {
1001     if (hev_ != nullptr) {
1002       ss_->Remove(this);
1003       WSACloseEvent(hev_);
1004       hev_ = nullptr;
1005     }
1006   }
1007 
Signal()1008   virtual void Signal() {
1009     if (hev_ != nullptr)
1010       WSASetEvent(hev_);
1011   }
1012 
GetRequestedEvents()1013   uint32_t GetRequestedEvents() override { return 0; }
1014 
OnPreEvent(uint32_t ff)1015   void OnPreEvent(uint32_t ff) override { WSAResetEvent(hev_); }
1016 
OnEvent(uint32_t ff,int err)1017   void OnEvent(uint32_t ff, int err) override {}
1018 
GetWSAEvent()1019   WSAEVENT GetWSAEvent() override { return hev_; }
1020 
GetSocket()1021   SOCKET GetSocket() override { return INVALID_SOCKET; }
1022 
CheckSignalClose()1023   bool CheckSignalClose() override { return false; }
1024 
1025  private:
1026   PhysicalSocketServer* ss_;
1027   WSAEVENT hev_;
1028 };
1029 #endif  // WEBRTC_WIN
1030 
1031 // Sets the value of a boolean value to false when signaled.
1032 class Signaler : public EventDispatcher {
1033  public:
Signaler(PhysicalSocketServer * ss,bool * pf)1034   Signaler(PhysicalSocketServer* ss, bool* pf) : EventDispatcher(ss), pf_(pf) {}
~Signaler()1035   ~Signaler() override {}
1036 
OnEvent(uint32_t ff,int err)1037   void OnEvent(uint32_t ff, int err) override {
1038     if (pf_)
1039       *pf_ = false;
1040   }
1041 
1042  private:
1043   bool* pf_;
1044 };
1045 
PhysicalSocketServer()1046 PhysicalSocketServer::PhysicalSocketServer()
1047     :
1048 #if defined(WEBRTC_USE_EPOLL)
1049       // Since Linux 2.6.8, the size argument is ignored, but must be greater
1050       // than zero. Before that the size served as hint to the kernel for the
1051       // amount of space to initially allocate in internal data structures.
1052       epoll_fd_(epoll_create(FD_SETSIZE)),
1053 #endif
1054 #if defined(WEBRTC_WIN)
1055       socket_ev_(WSACreateEvent()),
1056 #endif
1057       fWait_(false) {
1058 #if defined(WEBRTC_USE_EPOLL)
1059   if (epoll_fd_ == -1) {
1060     // Not an error, will fall back to "select" below.
1061     RTC_LOG_E(LS_WARNING, EN, errno) << "epoll_create";
1062     // Note that -1 == INVALID_SOCKET, the alias used by later checks.
1063   }
1064 #endif
1065   signal_wakeup_ = new Signaler(this, &fWait_);
1066 }
1067 
~PhysicalSocketServer()1068 PhysicalSocketServer::~PhysicalSocketServer() {
1069 #if defined(WEBRTC_WIN)
1070   WSACloseEvent(socket_ev_);
1071 #endif
1072   delete signal_wakeup_;
1073 #if defined(WEBRTC_USE_EPOLL)
1074   if (epoll_fd_ != INVALID_SOCKET) {
1075     close(epoll_fd_);
1076   }
1077 #endif
1078   RTC_DCHECK(dispatcher_by_key_.empty());
1079   RTC_DCHECK(key_by_dispatcher_.empty());
1080 }
1081 
WakeUp()1082 void PhysicalSocketServer::WakeUp() {
1083   signal_wakeup_->Signal();
1084 }
1085 
CreateSocket(int family,int type)1086 Socket* PhysicalSocketServer::CreateSocket(int family, int type) {
1087   PhysicalSocket* socket = new PhysicalSocket(this);
1088   if (socket->Create(family, type)) {
1089     return socket;
1090   } else {
1091     delete socket;
1092     return nullptr;
1093   }
1094 }
1095 
CreateAsyncSocket(int family,int type)1096 AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int family, int type) {
1097   SocketDispatcher* dispatcher = new SocketDispatcher(this);
1098   if (dispatcher->Create(family, type)) {
1099     return dispatcher;
1100   } else {
1101     delete dispatcher;
1102     return nullptr;
1103   }
1104 }
1105 
WrapSocket(SOCKET s)1106 AsyncSocket* PhysicalSocketServer::WrapSocket(SOCKET s) {
1107   SocketDispatcher* dispatcher = new SocketDispatcher(s, this);
1108   if (dispatcher->Initialize()) {
1109     return dispatcher;
1110   } else {
1111     delete dispatcher;
1112     return nullptr;
1113   }
1114 }
1115 
Add(Dispatcher * pdispatcher)1116 void PhysicalSocketServer::Add(Dispatcher* pdispatcher) {
1117   CritScope cs(&crit_);
1118   if (key_by_dispatcher_.count(pdispatcher)) {
1119     RTC_LOG(LS_WARNING)
1120         << "PhysicalSocketServer asked to add a duplicate dispatcher.";
1121     return;
1122   }
1123   uint64_t key = next_dispatcher_key_++;
1124   dispatcher_by_key_.emplace(key, pdispatcher);
1125   key_by_dispatcher_.emplace(pdispatcher, key);
1126 #if defined(WEBRTC_USE_EPOLL)
1127   if (epoll_fd_ != INVALID_SOCKET) {
1128     AddEpoll(pdispatcher, key);
1129   }
1130 #endif  // WEBRTC_USE_EPOLL
1131 }
1132 
Remove(Dispatcher * pdispatcher)1133 void PhysicalSocketServer::Remove(Dispatcher* pdispatcher) {
1134   CritScope cs(&crit_);
1135   if (!key_by_dispatcher_.count(pdispatcher)) {
1136     RTC_LOG(LS_WARNING)
1137         << "PhysicalSocketServer asked to remove a unknown "
1138            "dispatcher, potentially from a duplicate call to Add.";
1139     return;
1140   }
1141   uint64_t key = key_by_dispatcher_.at(pdispatcher);
1142   key_by_dispatcher_.erase(pdispatcher);
1143   dispatcher_by_key_.erase(key);
1144 #if defined(WEBRTC_USE_EPOLL)
1145   if (epoll_fd_ != INVALID_SOCKET) {
1146     RemoveEpoll(pdispatcher);
1147   }
1148 #endif  // WEBRTC_USE_EPOLL
1149 }
1150 
Update(Dispatcher * pdispatcher)1151 void PhysicalSocketServer::Update(Dispatcher* pdispatcher) {
1152 #if defined(WEBRTC_USE_EPOLL)
1153   if (epoll_fd_ == INVALID_SOCKET) {
1154     return;
1155   }
1156 
1157   // Don't update dispatchers that haven't yet been added.
1158   CritScope cs(&crit_);
1159   if (!key_by_dispatcher_.count(pdispatcher)) {
1160     return;
1161   }
1162 
1163   UpdateEpoll(pdispatcher, key_by_dispatcher_.at(pdispatcher));
1164 #endif
1165 }
1166 
1167 #if defined(WEBRTC_POSIX)
1168 
Wait(int cmsWait,bool process_io)1169 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
1170   // We don't support reentrant waiting.
1171   RTC_DCHECK(!waiting_);
1172   ScopedSetTrue s(&waiting_);
1173 #if defined(WEBRTC_USE_EPOLL)
1174   // We don't keep a dedicated "epoll" descriptor containing only the non-IO
1175   // (i.e. signaling) dispatcher, so "poll" will be used instead of the default
1176   // "select" to support sockets larger than FD_SETSIZE.
1177   if (!process_io) {
1178     return WaitPoll(cmsWait, signal_wakeup_);
1179   } else if (epoll_fd_ != INVALID_SOCKET) {
1180     return WaitEpoll(cmsWait);
1181   }
1182 #endif
1183   return WaitSelect(cmsWait, process_io);
1184 }
1185 
ProcessEvents(Dispatcher * dispatcher,bool readable,bool writable,bool check_error)1186 static void ProcessEvents(Dispatcher* dispatcher,
1187                           bool readable,
1188                           bool writable,
1189                           bool check_error) {
1190   int errcode = 0;
1191   // TODO(pthatcher): Should we set errcode if getsockopt fails?
1192   if (check_error) {
1193     socklen_t len = sizeof(errcode);
1194     ::getsockopt(dispatcher->GetDescriptor(), SOL_SOCKET, SO_ERROR, &errcode,
1195                  &len);
1196   }
1197 
1198   // Most often the socket is writable or readable or both, so make a single
1199   // virtual call to get requested events
1200   const uint32_t requested_events = dispatcher->GetRequestedEvents();
1201   uint32_t ff = 0;
1202 
1203   // Check readable descriptors. If we're waiting on an accept, signal
1204   // that. Otherwise we're waiting for data, check to see if we're
1205   // readable or really closed.
1206   // TODO(pthatcher): Only peek at TCP descriptors.
1207   if (readable) {
1208     if (requested_events & DE_ACCEPT) {
1209       ff |= DE_ACCEPT;
1210     } else if (errcode || dispatcher->IsDescriptorClosed()) {
1211       ff |= DE_CLOSE;
1212     } else {
1213       ff |= DE_READ;
1214     }
1215   }
1216 
1217   // Check writable descriptors. If we're waiting on a connect, detect
1218   // success versus failure by the reaped error code.
1219   if (writable) {
1220     if (requested_events & DE_CONNECT) {
1221       if (!errcode) {
1222         ff |= DE_CONNECT;
1223       } else {
1224         ff |= DE_CLOSE;
1225       }
1226     } else {
1227       ff |= DE_WRITE;
1228     }
1229   }
1230 
1231   // Tell the descriptor about the event.
1232   if (ff != 0) {
1233     dispatcher->OnPreEvent(ff);
1234     dispatcher->OnEvent(ff, errcode);
1235   }
1236 }
1237 
WaitSelect(int cmsWait,bool process_io)1238 bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) {
1239   // Calculate timing information
1240 
1241   struct timeval* ptvWait = nullptr;
1242   struct timeval tvWait;
1243   int64_t stop_us;
1244   if (cmsWait != kForever) {
1245     // Calculate wait timeval
1246     tvWait.tv_sec = cmsWait / 1000;
1247     tvWait.tv_usec = (cmsWait % 1000) * 1000;
1248     ptvWait = &tvWait;
1249 
1250     // Calculate when to return
1251     stop_us = rtc::TimeMicros() + cmsWait * 1000;
1252   }
1253 
1254 
1255   fd_set fdsRead;
1256   fd_set fdsWrite;
1257 // Explicitly unpoison these FDs on MemorySanitizer which doesn't handle the
1258 // inline assembly in FD_ZERO.
1259 // http://crbug.com/344505
1260 #ifdef MEMORY_SANITIZER
1261   __msan_unpoison(&fdsRead, sizeof(fdsRead));
1262   __msan_unpoison(&fdsWrite, sizeof(fdsWrite));
1263 #endif
1264 
1265   fWait_ = true;
1266 
1267   while (fWait_) {
1268     // Zero all fd_sets. Although select() zeros the descriptors not signaled,
1269     // we may need to do this for dispatchers that were deleted while
1270     // iterating.
1271     FD_ZERO(&fdsRead);
1272     FD_ZERO(&fdsWrite);
1273     int fdmax = -1;
1274     {
1275       CritScope cr(&crit_);
1276       current_dispatcher_keys_.clear();
1277       for (auto const& kv : dispatcher_by_key_) {
1278         uint64_t key = kv.first;
1279         Dispatcher* pdispatcher = kv.second;
1280         // Query dispatchers for read and write wait state
1281         if (!process_io && (pdispatcher != signal_wakeup_))
1282           continue;
1283         current_dispatcher_keys_.push_back(key);
1284         int fd = pdispatcher->GetDescriptor();
1285         // "select"ing a file descriptor that is equal to or larger than
1286         // FD_SETSIZE will result in undefined behavior.
1287         RTC_DCHECK_LT(fd, FD_SETSIZE);
1288         if (fd > fdmax)
1289           fdmax = fd;
1290 
1291         uint32_t ff = pdispatcher->GetRequestedEvents();
1292         if (ff & (DE_READ | DE_ACCEPT))
1293           FD_SET(fd, &fdsRead);
1294         if (ff & (DE_WRITE | DE_CONNECT))
1295           FD_SET(fd, &fdsWrite);
1296       }
1297     }
1298 
1299     // Wait then call handlers as appropriate
1300     // < 0 means error
1301     // 0 means timeout
1302     // > 0 means count of descriptors ready
1303     int n = select(fdmax + 1, &fdsRead, &fdsWrite, nullptr, ptvWait);
1304 
1305     // If error, return error.
1306     if (n < 0) {
1307       if (errno != EINTR) {
1308         RTC_LOG_E(LS_ERROR, EN, errno) << "select";
1309         return false;
1310       }
1311       // Else ignore the error and keep going. If this EINTR was for one of the
1312       // signals managed by this PhysicalSocketServer, the
1313       // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1314       // iteration.
1315     } else if (n == 0) {
1316       // If timeout, return success
1317       return true;
1318     } else {
1319       // We have signaled descriptors
1320       CritScope cr(&crit_);
1321       // Iterate only on the dispatchers whose sockets were passed into
1322       // WSAEventSelect; this avoids the ABA problem (a socket being
1323       // destroyed and a new one created with the same file descriptor).
1324       for (uint64_t key : current_dispatcher_keys_) {
1325         if (!dispatcher_by_key_.count(key))
1326           continue;
1327         Dispatcher* pdispatcher = dispatcher_by_key_.at(key);
1328 
1329         int fd = pdispatcher->GetDescriptor();
1330 
1331         bool readable = FD_ISSET(fd, &fdsRead);
1332         if (readable) {
1333           FD_CLR(fd, &fdsRead);
1334         }
1335 
1336         bool writable = FD_ISSET(fd, &fdsWrite);
1337         if (writable) {
1338           FD_CLR(fd, &fdsWrite);
1339         }
1340 
1341         // The error code can be signaled through reads or writes.
1342         ProcessEvents(pdispatcher, readable, writable, readable || writable);
1343       }
1344     }
1345 
1346     // Recalc the time remaining to wait. Doing it here means it doesn't get
1347     // calced twice the first time through the loop
1348     if (ptvWait) {
1349       ptvWait->tv_sec = 0;
1350       ptvWait->tv_usec = 0;
1351       int64_t time_left_us = stop_us - rtc::TimeMicros();
1352       if (time_left_us > 0) {
1353         ptvWait->tv_sec = time_left_us / rtc::kNumMicrosecsPerSec;
1354         ptvWait->tv_usec = time_left_us % rtc::kNumMicrosecsPerSec;
1355       }
1356     }
1357   }
1358 
1359   return true;
1360 }
1361 
1362 #if defined(WEBRTC_USE_EPOLL)
1363 
AddEpoll(Dispatcher * pdispatcher,uint64_t key)1364 void PhysicalSocketServer::AddEpoll(Dispatcher* pdispatcher, uint64_t key) {
1365   RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1366   int fd = pdispatcher->GetDescriptor();
1367   RTC_DCHECK(fd != INVALID_SOCKET);
1368   if (fd == INVALID_SOCKET) {
1369     return;
1370   }
1371 
1372   struct epoll_event event = {0};
1373   event.events = GetEpollEvents(pdispatcher->GetRequestedEvents());
1374   event.data.u64 = key;
1375   int err = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event);
1376   RTC_DCHECK_EQ(err, 0);
1377   if (err == -1) {
1378     RTC_LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_ADD";
1379   }
1380 }
1381 
RemoveEpoll(Dispatcher * pdispatcher)1382 void PhysicalSocketServer::RemoveEpoll(Dispatcher* pdispatcher) {
1383   RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1384   int fd = pdispatcher->GetDescriptor();
1385   RTC_DCHECK(fd != INVALID_SOCKET);
1386   if (fd == INVALID_SOCKET) {
1387     return;
1388   }
1389 
1390   struct epoll_event event = {0};
1391   int err = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, &event);
1392   RTC_DCHECK(err == 0 || errno == ENOENT);
1393   if (err == -1) {
1394     if (errno == ENOENT) {
1395       // Socket has already been closed.
1396       RTC_LOG_E(LS_VERBOSE, EN, errno) << "epoll_ctl EPOLL_CTL_DEL";
1397     } else {
1398       RTC_LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_DEL";
1399     }
1400   }
1401 }
1402 
UpdateEpoll(Dispatcher * pdispatcher,uint64_t key)1403 void PhysicalSocketServer::UpdateEpoll(Dispatcher* pdispatcher, uint64_t key) {
1404   RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1405   int fd = pdispatcher->GetDescriptor();
1406   RTC_DCHECK(fd != INVALID_SOCKET);
1407   if (fd == INVALID_SOCKET) {
1408     return;
1409   }
1410 
1411   struct epoll_event event = {0};
1412   event.events = GetEpollEvents(pdispatcher->GetRequestedEvents());
1413   event.data.u64 = key;
1414   int err = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &event);
1415   RTC_DCHECK_EQ(err, 0);
1416   if (err == -1) {
1417     RTC_LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_MOD";
1418   }
1419 }
1420 
WaitEpoll(int cmsWait)1421 bool PhysicalSocketServer::WaitEpoll(int cmsWait) {
1422   RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1423   int64_t tvWait = -1;
1424   int64_t tvStop = -1;
1425   if (cmsWait != kForever) {
1426     tvWait = cmsWait;
1427     tvStop = TimeAfter(cmsWait);
1428   }
1429 
1430   fWait_ = true;
1431   while (fWait_) {
1432     // Wait then call handlers as appropriate
1433     // < 0 means error
1434     // 0 means timeout
1435     // > 0 means count of descriptors ready
1436     int n = epoll_wait(epoll_fd_, epoll_events_.data(), epoll_events_.size(),
1437                        static_cast<int>(tvWait));
1438     if (n < 0) {
1439       if (errno != EINTR) {
1440         RTC_LOG_E(LS_ERROR, EN, errno) << "epoll";
1441         return false;
1442       }
1443       // Else ignore the error and keep going. If this EINTR was for one of the
1444       // signals managed by this PhysicalSocketServer, the
1445       // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1446       // iteration.
1447     } else if (n == 0) {
1448       // If timeout, return success
1449       return true;
1450     } else {
1451       // We have signaled descriptors
1452       CritScope cr(&crit_);
1453       for (int i = 0; i < n; ++i) {
1454         const epoll_event& event = epoll_events_[i];
1455         uint64_t key = event.data.u64;
1456         if (!dispatcher_by_key_.count(key)) {
1457           // The dispatcher for this socket no longer exists.
1458           continue;
1459         }
1460         Dispatcher* pdispatcher = dispatcher_by_key_.at(key);
1461 
1462         bool readable = (event.events & (EPOLLIN | EPOLLPRI));
1463         bool writable = (event.events & EPOLLOUT);
1464         bool check_error = (event.events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP));
1465 
1466         ProcessEvents(pdispatcher, readable, writable, check_error);
1467       }
1468     }
1469 
1470     if (cmsWait != kForever) {
1471       tvWait = TimeDiff(tvStop, TimeMillis());
1472       if (tvWait <= 0) {
1473         // Return success on timeout.
1474         return true;
1475       }
1476     }
1477   }
1478 
1479   return true;
1480 }
1481 
WaitPoll(int cmsWait,Dispatcher * dispatcher)1482 bool PhysicalSocketServer::WaitPoll(int cmsWait, Dispatcher* dispatcher) {
1483   RTC_DCHECK(dispatcher);
1484   int64_t tvWait = -1;
1485   int64_t tvStop = -1;
1486   if (cmsWait != kForever) {
1487     tvWait = cmsWait;
1488     tvStop = TimeAfter(cmsWait);
1489   }
1490 
1491   fWait_ = true;
1492 
1493   struct pollfd fds = {0};
1494   int fd = dispatcher->GetDescriptor();
1495   fds.fd = fd;
1496 
1497   while (fWait_) {
1498     uint32_t ff = dispatcher->GetRequestedEvents();
1499     fds.events = 0;
1500     if (ff & (DE_READ | DE_ACCEPT)) {
1501       fds.events |= POLLIN;
1502     }
1503     if (ff & (DE_WRITE | DE_CONNECT)) {
1504       fds.events |= POLLOUT;
1505     }
1506     fds.revents = 0;
1507 
1508     // Wait then call handlers as appropriate
1509     // < 0 means error
1510     // 0 means timeout
1511     // > 0 means count of descriptors ready
1512     int n = poll(&fds, 1, static_cast<int>(tvWait));
1513     if (n < 0) {
1514       if (errno != EINTR) {
1515         RTC_LOG_E(LS_ERROR, EN, errno) << "poll";
1516         return false;
1517       }
1518       // Else ignore the error and keep going. If this EINTR was for one of the
1519       // signals managed by this PhysicalSocketServer, the
1520       // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1521       // iteration.
1522     } else if (n == 0) {
1523       // If timeout, return success
1524       return true;
1525     } else {
1526       // We have signaled descriptors (should only be the passed dispatcher).
1527       RTC_DCHECK_EQ(n, 1);
1528       RTC_DCHECK_EQ(fds.fd, fd);
1529 
1530       bool readable = (fds.revents & (POLLIN | POLLPRI));
1531       bool writable = (fds.revents & POLLOUT);
1532       bool check_error = (fds.revents & (POLLRDHUP | POLLERR | POLLHUP));
1533 
1534       ProcessEvents(dispatcher, readable, writable, check_error);
1535     }
1536 
1537     if (cmsWait != kForever) {
1538       tvWait = TimeDiff(tvStop, TimeMillis());
1539       if (tvWait < 0) {
1540         // Return success on timeout.
1541         return true;
1542       }
1543     }
1544   }
1545 
1546   return true;
1547 }
1548 
1549 #endif  // WEBRTC_USE_EPOLL
1550 
1551 #endif  // WEBRTC_POSIX
1552 
1553 #if defined(WEBRTC_WIN)
Wait(int cmsWait,bool process_io)1554 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
1555   // We don't support reentrant waiting.
1556   RTC_DCHECK(!waiting_);
1557   ScopedSetTrue s(&waiting_);
1558 
1559   int64_t cmsTotal = cmsWait;
1560   int64_t cmsElapsed = 0;
1561   int64_t msStart = Time();
1562 
1563   fWait_ = true;
1564   while (fWait_) {
1565     std::vector<WSAEVENT> events;
1566     std::vector<uint64_t> event_owners;
1567 
1568     events.push_back(socket_ev_);
1569 
1570     {
1571       CritScope cr(&crit_);
1572       // Get a snapshot of all current dispatchers; this is used to avoid the
1573       // ABA problem (see later comment) and avoids the dispatcher_by_key_
1574       // iterator being invalidated by calling CheckSignalClose, which may
1575       // remove the dispatcher from the list.
1576       current_dispatcher_keys_.clear();
1577       for (auto const& kv : dispatcher_by_key_) {
1578         current_dispatcher_keys_.push_back(kv.first);
1579       }
1580       for (uint64_t key : current_dispatcher_keys_) {
1581         if (!dispatcher_by_key_.count(key)) {
1582           continue;
1583         }
1584         Dispatcher* disp = dispatcher_by_key_.at(key);
1585         if (!disp)
1586           continue;
1587         if (!process_io && (disp != signal_wakeup_))
1588           continue;
1589         SOCKET s = disp->GetSocket();
1590         if (disp->CheckSignalClose()) {
1591           // We just signalled close, don't poll this socket.
1592         } else if (s != INVALID_SOCKET) {
1593           WSAEventSelect(s, events[0],
1594                          FlagsToEvents(disp->GetRequestedEvents()));
1595         } else {
1596           events.push_back(disp->GetWSAEvent());
1597           event_owners.push_back(key);
1598         }
1599       }
1600     }
1601 
1602     // Which is shorter, the delay wait or the asked wait?
1603 
1604     int64_t cmsNext;
1605     if (cmsWait == kForever) {
1606       cmsNext = cmsWait;
1607     } else {
1608       cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
1609     }
1610 
1611     // Wait for one of the events to signal
1612     DWORD dw =
1613         WSAWaitForMultipleEvents(static_cast<DWORD>(events.size()), &events[0],
1614                                  false, static_cast<DWORD>(cmsNext), false);
1615 
1616     if (dw == WSA_WAIT_FAILED) {
1617       // Failed?
1618       // TODO(pthatcher): need a better strategy than this!
1619       WSAGetLastError();
1620       RTC_NOTREACHED();
1621       return false;
1622     } else if (dw == WSA_WAIT_TIMEOUT) {
1623       // Timeout?
1624       return true;
1625     } else {
1626       // Figure out which one it is and call it
1627       CritScope cr(&crit_);
1628       int index = dw - WSA_WAIT_EVENT_0;
1629       if (index > 0) {
1630         --index;  // The first event is the socket event
1631         uint64_t key = event_owners[index];
1632         if (!dispatcher_by_key_.count(key)) {
1633           // The dispatcher could have been removed while waiting for events.
1634           continue;
1635         }
1636         Dispatcher* disp = dispatcher_by_key_.at(key);
1637         disp->OnPreEvent(0);
1638         disp->OnEvent(0, 0);
1639       } else if (process_io) {
1640         // Iterate only on the dispatchers whose sockets were passed into
1641         // WSAEventSelect; this avoids the ABA problem (a socket being
1642         // destroyed and a new one created with the same SOCKET handle).
1643         for (uint64_t key : current_dispatcher_keys_) {
1644           if (!dispatcher_by_key_.count(key)) {
1645             continue;
1646           }
1647           Dispatcher* disp = dispatcher_by_key_.at(key);
1648           SOCKET s = disp->GetSocket();
1649           if (s == INVALID_SOCKET)
1650             continue;
1651 
1652           WSANETWORKEVENTS wsaEvents;
1653           int err = WSAEnumNetworkEvents(s, events[0], &wsaEvents);
1654           if (err == 0) {
1655             {
1656               if ((wsaEvents.lNetworkEvents & FD_READ) &&
1657                   wsaEvents.iErrorCode[FD_READ_BIT] != 0) {
1658                 RTC_LOG(WARNING)
1659                     << "PhysicalSocketServer got FD_READ_BIT error "
1660                     << wsaEvents.iErrorCode[FD_READ_BIT];
1661               }
1662               if ((wsaEvents.lNetworkEvents & FD_WRITE) &&
1663                   wsaEvents.iErrorCode[FD_WRITE_BIT] != 0) {
1664                 RTC_LOG(WARNING)
1665                     << "PhysicalSocketServer got FD_WRITE_BIT error "
1666                     << wsaEvents.iErrorCode[FD_WRITE_BIT];
1667               }
1668               if ((wsaEvents.lNetworkEvents & FD_CONNECT) &&
1669                   wsaEvents.iErrorCode[FD_CONNECT_BIT] != 0) {
1670                 RTC_LOG(WARNING)
1671                     << "PhysicalSocketServer got FD_CONNECT_BIT error "
1672                     << wsaEvents.iErrorCode[FD_CONNECT_BIT];
1673               }
1674               if ((wsaEvents.lNetworkEvents & FD_ACCEPT) &&
1675                   wsaEvents.iErrorCode[FD_ACCEPT_BIT] != 0) {
1676                 RTC_LOG(WARNING)
1677                     << "PhysicalSocketServer got FD_ACCEPT_BIT error "
1678                     << wsaEvents.iErrorCode[FD_ACCEPT_BIT];
1679               }
1680               if ((wsaEvents.lNetworkEvents & FD_CLOSE) &&
1681                   wsaEvents.iErrorCode[FD_CLOSE_BIT] != 0) {
1682                 RTC_LOG(WARNING)
1683                     << "PhysicalSocketServer got FD_CLOSE_BIT error "
1684                     << wsaEvents.iErrorCode[FD_CLOSE_BIT];
1685               }
1686             }
1687             uint32_t ff = 0;
1688             int errcode = 0;
1689             if (wsaEvents.lNetworkEvents & FD_READ)
1690               ff |= DE_READ;
1691             if (wsaEvents.lNetworkEvents & FD_WRITE)
1692               ff |= DE_WRITE;
1693             if (wsaEvents.lNetworkEvents & FD_CONNECT) {
1694               if (wsaEvents.iErrorCode[FD_CONNECT_BIT] == 0) {
1695                 ff |= DE_CONNECT;
1696               } else {
1697                 ff |= DE_CLOSE;
1698                 errcode = wsaEvents.iErrorCode[FD_CONNECT_BIT];
1699               }
1700             }
1701             if (wsaEvents.lNetworkEvents & FD_ACCEPT)
1702               ff |= DE_ACCEPT;
1703             if (wsaEvents.lNetworkEvents & FD_CLOSE) {
1704               ff |= DE_CLOSE;
1705               errcode = wsaEvents.iErrorCode[FD_CLOSE_BIT];
1706             }
1707             if (ff != 0) {
1708               disp->OnPreEvent(ff);
1709               disp->OnEvent(ff, errcode);
1710             }
1711           }
1712         }
1713       }
1714 
1715       // Reset the network event until new activity occurs
1716       WSAResetEvent(socket_ev_);
1717     }
1718 
1719     // Break?
1720     if (!fWait_)
1721       break;
1722     cmsElapsed = TimeSince(msStart);
1723     if ((cmsWait != kForever) && (cmsElapsed >= cmsWait)) {
1724       break;
1725     }
1726   }
1727 
1728   // Done
1729   return true;
1730 }
1731 #endif  // WEBRTC_WIN
1732 
1733 }  // namespace rtc
1734