1 /*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10 #include "rtc_base/physical_socket_server.h"
11
12 #if defined(_MSC_VER) && _MSC_VER < 1300
13 #pragma warning(disable : 4786)
14 #endif
15
16 #ifdef MEMORY_SANITIZER
17 #include <sanitizer/msan_interface.h>
18 #endif
19
20 #if defined(WEBRTC_POSIX)
21 #include <fcntl.h>
22 #include <string.h>
23 #if defined(WEBRTC_USE_EPOLL)
24 // "poll" will be used to wait for the signal dispatcher.
25 #include <poll.h>
26 #endif
27 #include <sys/ioctl.h>
28 #include <sys/select.h>
29 #include <sys/time.h>
30 #include <unistd.h>
31 #endif
32
33 #if defined(WEBRTC_WIN)
34 #include <windows.h>
35 #include <winsock2.h>
36 #include <ws2tcpip.h>
37 #undef SetPort
38 #endif
39
40 #include <errno.h>
41
42 #include <algorithm>
43 #include <map>
44
45 #include "rtc_base/arraysize.h"
46 #include "rtc_base/byte_order.h"
47 #include "rtc_base/checks.h"
48 #include "rtc_base/logging.h"
49 #include "rtc_base/network_monitor.h"
50 #include "rtc_base/null_socket_server.h"
51 #include "rtc_base/time_utils.h"
52
53 #if defined(WEBRTC_LINUX)
54 #include <linux/sockios.h>
55 #endif
56
57 #if defined(WEBRTC_WIN)
58 #define LAST_SYSTEM_ERROR (::GetLastError())
59 #elif defined(__native_client__) && __native_client__
60 #define LAST_SYSTEM_ERROR (0)
61 #elif defined(WEBRTC_POSIX)
62 #define LAST_SYSTEM_ERROR (errno)
63 #endif // WEBRTC_WIN
64
65 #if defined(WEBRTC_POSIX)
66 #include <netinet/tcp.h> // for TCP_NODELAY
67 #define IP_MTU 14 // Until this is integrated from linux/in.h to netinet/in.h
68 typedef void* SockOptArg;
69
70 #endif // WEBRTC_POSIX
71
72 #if defined(WEBRTC_POSIX) && !defined(WEBRTC_MAC) && !defined(WEBRTC_BSD) && !defined(__native_client__)
73 #if defined(WEBRTC_LINUX)
74 #include <linux/sockios.h>
75 #endif
76
GetSocketRecvTimestamp(int socket)77 int64_t GetSocketRecvTimestamp(int socket) {
78 struct timeval tv_ioctl;
79 int ret = ioctl(socket, SIOCGSTAMP, &tv_ioctl);
80 if (ret != 0)
81 return -1;
82 int64_t timestamp =
83 rtc::kNumMicrosecsPerSec * static_cast<int64_t>(tv_ioctl.tv_sec) +
84 static_cast<int64_t>(tv_ioctl.tv_usec);
85 return timestamp;
86 }
87
88 #else
89
GetSocketRecvTimestamp(int socket)90 int64_t GetSocketRecvTimestamp(int socket) {
91 return -1;
92 }
93 #endif
94
95 #if defined(WEBRTC_WIN)
96 typedef char* SockOptArg;
97 #endif
98
99 #if defined(WEBRTC_USE_EPOLL)
100 // POLLRDHUP / EPOLLRDHUP are only defined starting with Linux 2.6.17.
101 #if !defined(POLLRDHUP)
102 #define POLLRDHUP 0x2000
103 #endif
104 #if !defined(EPOLLRDHUP)
105 #define EPOLLRDHUP 0x2000
106 #endif
107 #endif
108
109 namespace {
110 class ScopedSetTrue {
111 public:
ScopedSetTrue(bool * value)112 ScopedSetTrue(bool* value) : value_(value) {
113 RTC_DCHECK(!*value_);
114 *value_ = true;
115 }
~ScopedSetTrue()116 ~ScopedSetTrue() { *value_ = false; }
117
118 private:
119 bool* value_;
120 };
121 } // namespace
122
123 namespace rtc {
124
CreateDefault()125 std::unique_ptr<SocketServer> SocketServer::CreateDefault() {
126 #if defined(__native_client__)
127 return std::unique_ptr<SocketServer>(new rtc::NullSocketServer);
128 #else
129 return std::unique_ptr<SocketServer>(new rtc::PhysicalSocketServer);
130 #endif
131 }
132
PhysicalSocket(PhysicalSocketServer * ss,SOCKET s)133 PhysicalSocket::PhysicalSocket(PhysicalSocketServer* ss, SOCKET s)
134 : ss_(ss),
135 s_(s),
136 error_(0),
137 state_((s == INVALID_SOCKET) ? CS_CLOSED : CS_CONNECTED),
138 resolver_(nullptr) {
139 if (s_ != INVALID_SOCKET) {
140 SetEnabledEvents(DE_READ | DE_WRITE);
141
142 int type = SOCK_STREAM;
143 socklen_t len = sizeof(type);
144 const int res =
145 getsockopt(s_, SOL_SOCKET, SO_TYPE, (SockOptArg)&type, &len);
146 RTC_DCHECK_EQ(0, res);
147 udp_ = (SOCK_DGRAM == type);
148 }
149 }
150
~PhysicalSocket()151 PhysicalSocket::~PhysicalSocket() {
152 Close();
153 }
154
Create(int family,int type)155 bool PhysicalSocket::Create(int family, int type) {
156 Close();
157 s_ = ::socket(family, type, 0);
158 udp_ = (SOCK_DGRAM == type);
159 family_ = family;
160 UpdateLastError();
161 if (udp_) {
162 SetEnabledEvents(DE_READ | DE_WRITE);
163 }
164 return s_ != INVALID_SOCKET;
165 }
166
GetLocalAddress() const167 SocketAddress PhysicalSocket::GetLocalAddress() const {
168 sockaddr_storage addr_storage = {};
169 socklen_t addrlen = sizeof(addr_storage);
170 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
171 int result = ::getsockname(s_, addr, &addrlen);
172 SocketAddress address;
173 if (result >= 0) {
174 SocketAddressFromSockAddrStorage(addr_storage, &address);
175 } else {
176 RTC_LOG(LS_WARNING) << "GetLocalAddress: unable to get local addr, socket="
177 << s_;
178 }
179 return address;
180 }
181
GetRemoteAddress() const182 SocketAddress PhysicalSocket::GetRemoteAddress() const {
183 sockaddr_storage addr_storage = {};
184 socklen_t addrlen = sizeof(addr_storage);
185 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
186 int result = ::getpeername(s_, addr, &addrlen);
187 SocketAddress address;
188 if (result >= 0) {
189 SocketAddressFromSockAddrStorage(addr_storage, &address);
190 } else {
191 RTC_LOG(LS_WARNING)
192 << "GetRemoteAddress: unable to get remote addr, socket=" << s_;
193 }
194 return address;
195 }
196
Bind(const SocketAddress & bind_addr)197 int PhysicalSocket::Bind(const SocketAddress& bind_addr) {
198 SocketAddress copied_bind_addr = bind_addr;
199 // If a network binder is available, use it to bind a socket to an interface
200 // instead of bind(), since this is more reliable on an OS with a weak host
201 // model.
202 if (ss_->network_binder() && !bind_addr.IsAnyIP()) {
203 NetworkBindingResult result =
204 ss_->network_binder()->BindSocketToNetwork(s_, bind_addr.ipaddr());
205 if (result == NetworkBindingResult::SUCCESS) {
206 // Since the network binder handled binding the socket to the desired
207 // network interface, we don't need to (and shouldn't) include an IP in
208 // the bind() call; bind() just needs to assign a port.
209 copied_bind_addr.SetIP(GetAnyIP(copied_bind_addr.ipaddr().family()));
210 } else if (result == NetworkBindingResult::NOT_IMPLEMENTED) {
211 RTC_LOG(LS_INFO) << "Can't bind socket to network because "
212 "network binding is not implemented for this OS.";
213 } else {
214 if (bind_addr.IsLoopbackIP()) {
215 // If we couldn't bind to a loopback IP (which should only happen in
216 // test scenarios), continue on. This may be expected behavior.
217 RTC_LOG(LS_VERBOSE) << "Binding socket to loopback address"
218 << " failed; result: " << static_cast<int>(result);
219 } else {
220 RTC_LOG(LS_WARNING) << "Binding socket to network address"
221 << " failed; result: " << static_cast<int>(result);
222 // If a network binding was attempted and failed, we should stop here
223 // and not try to use the socket. Otherwise, we may end up sending
224 // packets with an invalid source address.
225 // See: https://bugs.chromium.org/p/webrtc/issues/detail?id=7026
226 return -1;
227 }
228 }
229 }
230 sockaddr_storage addr_storage;
231 size_t len = copied_bind_addr.ToSockAddrStorage(&addr_storage);
232 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
233 int err = ::bind(s_, addr, static_cast<int>(len));
234 UpdateLastError();
235 #if !defined(NDEBUG)
236 if (0 == err) {
237 dbg_addr_ = "Bound @ ";
238 dbg_addr_.append(GetLocalAddress().ToString());
239 }
240 #endif
241 return err;
242 }
243
Connect(const SocketAddress & addr)244 int PhysicalSocket::Connect(const SocketAddress& addr) {
245 // TODO(pthatcher): Implicit creation is required to reconnect...
246 // ...but should we make it more explicit?
247 if (state_ != CS_CLOSED) {
248 SetError(EALREADY);
249 return SOCKET_ERROR;
250 }
251 if (addr.IsUnresolvedIP()) {
252 RTC_LOG(LS_VERBOSE) << "Resolving addr in PhysicalSocket::Connect";
253 resolver_ = new AsyncResolver();
254 resolver_->SignalDone.connect(this, &PhysicalSocket::OnResolveResult);
255 resolver_->Start(addr);
256 state_ = CS_CONNECTING;
257 return 0;
258 }
259
260 return DoConnect(addr);
261 }
262
DoConnect(const SocketAddress & connect_addr)263 int PhysicalSocket::DoConnect(const SocketAddress& connect_addr) {
264 if ((s_ == INVALID_SOCKET) && !Create(connect_addr.family(), SOCK_STREAM)) {
265 return SOCKET_ERROR;
266 }
267 sockaddr_storage addr_storage;
268 size_t len = connect_addr.ToSockAddrStorage(&addr_storage);
269 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
270 int err = ::connect(s_, addr, static_cast<int>(len));
271 UpdateLastError();
272 uint8_t events = DE_READ | DE_WRITE;
273 if (err == 0) {
274 state_ = CS_CONNECTED;
275 } else if (IsBlockingError(GetError())) {
276 state_ = CS_CONNECTING;
277 events |= DE_CONNECT;
278 } else {
279 return SOCKET_ERROR;
280 }
281
282 EnableEvents(events);
283 return 0;
284 }
285
GetError() const286 int PhysicalSocket::GetError() const {
287 CritScope cs(&crit_);
288 return error_;
289 }
290
SetError(int error)291 void PhysicalSocket::SetError(int error) {
292 CritScope cs(&crit_);
293 error_ = error;
294 }
295
GetState() const296 AsyncSocket::ConnState PhysicalSocket::GetState() const {
297 return state_;
298 }
299
GetOption(Option opt,int * value)300 int PhysicalSocket::GetOption(Option opt, int* value) {
301 int slevel;
302 int sopt;
303 if (TranslateOption(opt, &slevel, &sopt) == -1)
304 return -1;
305 socklen_t optlen = sizeof(*value);
306 int ret = ::getsockopt(s_, slevel, sopt, (SockOptArg)value, &optlen);
307 if (ret == -1) {
308 return -1;
309 }
310 if (opt == OPT_DONTFRAGMENT) {
311 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
312 *value = (*value != IP_PMTUDISC_DONT) ? 1 : 0;
313 #endif
314 } else if (opt == OPT_DSCP) {
315 #if defined(WEBRTC_POSIX)
316 // unshift DSCP value to get six most significant bits of IP DiffServ field
317 *value >>= 2;
318 #endif
319 }
320 return ret;
321 }
322
SetOption(Option opt,int value)323 int PhysicalSocket::SetOption(Option opt, int value) {
324 int slevel;
325 int sopt;
326 if (TranslateOption(opt, &slevel, &sopt) == -1)
327 return -1;
328 if (opt == OPT_DONTFRAGMENT) {
329 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
330 value = (value) ? IP_PMTUDISC_DO : IP_PMTUDISC_DONT;
331 #endif
332 } else if (opt == OPT_DSCP) {
333 #if defined(WEBRTC_POSIX)
334 // shift DSCP value to fit six most significant bits of IP DiffServ field
335 value <<= 2;
336 #endif
337 }
338 #if defined(WEBRTC_POSIX)
339 if (sopt == IPV6_TCLASS) {
340 // Set the IPv4 option in all cases to support dual-stack sockets.
341 ::setsockopt(s_, IPPROTO_IP, IP_TOS, (SockOptArg)&value, sizeof(value));
342 }
343 #endif
344 return ::setsockopt(s_, slevel, sopt, (SockOptArg)&value, sizeof(value));
345 }
346
Send(const void * pv,size_t cb)347 int PhysicalSocket::Send(const void* pv, size_t cb) {
348 int sent = DoSend(
349 s_, reinterpret_cast<const char*>(pv), static_cast<int>(cb),
350 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
351 // Suppress SIGPIPE. Without this, attempting to send on a socket whose
352 // other end is closed will result in a SIGPIPE signal being raised to
353 // our process, which by default will terminate the process, which we
354 // don't want. By specifying this flag, we'll just get the error EPIPE
355 // instead and can handle the error gracefully.
356 MSG_NOSIGNAL
357 #else
358 0
359 #endif
360 );
361 UpdateLastError();
362 MaybeRemapSendError();
363 // We have seen minidumps where this may be false.
364 RTC_DCHECK(sent <= static_cast<int>(cb));
365 if ((sent > 0 && sent < static_cast<int>(cb)) ||
366 (sent < 0 && IsBlockingError(GetError()))) {
367 EnableEvents(DE_WRITE);
368 }
369 return sent;
370 }
371
SendTo(const void * buffer,size_t length,const SocketAddress & addr)372 int PhysicalSocket::SendTo(const void* buffer,
373 size_t length,
374 const SocketAddress& addr) {
375 sockaddr_storage saddr;
376 size_t len = addr.ToSockAddrStorage(&saddr);
377 int sent =
378 DoSendTo(s_, static_cast<const char*>(buffer), static_cast<int>(length),
379 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
380 // Suppress SIGPIPE. See above for explanation.
381 MSG_NOSIGNAL,
382 #else
383 0,
384 #endif
385 reinterpret_cast<sockaddr*>(&saddr), static_cast<int>(len));
386 UpdateLastError();
387 MaybeRemapSendError();
388 // We have seen minidumps where this may be false.
389 RTC_DCHECK(sent <= static_cast<int>(length));
390 if ((sent > 0 && sent < static_cast<int>(length)) ||
391 (sent < 0 && IsBlockingError(GetError()))) {
392 EnableEvents(DE_WRITE);
393 }
394 return sent;
395 }
396
Recv(void * buffer,size_t length,int64_t * timestamp)397 int PhysicalSocket::Recv(void* buffer, size_t length, int64_t* timestamp) {
398 int received =
399 ::recv(s_, static_cast<char*>(buffer), static_cast<int>(length), 0);
400 if ((received == 0) && (length != 0)) {
401 // Note: on graceful shutdown, recv can return 0. In this case, we
402 // pretend it is blocking, and then signal close, so that simplifying
403 // assumptions can be made about Recv.
404 RTC_LOG(LS_WARNING) << "EOF from socket; deferring close event";
405 // Must turn this back on so that the select() loop will notice the close
406 // event.
407 EnableEvents(DE_READ);
408 SetError(EWOULDBLOCK);
409 return SOCKET_ERROR;
410 }
411 if (timestamp) {
412 *timestamp = GetSocketRecvTimestamp(s_);
413 }
414 UpdateLastError();
415 int error = GetError();
416 bool success = (received >= 0) || IsBlockingError(error);
417 if (udp_ || success) {
418 EnableEvents(DE_READ);
419 }
420 if (!success) {
421 RTC_LOG_F(LS_VERBOSE) << "Error = " << error;
422 }
423 return received;
424 }
425
RecvFrom(void * buffer,size_t length,SocketAddress * out_addr,int64_t * timestamp)426 int PhysicalSocket::RecvFrom(void* buffer,
427 size_t length,
428 SocketAddress* out_addr,
429 int64_t* timestamp) {
430 sockaddr_storage addr_storage;
431 socklen_t addr_len = sizeof(addr_storage);
432 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
433 int received = ::recvfrom(s_, static_cast<char*>(buffer),
434 static_cast<int>(length), 0, addr, &addr_len);
435 if (timestamp) {
436 *timestamp = GetSocketRecvTimestamp(s_);
437 }
438 UpdateLastError();
439 if ((received >= 0) && (out_addr != nullptr))
440 SocketAddressFromSockAddrStorage(addr_storage, out_addr);
441 int error = GetError();
442 bool success = (received >= 0) || IsBlockingError(error);
443 if (udp_ || success) {
444 EnableEvents(DE_READ);
445 }
446 if (!success) {
447 RTC_LOG_F(LS_VERBOSE) << "Error = " << error;
448 }
449 return received;
450 }
451
Listen(int backlog)452 int PhysicalSocket::Listen(int backlog) {
453 int err = ::listen(s_, backlog);
454 UpdateLastError();
455 if (err == 0) {
456 state_ = CS_CONNECTING;
457 EnableEvents(DE_ACCEPT);
458 #if !defined(NDEBUG)
459 dbg_addr_ = "Listening @ ";
460 dbg_addr_.append(GetLocalAddress().ToString());
461 #endif
462 }
463 return err;
464 }
465
Accept(SocketAddress * out_addr)466 AsyncSocket* PhysicalSocket::Accept(SocketAddress* out_addr) {
467 // Always re-subscribe DE_ACCEPT to make sure new incoming connections will
468 // trigger an event even if DoAccept returns an error here.
469 EnableEvents(DE_ACCEPT);
470 sockaddr_storage addr_storage;
471 socklen_t addr_len = sizeof(addr_storage);
472 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
473 SOCKET s = DoAccept(s_, addr, &addr_len);
474 UpdateLastError();
475 if (s == INVALID_SOCKET)
476 return nullptr;
477 if (out_addr != nullptr)
478 SocketAddressFromSockAddrStorage(addr_storage, out_addr);
479 return ss_->WrapSocket(s);
480 }
481
Close()482 int PhysicalSocket::Close() {
483 if (s_ == INVALID_SOCKET)
484 return 0;
485 int err = ::closesocket(s_);
486 UpdateLastError();
487 s_ = INVALID_SOCKET;
488 state_ = CS_CLOSED;
489 SetEnabledEvents(0);
490 if (resolver_) {
491 resolver_->Destroy(false);
492 resolver_ = nullptr;
493 }
494 return err;
495 }
496
DoAccept(SOCKET socket,sockaddr * addr,socklen_t * addrlen)497 SOCKET PhysicalSocket::DoAccept(SOCKET socket,
498 sockaddr* addr,
499 socklen_t* addrlen) {
500 return ::accept(socket, addr, addrlen);
501 }
502
DoSend(SOCKET socket,const char * buf,int len,int flags)503 int PhysicalSocket::DoSend(SOCKET socket, const char* buf, int len, int flags) {
504 return ::send(socket, buf, len, flags);
505 }
506
DoSendTo(SOCKET socket,const char * buf,int len,int flags,const struct sockaddr * dest_addr,socklen_t addrlen)507 int PhysicalSocket::DoSendTo(SOCKET socket,
508 const char* buf,
509 int len,
510 int flags,
511 const struct sockaddr* dest_addr,
512 socklen_t addrlen) {
513 return ::sendto(socket, buf, len, flags, dest_addr, addrlen);
514 }
515
OnResolveResult(AsyncResolverInterface * resolver)516 void PhysicalSocket::OnResolveResult(AsyncResolverInterface* resolver) {
517 if (resolver != resolver_) {
518 return;
519 }
520
521 int error = resolver_->GetError();
522 if (error == 0) {
523 error = DoConnect(resolver_->address());
524 } else {
525 Close();
526 }
527
528 if (error) {
529 SetError(error);
530 SignalCloseEvent(this, error);
531 }
532 }
533
UpdateLastError()534 void PhysicalSocket::UpdateLastError() {
535 SetError(LAST_SYSTEM_ERROR);
536 }
537
MaybeRemapSendError()538 void PhysicalSocket::MaybeRemapSendError() {
539 #if defined(WEBRTC_MAC)
540 // https://developer.apple.com/library/mac/documentation/Darwin/
541 // Reference/ManPages/man2/sendto.2.html
542 // ENOBUFS - The output queue for a network interface is full.
543 // This generally indicates that the interface has stopped sending,
544 // but may be caused by transient congestion.
545 if (GetError() == ENOBUFS) {
546 SetError(EWOULDBLOCK);
547 }
548 #endif
549 }
550
SetEnabledEvents(uint8_t events)551 void PhysicalSocket::SetEnabledEvents(uint8_t events) {
552 enabled_events_ = events;
553 }
554
EnableEvents(uint8_t events)555 void PhysicalSocket::EnableEvents(uint8_t events) {
556 enabled_events_ |= events;
557 }
558
DisableEvents(uint8_t events)559 void PhysicalSocket::DisableEvents(uint8_t events) {
560 enabled_events_ &= ~events;
561 }
562
TranslateOption(Option opt,int * slevel,int * sopt)563 int PhysicalSocket::TranslateOption(Option opt, int* slevel, int* sopt) {
564 switch (opt) {
565 case OPT_DONTFRAGMENT:
566 #if defined(WEBRTC_WIN)
567 *slevel = IPPROTO_IP;
568 *sopt = IP_DONTFRAGMENT;
569 break;
570 #elif defined(WEBRTC_MAC) || defined(WEBRTC_BSD) || defined(__native_client__)
571 RTC_LOG(LS_WARNING) << "Socket::OPT_DONTFRAGMENT not supported.";
572 return -1;
573 #elif defined(WEBRTC_POSIX)
574 *slevel = IPPROTO_IP;
575 *sopt = IP_MTU_DISCOVER;
576 break;
577 #endif
578 case OPT_RCVBUF:
579 *slevel = SOL_SOCKET;
580 *sopt = SO_RCVBUF;
581 break;
582 case OPT_SNDBUF:
583 *slevel = SOL_SOCKET;
584 *sopt = SO_SNDBUF;
585 break;
586 case OPT_NODELAY:
587 *slevel = IPPROTO_TCP;
588 *sopt = TCP_NODELAY;
589 break;
590 case OPT_DSCP:
591 #if defined(WEBRTC_POSIX)
592 if (family_ == AF_INET6) {
593 *slevel = IPPROTO_IPV6;
594 *sopt = IPV6_TCLASS;
595 } else {
596 *slevel = IPPROTO_IP;
597 *sopt = IP_TOS;
598 }
599 break;
600 #else
601 RTC_LOG(LS_WARNING) << "Socket::OPT_DSCP not supported.";
602 return -1;
603 #endif
604 case OPT_RTP_SENDTIME_EXTN_ID:
605 return -1; // No logging is necessary as this not a OS socket option.
606 default:
607 RTC_NOTREACHED();
608 return -1;
609 }
610 return 0;
611 }
612
SocketDispatcher(PhysicalSocketServer * ss)613 SocketDispatcher::SocketDispatcher(PhysicalSocketServer* ss)
614 #if defined(WEBRTC_WIN)
615 : PhysicalSocket(ss),
616 id_(0),
617 signal_close_(false)
618 #else
619 : PhysicalSocket(ss)
620 #endif
621 {
622 }
623
SocketDispatcher(SOCKET s,PhysicalSocketServer * ss)624 SocketDispatcher::SocketDispatcher(SOCKET s, PhysicalSocketServer* ss)
625 #if defined(WEBRTC_WIN)
626 : PhysicalSocket(ss, s),
627 id_(0),
628 signal_close_(false)
629 #else
630 : PhysicalSocket(ss, s)
631 #endif
632 {
633 }
634
~SocketDispatcher()635 SocketDispatcher::~SocketDispatcher() {
636 Close();
637 }
638
Initialize()639 bool SocketDispatcher::Initialize() {
640 RTC_DCHECK(s_ != INVALID_SOCKET);
641 // Must be a non-blocking
642 #if defined(WEBRTC_WIN)
643 u_long argp = 1;
644 ioctlsocket(s_, FIONBIO, &argp);
645 #elif defined(WEBRTC_POSIX)
646 fcntl(s_, F_SETFL, fcntl(s_, F_GETFL, 0) | O_NONBLOCK);
647 #endif
648 #if defined(WEBRTC_IOS)
649 // iOS may kill sockets when the app is moved to the background
650 // (specifically, if the app doesn't use the "voip" UIBackgroundMode). When
651 // we attempt to write to such a socket, SIGPIPE will be raised, which by
652 // default will terminate the process, which we don't want. By specifying
653 // this socket option, SIGPIPE will be disabled for the socket.
654 int value = 1;
655 ::setsockopt(s_, SOL_SOCKET, SO_NOSIGPIPE, &value, sizeof(value));
656 #endif
657 ss_->Add(this);
658 return true;
659 }
660
Create(int type)661 bool SocketDispatcher::Create(int type) {
662 return Create(AF_INET, type);
663 }
664
Create(int family,int type)665 bool SocketDispatcher::Create(int family, int type) {
666 // Change the socket to be non-blocking.
667 if (!PhysicalSocket::Create(family, type))
668 return false;
669
670 if (!Initialize())
671 return false;
672
673 #if defined(WEBRTC_WIN)
674 do {
675 id_ = ++next_id_;
676 } while (id_ == 0);
677 #endif
678 return true;
679 }
680
681 #if defined(WEBRTC_WIN)
682
GetWSAEvent()683 WSAEVENT SocketDispatcher::GetWSAEvent() {
684 return WSA_INVALID_EVENT;
685 }
686
GetSocket()687 SOCKET SocketDispatcher::GetSocket() {
688 return s_;
689 }
690
CheckSignalClose()691 bool SocketDispatcher::CheckSignalClose() {
692 if (!signal_close_)
693 return false;
694
695 char ch;
696 if (recv(s_, &ch, 1, MSG_PEEK) > 0)
697 return false;
698
699 state_ = CS_CLOSED;
700 signal_close_ = false;
701 SignalCloseEvent(this, signal_err_);
702 return true;
703 }
704
705 int SocketDispatcher::next_id_ = 0;
706
707 #elif defined(WEBRTC_POSIX)
708
GetDescriptor()709 int SocketDispatcher::GetDescriptor() {
710 return s_;
711 }
712
IsDescriptorClosed()713 bool SocketDispatcher::IsDescriptorClosed() {
714 if (udp_) {
715 // The MSG_PEEK trick doesn't work for UDP, since (at least in some
716 // circumstances) it requires reading an entire UDP packet, which would be
717 // bad for performance here. So, just check whether |s_| has been closed,
718 // which should be sufficient.
719 return s_ == INVALID_SOCKET;
720 }
721 // We don't have a reliable way of distinguishing end-of-stream
722 // from readability. So test on each readable call. Is this
723 // inefficient? Probably.
724 char ch;
725 ssize_t res = ::recv(s_, &ch, 1, MSG_PEEK);
726 if (res > 0) {
727 // Data available, so not closed.
728 return false;
729 } else if (res == 0) {
730 // EOF, so closed.
731 return true;
732 } else { // error
733 switch (errno) {
734 // Returned if we've already closed s_.
735 case EBADF:
736 // Returned during ungraceful peer shutdown.
737 case ECONNRESET:
738 return true;
739 // The normal blocking error; don't log anything.
740 case EWOULDBLOCK:
741 // Interrupted system call.
742 case EINTR:
743 return false;
744 default:
745 // Assume that all other errors are just blocking errors, meaning the
746 // connection is still good but we just can't read from it right now.
747 // This should only happen when connecting (and at most once), because
748 // in all other cases this function is only called if the file
749 // descriptor is already known to be in the readable state. However,
750 // it's not necessary a problem if we spuriously interpret a
751 // "connection lost"-type error as a blocking error, because typically
752 // the next recv() will get EOF, so we'll still eventually notice that
753 // the socket is closed.
754 RTC_LOG_ERR(LS_WARNING) << "Assuming benign blocking error";
755 return false;
756 }
757 }
758 }
759
760 #endif // WEBRTC_POSIX
761
GetRequestedEvents()762 uint32_t SocketDispatcher::GetRequestedEvents() {
763 return enabled_events();
764 }
765
OnPreEvent(uint32_t ff)766 void SocketDispatcher::OnPreEvent(uint32_t ff) {
767 if ((ff & DE_CONNECT) != 0)
768 state_ = CS_CONNECTED;
769
770 #if defined(WEBRTC_WIN)
771 // We set CS_CLOSED from CheckSignalClose.
772 #elif defined(WEBRTC_POSIX)
773 if ((ff & DE_CLOSE) != 0)
774 state_ = CS_CLOSED;
775 #endif
776 }
777
778 #if defined(WEBRTC_WIN)
779
OnEvent(uint32_t ff,int err)780 void SocketDispatcher::OnEvent(uint32_t ff, int err) {
781 int cache_id = id_;
782 // Make sure we deliver connect/accept first. Otherwise, consumers may see
783 // something like a READ followed by a CONNECT, which would be odd.
784 if (((ff & DE_CONNECT) != 0) && (id_ == cache_id)) {
785 if (ff != DE_CONNECT)
786 RTC_LOG(LS_VERBOSE) << "Signalled with DE_CONNECT: " << ff;
787 DisableEvents(DE_CONNECT);
788 #if !defined(NDEBUG)
789 dbg_addr_ = "Connected @ ";
790 dbg_addr_.append(GetRemoteAddress().ToString());
791 #endif
792 SignalConnectEvent(this);
793 }
794 if (((ff & DE_ACCEPT) != 0) && (id_ == cache_id)) {
795 DisableEvents(DE_ACCEPT);
796 SignalReadEvent(this);
797 }
798 if ((ff & DE_READ) != 0) {
799 DisableEvents(DE_READ);
800 SignalReadEvent(this);
801 }
802 if (((ff & DE_WRITE) != 0) && (id_ == cache_id)) {
803 DisableEvents(DE_WRITE);
804 SignalWriteEvent(this);
805 }
806 if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) {
807 signal_close_ = true;
808 signal_err_ = err;
809 }
810 }
811
812 #elif defined(WEBRTC_POSIX)
813
OnEvent(uint32_t ff,int err)814 void SocketDispatcher::OnEvent(uint32_t ff, int err) {
815 #if defined(WEBRTC_USE_EPOLL)
816 // Remember currently enabled events so we can combine multiple changes
817 // into one update call later.
818 // The signal handlers might re-enable events disabled here, so we can't
819 // keep a list of events to disable at the end of the method. This list
820 // would not be updated with the events enabled by the signal handlers.
821 StartBatchedEventUpdates();
822 #endif
823 // Make sure we deliver connect/accept first. Otherwise, consumers may see
824 // something like a READ followed by a CONNECT, which would be odd.
825 if ((ff & DE_CONNECT) != 0) {
826 DisableEvents(DE_CONNECT);
827 SignalConnectEvent(this);
828 }
829 if ((ff & DE_ACCEPT) != 0) {
830 DisableEvents(DE_ACCEPT);
831 SignalReadEvent(this);
832 }
833 if ((ff & DE_READ) != 0) {
834 DisableEvents(DE_READ);
835 SignalReadEvent(this);
836 }
837 if ((ff & DE_WRITE) != 0) {
838 DisableEvents(DE_WRITE);
839 SignalWriteEvent(this);
840 }
841 if ((ff & DE_CLOSE) != 0) {
842 // The socket is now dead to us, so stop checking it.
843 SetEnabledEvents(0);
844 SignalCloseEvent(this, err);
845 }
846 #if defined(WEBRTC_USE_EPOLL)
847 FinishBatchedEventUpdates();
848 #endif
849 }
850
851 #endif // WEBRTC_POSIX
852
853 #if defined(WEBRTC_USE_EPOLL)
854
GetEpollEvents(uint32_t ff)855 inline static int GetEpollEvents(uint32_t ff) {
856 int events = 0;
857 if (ff & (DE_READ | DE_ACCEPT)) {
858 events |= EPOLLIN;
859 }
860 if (ff & (DE_WRITE | DE_CONNECT)) {
861 events |= EPOLLOUT;
862 }
863 return events;
864 }
865
StartBatchedEventUpdates()866 void SocketDispatcher::StartBatchedEventUpdates() {
867 RTC_DCHECK_EQ(saved_enabled_events_, -1);
868 saved_enabled_events_ = enabled_events();
869 }
870
FinishBatchedEventUpdates()871 void SocketDispatcher::FinishBatchedEventUpdates() {
872 RTC_DCHECK_NE(saved_enabled_events_, -1);
873 uint8_t old_events = static_cast<uint8_t>(saved_enabled_events_);
874 saved_enabled_events_ = -1;
875 MaybeUpdateDispatcher(old_events);
876 }
877
MaybeUpdateDispatcher(uint8_t old_events)878 void SocketDispatcher::MaybeUpdateDispatcher(uint8_t old_events) {
879 if (GetEpollEvents(enabled_events()) != GetEpollEvents(old_events) &&
880 saved_enabled_events_ == -1) {
881 ss_->Update(this);
882 }
883 }
884
SetEnabledEvents(uint8_t events)885 void SocketDispatcher::SetEnabledEvents(uint8_t events) {
886 uint8_t old_events = enabled_events();
887 PhysicalSocket::SetEnabledEvents(events);
888 MaybeUpdateDispatcher(old_events);
889 }
890
EnableEvents(uint8_t events)891 void SocketDispatcher::EnableEvents(uint8_t events) {
892 uint8_t old_events = enabled_events();
893 PhysicalSocket::EnableEvents(events);
894 MaybeUpdateDispatcher(old_events);
895 }
896
DisableEvents(uint8_t events)897 void SocketDispatcher::DisableEvents(uint8_t events) {
898 uint8_t old_events = enabled_events();
899 PhysicalSocket::DisableEvents(events);
900 MaybeUpdateDispatcher(old_events);
901 }
902
903 #endif // WEBRTC_USE_EPOLL
904
Close()905 int SocketDispatcher::Close() {
906 if (s_ == INVALID_SOCKET)
907 return 0;
908
909 #if defined(WEBRTC_WIN)
910 id_ = 0;
911 signal_close_ = false;
912 #endif
913 #if defined(WEBRTC_USE_EPOLL)
914 // If we're batching events, the socket can be closed and reopened
915 // during the batch. Set saved_enabled_events_ to 0 here so the new
916 // socket, if any, has the correct old events bitfield
917 if (saved_enabled_events_ != -1) {
918 saved_enabled_events_ = 0;
919 }
920 #endif
921 ss_->Remove(this);
922 return PhysicalSocket::Close();
923 }
924
925 #if defined(WEBRTC_POSIX)
926 class EventDispatcher : public Dispatcher {
927 public:
EventDispatcher(PhysicalSocketServer * ss)928 EventDispatcher(PhysicalSocketServer* ss) : ss_(ss), fSignaled_(false) {
929 if (pipe(afd_) < 0)
930 RTC_LOG(LERROR) << "pipe failed";
931 ss_->Add(this);
932 }
933
~EventDispatcher()934 ~EventDispatcher() override {
935 ss_->Remove(this);
936 close(afd_[0]);
937 close(afd_[1]);
938 }
939
Signal()940 virtual void Signal() {
941 CritScope cs(&crit_);
942 if (!fSignaled_) {
943 const uint8_t b[1] = {0};
944 const ssize_t res = write(afd_[1], b, sizeof(b));
945 RTC_DCHECK_EQ(1, res);
946 fSignaled_ = true;
947 }
948 }
949
GetRequestedEvents()950 uint32_t GetRequestedEvents() override { return DE_READ; }
951
OnPreEvent(uint32_t ff)952 void OnPreEvent(uint32_t ff) override {
953 // It is not possible to perfectly emulate an auto-resetting event with
954 // pipes. This simulates it by resetting before the event is handled.
955
956 CritScope cs(&crit_);
957 if (fSignaled_) {
958 uint8_t b[4]; // Allow for reading more than 1 byte, but expect 1.
959 const ssize_t res = read(afd_[0], b, sizeof(b));
960 RTC_DCHECK_EQ(1, res);
961 fSignaled_ = false;
962 }
963 }
964
OnEvent(uint32_t ff,int err)965 void OnEvent(uint32_t ff, int err) override { RTC_NOTREACHED(); }
966
GetDescriptor()967 int GetDescriptor() override { return afd_[0]; }
968
IsDescriptorClosed()969 bool IsDescriptorClosed() override { return false; }
970
971 private:
972 PhysicalSocketServer* ss_;
973 int afd_[2];
974 bool fSignaled_;
975 RecursiveCriticalSection crit_;
976 };
977
978 #endif // WEBRTC_POSIX
979
980 #if defined(WEBRTC_WIN)
FlagsToEvents(uint32_t events)981 static uint32_t FlagsToEvents(uint32_t events) {
982 uint32_t ffFD = FD_CLOSE;
983 if (events & DE_READ)
984 ffFD |= FD_READ;
985 if (events & DE_WRITE)
986 ffFD |= FD_WRITE;
987 if (events & DE_CONNECT)
988 ffFD |= FD_CONNECT;
989 if (events & DE_ACCEPT)
990 ffFD |= FD_ACCEPT;
991 return ffFD;
992 }
993
994 class EventDispatcher : public Dispatcher {
995 public:
EventDispatcher(PhysicalSocketServer * ss)996 EventDispatcher(PhysicalSocketServer* ss) : ss_(ss) {
997 hev_ = WSACreateEvent();
998 if (hev_) {
999 ss_->Add(this);
1000 }
1001 }
1002
~EventDispatcher()1003 ~EventDispatcher() override {
1004 if (hev_ != nullptr) {
1005 ss_->Remove(this);
1006 WSACloseEvent(hev_);
1007 hev_ = nullptr;
1008 }
1009 }
1010
Signal()1011 virtual void Signal() {
1012 if (hev_ != nullptr)
1013 WSASetEvent(hev_);
1014 }
1015
GetRequestedEvents()1016 uint32_t GetRequestedEvents() override { return 0; }
1017
OnPreEvent(uint32_t ff)1018 void OnPreEvent(uint32_t ff) override { WSAResetEvent(hev_); }
1019
OnEvent(uint32_t ff,int err)1020 void OnEvent(uint32_t ff, int err) override {}
1021
GetWSAEvent()1022 WSAEVENT GetWSAEvent() override { return hev_; }
1023
GetSocket()1024 SOCKET GetSocket() override { return INVALID_SOCKET; }
1025
CheckSignalClose()1026 bool CheckSignalClose() override { return false; }
1027
1028 private:
1029 PhysicalSocketServer* ss_;
1030 WSAEVENT hev_;
1031 };
1032 #endif // WEBRTC_WIN
1033
1034 // Sets the value of a boolean value to false when signaled.
1035 class Signaler : public EventDispatcher {
1036 public:
Signaler(PhysicalSocketServer * ss,bool * pf)1037 Signaler(PhysicalSocketServer* ss, bool* pf) : EventDispatcher(ss), pf_(pf) {}
~Signaler()1038 ~Signaler() override {}
1039
OnEvent(uint32_t ff,int err)1040 void OnEvent(uint32_t ff, int err) override {
1041 if (pf_)
1042 *pf_ = false;
1043 }
1044
1045 private:
1046 bool* pf_;
1047 };
1048
PhysicalSocketServer()1049 PhysicalSocketServer::PhysicalSocketServer()
1050 :
1051 #if defined(WEBRTC_USE_EPOLL)
1052 // Since Linux 2.6.8, the size argument is ignored, but must be greater
1053 // than zero. Before that the size served as hint to the kernel for the
1054 // amount of space to initially allocate in internal data structures.
1055 epoll_fd_(epoll_create(FD_SETSIZE)),
1056 #endif
1057 #if defined(WEBRTC_WIN)
1058 socket_ev_(WSACreateEvent()),
1059 #endif
1060 fWait_(false) {
1061 #if defined(WEBRTC_USE_EPOLL)
1062 if (epoll_fd_ == -1) {
1063 // Not an error, will fall back to "select" below.
1064 RTC_LOG_E(LS_WARNING, EN, errno) << "epoll_create";
1065 // Note that -1 == INVALID_SOCKET, the alias used by later checks.
1066 }
1067 #endif
1068 signal_wakeup_ = new Signaler(this, &fWait_);
1069 }
1070
~PhysicalSocketServer()1071 PhysicalSocketServer::~PhysicalSocketServer() {
1072 #if defined(WEBRTC_WIN)
1073 WSACloseEvent(socket_ev_);
1074 #endif
1075 delete signal_wakeup_;
1076 #if defined(WEBRTC_USE_EPOLL)
1077 if (epoll_fd_ != INVALID_SOCKET) {
1078 close(epoll_fd_);
1079 }
1080 #endif
1081 RTC_DCHECK(dispatcher_by_key_.empty());
1082 RTC_DCHECK(key_by_dispatcher_.empty());
1083 }
1084
WakeUp()1085 void PhysicalSocketServer::WakeUp() {
1086 signal_wakeup_->Signal();
1087 }
1088
CreateSocket(int family,int type)1089 Socket* PhysicalSocketServer::CreateSocket(int family, int type) {
1090 PhysicalSocket* socket = new PhysicalSocket(this);
1091 if (socket->Create(family, type)) {
1092 return socket;
1093 } else {
1094 delete socket;
1095 return nullptr;
1096 }
1097 }
1098
CreateAsyncSocket(int family,int type)1099 AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int family, int type) {
1100 SocketDispatcher* dispatcher = new SocketDispatcher(this);
1101 if (dispatcher->Create(family, type)) {
1102 return dispatcher;
1103 } else {
1104 delete dispatcher;
1105 return nullptr;
1106 }
1107 }
1108
WrapSocket(SOCKET s)1109 AsyncSocket* PhysicalSocketServer::WrapSocket(SOCKET s) {
1110 SocketDispatcher* dispatcher = new SocketDispatcher(s, this);
1111 if (dispatcher->Initialize()) {
1112 return dispatcher;
1113 } else {
1114 delete dispatcher;
1115 return nullptr;
1116 }
1117 }
1118
Add(Dispatcher * pdispatcher)1119 void PhysicalSocketServer::Add(Dispatcher* pdispatcher) {
1120 CritScope cs(&crit_);
1121 if (key_by_dispatcher_.count(pdispatcher)) {
1122 RTC_LOG(LS_WARNING)
1123 << "PhysicalSocketServer asked to add a duplicate dispatcher.";
1124 return;
1125 }
1126 uint64_t key = next_dispatcher_key_++;
1127 dispatcher_by_key_.emplace(key, pdispatcher);
1128 key_by_dispatcher_.emplace(pdispatcher, key);
1129 #if defined(WEBRTC_USE_EPOLL)
1130 if (epoll_fd_ != INVALID_SOCKET) {
1131 AddEpoll(pdispatcher, key);
1132 }
1133 #endif // WEBRTC_USE_EPOLL
1134 }
1135
Remove(Dispatcher * pdispatcher)1136 void PhysicalSocketServer::Remove(Dispatcher* pdispatcher) {
1137 CritScope cs(&crit_);
1138 if (!key_by_dispatcher_.count(pdispatcher)) {
1139 RTC_LOG(LS_WARNING)
1140 << "PhysicalSocketServer asked to remove a unknown "
1141 "dispatcher, potentially from a duplicate call to Add.";
1142 return;
1143 }
1144 uint64_t key = key_by_dispatcher_.at(pdispatcher);
1145 key_by_dispatcher_.erase(pdispatcher);
1146 dispatcher_by_key_.erase(key);
1147 #if defined(WEBRTC_USE_EPOLL)
1148 if (epoll_fd_ != INVALID_SOCKET) {
1149 RemoveEpoll(pdispatcher);
1150 }
1151 #endif // WEBRTC_USE_EPOLL
1152 }
1153
Update(Dispatcher * pdispatcher)1154 void PhysicalSocketServer::Update(Dispatcher* pdispatcher) {
1155 #if defined(WEBRTC_USE_EPOLL)
1156 if (epoll_fd_ == INVALID_SOCKET) {
1157 return;
1158 }
1159
1160 // Don't update dispatchers that haven't yet been added.
1161 CritScope cs(&crit_);
1162 if (!key_by_dispatcher_.count(pdispatcher)) {
1163 return;
1164 }
1165
1166 UpdateEpoll(pdispatcher, key_by_dispatcher_.at(pdispatcher));
1167 #endif
1168 }
1169
1170 #if defined(WEBRTC_POSIX)
1171
Wait(int cmsWait,bool process_io)1172 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
1173 // We don't support reentrant waiting.
1174 RTC_DCHECK(!waiting_);
1175 ScopedSetTrue s(&waiting_);
1176 #if defined(WEBRTC_USE_EPOLL)
1177 // We don't keep a dedicated "epoll" descriptor containing only the non-IO
1178 // (i.e. signaling) dispatcher, so "poll" will be used instead of the default
1179 // "select" to support sockets larger than FD_SETSIZE.
1180 if (!process_io) {
1181 return WaitPoll(cmsWait, signal_wakeup_);
1182 } else if (epoll_fd_ != INVALID_SOCKET) {
1183 return WaitEpoll(cmsWait);
1184 }
1185 #endif
1186 return WaitSelect(cmsWait, process_io);
1187 }
1188
ProcessEvents(Dispatcher * dispatcher,bool readable,bool writable,bool check_error)1189 static void ProcessEvents(Dispatcher* dispatcher,
1190 bool readable,
1191 bool writable,
1192 bool check_error) {
1193 int errcode = 0;
1194 // TODO(pthatcher): Should we set errcode if getsockopt fails?
1195 if (check_error) {
1196 socklen_t len = sizeof(errcode);
1197 ::getsockopt(dispatcher->GetDescriptor(), SOL_SOCKET, SO_ERROR, &errcode,
1198 &len);
1199 }
1200
1201 // Most often the socket is writable or readable or both, so make a single
1202 // virtual call to get requested events
1203 const uint32_t requested_events = dispatcher->GetRequestedEvents();
1204 uint32_t ff = 0;
1205
1206 // Check readable descriptors. If we're waiting on an accept, signal
1207 // that. Otherwise we're waiting for data, check to see if we're
1208 // readable or really closed.
1209 // TODO(pthatcher): Only peek at TCP descriptors.
1210 if (readable) {
1211 if (requested_events & DE_ACCEPT) {
1212 ff |= DE_ACCEPT;
1213 } else if (errcode || dispatcher->IsDescriptorClosed()) {
1214 ff |= DE_CLOSE;
1215 } else {
1216 ff |= DE_READ;
1217 }
1218 }
1219
1220 // Check writable descriptors. If we're waiting on a connect, detect
1221 // success versus failure by the reaped error code.
1222 if (writable) {
1223 if (requested_events & DE_CONNECT) {
1224 if (!errcode) {
1225 ff |= DE_CONNECT;
1226 } else {
1227 ff |= DE_CLOSE;
1228 }
1229 } else {
1230 ff |= DE_WRITE;
1231 }
1232 }
1233
1234 // Tell the descriptor about the event.
1235 if (ff != 0) {
1236 dispatcher->OnPreEvent(ff);
1237 dispatcher->OnEvent(ff, errcode);
1238 }
1239 }
1240
WaitSelect(int cmsWait,bool process_io)1241 bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) {
1242 // Calculate timing information
1243
1244 struct timeval* ptvWait = nullptr;
1245 struct timeval tvWait;
1246 int64_t stop_us;
1247 if (cmsWait != kForever) {
1248 // Calculate wait timeval
1249 tvWait.tv_sec = cmsWait / 1000;
1250 tvWait.tv_usec = (cmsWait % 1000) * 1000;
1251 ptvWait = &tvWait;
1252
1253 // Calculate when to return
1254 stop_us = rtc::TimeMicros() + cmsWait * 1000;
1255 }
1256
1257
1258 fd_set fdsRead;
1259 fd_set fdsWrite;
1260 // Explicitly unpoison these FDs on MemorySanitizer which doesn't handle the
1261 // inline assembly in FD_ZERO.
1262 // http://crbug.com/344505
1263 #ifdef MEMORY_SANITIZER
1264 __msan_unpoison(&fdsRead, sizeof(fdsRead));
1265 __msan_unpoison(&fdsWrite, sizeof(fdsWrite));
1266 #endif
1267
1268 fWait_ = true;
1269
1270 while (fWait_) {
1271 // Zero all fd_sets. Although select() zeros the descriptors not signaled,
1272 // we may need to do this for dispatchers that were deleted while
1273 // iterating.
1274 FD_ZERO(&fdsRead);
1275 FD_ZERO(&fdsWrite);
1276 int fdmax = -1;
1277 {
1278 CritScope cr(&crit_);
1279 current_dispatcher_keys_.clear();
1280 for (auto const& kv : dispatcher_by_key_) {
1281 uint64_t key = kv.first;
1282 Dispatcher* pdispatcher = kv.second;
1283 // Query dispatchers for read and write wait state
1284 if (!process_io && (pdispatcher != signal_wakeup_))
1285 continue;
1286 current_dispatcher_keys_.push_back(key);
1287 int fd = pdispatcher->GetDescriptor();
1288 // "select"ing a file descriptor that is equal to or larger than
1289 // FD_SETSIZE will result in undefined behavior.
1290 RTC_CHECK_LT(fd, FD_SETSIZE);
1291 if (fd > fdmax)
1292 fdmax = fd;
1293
1294 uint32_t ff = pdispatcher->GetRequestedEvents();
1295 if (ff & (DE_READ | DE_ACCEPT))
1296 FD_SET(fd, &fdsRead);
1297 if (ff & (DE_WRITE | DE_CONNECT))
1298 FD_SET(fd, &fdsWrite);
1299 }
1300 }
1301
1302 // Wait then call handlers as appropriate
1303 // < 0 means error
1304 // 0 means timeout
1305 // > 0 means count of descriptors ready
1306 int n = select(fdmax + 1, &fdsRead, &fdsWrite, nullptr, ptvWait);
1307
1308 // If error, return error.
1309 if (n < 0) {
1310 if (errno != EINTR) {
1311 RTC_LOG_E(LS_ERROR, EN, errno) << "select";
1312 return false;
1313 }
1314 // Else ignore the error and keep going. If this EINTR was for one of the
1315 // signals managed by this PhysicalSocketServer, the
1316 // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1317 // iteration.
1318 } else if (n == 0) {
1319 // If timeout, return success
1320 return true;
1321 } else {
1322 // We have signaled descriptors
1323 CritScope cr(&crit_);
1324 // Iterate only on the dispatchers whose sockets were passed into
1325 // WSAEventSelect; this avoids the ABA problem (a socket being
1326 // destroyed and a new one created with the same file descriptor).
1327 for (uint64_t key : current_dispatcher_keys_) {
1328 if (!dispatcher_by_key_.count(key))
1329 continue;
1330 Dispatcher* pdispatcher = dispatcher_by_key_.at(key);
1331
1332 int fd = pdispatcher->GetDescriptor();
1333
1334 bool readable = FD_ISSET(fd, &fdsRead);
1335 if (readable) {
1336 FD_CLR(fd, &fdsRead);
1337 }
1338
1339 bool writable = FD_ISSET(fd, &fdsWrite);
1340 if (writable) {
1341 FD_CLR(fd, &fdsWrite);
1342 }
1343
1344 // The error code can be signaled through reads or writes.
1345 ProcessEvents(pdispatcher, readable, writable, readable || writable);
1346 }
1347 }
1348
1349 // Recalc the time remaining to wait. Doing it here means it doesn't get
1350 // calced twice the first time through the loop
1351 if (ptvWait) {
1352 ptvWait->tv_sec = 0;
1353 ptvWait->tv_usec = 0;
1354 int64_t time_left_us = stop_us - rtc::TimeMicros();
1355 if (time_left_us > 0) {
1356 ptvWait->tv_sec = time_left_us / rtc::kNumMicrosecsPerSec;
1357 ptvWait->tv_usec = time_left_us % rtc::kNumMicrosecsPerSec;
1358 }
1359 }
1360 }
1361
1362 return true;
1363 }
1364
1365 #if defined(WEBRTC_USE_EPOLL)
1366
AddEpoll(Dispatcher * pdispatcher,uint64_t key)1367 void PhysicalSocketServer::AddEpoll(Dispatcher* pdispatcher, uint64_t key) {
1368 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1369 int fd = pdispatcher->GetDescriptor();
1370 RTC_DCHECK(fd != INVALID_SOCKET);
1371 if (fd == INVALID_SOCKET) {
1372 return;
1373 }
1374
1375 struct epoll_event event = {0};
1376 event.events = GetEpollEvents(pdispatcher->GetRequestedEvents());
1377 event.data.u64 = key;
1378 int err = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event);
1379 RTC_DCHECK_EQ(err, 0);
1380 if (err == -1) {
1381 RTC_LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_ADD";
1382 }
1383 }
1384
RemoveEpoll(Dispatcher * pdispatcher)1385 void PhysicalSocketServer::RemoveEpoll(Dispatcher* pdispatcher) {
1386 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1387 int fd = pdispatcher->GetDescriptor();
1388 RTC_DCHECK(fd != INVALID_SOCKET);
1389 if (fd == INVALID_SOCKET) {
1390 return;
1391 }
1392
1393 struct epoll_event event = {0};
1394 int err = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, &event);
1395 RTC_DCHECK(err == 0 || errno == ENOENT);
1396 if (err == -1) {
1397 if (errno == ENOENT) {
1398 // Socket has already been closed.
1399 RTC_LOG_E(LS_VERBOSE, EN, errno) << "epoll_ctl EPOLL_CTL_DEL";
1400 } else {
1401 RTC_LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_DEL";
1402 }
1403 }
1404 }
1405
UpdateEpoll(Dispatcher * pdispatcher,uint64_t key)1406 void PhysicalSocketServer::UpdateEpoll(Dispatcher* pdispatcher, uint64_t key) {
1407 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1408 int fd = pdispatcher->GetDescriptor();
1409 RTC_DCHECK(fd != INVALID_SOCKET);
1410 if (fd == INVALID_SOCKET) {
1411 return;
1412 }
1413
1414 struct epoll_event event = {0};
1415 event.events = GetEpollEvents(pdispatcher->GetRequestedEvents());
1416 event.data.u64 = key;
1417 int err = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &event);
1418 RTC_DCHECK_EQ(err, 0);
1419 if (err == -1) {
1420 RTC_LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_MOD";
1421 }
1422 }
1423
WaitEpoll(int cmsWait)1424 bool PhysicalSocketServer::WaitEpoll(int cmsWait) {
1425 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1426 int64_t tvWait = -1;
1427 int64_t tvStop = -1;
1428 if (cmsWait != kForever) {
1429 tvWait = cmsWait;
1430 tvStop = TimeAfter(cmsWait);
1431 }
1432
1433 fWait_ = true;
1434 while (fWait_) {
1435 // Wait then call handlers as appropriate
1436 // < 0 means error
1437 // 0 means timeout
1438 // > 0 means count of descriptors ready
1439 int n = epoll_wait(epoll_fd_, epoll_events_.data(), epoll_events_.size(),
1440 static_cast<int>(tvWait));
1441 if (n < 0) {
1442 if (errno != EINTR) {
1443 RTC_LOG_E(LS_ERROR, EN, errno) << "epoll";
1444 return false;
1445 }
1446 // Else ignore the error and keep going. If this EINTR was for one of the
1447 // signals managed by this PhysicalSocketServer, the
1448 // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1449 // iteration.
1450 } else if (n == 0) {
1451 // If timeout, return success
1452 return true;
1453 } else {
1454 // We have signaled descriptors
1455 CritScope cr(&crit_);
1456 for (int i = 0; i < n; ++i) {
1457 const epoll_event& event = epoll_events_[i];
1458 uint64_t key = event.data.u64;
1459 if (!dispatcher_by_key_.count(key)) {
1460 // The dispatcher for this socket no longer exists.
1461 continue;
1462 }
1463 Dispatcher* pdispatcher = dispatcher_by_key_.at(key);
1464
1465 bool readable = (event.events & (EPOLLIN | EPOLLPRI));
1466 bool writable = (event.events & EPOLLOUT);
1467 bool check_error = (event.events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP));
1468
1469 ProcessEvents(pdispatcher, readable, writable, check_error);
1470 }
1471 }
1472
1473 if (cmsWait != kForever) {
1474 tvWait = TimeDiff(tvStop, TimeMillis());
1475 if (tvWait <= 0) {
1476 // Return success on timeout.
1477 return true;
1478 }
1479 }
1480 }
1481
1482 return true;
1483 }
1484
WaitPoll(int cmsWait,Dispatcher * dispatcher)1485 bool PhysicalSocketServer::WaitPoll(int cmsWait, Dispatcher* dispatcher) {
1486 RTC_DCHECK(dispatcher);
1487 int64_t tvWait = -1;
1488 int64_t tvStop = -1;
1489 if (cmsWait != kForever) {
1490 tvWait = cmsWait;
1491 tvStop = TimeAfter(cmsWait);
1492 }
1493
1494 fWait_ = true;
1495
1496 struct pollfd fds = {0};
1497 int fd = dispatcher->GetDescriptor();
1498 fds.fd = fd;
1499
1500 while (fWait_) {
1501 uint32_t ff = dispatcher->GetRequestedEvents();
1502 fds.events = 0;
1503 if (ff & (DE_READ | DE_ACCEPT)) {
1504 fds.events |= POLLIN;
1505 }
1506 if (ff & (DE_WRITE | DE_CONNECT)) {
1507 fds.events |= POLLOUT;
1508 }
1509 fds.revents = 0;
1510
1511 // Wait then call handlers as appropriate
1512 // < 0 means error
1513 // 0 means timeout
1514 // > 0 means count of descriptors ready
1515 int n = poll(&fds, 1, static_cast<int>(tvWait));
1516 if (n < 0) {
1517 if (errno != EINTR) {
1518 RTC_LOG_E(LS_ERROR, EN, errno) << "poll";
1519 return false;
1520 }
1521 // Else ignore the error and keep going. If this EINTR was for one of the
1522 // signals managed by this PhysicalSocketServer, the
1523 // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1524 // iteration.
1525 } else if (n == 0) {
1526 // If timeout, return success
1527 return true;
1528 } else {
1529 // We have signaled descriptors (should only be the passed dispatcher).
1530 RTC_DCHECK_EQ(n, 1);
1531 RTC_DCHECK_EQ(fds.fd, fd);
1532
1533 bool readable = (fds.revents & (POLLIN | POLLPRI));
1534 bool writable = (fds.revents & POLLOUT);
1535 bool check_error = (fds.revents & (POLLRDHUP | POLLERR | POLLHUP));
1536
1537 ProcessEvents(dispatcher, readable, writable, check_error);
1538 }
1539
1540 if (cmsWait != kForever) {
1541 tvWait = TimeDiff(tvStop, TimeMillis());
1542 if (tvWait < 0) {
1543 // Return success on timeout.
1544 return true;
1545 }
1546 }
1547 }
1548
1549 return true;
1550 }
1551
1552 #endif // WEBRTC_USE_EPOLL
1553
1554 #endif // WEBRTC_POSIX
1555
1556 #if defined(WEBRTC_WIN)
Wait(int cmsWait,bool process_io)1557 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
1558 // We don't support reentrant waiting.
1559 RTC_DCHECK(!waiting_);
1560 ScopedSetTrue s(&waiting_);
1561
1562 int64_t cmsTotal = cmsWait;
1563 int64_t cmsElapsed = 0;
1564 int64_t msStart = Time();
1565
1566 fWait_ = true;
1567 while (fWait_) {
1568 std::vector<WSAEVENT> events;
1569 std::vector<uint64_t> event_owners;
1570
1571 events.push_back(socket_ev_);
1572
1573 {
1574 CritScope cr(&crit_);
1575 // Get a snapshot of all current dispatchers; this is used to avoid the
1576 // ABA problem (see later comment) and avoids the dispatcher_by_key_
1577 // iterator being invalidated by calling CheckSignalClose, which may
1578 // remove the dispatcher from the list.
1579 current_dispatcher_keys_.clear();
1580 for (auto const& kv : dispatcher_by_key_) {
1581 current_dispatcher_keys_.push_back(kv.first);
1582 }
1583 for (uint64_t key : current_dispatcher_keys_) {
1584 if (!dispatcher_by_key_.count(key)) {
1585 continue;
1586 }
1587 Dispatcher* disp = dispatcher_by_key_.at(key);
1588 if (!disp)
1589 continue;
1590 if (!process_io && (disp != signal_wakeup_))
1591 continue;
1592 SOCKET s = disp->GetSocket();
1593 if (disp->CheckSignalClose()) {
1594 // We just signalled close, don't poll this socket.
1595 } else if (s != INVALID_SOCKET) {
1596 WSAEventSelect(s, events[0],
1597 FlagsToEvents(disp->GetRequestedEvents()));
1598 } else {
1599 events.push_back(disp->GetWSAEvent());
1600 event_owners.push_back(key);
1601 }
1602 }
1603 }
1604
1605 // Which is shorter, the delay wait or the asked wait?
1606
1607 int64_t cmsNext;
1608 if (cmsWait == kForever) {
1609 cmsNext = cmsWait;
1610 } else {
1611 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
1612 }
1613
1614 // Wait for one of the events to signal
1615 DWORD dw =
1616 WSAWaitForMultipleEvents(static_cast<DWORD>(events.size()), &events[0],
1617 false, static_cast<DWORD>(cmsNext), false);
1618
1619 if (dw == WSA_WAIT_FAILED) {
1620 // Failed?
1621 // TODO(pthatcher): need a better strategy than this!
1622 WSAGetLastError();
1623 RTC_NOTREACHED();
1624 return false;
1625 } else if (dw == WSA_WAIT_TIMEOUT) {
1626 // Timeout?
1627 return true;
1628 } else {
1629 // Figure out which one it is and call it
1630 CritScope cr(&crit_);
1631 int index = dw - WSA_WAIT_EVENT_0;
1632 if (index > 0) {
1633 --index; // The first event is the socket event
1634 uint64_t key = event_owners[index];
1635 if (!dispatcher_by_key_.count(key)) {
1636 // The dispatcher could have been removed while waiting for events.
1637 continue;
1638 }
1639 Dispatcher* disp = dispatcher_by_key_.at(key);
1640 disp->OnPreEvent(0);
1641 disp->OnEvent(0, 0);
1642 } else if (process_io) {
1643 // Iterate only on the dispatchers whose sockets were passed into
1644 // WSAEventSelect; this avoids the ABA problem (a socket being
1645 // destroyed and a new one created with the same SOCKET handle).
1646 for (uint64_t key : current_dispatcher_keys_) {
1647 if (!dispatcher_by_key_.count(key)) {
1648 continue;
1649 }
1650 Dispatcher* disp = dispatcher_by_key_.at(key);
1651 SOCKET s = disp->GetSocket();
1652 if (s == INVALID_SOCKET)
1653 continue;
1654
1655 WSANETWORKEVENTS wsaEvents;
1656 int err = WSAEnumNetworkEvents(s, events[0], &wsaEvents);
1657 if (err == 0) {
1658 {
1659 if ((wsaEvents.lNetworkEvents & FD_READ) &&
1660 wsaEvents.iErrorCode[FD_READ_BIT] != 0) {
1661 RTC_LOG(WARNING)
1662 << "PhysicalSocketServer got FD_READ_BIT error "
1663 << wsaEvents.iErrorCode[FD_READ_BIT];
1664 }
1665 if ((wsaEvents.lNetworkEvents & FD_WRITE) &&
1666 wsaEvents.iErrorCode[FD_WRITE_BIT] != 0) {
1667 RTC_LOG(WARNING)
1668 << "PhysicalSocketServer got FD_WRITE_BIT error "
1669 << wsaEvents.iErrorCode[FD_WRITE_BIT];
1670 }
1671 if ((wsaEvents.lNetworkEvents & FD_CONNECT) &&
1672 wsaEvents.iErrorCode[FD_CONNECT_BIT] != 0) {
1673 RTC_LOG(WARNING)
1674 << "PhysicalSocketServer got FD_CONNECT_BIT error "
1675 << wsaEvents.iErrorCode[FD_CONNECT_BIT];
1676 }
1677 if ((wsaEvents.lNetworkEvents & FD_ACCEPT) &&
1678 wsaEvents.iErrorCode[FD_ACCEPT_BIT] != 0) {
1679 RTC_LOG(WARNING)
1680 << "PhysicalSocketServer got FD_ACCEPT_BIT error "
1681 << wsaEvents.iErrorCode[FD_ACCEPT_BIT];
1682 }
1683 if ((wsaEvents.lNetworkEvents & FD_CLOSE) &&
1684 wsaEvents.iErrorCode[FD_CLOSE_BIT] != 0) {
1685 RTC_LOG(WARNING)
1686 << "PhysicalSocketServer got FD_CLOSE_BIT error "
1687 << wsaEvents.iErrorCode[FD_CLOSE_BIT];
1688 }
1689 }
1690 uint32_t ff = 0;
1691 int errcode = 0;
1692 if (wsaEvents.lNetworkEvents & FD_READ)
1693 ff |= DE_READ;
1694 if (wsaEvents.lNetworkEvents & FD_WRITE)
1695 ff |= DE_WRITE;
1696 if (wsaEvents.lNetworkEvents & FD_CONNECT) {
1697 if (wsaEvents.iErrorCode[FD_CONNECT_BIT] == 0) {
1698 ff |= DE_CONNECT;
1699 } else {
1700 ff |= DE_CLOSE;
1701 errcode = wsaEvents.iErrorCode[FD_CONNECT_BIT];
1702 }
1703 }
1704 if (wsaEvents.lNetworkEvents & FD_ACCEPT)
1705 ff |= DE_ACCEPT;
1706 if (wsaEvents.lNetworkEvents & FD_CLOSE) {
1707 ff |= DE_CLOSE;
1708 errcode = wsaEvents.iErrorCode[FD_CLOSE_BIT];
1709 }
1710 if (ff != 0) {
1711 disp->OnPreEvent(ff);
1712 disp->OnEvent(ff, errcode);
1713 }
1714 }
1715 }
1716 }
1717
1718 // Reset the network event until new activity occurs
1719 WSAResetEvent(socket_ev_);
1720 }
1721
1722 // Break?
1723 if (!fWait_)
1724 break;
1725 cmsElapsed = TimeSince(msStart);
1726 if ((cmsWait != kForever) && (cmsElapsed >= cmsWait)) {
1727 break;
1728 }
1729 }
1730
1731 // Done
1732 return true;
1733 }
1734 #endif // WEBRTC_WIN
1735
1736 } // namespace rtc
1737