1 /*
2  *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #if defined(_MSC_VER) && _MSC_VER < 1300
12 #pragma warning(disable:4786)
13 #endif
14 
15 #include <assert.h>
16 
17 #ifdef MEMORY_SANITIZER
18 #include <sanitizer/msan_interface.h>
19 #endif
20 
21 #if defined(WEBRTC_POSIX)
22 #include <string.h>
23 #include <errno.h>
24 #include <fcntl.h>
25 #include <sys/time.h>
26 #include <sys/select.h>
27 #include <unistd.h>
28 #include <signal.h>
29 #endif
30 
31 #if defined(WEBRTC_WIN)
32 #define WIN32_LEAN_AND_MEAN
33 #include <windows.h>
34 #include <winsock2.h>
35 #include <ws2tcpip.h>
36 #undef SetPort
37 #endif
38 
39 #include <algorithm>
40 #include <map>
41 
42 #include "webrtc/base/basictypes.h"
43 #include "webrtc/base/byteorder.h"
44 #include "webrtc/base/common.h"
45 #include "webrtc/base/logging.h"
46 #include "webrtc/base/nethelpers.h"
47 #include "webrtc/base/physicalsocketserver.h"
48 #include "webrtc/base/timeutils.h"
49 #include "webrtc/base/winping.h"
50 #include "webrtc/base/win32socketinit.h"
51 
52 // stm: this will tell us if we are on OSX
53 #ifdef HAVE_CONFIG_H
54 #include "config.h"
55 #endif
56 
57 #if defined(WEBRTC_POSIX)
58 #include <netinet/tcp.h>  // for TCP_NODELAY
59 #define IP_MTU 14 // Until this is integrated from linux/in.h to netinet/in.h
60 typedef void* SockOptArg;
61 #endif  // WEBRTC_POSIX
62 
63 #if defined(WEBRTC_WIN)
64 typedef char* SockOptArg;
65 #endif
66 
67 namespace rtc {
68 
69 #if defined(WEBRTC_WIN)
70 // Standard MTUs, from RFC 1191
71 const uint16_t PACKET_MAXIMUMS[] = {
72     65535,  // Theoretical maximum, Hyperchannel
73     32000,  // Nothing
74     17914,  // 16Mb IBM Token Ring
75     8166,   // IEEE 802.4
76     // 4464,   // IEEE 802.5 (4Mb max)
77     4352,   // FDDI
78     // 2048,   // Wideband Network
79     2002,   // IEEE 802.5 (4Mb recommended)
80     // 1536,   // Expermental Ethernet Networks
81     // 1500,   // Ethernet, Point-to-Point (default)
82     1492,   // IEEE 802.3
83     1006,   // SLIP, ARPANET
84     // 576,    // X.25 Networks
85     // 544,    // DEC IP Portal
86     // 512,    // NETBIOS
87     508,    // IEEE 802/Source-Rt Bridge, ARCNET
88     296,    // Point-to-Point (low delay)
89     68,     // Official minimum
90     0,      // End of list marker
91 };
92 
93 static const int IP_HEADER_SIZE = 20u;
94 static const int IPV6_HEADER_SIZE = 40u;
95 static const int ICMP_HEADER_SIZE = 8u;
96 static const int ICMP_PING_TIMEOUT_MILLIS = 10000u;
97 #endif
98 
99 class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> {
100  public:
PhysicalSocket(PhysicalSocketServer * ss,SOCKET s=INVALID_SOCKET)101   PhysicalSocket(PhysicalSocketServer* ss, SOCKET s = INVALID_SOCKET)
102     : ss_(ss), s_(s), enabled_events_(0), error_(0),
103       state_((s == INVALID_SOCKET) ? CS_CLOSED : CS_CONNECTED),
104       resolver_(NULL) {
105 #if defined(WEBRTC_WIN)
106     // EnsureWinsockInit() ensures that winsock is initialized. The default
107     // version of this function doesn't do anything because winsock is
108     // initialized by constructor of a static object. If neccessary libjingle
109     // users can link it with a different version of this function by replacing
110     // win32socketinit.cc. See win32socketinit.cc for more details.
111     EnsureWinsockInit();
112 #endif
113     if (s_ != INVALID_SOCKET) {
114       enabled_events_ = DE_READ | DE_WRITE;
115 
116       int type = SOCK_STREAM;
117       socklen_t len = sizeof(type);
118       VERIFY(0 == getsockopt(s_, SOL_SOCKET, SO_TYPE, (SockOptArg)&type, &len));
119       udp_ = (SOCK_DGRAM == type);
120     }
121   }
122 
~PhysicalSocket()123   ~PhysicalSocket() override {
124     Close();
125   }
126 
127   // Creates the underlying OS socket (same as the "socket" function).
Create(int family,int type)128   virtual bool Create(int family, int type) {
129     Close();
130     s_ = ::socket(family, type, 0);
131     udp_ = (SOCK_DGRAM == type);
132     UpdateLastError();
133     if (udp_)
134       enabled_events_ = DE_READ | DE_WRITE;
135     return s_ != INVALID_SOCKET;
136   }
137 
GetLocalAddress() const138   SocketAddress GetLocalAddress() const override {
139     sockaddr_storage addr_storage = {0};
140     socklen_t addrlen = sizeof(addr_storage);
141     sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
142     int result = ::getsockname(s_, addr, &addrlen);
143     SocketAddress address;
144     if (result >= 0) {
145       SocketAddressFromSockAddrStorage(addr_storage, &address);
146     } else {
147       LOG(LS_WARNING) << "GetLocalAddress: unable to get local addr, socket="
148                       << s_;
149     }
150     return address;
151   }
152 
GetRemoteAddress() const153   SocketAddress GetRemoteAddress() const override {
154     sockaddr_storage addr_storage = {0};
155     socklen_t addrlen = sizeof(addr_storage);
156     sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
157     int result = ::getpeername(s_, addr, &addrlen);
158     SocketAddress address;
159     if (result >= 0) {
160       SocketAddressFromSockAddrStorage(addr_storage, &address);
161     } else {
162       LOG(LS_WARNING) << "GetRemoteAddress: unable to get remote addr, socket="
163                       << s_;
164     }
165     return address;
166   }
167 
Bind(const SocketAddress & bind_addr)168   int Bind(const SocketAddress& bind_addr) override {
169     sockaddr_storage addr_storage;
170     size_t len = bind_addr.ToSockAddrStorage(&addr_storage);
171     sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
172     int err = ::bind(s_, addr, static_cast<int>(len));
173     UpdateLastError();
174 #if !defined(NDEBUG)
175     if (0 == err) {
176       dbg_addr_ = "Bound @ ";
177       dbg_addr_.append(GetLocalAddress().ToString());
178     }
179 #endif
180     return err;
181   }
182 
Connect(const SocketAddress & addr)183   int Connect(const SocketAddress& addr) override {
184     // TODO: Implicit creation is required to reconnect...
185     // ...but should we make it more explicit?
186     if (state_ != CS_CLOSED) {
187       SetError(EALREADY);
188       return SOCKET_ERROR;
189     }
190     if (addr.IsUnresolvedIP()) {
191       LOG(LS_VERBOSE) << "Resolving addr in PhysicalSocket::Connect";
192       resolver_ = new AsyncResolver();
193       resolver_->SignalDone.connect(this, &PhysicalSocket::OnResolveResult);
194       resolver_->Start(addr);
195       state_ = CS_CONNECTING;
196       return 0;
197     }
198 
199     return DoConnect(addr);
200   }
201 
DoConnect(const SocketAddress & connect_addr)202   int DoConnect(const SocketAddress& connect_addr) {
203     if ((s_ == INVALID_SOCKET) &&
204         !Create(connect_addr.family(), SOCK_STREAM)) {
205       return SOCKET_ERROR;
206     }
207     sockaddr_storage addr_storage;
208     size_t len = connect_addr.ToSockAddrStorage(&addr_storage);
209     sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
210     int err = ::connect(s_, addr, static_cast<int>(len));
211     UpdateLastError();
212     if (err == 0) {
213       state_ = CS_CONNECTED;
214     } else if (IsBlockingError(GetError())) {
215       state_ = CS_CONNECTING;
216       enabled_events_ |= DE_CONNECT;
217     } else {
218       return SOCKET_ERROR;
219     }
220 
221     enabled_events_ |= DE_READ | DE_WRITE;
222     return 0;
223   }
224 
GetError() const225   int GetError() const override {
226     CritScope cs(&crit_);
227     return error_;
228   }
229 
SetError(int error)230   void SetError(int error) override {
231     CritScope cs(&crit_);
232     error_ = error;
233   }
234 
GetState() const235   ConnState GetState() const override { return state_; }
236 
GetOption(Option opt,int * value)237   int GetOption(Option opt, int* value) override {
238     int slevel;
239     int sopt;
240     if (TranslateOption(opt, &slevel, &sopt) == -1)
241       return -1;
242     socklen_t optlen = sizeof(*value);
243     int ret = ::getsockopt(s_, slevel, sopt, (SockOptArg)value, &optlen);
244     if (ret != -1 && opt == OPT_DONTFRAGMENT) {
245 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
246       *value = (*value != IP_PMTUDISC_DONT) ? 1 : 0;
247 #endif
248     }
249     return ret;
250   }
251 
SetOption(Option opt,int value)252   int SetOption(Option opt, int value) override {
253     int slevel;
254     int sopt;
255     if (TranslateOption(opt, &slevel, &sopt) == -1)
256       return -1;
257     if (opt == OPT_DONTFRAGMENT) {
258 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
259       value = (value) ? IP_PMTUDISC_DO : IP_PMTUDISC_DONT;
260 #endif
261     }
262     return ::setsockopt(s_, slevel, sopt, (SockOptArg)&value, sizeof(value));
263   }
264 
Send(const void * pv,size_t cb)265   int Send(const void* pv, size_t cb) override {
266     int sent = ::send(s_, reinterpret_cast<const char *>(pv), (int)cb,
267 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
268         // Suppress SIGPIPE. Without this, attempting to send on a socket whose
269         // other end is closed will result in a SIGPIPE signal being raised to
270         // our process, which by default will terminate the process, which we
271         // don't want. By specifying this flag, we'll just get the error EPIPE
272         // instead and can handle the error gracefully.
273         MSG_NOSIGNAL
274 #else
275         0
276 #endif
277         );
278     UpdateLastError();
279     MaybeRemapSendError();
280     // We have seen minidumps where this may be false.
281     ASSERT(sent <= static_cast<int>(cb));
282     if ((sent < 0) && IsBlockingError(GetError())) {
283       enabled_events_ |= DE_WRITE;
284     }
285     return sent;
286   }
287 
SendTo(const void * buffer,size_t length,const SocketAddress & addr)288   int SendTo(const void* buffer,
289              size_t length,
290              const SocketAddress& addr) override {
291     sockaddr_storage saddr;
292     size_t len = addr.ToSockAddrStorage(&saddr);
293     int sent = ::sendto(
294         s_, static_cast<const char *>(buffer), static_cast<int>(length),
295 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
296         // Suppress SIGPIPE. See above for explanation.
297         MSG_NOSIGNAL,
298 #else
299         0,
300 #endif
301         reinterpret_cast<sockaddr*>(&saddr), static_cast<int>(len));
302     UpdateLastError();
303     MaybeRemapSendError();
304     // We have seen minidumps where this may be false.
305     ASSERT(sent <= static_cast<int>(length));
306     if ((sent < 0) && IsBlockingError(GetError())) {
307       enabled_events_ |= DE_WRITE;
308     }
309     return sent;
310   }
311 
Recv(void * buffer,size_t length)312   int Recv(void* buffer, size_t length) override {
313     int received = ::recv(s_, static_cast<char*>(buffer),
314                           static_cast<int>(length), 0);
315     if ((received == 0) && (length != 0)) {
316       // Note: on graceful shutdown, recv can return 0.  In this case, we
317       // pretend it is blocking, and then signal close, so that simplifying
318       // assumptions can be made about Recv.
319       LOG(LS_WARNING) << "EOF from socket; deferring close event";
320       // Must turn this back on so that the select() loop will notice the close
321       // event.
322       enabled_events_ |= DE_READ;
323       SetError(EWOULDBLOCK);
324       return SOCKET_ERROR;
325     }
326     UpdateLastError();
327     int error = GetError();
328     bool success = (received >= 0) || IsBlockingError(error);
329     if (udp_ || success) {
330       enabled_events_ |= DE_READ;
331     }
332     if (!success) {
333       LOG_F(LS_VERBOSE) << "Error = " << error;
334     }
335     return received;
336   }
337 
RecvFrom(void * buffer,size_t length,SocketAddress * out_addr)338   int RecvFrom(void* buffer, size_t length, SocketAddress* out_addr) override {
339     sockaddr_storage addr_storage;
340     socklen_t addr_len = sizeof(addr_storage);
341     sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
342     int received = ::recvfrom(s_, static_cast<char*>(buffer),
343                               static_cast<int>(length), 0, addr, &addr_len);
344     UpdateLastError();
345     if ((received >= 0) && (out_addr != NULL))
346       SocketAddressFromSockAddrStorage(addr_storage, out_addr);
347     int error = GetError();
348     bool success = (received >= 0) || IsBlockingError(error);
349     if (udp_ || success) {
350       enabled_events_ |= DE_READ;
351     }
352     if (!success) {
353       LOG_F(LS_VERBOSE) << "Error = " << error;
354     }
355     return received;
356   }
357 
Listen(int backlog)358   int Listen(int backlog) override {
359     int err = ::listen(s_, backlog);
360     UpdateLastError();
361     if (err == 0) {
362       state_ = CS_CONNECTING;
363       enabled_events_ |= DE_ACCEPT;
364 #if !defined(NDEBUG)
365       dbg_addr_ = "Listening @ ";
366       dbg_addr_.append(GetLocalAddress().ToString());
367 #endif
368     }
369     return err;
370   }
371 
Accept(SocketAddress * out_addr)372   AsyncSocket* Accept(SocketAddress* out_addr) override {
373     sockaddr_storage addr_storage;
374     socklen_t addr_len = sizeof(addr_storage);
375     sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
376     SOCKET s = ::accept(s_, addr, &addr_len);
377     UpdateLastError();
378     if (s == INVALID_SOCKET)
379       return NULL;
380     enabled_events_ |= DE_ACCEPT;
381     if (out_addr != NULL)
382       SocketAddressFromSockAddrStorage(addr_storage, out_addr);
383     return ss_->WrapSocket(s);
384   }
385 
Close()386   int Close() override {
387     if (s_ == INVALID_SOCKET)
388       return 0;
389     int err = ::closesocket(s_);
390     UpdateLastError();
391     s_ = INVALID_SOCKET;
392     state_ = CS_CLOSED;
393     enabled_events_ = 0;
394     if (resolver_) {
395       resolver_->Destroy(false);
396       resolver_ = NULL;
397     }
398     return err;
399   }
400 
EstimateMTU(uint16_t * mtu)401   int EstimateMTU(uint16_t* mtu) override {
402     SocketAddress addr = GetRemoteAddress();
403     if (addr.IsAnyIP()) {
404       SetError(ENOTCONN);
405       return -1;
406     }
407 
408 #if defined(WEBRTC_WIN)
409     // Gets the interface MTU (TTL=1) for the interface used to reach |addr|.
410     WinPing ping;
411     if (!ping.IsValid()) {
412       SetError(EINVAL);  // can't think of a better error ID
413       return -1;
414     }
415     int header_size = ICMP_HEADER_SIZE;
416     if (addr.family() == AF_INET6) {
417       header_size += IPV6_HEADER_SIZE;
418     } else if (addr.family() == AF_INET) {
419       header_size += IP_HEADER_SIZE;
420     }
421 
422     for (int level = 0; PACKET_MAXIMUMS[level + 1] > 0; ++level) {
423       int32_t size = PACKET_MAXIMUMS[level] - header_size;
424       WinPing::PingResult result = ping.Ping(addr.ipaddr(), size,
425                                              ICMP_PING_TIMEOUT_MILLIS,
426                                              1, false);
427       if (result == WinPing::PING_FAIL) {
428         SetError(EINVAL);  // can't think of a better error ID
429         return -1;
430       } else if (result != WinPing::PING_TOO_LARGE) {
431         *mtu = PACKET_MAXIMUMS[level];
432         return 0;
433       }
434     }
435 
436     ASSERT(false);
437     return -1;
438 #elif defined(WEBRTC_MAC)
439     // No simple way to do this on Mac OS X.
440     // SIOCGIFMTU would work if we knew which interface would be used, but
441     // figuring that out is pretty complicated. For now we'll return an error
442     // and let the caller pick a default MTU.
443     SetError(EINVAL);
444     return -1;
445 #elif defined(WEBRTC_LINUX)
446     // Gets the path MTU.
447     int value;
448     socklen_t vlen = sizeof(value);
449     int err = getsockopt(s_, IPPROTO_IP, IP_MTU, &value, &vlen);
450     if (err < 0) {
451       UpdateLastError();
452       return err;
453     }
454 
455     ASSERT((0 <= value) && (value <= 65536));
456     *mtu = value;
457     return 0;
458 #elif defined(__native_client__)
459     // Most socket operations, including this, will fail in NaCl's sandbox.
460     error_ = EACCES;
461     return -1;
462 #endif
463   }
464 
socketserver()465   SocketServer* socketserver() { return ss_; }
466 
467  protected:
OnResolveResult(AsyncResolverInterface * resolver)468   void OnResolveResult(AsyncResolverInterface* resolver) {
469     if (resolver != resolver_) {
470       return;
471     }
472 
473     int error = resolver_->GetError();
474     if (error == 0) {
475       error = DoConnect(resolver_->address());
476     } else {
477       Close();
478     }
479 
480     if (error) {
481       SetError(error);
482       SignalCloseEvent(this, error);
483     }
484   }
485 
UpdateLastError()486   void UpdateLastError() {
487     SetError(LAST_SYSTEM_ERROR);
488   }
489 
MaybeRemapSendError()490   void MaybeRemapSendError() {
491 #if defined(WEBRTC_MAC)
492     // https://developer.apple.com/library/mac/documentation/Darwin/
493     // Reference/ManPages/man2/sendto.2.html
494     // ENOBUFS - The output queue for a network interface is full.
495     // This generally indicates that the interface has stopped sending,
496     // but may be caused by transient congestion.
497     if (GetError() == ENOBUFS) {
498       SetError(EWOULDBLOCK);
499     }
500 #endif
501   }
502 
TranslateOption(Option opt,int * slevel,int * sopt)503   static int TranslateOption(Option opt, int* slevel, int* sopt) {
504     switch (opt) {
505       case OPT_DONTFRAGMENT:
506 #if defined(WEBRTC_WIN)
507         *slevel = IPPROTO_IP;
508         *sopt = IP_DONTFRAGMENT;
509         break;
510 #elif defined(WEBRTC_MAC) || defined(BSD) || defined(__native_client__)
511         LOG(LS_WARNING) << "Socket::OPT_DONTFRAGMENT not supported.";
512         return -1;
513 #elif defined(WEBRTC_POSIX)
514         *slevel = IPPROTO_IP;
515         *sopt = IP_MTU_DISCOVER;
516         break;
517 #endif
518       case OPT_RCVBUF:
519         *slevel = SOL_SOCKET;
520         *sopt = SO_RCVBUF;
521         break;
522       case OPT_SNDBUF:
523         *slevel = SOL_SOCKET;
524         *sopt = SO_SNDBUF;
525         break;
526       case OPT_NODELAY:
527         *slevel = IPPROTO_TCP;
528         *sopt = TCP_NODELAY;
529         break;
530       case OPT_DSCP:
531         LOG(LS_WARNING) << "Socket::OPT_DSCP not supported.";
532         return -1;
533       case OPT_RTP_SENDTIME_EXTN_ID:
534         return -1;  // No logging is necessary as this not a OS socket option.
535       default:
536         ASSERT(false);
537         return -1;
538     }
539     return 0;
540   }
541 
542   PhysicalSocketServer* ss_;
543   SOCKET s_;
544   uint8_t enabled_events_;
545   bool udp_;
546   int error_;
547   // Protects |error_| that is accessed from different threads.
548   mutable CriticalSection crit_;
549   ConnState state_;
550   AsyncResolver* resolver_;
551 
552 #if !defined(NDEBUG)
553   std::string dbg_addr_;
554 #endif
555 };
556 
557 #if defined(WEBRTC_POSIX)
558 class EventDispatcher : public Dispatcher {
559  public:
EventDispatcher(PhysicalSocketServer * ss)560   EventDispatcher(PhysicalSocketServer* ss) : ss_(ss), fSignaled_(false) {
561     if (pipe(afd_) < 0)
562       LOG(LERROR) << "pipe failed";
563     ss_->Add(this);
564   }
565 
~EventDispatcher()566   ~EventDispatcher() override {
567     ss_->Remove(this);
568     close(afd_[0]);
569     close(afd_[1]);
570   }
571 
Signal()572   virtual void Signal() {
573     CritScope cs(&crit_);
574     if (!fSignaled_) {
575       const uint8_t b[1] = {0};
576       if (VERIFY(1 == write(afd_[1], b, sizeof(b)))) {
577         fSignaled_ = true;
578       }
579     }
580   }
581 
GetRequestedEvents()582   uint32_t GetRequestedEvents() override { return DE_READ; }
583 
OnPreEvent(uint32_t ff)584   void OnPreEvent(uint32_t ff) override {
585     // It is not possible to perfectly emulate an auto-resetting event with
586     // pipes.  This simulates it by resetting before the event is handled.
587 
588     CritScope cs(&crit_);
589     if (fSignaled_) {
590       uint8_t b[4];  // Allow for reading more than 1 byte, but expect 1.
591       VERIFY(1 == read(afd_[0], b, sizeof(b)));
592       fSignaled_ = false;
593     }
594   }
595 
OnEvent(uint32_t ff,int err)596   void OnEvent(uint32_t ff, int err) override { ASSERT(false); }
597 
GetDescriptor()598   int GetDescriptor() override { return afd_[0]; }
599 
IsDescriptorClosed()600   bool IsDescriptorClosed() override { return false; }
601 
602  private:
603   PhysicalSocketServer *ss_;
604   int afd_[2];
605   bool fSignaled_;
606   CriticalSection crit_;
607 };
608 
609 // These two classes use the self-pipe trick to deliver POSIX signals to our
610 // select loop. This is the only safe, reliable, cross-platform way to do
611 // non-trivial things with a POSIX signal in an event-driven program (until
612 // proper pselect() implementations become ubiquitous).
613 
614 class PosixSignalHandler {
615  public:
616   // POSIX only specifies 32 signals, but in principle the system might have
617   // more and the programmer might choose to use them, so we size our array
618   // for 128.
619   static const int kNumPosixSignals = 128;
620 
621   // There is just a single global instance. (Signal handlers do not get any
622   // sort of user-defined void * parameter, so they can't access anything that
623   // isn't global.)
Instance()624   static PosixSignalHandler* Instance() {
625     RTC_DEFINE_STATIC_LOCAL(PosixSignalHandler, instance, ());
626     return &instance;
627   }
628 
629   // Returns true if the given signal number is set.
IsSignalSet(int signum) const630   bool IsSignalSet(int signum) const {
631     ASSERT(signum < ARRAY_SIZE(received_signal_));
632     if (signum < ARRAY_SIZE(received_signal_)) {
633       return received_signal_[signum];
634     } else {
635       return false;
636     }
637   }
638 
639   // Clears the given signal number.
ClearSignal(int signum)640   void ClearSignal(int signum) {
641     ASSERT(signum < ARRAY_SIZE(received_signal_));
642     if (signum < ARRAY_SIZE(received_signal_)) {
643       received_signal_[signum] = false;
644     }
645   }
646 
647   // Returns the file descriptor to monitor for signal events.
GetDescriptor() const648   int GetDescriptor() const {
649     return afd_[0];
650   }
651 
652   // This is called directly from our real signal handler, so it must be
653   // signal-handler-safe. That means it cannot assume anything about the
654   // user-level state of the process, since the handler could be executed at any
655   // time on any thread.
OnPosixSignalReceived(int signum)656   void OnPosixSignalReceived(int signum) {
657     if (signum >= ARRAY_SIZE(received_signal_)) {
658       // We don't have space in our array for this.
659       return;
660     }
661     // Set a flag saying we've seen this signal.
662     received_signal_[signum] = true;
663     // Notify application code that we got a signal.
664     const uint8_t b[1] = {0};
665     if (-1 == write(afd_[1], b, sizeof(b))) {
666       // Nothing we can do here. If there's an error somehow then there's
667       // nothing we can safely do from a signal handler.
668       // No, we can't even safely log it.
669       // But, we still have to check the return value here. Otherwise,
670       // GCC 4.4.1 complains ignoring return value. Even (void) doesn't help.
671       return;
672     }
673   }
674 
675  private:
PosixSignalHandler()676   PosixSignalHandler() {
677     if (pipe(afd_) < 0) {
678       LOG_ERR(LS_ERROR) << "pipe failed";
679       return;
680     }
681     if (fcntl(afd_[0], F_SETFL, O_NONBLOCK) < 0) {
682       LOG_ERR(LS_WARNING) << "fcntl #1 failed";
683     }
684     if (fcntl(afd_[1], F_SETFL, O_NONBLOCK) < 0) {
685       LOG_ERR(LS_WARNING) << "fcntl #2 failed";
686     }
687     memset(const_cast<void *>(static_cast<volatile void *>(received_signal_)),
688            0,
689            sizeof(received_signal_));
690   }
691 
~PosixSignalHandler()692   ~PosixSignalHandler() {
693     int fd1 = afd_[0];
694     int fd2 = afd_[1];
695     // We clobber the stored file descriptor numbers here or else in principle
696     // a signal that happens to be delivered during application termination
697     // could erroneously write a zero byte to an unrelated file handle in
698     // OnPosixSignalReceived() if some other file happens to be opened later
699     // during shutdown and happens to be given the same file descriptor number
700     // as our pipe had. Unfortunately even with this precaution there is still a
701     // race where that could occur if said signal happens to be handled
702     // concurrently with this code and happens to have already read the value of
703     // afd_[1] from memory before we clobber it, but that's unlikely.
704     afd_[0] = -1;
705     afd_[1] = -1;
706     close(fd1);
707     close(fd2);
708   }
709 
710   int afd_[2];
711   // These are boolean flags that will be set in our signal handler and read
712   // and cleared from Wait(). There is a race involved in this, but it is
713   // benign. The signal handler sets the flag before signaling the pipe, so
714   // we'll never end up blocking in select() while a flag is still true.
715   // However, if two of the same signal arrive close to each other then it's
716   // possible that the second time the handler may set the flag while it's still
717   // true, meaning that signal will be missed. But the first occurrence of it
718   // will still be handled, so this isn't a problem.
719   // Volatile is not necessary here for correctness, but this data _is_ volatile
720   // so I've marked it as such.
721   volatile uint8_t received_signal_[kNumPosixSignals];
722 };
723 
724 class PosixSignalDispatcher : public Dispatcher {
725  public:
PosixSignalDispatcher(PhysicalSocketServer * owner)726   PosixSignalDispatcher(PhysicalSocketServer *owner) : owner_(owner) {
727     owner_->Add(this);
728   }
729 
~PosixSignalDispatcher()730   ~PosixSignalDispatcher() override {
731     owner_->Remove(this);
732   }
733 
GetRequestedEvents()734   uint32_t GetRequestedEvents() override { return DE_READ; }
735 
OnPreEvent(uint32_t ff)736   void OnPreEvent(uint32_t ff) override {
737     // Events might get grouped if signals come very fast, so we read out up to
738     // 16 bytes to make sure we keep the pipe empty.
739     uint8_t b[16];
740     ssize_t ret = read(GetDescriptor(), b, sizeof(b));
741     if (ret < 0) {
742       LOG_ERR(LS_WARNING) << "Error in read()";
743     } else if (ret == 0) {
744       LOG(LS_WARNING) << "Should have read at least one byte";
745     }
746   }
747 
OnEvent(uint32_t ff,int err)748   void OnEvent(uint32_t ff, int err) override {
749     for (int signum = 0; signum < PosixSignalHandler::kNumPosixSignals;
750          ++signum) {
751       if (PosixSignalHandler::Instance()->IsSignalSet(signum)) {
752         PosixSignalHandler::Instance()->ClearSignal(signum);
753         HandlerMap::iterator i = handlers_.find(signum);
754         if (i == handlers_.end()) {
755           // This can happen if a signal is delivered to our process at around
756           // the same time as we unset our handler for it. It is not an error
757           // condition, but it's unusual enough to be worth logging.
758           LOG(LS_INFO) << "Received signal with no handler: " << signum;
759         } else {
760           // Otherwise, execute our handler.
761           (*i->second)(signum);
762         }
763       }
764     }
765   }
766 
GetDescriptor()767   int GetDescriptor() override {
768     return PosixSignalHandler::Instance()->GetDescriptor();
769   }
770 
IsDescriptorClosed()771   bool IsDescriptorClosed() override { return false; }
772 
SetHandler(int signum,void (* handler)(int))773   void SetHandler(int signum, void (*handler)(int)) {
774     handlers_[signum] = handler;
775   }
776 
ClearHandler(int signum)777   void ClearHandler(int signum) {
778     handlers_.erase(signum);
779   }
780 
HasHandlers()781   bool HasHandlers() {
782     return !handlers_.empty();
783   }
784 
785  private:
786   typedef std::map<int, void (*)(int)> HandlerMap;
787 
788   HandlerMap handlers_;
789   // Our owner.
790   PhysicalSocketServer *owner_;
791 };
792 
793 class SocketDispatcher : public Dispatcher, public PhysicalSocket {
794  public:
SocketDispatcher(PhysicalSocketServer * ss)795   explicit SocketDispatcher(PhysicalSocketServer *ss) : PhysicalSocket(ss) {
796   }
SocketDispatcher(SOCKET s,PhysicalSocketServer * ss)797   SocketDispatcher(SOCKET s, PhysicalSocketServer *ss) : PhysicalSocket(ss, s) {
798   }
799 
~SocketDispatcher()800   ~SocketDispatcher() override {
801     Close();
802   }
803 
Initialize()804   bool Initialize() {
805     ss_->Add(this);
806     fcntl(s_, F_SETFL, fcntl(s_, F_GETFL, 0) | O_NONBLOCK);
807     return true;
808   }
809 
Create(int type)810   virtual bool Create(int type) {
811     return Create(AF_INET, type);
812   }
813 
Create(int family,int type)814   bool Create(int family, int type) override {
815     // Change the socket to be non-blocking.
816     if (!PhysicalSocket::Create(family, type))
817       return false;
818 
819     return Initialize();
820   }
821 
GetDescriptor()822   int GetDescriptor() override { return s_; }
823 
IsDescriptorClosed()824   bool IsDescriptorClosed() override {
825     // We don't have a reliable way of distinguishing end-of-stream
826     // from readability.  So test on each readable call.  Is this
827     // inefficient?  Probably.
828     char ch;
829     ssize_t res = ::recv(s_, &ch, 1, MSG_PEEK);
830     if (res > 0) {
831       // Data available, so not closed.
832       return false;
833     } else if (res == 0) {
834       // EOF, so closed.
835       return true;
836     } else {  // error
837       switch (errno) {
838         // Returned if we've already closed s_.
839         case EBADF:
840         // Returned during ungraceful peer shutdown.
841         case ECONNRESET:
842           return true;
843         default:
844           // Assume that all other errors are just blocking errors, meaning the
845           // connection is still good but we just can't read from it right now.
846           // This should only happen when connecting (and at most once), because
847           // in all other cases this function is only called if the file
848           // descriptor is already known to be in the readable state. However,
849           // it's not necessary a problem if we spuriously interpret a
850           // "connection lost"-type error as a blocking error, because typically
851           // the next recv() will get EOF, so we'll still eventually notice that
852           // the socket is closed.
853           LOG_ERR(LS_WARNING) << "Assuming benign blocking error";
854           return false;
855       }
856     }
857   }
858 
GetRequestedEvents()859   uint32_t GetRequestedEvents() override { return enabled_events_; }
860 
OnPreEvent(uint32_t ff)861   void OnPreEvent(uint32_t ff) override {
862     if ((ff & DE_CONNECT) != 0)
863       state_ = CS_CONNECTED;
864     if ((ff & DE_CLOSE) != 0)
865       state_ = CS_CLOSED;
866   }
867 
OnEvent(uint32_t ff,int err)868   void OnEvent(uint32_t ff, int err) override {
869     // Make sure we deliver connect/accept first. Otherwise, consumers may see
870     // something like a READ followed by a CONNECT, which would be odd.
871     if ((ff & DE_CONNECT) != 0) {
872       enabled_events_ &= ~DE_CONNECT;
873       SignalConnectEvent(this);
874     }
875     if ((ff & DE_ACCEPT) != 0) {
876       enabled_events_ &= ~DE_ACCEPT;
877       SignalReadEvent(this);
878     }
879     if ((ff & DE_READ) != 0) {
880       enabled_events_ &= ~DE_READ;
881       SignalReadEvent(this);
882     }
883     if ((ff & DE_WRITE) != 0) {
884       enabled_events_ &= ~DE_WRITE;
885       SignalWriteEvent(this);
886     }
887     if ((ff & DE_CLOSE) != 0) {
888       // The socket is now dead to us, so stop checking it.
889       enabled_events_ = 0;
890       SignalCloseEvent(this, err);
891     }
892   }
893 
Close()894   int Close() override {
895     if (s_ == INVALID_SOCKET)
896       return 0;
897 
898     ss_->Remove(this);
899     return PhysicalSocket::Close();
900   }
901 };
902 
903 class FileDispatcher: public Dispatcher, public AsyncFile {
904  public:
FileDispatcher(int fd,PhysicalSocketServer * ss)905   FileDispatcher(int fd, PhysicalSocketServer *ss) : ss_(ss), fd_(fd) {
906     set_readable(true);
907 
908     ss_->Add(this);
909 
910     fcntl(fd_, F_SETFL, fcntl(fd_, F_GETFL, 0) | O_NONBLOCK);
911   }
912 
~FileDispatcher()913   ~FileDispatcher() override {
914     ss_->Remove(this);
915   }
916 
socketserver()917   SocketServer* socketserver() { return ss_; }
918 
GetDescriptor()919   int GetDescriptor() override { return fd_; }
920 
IsDescriptorClosed()921   bool IsDescriptorClosed() override { return false; }
922 
GetRequestedEvents()923   uint32_t GetRequestedEvents() override { return flags_; }
924 
OnPreEvent(uint32_t ff)925   void OnPreEvent(uint32_t ff) override {}
926 
OnEvent(uint32_t ff,int err)927   void OnEvent(uint32_t ff, int err) override {
928     if ((ff & DE_READ) != 0)
929       SignalReadEvent(this);
930     if ((ff & DE_WRITE) != 0)
931       SignalWriteEvent(this);
932     if ((ff & DE_CLOSE) != 0)
933       SignalCloseEvent(this, err);
934   }
935 
readable()936   bool readable() override { return (flags_ & DE_READ) != 0; }
937 
set_readable(bool value)938   void set_readable(bool value) override {
939     flags_ = value ? (flags_ | DE_READ) : (flags_ & ~DE_READ);
940   }
941 
writable()942   bool writable() override { return (flags_ & DE_WRITE) != 0; }
943 
set_writable(bool value)944   void set_writable(bool value) override {
945     flags_ = value ? (flags_ | DE_WRITE) : (flags_ & ~DE_WRITE);
946   }
947 
948  private:
949   PhysicalSocketServer* ss_;
950   int fd_;
951   int flags_;
952 };
953 
CreateFile(int fd)954 AsyncFile* PhysicalSocketServer::CreateFile(int fd) {
955   return new FileDispatcher(fd, this);
956 }
957 
958 #endif // WEBRTC_POSIX
959 
960 #if defined(WEBRTC_WIN)
FlagsToEvents(uint32_t events)961 static uint32_t FlagsToEvents(uint32_t events) {
962   uint32_t ffFD = FD_CLOSE;
963   if (events & DE_READ)
964     ffFD |= FD_READ;
965   if (events & DE_WRITE)
966     ffFD |= FD_WRITE;
967   if (events & DE_CONNECT)
968     ffFD |= FD_CONNECT;
969   if (events & DE_ACCEPT)
970     ffFD |= FD_ACCEPT;
971   return ffFD;
972 }
973 
974 class EventDispatcher : public Dispatcher {
975  public:
EventDispatcher(PhysicalSocketServer * ss)976   EventDispatcher(PhysicalSocketServer *ss) : ss_(ss) {
977     hev_ = WSACreateEvent();
978     if (hev_) {
979       ss_->Add(this);
980     }
981   }
982 
~EventDispatcher()983   ~EventDispatcher() {
984     if (hev_ != NULL) {
985       ss_->Remove(this);
986       WSACloseEvent(hev_);
987       hev_ = NULL;
988     }
989   }
990 
Signal()991   virtual void Signal() {
992     if (hev_ != NULL)
993       WSASetEvent(hev_);
994   }
995 
GetRequestedEvents()996   virtual uint32_t GetRequestedEvents() { return 0; }
997 
OnPreEvent(uint32_t ff)998   virtual void OnPreEvent(uint32_t ff) { WSAResetEvent(hev_); }
999 
OnEvent(uint32_t ff,int err)1000   virtual void OnEvent(uint32_t ff, int err) {}
1001 
GetWSAEvent()1002   virtual WSAEVENT GetWSAEvent() {
1003     return hev_;
1004   }
1005 
GetSocket()1006   virtual SOCKET GetSocket() {
1007     return INVALID_SOCKET;
1008   }
1009 
CheckSignalClose()1010   virtual bool CheckSignalClose() { return false; }
1011 
1012 private:
1013   PhysicalSocketServer* ss_;
1014   WSAEVENT hev_;
1015 };
1016 
1017 class SocketDispatcher : public Dispatcher, public PhysicalSocket {
1018  public:
1019   static int next_id_;
1020   int id_;
1021   bool signal_close_;
1022   int signal_err_;
1023 
SocketDispatcher(PhysicalSocketServer * ss)1024   SocketDispatcher(PhysicalSocketServer* ss)
1025       : PhysicalSocket(ss),
1026         id_(0),
1027         signal_close_(false) {
1028   }
1029 
SocketDispatcher(SOCKET s,PhysicalSocketServer * ss)1030   SocketDispatcher(SOCKET s, PhysicalSocketServer* ss)
1031       : PhysicalSocket(ss, s),
1032         id_(0),
1033         signal_close_(false) {
1034   }
1035 
~SocketDispatcher()1036   virtual ~SocketDispatcher() {
1037     Close();
1038   }
1039 
Initialize()1040   bool Initialize() {
1041     ASSERT(s_ != INVALID_SOCKET);
1042     // Must be a non-blocking
1043     u_long argp = 1;
1044     ioctlsocket(s_, FIONBIO, &argp);
1045     ss_->Add(this);
1046     return true;
1047   }
1048 
Create(int type)1049   virtual bool Create(int type) {
1050     return Create(AF_INET, type);
1051   }
1052 
Create(int family,int type)1053   virtual bool Create(int family, int type) {
1054     // Create socket
1055     if (!PhysicalSocket::Create(family, type))
1056       return false;
1057 
1058     if (!Initialize())
1059       return false;
1060 
1061     do { id_ = ++next_id_; } while (id_ == 0);
1062     return true;
1063   }
1064 
Close()1065   virtual int Close() {
1066     if (s_ == INVALID_SOCKET)
1067       return 0;
1068 
1069     id_ = 0;
1070     signal_close_ = false;
1071     ss_->Remove(this);
1072     return PhysicalSocket::Close();
1073   }
1074 
GetRequestedEvents()1075   virtual uint32_t GetRequestedEvents() { return enabled_events_; }
1076 
OnPreEvent(uint32_t ff)1077   virtual void OnPreEvent(uint32_t ff) {
1078     if ((ff & DE_CONNECT) != 0)
1079       state_ = CS_CONNECTED;
1080     // We set CS_CLOSED from CheckSignalClose.
1081   }
1082 
OnEvent(uint32_t ff,int err)1083   virtual void OnEvent(uint32_t ff, int err) {
1084     int cache_id = id_;
1085     // Make sure we deliver connect/accept first. Otherwise, consumers may see
1086     // something like a READ followed by a CONNECT, which would be odd.
1087     if (((ff & DE_CONNECT) != 0) && (id_ == cache_id)) {
1088       if (ff != DE_CONNECT)
1089         LOG(LS_VERBOSE) << "Signalled with DE_CONNECT: " << ff;
1090       enabled_events_ &= ~DE_CONNECT;
1091 #if !defined(NDEBUG)
1092       dbg_addr_ = "Connected @ ";
1093       dbg_addr_.append(GetRemoteAddress().ToString());
1094 #endif
1095       SignalConnectEvent(this);
1096     }
1097     if (((ff & DE_ACCEPT) != 0) && (id_ == cache_id)) {
1098       enabled_events_ &= ~DE_ACCEPT;
1099       SignalReadEvent(this);
1100     }
1101     if ((ff & DE_READ) != 0) {
1102       enabled_events_ &= ~DE_READ;
1103       SignalReadEvent(this);
1104     }
1105     if (((ff & DE_WRITE) != 0) && (id_ == cache_id)) {
1106       enabled_events_ &= ~DE_WRITE;
1107       SignalWriteEvent(this);
1108     }
1109     if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) {
1110       signal_close_ = true;
1111       signal_err_ = err;
1112     }
1113   }
1114 
GetWSAEvent()1115   virtual WSAEVENT GetWSAEvent() {
1116     return WSA_INVALID_EVENT;
1117   }
1118 
GetSocket()1119   virtual SOCKET GetSocket() {
1120     return s_;
1121   }
1122 
CheckSignalClose()1123   virtual bool CheckSignalClose() {
1124     if (!signal_close_)
1125       return false;
1126 
1127     char ch;
1128     if (recv(s_, &ch, 1, MSG_PEEK) > 0)
1129       return false;
1130 
1131     state_ = CS_CLOSED;
1132     signal_close_ = false;
1133     SignalCloseEvent(this, signal_err_);
1134     return true;
1135   }
1136 };
1137 
1138 int SocketDispatcher::next_id_ = 0;
1139 
1140 #endif  // WEBRTC_WIN
1141 
1142 // Sets the value of a boolean value to false when signaled.
1143 class Signaler : public EventDispatcher {
1144  public:
Signaler(PhysicalSocketServer * ss,bool * pf)1145   Signaler(PhysicalSocketServer* ss, bool* pf)
1146       : EventDispatcher(ss), pf_(pf) {
1147   }
~Signaler()1148   ~Signaler() override { }
1149 
OnEvent(uint32_t ff,int err)1150   void OnEvent(uint32_t ff, int err) override {
1151     if (pf_)
1152       *pf_ = false;
1153   }
1154 
1155  private:
1156   bool *pf_;
1157 };
1158 
PhysicalSocketServer()1159 PhysicalSocketServer::PhysicalSocketServer()
1160     : fWait_(false) {
1161   signal_wakeup_ = new Signaler(this, &fWait_);
1162 #if defined(WEBRTC_WIN)
1163   socket_ev_ = WSACreateEvent();
1164 #endif
1165 }
1166 
~PhysicalSocketServer()1167 PhysicalSocketServer::~PhysicalSocketServer() {
1168 #if defined(WEBRTC_WIN)
1169   WSACloseEvent(socket_ev_);
1170 #endif
1171 #if defined(WEBRTC_POSIX)
1172   signal_dispatcher_.reset();
1173 #endif
1174   delete signal_wakeup_;
1175   ASSERT(dispatchers_.empty());
1176 }
1177 
WakeUp()1178 void PhysicalSocketServer::WakeUp() {
1179   signal_wakeup_->Signal();
1180 }
1181 
CreateSocket(int type)1182 Socket* PhysicalSocketServer::CreateSocket(int type) {
1183   return CreateSocket(AF_INET, type);
1184 }
1185 
CreateSocket(int family,int type)1186 Socket* PhysicalSocketServer::CreateSocket(int family, int type) {
1187   PhysicalSocket* socket = new PhysicalSocket(this);
1188   if (socket->Create(family, type)) {
1189     return socket;
1190   } else {
1191     delete socket;
1192     return 0;
1193   }
1194 }
1195 
CreateAsyncSocket(int type)1196 AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int type) {
1197   return CreateAsyncSocket(AF_INET, type);
1198 }
1199 
CreateAsyncSocket(int family,int type)1200 AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int family, int type) {
1201   SocketDispatcher* dispatcher = new SocketDispatcher(this);
1202   if (dispatcher->Create(family, type)) {
1203     return dispatcher;
1204   } else {
1205     delete dispatcher;
1206     return 0;
1207   }
1208 }
1209 
WrapSocket(SOCKET s)1210 AsyncSocket* PhysicalSocketServer::WrapSocket(SOCKET s) {
1211   SocketDispatcher* dispatcher = new SocketDispatcher(s, this);
1212   if (dispatcher->Initialize()) {
1213     return dispatcher;
1214   } else {
1215     delete dispatcher;
1216     return 0;
1217   }
1218 }
1219 
Add(Dispatcher * pdispatcher)1220 void PhysicalSocketServer::Add(Dispatcher *pdispatcher) {
1221   CritScope cs(&crit_);
1222   // Prevent duplicates. This can cause dead dispatchers to stick around.
1223   DispatcherList::iterator pos = std::find(dispatchers_.begin(),
1224                                            dispatchers_.end(),
1225                                            pdispatcher);
1226   if (pos != dispatchers_.end())
1227     return;
1228   dispatchers_.push_back(pdispatcher);
1229 }
1230 
Remove(Dispatcher * pdispatcher)1231 void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) {
1232   CritScope cs(&crit_);
1233   DispatcherList::iterator pos = std::find(dispatchers_.begin(),
1234                                            dispatchers_.end(),
1235                                            pdispatcher);
1236   // We silently ignore duplicate calls to Add, so we should silently ignore
1237   // the (expected) symmetric calls to Remove. Note that this may still hide
1238   // a real issue, so we at least log a warning about it.
1239   if (pos == dispatchers_.end()) {
1240     LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown "
1241                     << "dispatcher, potentially from a duplicate call to Add.";
1242     return;
1243   }
1244   size_t index = pos - dispatchers_.begin();
1245   dispatchers_.erase(pos);
1246   for (IteratorList::iterator it = iterators_.begin(); it != iterators_.end();
1247        ++it) {
1248     if (index < **it) {
1249       --**it;
1250     }
1251   }
1252 }
1253 
1254 #if defined(WEBRTC_POSIX)
Wait(int cmsWait,bool process_io)1255 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
1256   // Calculate timing information
1257 
1258   struct timeval *ptvWait = NULL;
1259   struct timeval tvWait;
1260   struct timeval tvStop;
1261   if (cmsWait != kForever) {
1262     // Calculate wait timeval
1263     tvWait.tv_sec = cmsWait / 1000;
1264     tvWait.tv_usec = (cmsWait % 1000) * 1000;
1265     ptvWait = &tvWait;
1266 
1267     // Calculate when to return in a timeval
1268     gettimeofday(&tvStop, NULL);
1269     tvStop.tv_sec += tvWait.tv_sec;
1270     tvStop.tv_usec += tvWait.tv_usec;
1271     if (tvStop.tv_usec >= 1000000) {
1272       tvStop.tv_usec -= 1000000;
1273       tvStop.tv_sec += 1;
1274     }
1275   }
1276 
1277   // Zero all fd_sets. Don't need to do this inside the loop since
1278   // select() zeros the descriptors not signaled
1279 
1280   fd_set fdsRead;
1281   FD_ZERO(&fdsRead);
1282   fd_set fdsWrite;
1283   FD_ZERO(&fdsWrite);
1284   // Explicitly unpoison these FDs on MemorySanitizer which doesn't handle the
1285   // inline assembly in FD_ZERO.
1286   // http://crbug.com/344505
1287 #ifdef MEMORY_SANITIZER
1288   __msan_unpoison(&fdsRead, sizeof(fdsRead));
1289   __msan_unpoison(&fdsWrite, sizeof(fdsWrite));
1290 #endif
1291 
1292   fWait_ = true;
1293 
1294   while (fWait_) {
1295     int fdmax = -1;
1296     {
1297       CritScope cr(&crit_);
1298       for (size_t i = 0; i < dispatchers_.size(); ++i) {
1299         // Query dispatchers for read and write wait state
1300         Dispatcher *pdispatcher = dispatchers_[i];
1301         ASSERT(pdispatcher);
1302         if (!process_io && (pdispatcher != signal_wakeup_))
1303           continue;
1304         int fd = pdispatcher->GetDescriptor();
1305         if (fd > fdmax)
1306           fdmax = fd;
1307 
1308         uint32_t ff = pdispatcher->GetRequestedEvents();
1309         if (ff & (DE_READ | DE_ACCEPT))
1310           FD_SET(fd, &fdsRead);
1311         if (ff & (DE_WRITE | DE_CONNECT))
1312           FD_SET(fd, &fdsWrite);
1313       }
1314     }
1315 
1316     // Wait then call handlers as appropriate
1317     // < 0 means error
1318     // 0 means timeout
1319     // > 0 means count of descriptors ready
1320     int n = select(fdmax + 1, &fdsRead, &fdsWrite, NULL, ptvWait);
1321 
1322     // If error, return error.
1323     if (n < 0) {
1324       if (errno != EINTR) {
1325         LOG_E(LS_ERROR, EN, errno) << "select";
1326         return false;
1327       }
1328       // Else ignore the error and keep going. If this EINTR was for one of the
1329       // signals managed by this PhysicalSocketServer, the
1330       // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1331       // iteration.
1332     } else if (n == 0) {
1333       // If timeout, return success
1334       return true;
1335     } else {
1336       // We have signaled descriptors
1337       CritScope cr(&crit_);
1338       for (size_t i = 0; i < dispatchers_.size(); ++i) {
1339         Dispatcher *pdispatcher = dispatchers_[i];
1340         int fd = pdispatcher->GetDescriptor();
1341         uint32_t ff = 0;
1342         int errcode = 0;
1343 
1344         // Reap any error code, which can be signaled through reads or writes.
1345         // TODO: Should we set errcode if getsockopt fails?
1346         if (FD_ISSET(fd, &fdsRead) || FD_ISSET(fd, &fdsWrite)) {
1347           socklen_t len = sizeof(errcode);
1348           ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len);
1349         }
1350 
1351         // Check readable descriptors. If we're waiting on an accept, signal
1352         // that. Otherwise we're waiting for data, check to see if we're
1353         // readable or really closed.
1354         // TODO: Only peek at TCP descriptors.
1355         if (FD_ISSET(fd, &fdsRead)) {
1356           FD_CLR(fd, &fdsRead);
1357           if (pdispatcher->GetRequestedEvents() & DE_ACCEPT) {
1358             ff |= DE_ACCEPT;
1359           } else if (errcode || pdispatcher->IsDescriptorClosed()) {
1360             ff |= DE_CLOSE;
1361           } else {
1362             ff |= DE_READ;
1363           }
1364         }
1365 
1366         // Check writable descriptors. If we're waiting on a connect, detect
1367         // success versus failure by the reaped error code.
1368         if (FD_ISSET(fd, &fdsWrite)) {
1369           FD_CLR(fd, &fdsWrite);
1370           if (pdispatcher->GetRequestedEvents() & DE_CONNECT) {
1371             if (!errcode) {
1372               ff |= DE_CONNECT;
1373             } else {
1374               ff |= DE_CLOSE;
1375             }
1376           } else {
1377             ff |= DE_WRITE;
1378           }
1379         }
1380 
1381         // Tell the descriptor about the event.
1382         if (ff != 0) {
1383           pdispatcher->OnPreEvent(ff);
1384           pdispatcher->OnEvent(ff, errcode);
1385         }
1386       }
1387     }
1388 
1389     // Recalc the time remaining to wait. Doing it here means it doesn't get
1390     // calced twice the first time through the loop
1391     if (ptvWait) {
1392       ptvWait->tv_sec = 0;
1393       ptvWait->tv_usec = 0;
1394       struct timeval tvT;
1395       gettimeofday(&tvT, NULL);
1396       if ((tvStop.tv_sec > tvT.tv_sec)
1397           || ((tvStop.tv_sec == tvT.tv_sec)
1398               && (tvStop.tv_usec > tvT.tv_usec))) {
1399         ptvWait->tv_sec = tvStop.tv_sec - tvT.tv_sec;
1400         ptvWait->tv_usec = tvStop.tv_usec - tvT.tv_usec;
1401         if (ptvWait->tv_usec < 0) {
1402           ASSERT(ptvWait->tv_sec > 0);
1403           ptvWait->tv_usec += 1000000;
1404           ptvWait->tv_sec -= 1;
1405         }
1406       }
1407     }
1408   }
1409 
1410   return true;
1411 }
1412 
GlobalSignalHandler(int signum)1413 static void GlobalSignalHandler(int signum) {
1414   PosixSignalHandler::Instance()->OnPosixSignalReceived(signum);
1415 }
1416 
SetPosixSignalHandler(int signum,void (* handler)(int))1417 bool PhysicalSocketServer::SetPosixSignalHandler(int signum,
1418                                                  void (*handler)(int)) {
1419   // If handler is SIG_IGN or SIG_DFL then clear our user-level handler,
1420   // otherwise set one.
1421   if (handler == SIG_IGN || handler == SIG_DFL) {
1422     if (!InstallSignal(signum, handler)) {
1423       return false;
1424     }
1425     if (signal_dispatcher_) {
1426       signal_dispatcher_->ClearHandler(signum);
1427       if (!signal_dispatcher_->HasHandlers()) {
1428         signal_dispatcher_.reset();
1429       }
1430     }
1431   } else {
1432     if (!signal_dispatcher_) {
1433       signal_dispatcher_.reset(new PosixSignalDispatcher(this));
1434     }
1435     signal_dispatcher_->SetHandler(signum, handler);
1436     if (!InstallSignal(signum, &GlobalSignalHandler)) {
1437       return false;
1438     }
1439   }
1440   return true;
1441 }
1442 
signal_dispatcher()1443 Dispatcher* PhysicalSocketServer::signal_dispatcher() {
1444   return signal_dispatcher_.get();
1445 }
1446 
InstallSignal(int signum,void (* handler)(int))1447 bool PhysicalSocketServer::InstallSignal(int signum, void (*handler)(int)) {
1448   struct sigaction act;
1449   // It doesn't really matter what we set this mask to.
1450   if (sigemptyset(&act.sa_mask) != 0) {
1451     LOG_ERR(LS_ERROR) << "Couldn't set mask";
1452     return false;
1453   }
1454   act.sa_handler = handler;
1455 #if !defined(__native_client__)
1456   // Use SA_RESTART so that our syscalls don't get EINTR, since we don't need it
1457   // and it's a nuisance. Though some syscalls still return EINTR and there's no
1458   // real standard for which ones. :(
1459   act.sa_flags = SA_RESTART;
1460 #else
1461   act.sa_flags = 0;
1462 #endif
1463   if (sigaction(signum, &act, NULL) != 0) {
1464     LOG_ERR(LS_ERROR) << "Couldn't set sigaction";
1465     return false;
1466   }
1467   return true;
1468 }
1469 #endif  // WEBRTC_POSIX
1470 
1471 #if defined(WEBRTC_WIN)
Wait(int cmsWait,bool process_io)1472 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
1473   int cmsTotal = cmsWait;
1474   int cmsElapsed = 0;
1475   uint32_t msStart = Time();
1476 
1477   fWait_ = true;
1478   while (fWait_) {
1479     std::vector<WSAEVENT> events;
1480     std::vector<Dispatcher *> event_owners;
1481 
1482     events.push_back(socket_ev_);
1483 
1484     {
1485       CritScope cr(&crit_);
1486       size_t i = 0;
1487       iterators_.push_back(&i);
1488       // Don't track dispatchers_.size(), because we want to pick up any new
1489       // dispatchers that were added while processing the loop.
1490       while (i < dispatchers_.size()) {
1491         Dispatcher* disp = dispatchers_[i++];
1492         if (!process_io && (disp != signal_wakeup_))
1493           continue;
1494         SOCKET s = disp->GetSocket();
1495         if (disp->CheckSignalClose()) {
1496           // We just signalled close, don't poll this socket
1497         } else if (s != INVALID_SOCKET) {
1498           WSAEventSelect(s,
1499                          events[0],
1500                          FlagsToEvents(disp->GetRequestedEvents()));
1501         } else {
1502           events.push_back(disp->GetWSAEvent());
1503           event_owners.push_back(disp);
1504         }
1505       }
1506       ASSERT(iterators_.back() == &i);
1507       iterators_.pop_back();
1508     }
1509 
1510     // Which is shorter, the delay wait or the asked wait?
1511 
1512     int cmsNext;
1513     if (cmsWait == kForever) {
1514       cmsNext = cmsWait;
1515     } else {
1516       cmsNext = std::max(0, cmsTotal - cmsElapsed);
1517     }
1518 
1519     // Wait for one of the events to signal
1520     DWORD dw = WSAWaitForMultipleEvents(static_cast<DWORD>(events.size()),
1521                                         &events[0],
1522                                         false,
1523                                         cmsNext,
1524                                         false);
1525 
1526     if (dw == WSA_WAIT_FAILED) {
1527       // Failed?
1528       // TODO: need a better strategy than this!
1529       WSAGetLastError();
1530       ASSERT(false);
1531       return false;
1532     } else if (dw == WSA_WAIT_TIMEOUT) {
1533       // Timeout?
1534       return true;
1535     } else {
1536       // Figure out which one it is and call it
1537       CritScope cr(&crit_);
1538       int index = dw - WSA_WAIT_EVENT_0;
1539       if (index > 0) {
1540         --index; // The first event is the socket event
1541         event_owners[index]->OnPreEvent(0);
1542         event_owners[index]->OnEvent(0, 0);
1543       } else if (process_io) {
1544         size_t i = 0, end = dispatchers_.size();
1545         iterators_.push_back(&i);
1546         iterators_.push_back(&end);  // Don't iterate over new dispatchers.
1547         while (i < end) {
1548           Dispatcher* disp = dispatchers_[i++];
1549           SOCKET s = disp->GetSocket();
1550           if (s == INVALID_SOCKET)
1551             continue;
1552 
1553           WSANETWORKEVENTS wsaEvents;
1554           int err = WSAEnumNetworkEvents(s, events[0], &wsaEvents);
1555           if (err == 0) {
1556 
1557 #if LOGGING
1558             {
1559               if ((wsaEvents.lNetworkEvents & FD_READ) &&
1560                   wsaEvents.iErrorCode[FD_READ_BIT] != 0) {
1561                 LOG(WARNING) << "PhysicalSocketServer got FD_READ_BIT error "
1562                              << wsaEvents.iErrorCode[FD_READ_BIT];
1563               }
1564               if ((wsaEvents.lNetworkEvents & FD_WRITE) &&
1565                   wsaEvents.iErrorCode[FD_WRITE_BIT] != 0) {
1566                 LOG(WARNING) << "PhysicalSocketServer got FD_WRITE_BIT error "
1567                              << wsaEvents.iErrorCode[FD_WRITE_BIT];
1568               }
1569               if ((wsaEvents.lNetworkEvents & FD_CONNECT) &&
1570                   wsaEvents.iErrorCode[FD_CONNECT_BIT] != 0) {
1571                 LOG(WARNING) << "PhysicalSocketServer got FD_CONNECT_BIT error "
1572                              << wsaEvents.iErrorCode[FD_CONNECT_BIT];
1573               }
1574               if ((wsaEvents.lNetworkEvents & FD_ACCEPT) &&
1575                   wsaEvents.iErrorCode[FD_ACCEPT_BIT] != 0) {
1576                 LOG(WARNING) << "PhysicalSocketServer got FD_ACCEPT_BIT error "
1577                              << wsaEvents.iErrorCode[FD_ACCEPT_BIT];
1578               }
1579               if ((wsaEvents.lNetworkEvents & FD_CLOSE) &&
1580                   wsaEvents.iErrorCode[FD_CLOSE_BIT] != 0) {
1581                 LOG(WARNING) << "PhysicalSocketServer got FD_CLOSE_BIT error "
1582                              << wsaEvents.iErrorCode[FD_CLOSE_BIT];
1583               }
1584             }
1585 #endif
1586             uint32_t ff = 0;
1587             int errcode = 0;
1588             if (wsaEvents.lNetworkEvents & FD_READ)
1589               ff |= DE_READ;
1590             if (wsaEvents.lNetworkEvents & FD_WRITE)
1591               ff |= DE_WRITE;
1592             if (wsaEvents.lNetworkEvents & FD_CONNECT) {
1593               if (wsaEvents.iErrorCode[FD_CONNECT_BIT] == 0) {
1594                 ff |= DE_CONNECT;
1595               } else {
1596                 ff |= DE_CLOSE;
1597                 errcode = wsaEvents.iErrorCode[FD_CONNECT_BIT];
1598               }
1599             }
1600             if (wsaEvents.lNetworkEvents & FD_ACCEPT)
1601               ff |= DE_ACCEPT;
1602             if (wsaEvents.lNetworkEvents & FD_CLOSE) {
1603               ff |= DE_CLOSE;
1604               errcode = wsaEvents.iErrorCode[FD_CLOSE_BIT];
1605             }
1606             if (ff != 0) {
1607               disp->OnPreEvent(ff);
1608               disp->OnEvent(ff, errcode);
1609             }
1610           }
1611         }
1612         ASSERT(iterators_.back() == &end);
1613         iterators_.pop_back();
1614         ASSERT(iterators_.back() == &i);
1615         iterators_.pop_back();
1616       }
1617 
1618       // Reset the network event until new activity occurs
1619       WSAResetEvent(socket_ev_);
1620     }
1621 
1622     // Break?
1623     if (!fWait_)
1624       break;
1625     cmsElapsed = TimeSince(msStart);
1626     if ((cmsWait != kForever) && (cmsElapsed >= cmsWait)) {
1627        break;
1628     }
1629   }
1630 
1631   // Done
1632   return true;
1633 }
1634 #endif  // WEBRTC_WIN
1635 
1636 }  // namespace rtc
1637