1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/socket/udp_socket_posix.h"
6
7 #include <errno.h>
8 #include <fcntl.h>
9 #include <net/if.h>
10 #include <netdb.h>
11 #include <netinet/in.h>
12 #include <sys/ioctl.h>
13
14 #include "base/bind.h"
15 #include "base/callback.h"
16 #include "base/callback_helpers.h"
17 #include "base/containers/stack_container.h"
18 #include "base/debug/alias.h"
19 #include "base/files/file_util.h"
20 #include "base/logging.h"
21 #include "base/message_loop/message_loop_current.h"
22 #include "base/metrics/histogram_functions.h"
23 #include "base/posix/eintr_wrapper.h"
24 #include "base/rand_util.h"
25 #include "base/task/post_task.h"
26 #include "base/task/thread_pool.h"
27 #include "base/task_runner_util.h"
28 #include "base/trace_event/trace_event.h"
29 #include "build/build_config.h"
30 #include "net/base/io_buffer.h"
31 #include "net/base/ip_address.h"
32 #include "net/base/ip_endpoint.h"
33 #include "net/base/net_errors.h"
34 #include "net/base/network_activity_monitor.h"
35 #include "net/base/sockaddr_storage.h"
36 #include "net/base/trace_constants.h"
37 #include "net/log/net_log.h"
38 #include "net/log/net_log_event_type.h"
39 #include "net/log/net_log_source.h"
40 #include "net/log/net_log_source_type.h"
41 #include "net/socket/socket_descriptor.h"
42 #include "net/socket/socket_options.h"
43 #include "net/socket/socket_tag.h"
44 #include "net/socket/udp_net_log_parameters.h"
45 #include "net/traffic_annotation/network_traffic_annotation.h"
46
47 #if defined(OS_ANDROID)
48 #include <dlfcn.h>
49 #include "base/android/build_info.h"
50 #include "base/native_library.h"
51 #include "base/strings/utf_string_conversions.h"
52 #endif // defined(OS_ANDROID)
53
54 #if defined(OS_MACOSX) && !defined(OS_IOS)
55 // This was needed to debug crbug.com/640281.
56 // TODO(zhongyi): Remove once the bug is resolved.
57 #include <dlfcn.h>
58 #include <pthread.h>
59 #endif // defined(OS_MACOSX) && !defined(OS_IOS)
60
61 namespace net {
62
63 namespace {
64
65 const int kBindRetries = 10;
66 const int kPortStart = 1024;
67 const int kPortEnd = 65535;
68 const int kActivityMonitorBytesThreshold = 65535;
69 const int kActivityMonitorMinimumSamplesForThroughputEstimate = 2;
70 const base::TimeDelta kActivityMonitorMsThreshold =
71 base::TimeDelta::FromMilliseconds(100);
72
73 #if defined(OS_MACOSX) || defined(OS_BSD)
74 // When enabling multicast using setsockopt(IP_MULTICAST_IF) MacOS
75 // requires passing IPv4 address instead of interface index. This function
76 // resolves IPv4 address by interface index. The |address| is returned in
77 // network order.
GetIPv4AddressFromIndex(int socket,uint32_t index,uint32_t * address)78 int GetIPv4AddressFromIndex(int socket, uint32_t index, uint32_t* address) {
79 if (!index) {
80 *address = htonl(INADDR_ANY);
81 return OK;
82 }
83
84 sockaddr_in* result = nullptr;
85
86 ifreq ifr;
87 ifr.ifr_addr.sa_family = AF_INET;
88 if (!if_indextoname(index, ifr.ifr_name))
89 return MapSystemError(errno);
90 int rv = ioctl(socket, SIOCGIFADDR, &ifr);
91 if (rv == -1)
92 return MapSystemError(errno);
93 result = reinterpret_cast<sockaddr_in*>(&ifr.ifr_addr);
94
95 if (!result)
96 return ERR_ADDRESS_INVALID;
97
98 *address = result->sin_addr.s_addr;
99 return OK;
100 }
101
102 #endif // OS_MACOSX || OS_BSD
103
104 #if defined(OS_MACOSX) && !defined(OS_IOS)
105
106 // On OSX the file descriptor is guarded to detect the cause of
107 // crbug.com/640281. guarded API is supported only on newer versions of OSX,
108 // so the symbols need to be resolved dynamically.
109 // TODO(zhongyi): Removed this code once the bug is resolved.
110
111 typedef uint64_t guardid_t;
112
113 typedef int (*GuardedCloseNpFunction)(int fd, const guardid_t* guard);
114 typedef int (*ChangeFdguardNpFunction)(int fd,
115 const guardid_t* guard,
116 u_int flags,
117 const guardid_t* nguard,
118 u_int nflags,
119 int* fdflagsp);
120
121 GuardedCloseNpFunction g_guarded_close_np = nullptr;
122 ChangeFdguardNpFunction g_change_fdguard_np = nullptr;
123
124 pthread_once_t g_guarded_functions_once = PTHREAD_ONCE_INIT;
125
InitGuardedFunctions()126 void InitGuardedFunctions() {
127 void* libsystem_handle =
128 dlopen("/usr/lib/libSystem.dylib", RTLD_LAZY | RTLD_LOCAL | RTLD_NOLOAD);
129 if (libsystem_handle) {
130 g_guarded_close_np = reinterpret_cast<GuardedCloseNpFunction>(
131 dlsym(libsystem_handle, "guarded_close_np"));
132 g_change_fdguard_np = reinterpret_cast<ChangeFdguardNpFunction>(
133 dlsym(libsystem_handle, "change_fdguard_np"));
134
135 // If for any reason only one of the functions is found, set both of them to
136 // nullptr.
137 if (!g_guarded_close_np || !g_change_fdguard_np) {
138 g_guarded_close_np = nullptr;
139 g_change_fdguard_np = nullptr;
140 }
141 }
142 }
143
change_fdguard_np(int fd,const guardid_t * guard,u_int flags,const guardid_t * nguard,u_int nflags,int * fdflagsp)144 int change_fdguard_np(int fd,
145 const guardid_t* guard,
146 u_int flags,
147 const guardid_t* nguard,
148 u_int nflags,
149 int* fdflagsp) {
150 CHECK_EQ(pthread_once(&g_guarded_functions_once, InitGuardedFunctions), 0);
151 // Older version of OSX may not support guarded API.
152 if (!g_change_fdguard_np)
153 return 0;
154 return g_change_fdguard_np(fd, guard, flags, nguard, nflags, fdflagsp);
155 }
156
guarded_close_np(int fd,const guardid_t * guard)157 int guarded_close_np(int fd, const guardid_t* guard) {
158 // Older version of OSX may not support guarded API.
159 if (!g_guarded_close_np)
160 return close(fd);
161
162 return g_guarded_close_np(fd, guard);
163 }
164
165 const unsigned int GUARD_CLOSE = 1u << 0;
166 const unsigned int GUARD_DUP = 1u << 1;
167
168 const guardid_t kSocketFdGuard = 0xD712BC0BC9A4EAD4;
169
170 #endif // defined(OS_MACOSX) && !defined(OS_IOS)
171
GetSocketFDHash(int fd)172 int GetSocketFDHash(int fd) {
173 return fd ^ 1595649551;
174 }
175
176 } // namespace
177
UDPSocketPosix(DatagramSocket::BindType bind_type,net::NetLog * net_log,const net::NetLogSource & source)178 UDPSocketPosix::UDPSocketPosix(DatagramSocket::BindType bind_type,
179 net::NetLog* net_log,
180 const net::NetLogSource& source)
181 : write_async_watcher_(std::make_unique<WriteAsyncWatcher>(this)),
182 sender_(new UDPSocketPosixSender()),
183 socket_(kInvalidSocket),
184 socket_hash_(0),
185 addr_family_(0),
186 is_connected_(false),
187 socket_options_(SOCKET_OPTION_MULTICAST_LOOP),
188 sendto_flags_(0),
189 multicast_interface_(0),
190 multicast_time_to_live_(1),
191 bind_type_(bind_type),
192 read_socket_watcher_(FROM_HERE),
193 write_socket_watcher_(FROM_HERE),
194 read_watcher_(this),
195 write_watcher_(this),
196 last_async_result_(0),
197 write_async_timer_running_(false),
198 write_async_outstanding_(0),
199 read_buf_len_(0),
200 recv_from_address_(NULL),
201 write_buf_len_(0),
202 net_log_(NetLogWithSource::Make(net_log, NetLogSourceType::UDP_SOCKET)),
203 bound_network_(NetworkChangeNotifier::kInvalidNetworkHandle),
204 experimental_recv_optimization_enabled_(false) {
205 net_log_.BeginEventReferencingSource(NetLogEventType::SOCKET_ALIVE, source);
206 }
207
~UDPSocketPosix()208 UDPSocketPosix::~UDPSocketPosix() {
209 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
210 Close();
211 net_log_.EndEvent(NetLogEventType::SOCKET_ALIVE);
212 }
213
Open(AddressFamily address_family)214 int UDPSocketPosix::Open(AddressFamily address_family) {
215 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
216 DCHECK_EQ(socket_, kInvalidSocket);
217
218 auto owned_socket_count = TryAcquireGlobalUDPSocketCount();
219 if (owned_socket_count.empty())
220 return ERR_INSUFFICIENT_RESOURCES;
221
222 addr_family_ = ConvertAddressFamily(address_family);
223 socket_ = CreatePlatformSocket(addr_family_, SOCK_DGRAM, 0);
224 if (socket_ == kInvalidSocket)
225 return MapSystemError(errno);
226 #if defined(OS_MACOSX) && !defined(OS_IOS)
227 PCHECK(change_fdguard_np(socket_, NULL, 0, &kSocketFdGuard,
228 GUARD_CLOSE | GUARD_DUP, NULL) == 0);
229 #endif // defined(OS_MACOSX) && !defined(OS_IOS)
230 socket_hash_ = GetSocketFDHash(socket_);
231 if (!base::SetNonBlocking(socket_)) {
232 const int err = MapSystemError(errno);
233 Close();
234 return err;
235 }
236 if (tag_ != SocketTag())
237 tag_.Apply(socket_);
238
239 owned_socket_count_ = std::move(owned_socket_count);
240 return OK;
241 }
242
Increment(uint32_t bytes)243 void UDPSocketPosix::ActivityMonitor::Increment(uint32_t bytes) {
244 if (!bytes)
245 return;
246 bool timer_running = timer_.IsRunning();
247 bytes_ += bytes;
248 increments_++;
249 // Allow initial updates to make sure throughput estimator has
250 // enough samples to generate a value. (low water mark)
251 // Or once the bytes threshold has be met. (high water mark)
252 if (increments_ < kActivityMonitorMinimumSamplesForThroughputEstimate ||
253 bytes_ > kActivityMonitorBytesThreshold) {
254 Update();
255 if (timer_running)
256 timer_.Reset();
257 }
258 if (!timer_running) {
259 timer_.Start(FROM_HERE, kActivityMonitorMsThreshold, this,
260 &UDPSocketPosix::ActivityMonitor::OnTimerFired);
261 }
262 }
263
Update()264 void UDPSocketPosix::ActivityMonitor::Update() {
265 if (!bytes_)
266 return;
267 NetworkActivityMonitorIncrement(bytes_);
268 bytes_ = 0;
269 }
270
OnClose()271 void UDPSocketPosix::ActivityMonitor::OnClose() {
272 timer_.Stop();
273 Update();
274 }
275
OnTimerFired()276 void UDPSocketPosix::ActivityMonitor::OnTimerFired() {
277 increments_ = 0;
278 if (!bytes_) {
279 // Can happen if the socket has been idle and have had no
280 // increments since the timer previously fired. Don't bother
281 // keeping the timer running in this case.
282 timer_.Stop();
283 return;
284 }
285 Update();
286 }
287
NetworkActivityMonitorIncrement(uint32_t bytes)288 void UDPSocketPosix::SentActivityMonitor::NetworkActivityMonitorIncrement(
289 uint32_t bytes) {
290 NetworkActivityMonitor::GetInstance()->IncrementBytesSent(bytes);
291 }
292
NetworkActivityMonitorIncrement(uint32_t bytes)293 void UDPSocketPosix::ReceivedActivityMonitor::NetworkActivityMonitorIncrement(
294 uint32_t bytes) {
295 NetworkActivityMonitor::GetInstance()->IncrementBytesReceived(bytes);
296 }
297
Close()298 void UDPSocketPosix::Close() {
299 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
300
301 owned_socket_count_.Reset();
302
303 if (socket_ == kInvalidSocket)
304 return;
305
306 // Zero out any pending read/write callback state.
307 read_buf_.reset();
308 read_buf_len_ = 0;
309 read_callback_.Reset();
310 recv_from_address_ = NULL;
311 write_buf_.reset();
312 write_buf_len_ = 0;
313 write_callback_.Reset();
314 send_to_address_.reset();
315
316 bool ok = read_socket_watcher_.StopWatchingFileDescriptor();
317 DCHECK(ok);
318 ok = write_socket_watcher_.StopWatchingFileDescriptor();
319 DCHECK(ok);
320
321 // Verify that |socket_| hasn't been corrupted. Needed to debug
322 // crbug.com/906005.
323 CHECK_EQ(socket_hash_, GetSocketFDHash(socket_));
324 #if defined(OS_MACOSX) && !defined(OS_IOS)
325 PCHECK(IGNORE_EINTR(guarded_close_np(socket_, &kSocketFdGuard)) == 0);
326 #else
327 PCHECK(IGNORE_EINTR(close(socket_)) == 0);
328 #endif // defined(OS_MACOSX) && !defined(OS_IOS)
329
330 socket_ = kInvalidSocket;
331 addr_family_ = 0;
332 is_connected_ = false;
333 tag_ = SocketTag();
334
335 write_async_timer_.Stop();
336 sent_activity_monitor_.OnClose();
337 received_activity_monitor_.OnClose();
338 }
339
GetPeerAddress(IPEndPoint * address) const340 int UDPSocketPosix::GetPeerAddress(IPEndPoint* address) const {
341 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
342 DCHECK(address);
343 if (!is_connected())
344 return ERR_SOCKET_NOT_CONNECTED;
345
346 if (!remote_address_.get()) {
347 SockaddrStorage storage;
348 if (getpeername(socket_, storage.addr, &storage.addr_len))
349 return MapSystemError(errno);
350 std::unique_ptr<IPEndPoint> address(new IPEndPoint());
351 if (!address->FromSockAddr(storage.addr, storage.addr_len))
352 return ERR_ADDRESS_INVALID;
353 remote_address_ = std::move(address);
354 }
355
356 *address = *remote_address_;
357 return OK;
358 }
359
GetLocalAddress(IPEndPoint * address) const360 int UDPSocketPosix::GetLocalAddress(IPEndPoint* address) const {
361 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
362 DCHECK(address);
363 if (!is_connected())
364 return ERR_SOCKET_NOT_CONNECTED;
365
366 if (!local_address_.get()) {
367 SockaddrStorage storage;
368 if (getsockname(socket_, storage.addr, &storage.addr_len))
369 return MapSystemError(errno);
370 std::unique_ptr<IPEndPoint> address(new IPEndPoint());
371 if (!address->FromSockAddr(storage.addr, storage.addr_len))
372 return ERR_ADDRESS_INVALID;
373 local_address_ = std::move(address);
374 net_log_.AddEvent(NetLogEventType::UDP_LOCAL_ADDRESS, [&] {
375 return CreateNetLogUDPConnectParams(*local_address_, bound_network_);
376 });
377 }
378
379 *address = *local_address_;
380 return OK;
381 }
382
Read(IOBuffer * buf,int buf_len,CompletionOnceCallback callback)383 int UDPSocketPosix::Read(IOBuffer* buf,
384 int buf_len,
385 CompletionOnceCallback callback) {
386 return RecvFrom(buf, buf_len, NULL, std::move(callback));
387 }
388
RecvFrom(IOBuffer * buf,int buf_len,IPEndPoint * address,CompletionOnceCallback callback)389 int UDPSocketPosix::RecvFrom(IOBuffer* buf,
390 int buf_len,
391 IPEndPoint* address,
392 CompletionOnceCallback callback) {
393 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
394 DCHECK_NE(kInvalidSocket, socket_);
395 CHECK(read_callback_.is_null());
396 DCHECK(!recv_from_address_);
397 DCHECK(!callback.is_null()); // Synchronous operation not supported
398 DCHECK_GT(buf_len, 0);
399
400 int nread = InternalRecvFrom(buf, buf_len, address);
401 if (nread != ERR_IO_PENDING)
402 return nread;
403
404 if (!base::MessageLoopCurrentForIO::Get()->WatchFileDescriptor(
405 socket_, true, base::MessagePumpForIO::WATCH_READ,
406 &read_socket_watcher_, &read_watcher_)) {
407 PLOG(ERROR) << "WatchFileDescriptor failed on read";
408 int result = MapSystemError(errno);
409 LogRead(result, NULL, 0, NULL);
410 return result;
411 }
412
413 read_buf_ = buf;
414 read_buf_len_ = buf_len;
415 recv_from_address_ = address;
416 read_callback_ = std::move(callback);
417 return ERR_IO_PENDING;
418 }
419
Write(IOBuffer * buf,int buf_len,CompletionOnceCallback callback,const NetworkTrafficAnnotationTag & traffic_annotation)420 int UDPSocketPosix::Write(
421 IOBuffer* buf,
422 int buf_len,
423 CompletionOnceCallback callback,
424 const NetworkTrafficAnnotationTag& traffic_annotation) {
425 return SendToOrWrite(buf, buf_len, NULL, std::move(callback));
426 }
427
SendTo(IOBuffer * buf,int buf_len,const IPEndPoint & address,CompletionOnceCallback callback)428 int UDPSocketPosix::SendTo(IOBuffer* buf,
429 int buf_len,
430 const IPEndPoint& address,
431 CompletionOnceCallback callback) {
432 return SendToOrWrite(buf, buf_len, &address, std::move(callback));
433 }
434
SendToOrWrite(IOBuffer * buf,int buf_len,const IPEndPoint * address,CompletionOnceCallback callback)435 int UDPSocketPosix::SendToOrWrite(IOBuffer* buf,
436 int buf_len,
437 const IPEndPoint* address,
438 CompletionOnceCallback callback) {
439 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
440 DCHECK_NE(kInvalidSocket, socket_);
441 CHECK(write_callback_.is_null());
442 DCHECK(!callback.is_null()); // Synchronous operation not supported
443 DCHECK_GT(buf_len, 0);
444
445 int result = InternalSendTo(buf, buf_len, address);
446 if (result != ERR_IO_PENDING)
447 return result;
448
449 if (!base::MessageLoopCurrentForIO::Get()->WatchFileDescriptor(
450 socket_, true, base::MessagePumpForIO::WATCH_WRITE,
451 &write_socket_watcher_, &write_watcher_)) {
452 DVPLOG(1) << "WatchFileDescriptor failed on write";
453 int result = MapSystemError(errno);
454 LogWrite(result, NULL, NULL);
455 return result;
456 }
457
458 write_buf_ = buf;
459 write_buf_len_ = buf_len;
460 DCHECK(!send_to_address_.get());
461 if (address) {
462 send_to_address_.reset(new IPEndPoint(*address));
463 }
464 write_callback_ = std::move(callback);
465 return ERR_IO_PENDING;
466 }
467
Connect(const IPEndPoint & address)468 int UDPSocketPosix::Connect(const IPEndPoint& address) {
469 DCHECK_NE(socket_, kInvalidSocket);
470 net_log_.BeginEvent(NetLogEventType::UDP_CONNECT, [&] {
471 return CreateNetLogUDPConnectParams(address, bound_network_);
472 });
473 int rv = SetMulticastOptions();
474 if (rv != OK)
475 return rv;
476 rv = InternalConnect(address);
477 net_log_.EndEventWithNetErrorCode(NetLogEventType::UDP_CONNECT, rv);
478 is_connected_ = (rv == OK);
479 if (rv != OK)
480 tag_ = SocketTag();
481 return rv;
482 }
483
InternalConnect(const IPEndPoint & address)484 int UDPSocketPosix::InternalConnect(const IPEndPoint& address) {
485 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
486 DCHECK(!is_connected());
487 DCHECK(!remote_address_.get());
488
489 int rv = 0;
490 if (bind_type_ == DatagramSocket::RANDOM_BIND) {
491 // Construct IPAddress of appropriate size (IPv4 or IPv6) of 0s,
492 // representing INADDR_ANY or in6addr_any.
493 size_t addr_size = address.GetSockAddrFamily() == AF_INET
494 ? IPAddress::kIPv4AddressSize
495 : IPAddress::kIPv6AddressSize;
496 rv = RandomBind(IPAddress::AllZeros(addr_size));
497 }
498 // else connect() does the DatagramSocket::DEFAULT_BIND
499
500 if (rv < 0) {
501 base::UmaHistogramSparse("Net.UdpSocketRandomBindErrorCode", -rv);
502 return rv;
503 }
504
505 SockaddrStorage storage;
506 if (!address.ToSockAddr(storage.addr, &storage.addr_len))
507 return ERR_ADDRESS_INVALID;
508
509 rv = HANDLE_EINTR(connect(socket_, storage.addr, storage.addr_len));
510 if (rv < 0)
511 return MapSystemError(errno);
512
513 remote_address_.reset(new IPEndPoint(address));
514 return rv;
515 }
516
Bind(const IPEndPoint & address)517 int UDPSocketPosix::Bind(const IPEndPoint& address) {
518 DCHECK_NE(socket_, kInvalidSocket);
519 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
520 DCHECK(!is_connected());
521
522 int rv = SetMulticastOptions();
523 if (rv < 0)
524 return rv;
525
526 rv = DoBind(address);
527 if (rv < 0)
528 return rv;
529
530 is_connected_ = true;
531 local_address_.reset();
532 return rv;
533 }
534
BindToNetwork(NetworkChangeNotifier::NetworkHandle network)535 int UDPSocketPosix::BindToNetwork(
536 NetworkChangeNotifier::NetworkHandle network) {
537 DCHECK_NE(socket_, kInvalidSocket);
538 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
539 DCHECK(!is_connected());
540 if (network == NetworkChangeNotifier::kInvalidNetworkHandle)
541 return ERR_INVALID_ARGUMENT;
542 #if defined(OS_ANDROID)
543 // Android prior to Lollipop didn't have support for binding sockets to
544 // networks.
545 if (base::android::BuildInfo::GetInstance()->sdk_int() <
546 base::android::SDK_VERSION_LOLLIPOP) {
547 return ERR_NOT_IMPLEMENTED;
548 }
549 int rv;
550 // On Android M and newer releases use supported NDK API. On Android L use
551 // setNetworkForSocket from libnetd_client.so.
552 if (base::android::BuildInfo::GetInstance()->sdk_int() >=
553 base::android::SDK_VERSION_MARSHMALLOW) {
554 // See declaration of android_setsocknetwork() here:
555 // http://androidxref.com/6.0.0_r1/xref/development/ndk/platforms/android-M/include/android/multinetwork.h#65
556 // Function cannot be called directly as it will cause app to fail to load
557 // on pre-marshmallow devices.
558 typedef int (*MarshmallowSetNetworkForSocket)(int64_t netId, int socketFd);
559 static MarshmallowSetNetworkForSocket marshmallowSetNetworkForSocket;
560 // This is racy, but all racers should come out with the same answer so it
561 // shouldn't matter.
562 if (!marshmallowSetNetworkForSocket) {
563 base::FilePath file(base::GetNativeLibraryName("android"));
564 void* dl = dlopen(file.value().c_str(), RTLD_NOW);
565 marshmallowSetNetworkForSocket =
566 reinterpret_cast<MarshmallowSetNetworkForSocket>(
567 dlsym(dl, "android_setsocknetwork"));
568 }
569 if (!marshmallowSetNetworkForSocket)
570 return ERR_NOT_IMPLEMENTED;
571 rv = marshmallowSetNetworkForSocket(network, socket_);
572 if (rv)
573 rv = errno;
574 } else {
575 // NOTE(pauljensen): This does rely on Android implementation details, but
576 // they won't change because Lollipop is already released.
577 typedef int (*LollipopSetNetworkForSocket)(unsigned netId, int socketFd);
578 static LollipopSetNetworkForSocket lollipopSetNetworkForSocket;
579 // This is racy, but all racers should come out with the same answer so it
580 // shouldn't matter.
581 if (!lollipopSetNetworkForSocket) {
582 // Android's netd client library should always be loaded in our address
583 // space as it shims socket() which was used to create |socket_|.
584 base::FilePath file(base::GetNativeLibraryName("netd_client"));
585 // Use RTLD_NOW to match Android's prior loading of the library:
586 // http://androidxref.com/6.0.0_r5/xref/bionic/libc/bionic/NetdClient.cpp#37
587 // Use RTLD_NOLOAD to assert that the library is already loaded and
588 // avoid doing any disk IO.
589 void* dl = dlopen(file.value().c_str(), RTLD_NOW | RTLD_NOLOAD);
590 lollipopSetNetworkForSocket =
591 reinterpret_cast<LollipopSetNetworkForSocket>(
592 dlsym(dl, "setNetworkForSocket"));
593 }
594 if (!lollipopSetNetworkForSocket)
595 return ERR_NOT_IMPLEMENTED;
596 rv = -lollipopSetNetworkForSocket(network, socket_);
597 }
598 // If |network| has since disconnected, |rv| will be ENONET. Surface this as
599 // ERR_NETWORK_CHANGED, rather than MapSystemError(ENONET) which gives back
600 // the less descriptive ERR_FAILED.
601 if (rv == ENONET)
602 return ERR_NETWORK_CHANGED;
603 if (rv == 0)
604 bound_network_ = network;
605 return MapSystemError(rv);
606 #else
607 NOTIMPLEMENTED();
608 return ERR_NOT_IMPLEMENTED;
609 #endif
610 }
611
SetReceiveBufferSize(int32_t size)612 int UDPSocketPosix::SetReceiveBufferSize(int32_t size) {
613 DCHECK_NE(socket_, kInvalidSocket);
614 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
615 return SetSocketReceiveBufferSize(socket_, size);
616 }
617
SetSendBufferSize(int32_t size)618 int UDPSocketPosix::SetSendBufferSize(int32_t size) {
619 DCHECK_NE(socket_, kInvalidSocket);
620 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
621 return SetSocketSendBufferSize(socket_, size);
622 }
623
SetDoNotFragment()624 int UDPSocketPosix::SetDoNotFragment() {
625 DCHECK_NE(socket_, kInvalidSocket);
626 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
627
628 #if !defined(IP_PMTUDISC_DO)
629 return ERR_NOT_IMPLEMENTED;
630 #else
631 if (addr_family_ == AF_INET6) {
632 int val = IPV6_PMTUDISC_DO;
633 if (setsockopt(socket_, IPPROTO_IPV6, IPV6_MTU_DISCOVER, &val,
634 sizeof(val)) != 0) {
635 return MapSystemError(errno);
636 }
637
638 int v6_only = false;
639 socklen_t v6_only_len = sizeof(v6_only);
640 if (getsockopt(socket_, IPPROTO_IPV6, IPV6_V6ONLY, &v6_only,
641 &v6_only_len) != 0) {
642 return MapSystemError(errno);
643 }
644
645 if (v6_only)
646 return OK;
647 }
648
649 int val = IP_PMTUDISC_DO;
650 int rv = setsockopt(socket_, IPPROTO_IP, IP_MTU_DISCOVER, &val, sizeof(val));
651 return rv == 0 ? OK : MapSystemError(errno);
652 #endif
653 }
654
SetMsgConfirm(bool confirm)655 void UDPSocketPosix::SetMsgConfirm(bool confirm) {
656 #if !defined(OS_MACOSX) && !defined(OS_IOS) && !defined(OS_BSD)
657 if (confirm) {
658 sendto_flags_ |= MSG_CONFIRM;
659 } else {
660 sendto_flags_ &= ~MSG_CONFIRM;
661 }
662 #endif // !defined(OS_MACOSX) && !defined(OS_IOS) && !defined(OS_BSD)
663 }
664
AllowAddressReuse()665 int UDPSocketPosix::AllowAddressReuse() {
666 DCHECK_NE(socket_, kInvalidSocket);
667 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
668 DCHECK(!is_connected());
669 return SetReuseAddr(socket_, true);
670 }
671
SetBroadcast(bool broadcast)672 int UDPSocketPosix::SetBroadcast(bool broadcast) {
673 DCHECK_NE(socket_, kInvalidSocket);
674 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
675 int value = broadcast ? 1 : 0;
676 int rv;
677 #if defined(OS_MACOSX) || defined(OS_BSD)
678 // SO_REUSEPORT on OSX permits multiple processes to each receive
679 // UDP multicast or broadcast datagrams destined for the bound
680 // port.
681 // This is only being set on OSX because its behavior is platform dependent
682 // and we are playing it safe by only setting it on platforms where things
683 // break.
684 rv = setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value));
685 if (rv != 0)
686 return MapSystemError(errno);
687 rv = setsockopt(socket_, SOL_SOCKET, SO_REUSEPORT, &value, sizeof(value));
688 if (rv != 0)
689 return MapSystemError(errno);
690 #endif // defined(OS_MACOSX) || defined(OS_BSD)
691 rv = setsockopt(socket_, SOL_SOCKET, SO_BROADCAST, &value, sizeof(value));
692
693 return rv == 0 ? OK : MapSystemError(errno);
694 }
695
AllowAddressSharingForMulticast()696 int UDPSocketPosix::AllowAddressSharingForMulticast() {
697 DCHECK_NE(socket_, kInvalidSocket);
698 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
699 DCHECK(!is_connected());
700
701 int rv = AllowAddressReuse();
702 if (rv != OK)
703 return rv;
704
705 #ifdef SO_REUSEPORT
706 // Attempt to set SO_REUSEPORT if available. On some platforms, this is
707 // necessary to allow the address to be fully shared between separate sockets.
708 // On platforms where the option does not exist, SO_REUSEADDR should be
709 // sufficient to share multicast packets if such sharing is at all possible.
710 int value = 1;
711 rv = setsockopt(socket_, SOL_SOCKET, SO_REUSEPORT, &value, sizeof(value));
712 // Ignore errors that the option does not exist.
713 if (rv != 0 && errno != ENOPROTOOPT)
714 return MapSystemError(errno);
715 #endif // SO_REUSEPORT
716
717 return OK;
718 }
719
OnFileCanReadWithoutBlocking(int)720 void UDPSocketPosix::ReadWatcher::OnFileCanReadWithoutBlocking(int) {
721 TRACE_EVENT0(NetTracingCategory(),
722 "UDPSocketPosix::ReadWatcher::OnFileCanReadWithoutBlocking");
723 if (!socket_->read_callback_.is_null())
724 socket_->DidCompleteRead();
725 }
726
OnFileCanWriteWithoutBlocking(int)727 void UDPSocketPosix::WriteWatcher::OnFileCanWriteWithoutBlocking(int) {
728 if (!socket_->write_callback_.is_null())
729 socket_->DidCompleteWrite();
730 }
731
DoReadCallback(int rv)732 void UDPSocketPosix::DoReadCallback(int rv) {
733 DCHECK_NE(rv, ERR_IO_PENDING);
734 DCHECK(!read_callback_.is_null());
735
736 // Since Run() may result in Read() being called,
737 // clear |read_callback_| up front.
738 std::move(read_callback_).Run(rv);
739 }
740
DoWriteCallback(int rv)741 void UDPSocketPosix::DoWriteCallback(int rv) {
742 DCHECK_NE(rv, ERR_IO_PENDING);
743 DCHECK(!write_callback_.is_null());
744
745 // Since Run() may result in Write() being called,
746 // clear |write_callback_| up front.
747 std::move(write_callback_).Run(rv);
748 }
749
DidCompleteRead()750 void UDPSocketPosix::DidCompleteRead() {
751 int result =
752 InternalRecvFrom(read_buf_.get(), read_buf_len_, recv_from_address_);
753 if (result != ERR_IO_PENDING) {
754 read_buf_.reset();
755 read_buf_len_ = 0;
756 recv_from_address_ = NULL;
757 bool ok = read_socket_watcher_.StopWatchingFileDescriptor();
758 DCHECK(ok);
759 DoReadCallback(result);
760 }
761 }
762
LogRead(int result,const char * bytes,socklen_t addr_len,const sockaddr * addr)763 void UDPSocketPosix::LogRead(int result,
764 const char* bytes,
765 socklen_t addr_len,
766 const sockaddr* addr) {
767 if (result < 0) {
768 net_log_.AddEventWithNetErrorCode(NetLogEventType::UDP_RECEIVE_ERROR,
769 result);
770 return;
771 }
772
773 if (net_log_.IsCapturing()) {
774 DCHECK(addr_len > 0);
775 DCHECK(addr);
776
777 IPEndPoint address;
778 bool is_address_valid = address.FromSockAddr(addr, addr_len);
779 NetLogUDPDataTransfer(net_log_, NetLogEventType::UDP_BYTES_RECEIVED, result,
780 bytes, is_address_valid ? &address : nullptr);
781 }
782
783 received_activity_monitor_.Increment(result);
784 }
785
DidCompleteWrite()786 void UDPSocketPosix::DidCompleteWrite() {
787 int result =
788 InternalSendTo(write_buf_.get(), write_buf_len_, send_to_address_.get());
789
790 if (result != ERR_IO_PENDING) {
791 write_buf_.reset();
792 write_buf_len_ = 0;
793 send_to_address_.reset();
794 write_socket_watcher_.StopWatchingFileDescriptor();
795 DoWriteCallback(result);
796 }
797 }
798
LogWrite(int result,const char * bytes,const IPEndPoint * address)799 void UDPSocketPosix::LogWrite(int result,
800 const char* bytes,
801 const IPEndPoint* address) {
802 if (result < 0) {
803 net_log_.AddEventWithNetErrorCode(NetLogEventType::UDP_SEND_ERROR, result);
804 return;
805 }
806
807 if (net_log_.IsCapturing()) {
808 NetLogUDPDataTransfer(net_log_, NetLogEventType::UDP_BYTES_SENT, result,
809 bytes, address);
810 }
811
812 sent_activity_monitor_.Increment(result);
813 }
814
InternalRecvFrom(IOBuffer * buf,int buf_len,IPEndPoint * address)815 int UDPSocketPosix::InternalRecvFrom(IOBuffer* buf,
816 int buf_len,
817 IPEndPoint* address) {
818 // If the socket is connected and the remote address is known
819 // use the more efficient method that uses read() instead of recvmsg().
820 if (experimental_recv_optimization_enabled_ && is_connected_ &&
821 remote_address_) {
822 return InternalRecvFromConnectedSocket(buf, buf_len, address);
823 }
824 return InternalRecvFromNonConnectedSocket(buf, buf_len, address);
825 }
826
InternalRecvFromConnectedSocket(IOBuffer * buf,int buf_len,IPEndPoint * address)827 int UDPSocketPosix::InternalRecvFromConnectedSocket(IOBuffer* buf,
828 int buf_len,
829 IPEndPoint* address) {
830 DCHECK(is_connected_);
831 DCHECK(remote_address_);
832 int bytes_transferred;
833 bytes_transferred = HANDLE_EINTR(read(socket_, buf->data(), buf_len));
834 int result;
835
836 if (bytes_transferred < 0) {
837 result = MapSystemError(errno);
838 } else if (bytes_transferred == buf_len) {
839 result = ERR_MSG_TOO_BIG;
840 } else {
841 result = bytes_transferred;
842 if (address)
843 *address = *remote_address_.get();
844 }
845
846 if (result != ERR_IO_PENDING) {
847 SockaddrStorage sock_addr;
848 bool success =
849 remote_address_->ToSockAddr(sock_addr.addr, &sock_addr.addr_len);
850 DCHECK(success);
851 LogRead(result, buf->data(), sock_addr.addr_len, sock_addr.addr);
852 }
853 return result;
854 }
855
InternalRecvFromNonConnectedSocket(IOBuffer * buf,int buf_len,IPEndPoint * address)856 int UDPSocketPosix::InternalRecvFromNonConnectedSocket(IOBuffer* buf,
857 int buf_len,
858 IPEndPoint* address) {
859 int bytes_transferred;
860
861 struct iovec iov = {};
862 iov.iov_base = buf->data();
863 iov.iov_len = buf_len;
864
865 struct msghdr msg = {};
866 msg.msg_iov = &iov;
867 msg.msg_iovlen = 1;
868
869 SockaddrStorage storage;
870 msg.msg_name = storage.addr;
871 msg.msg_namelen = storage.addr_len;
872
873 bytes_transferred = HANDLE_EINTR(recvmsg(socket_, &msg, 0));
874 storage.addr_len = msg.msg_namelen;
875 int result;
876 if (bytes_transferred >= 0) {
877 if (msg.msg_flags & MSG_TRUNC) {
878 result = ERR_MSG_TOO_BIG;
879 } else {
880 result = bytes_transferred;
881 if (address && !address->FromSockAddr(storage.addr, storage.addr_len))
882 result = ERR_ADDRESS_INVALID;
883 }
884 } else {
885 result = MapSystemError(errno);
886 }
887 if (result != ERR_IO_PENDING)
888 LogRead(result, buf->data(), storage.addr_len, storage.addr);
889 return result;
890 }
891
InternalSendTo(IOBuffer * buf,int buf_len,const IPEndPoint * address)892 int UDPSocketPosix::InternalSendTo(IOBuffer* buf,
893 int buf_len,
894 const IPEndPoint* address) {
895 SockaddrStorage storage;
896 struct sockaddr* addr = storage.addr;
897 if (!address) {
898 addr = NULL;
899 storage.addr_len = 0;
900 } else {
901 if (!address->ToSockAddr(storage.addr, &storage.addr_len)) {
902 int result = ERR_ADDRESS_INVALID;
903 LogWrite(result, NULL, NULL);
904 return result;
905 }
906 }
907
908 int result = HANDLE_EINTR(sendto(socket_, buf->data(), buf_len, sendto_flags_,
909 addr, storage.addr_len));
910 if (result < 0)
911 result = MapSystemError(errno);
912 if (result != ERR_IO_PENDING)
913 LogWrite(result, buf->data(), address);
914 return result;
915 }
916
SetMulticastOptions()917 int UDPSocketPosix::SetMulticastOptions() {
918 if (!(socket_options_ & SOCKET_OPTION_MULTICAST_LOOP)) {
919 int rv;
920 if (addr_family_ == AF_INET) {
921 u_char loop = 0;
922 rv = setsockopt(socket_, IPPROTO_IP, IP_MULTICAST_LOOP,
923 &loop, sizeof(loop));
924 } else {
925 u_int loop = 0;
926 rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_MULTICAST_LOOP,
927 &loop, sizeof(loop));
928 }
929 if (rv < 0)
930 return MapSystemError(errno);
931 }
932 if (multicast_time_to_live_ != IP_DEFAULT_MULTICAST_TTL) {
933 int rv;
934 if (addr_family_ == AF_INET) {
935 u_char ttl = multicast_time_to_live_;
936 rv = setsockopt(socket_, IPPROTO_IP, IP_MULTICAST_TTL,
937 &ttl, sizeof(ttl));
938 } else {
939 // Signed integer. -1 to use route default.
940 int ttl = multicast_time_to_live_;
941 rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_MULTICAST_HOPS,
942 &ttl, sizeof(ttl));
943 }
944 if (rv < 0)
945 return MapSystemError(errno);
946 }
947 if (multicast_interface_ != 0) {
948 switch (addr_family_) {
949 case AF_INET: {
950 #if defined(OS_MACOSX) || defined(OS_BSD)
951 ip_mreq mreq = {};
952 int error = GetIPv4AddressFromIndex(socket_, multicast_interface_,
953 &mreq.imr_interface.s_addr);
954 if (error != OK)
955 return error;
956 #else // defined(OS_MACOSX) || defined(OS_BSD)
957 ip_mreqn mreq = {};
958 mreq.imr_ifindex = multicast_interface_;
959 mreq.imr_address.s_addr = htonl(INADDR_ANY);
960 #endif // !defined(OS_MACOSX) || defined(OS_BSD)
961 int rv = setsockopt(socket_, IPPROTO_IP, IP_MULTICAST_IF,
962 #if defined(OS_BSD)
963 reinterpret_cast<const char*>(&mreq.imr_interface.s_addr),
964 sizeof(mreq.imr_interface.s_addr));
965 #else
966 reinterpret_cast<const char*>(&mreq), sizeof(mreq));
967 #endif
968 if (rv)
969 return MapSystemError(errno);
970 break;
971 }
972 case AF_INET6: {
973 uint32_t interface_index = multicast_interface_;
974 int rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_MULTICAST_IF,
975 reinterpret_cast<const char*>(&interface_index),
976 sizeof(interface_index));
977 if (rv)
978 return MapSystemError(errno);
979 break;
980 }
981 default:
982 NOTREACHED() << "Invalid address family";
983 return ERR_ADDRESS_INVALID;
984 }
985 }
986 return OK;
987 }
988
DoBind(const IPEndPoint & address)989 int UDPSocketPosix::DoBind(const IPEndPoint& address) {
990 SockaddrStorage storage;
991 if (!address.ToSockAddr(storage.addr, &storage.addr_len))
992 return ERR_ADDRESS_INVALID;
993 int rv = bind(socket_, storage.addr, storage.addr_len);
994 if (rv == 0)
995 return OK;
996 int last_error = errno;
997 #if defined(OS_CHROMEOS)
998 if (last_error == EINVAL)
999 return ERR_ADDRESS_IN_USE;
1000 #elif defined(OS_MACOSX)
1001 if (last_error == EADDRNOTAVAIL)
1002 return ERR_ADDRESS_IN_USE;
1003 #endif
1004 return MapSystemError(last_error);
1005 }
1006
RandomBind(const IPAddress & address)1007 int UDPSocketPosix::RandomBind(const IPAddress& address) {
1008 DCHECK_EQ(bind_type_, DatagramSocket::RANDOM_BIND);
1009
1010 for (int i = 0; i < kBindRetries; ++i) {
1011 int rv = DoBind(IPEndPoint(address, base::RandInt(kPortStart, kPortEnd)));
1012 if (rv != ERR_ADDRESS_IN_USE)
1013 return rv;
1014 }
1015
1016 return DoBind(IPEndPoint(address, 0));
1017 }
1018
JoinGroup(const IPAddress & group_address) const1019 int UDPSocketPosix::JoinGroup(const IPAddress& group_address) const {
1020 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
1021 if (!is_connected())
1022 return ERR_SOCKET_NOT_CONNECTED;
1023
1024 switch (group_address.size()) {
1025 case IPAddress::kIPv4AddressSize: {
1026 if (addr_family_ != AF_INET)
1027 return ERR_ADDRESS_INVALID;
1028
1029 #if defined(OS_MACOSX) || defined(OS_BSD)
1030 ip_mreq mreq = {};
1031 int error = GetIPv4AddressFromIndex(socket_, multicast_interface_,
1032 &mreq.imr_interface.s_addr);
1033 if (error != OK)
1034 return error;
1035 #else
1036 ip_mreqn mreq = {};
1037 mreq.imr_ifindex = multicast_interface_;
1038 mreq.imr_address.s_addr = htonl(INADDR_ANY);
1039 #endif
1040 memcpy(&mreq.imr_multiaddr, group_address.bytes().data(),
1041 IPAddress::kIPv4AddressSize);
1042 int rv = setsockopt(socket_, IPPROTO_IP, IP_ADD_MEMBERSHIP,
1043 &mreq, sizeof(mreq));
1044 if (rv < 0)
1045 return MapSystemError(errno);
1046 return OK;
1047 }
1048 case IPAddress::kIPv6AddressSize: {
1049 if (addr_family_ != AF_INET6)
1050 return ERR_ADDRESS_INVALID;
1051 ipv6_mreq mreq;
1052 mreq.ipv6mr_interface = multicast_interface_;
1053 memcpy(&mreq.ipv6mr_multiaddr, group_address.bytes().data(),
1054 IPAddress::kIPv6AddressSize);
1055 int rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_JOIN_GROUP,
1056 &mreq, sizeof(mreq));
1057 if (rv < 0)
1058 return MapSystemError(errno);
1059 return OK;
1060 }
1061 default:
1062 NOTREACHED() << "Invalid address family";
1063 return ERR_ADDRESS_INVALID;
1064 }
1065 }
1066
LeaveGroup(const IPAddress & group_address) const1067 int UDPSocketPosix::LeaveGroup(const IPAddress& group_address) const {
1068 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
1069
1070 if (!is_connected())
1071 return ERR_SOCKET_NOT_CONNECTED;
1072
1073 switch (group_address.size()) {
1074 case IPAddress::kIPv4AddressSize: {
1075 if (addr_family_ != AF_INET)
1076 return ERR_ADDRESS_INVALID;
1077 #if defined(OS_BSD)
1078 ip_mreq mreq = {};
1079 int error = GetIPv4AddressFromIndex(socket_, multicast_interface_,
1080 &mreq.imr_interface.s_addr);
1081
1082 if (error != OK)
1083 return error;
1084 #else
1085 ip_mreqn mreq = {};
1086 mreq.imr_ifindex = multicast_interface_;
1087 mreq.imr_address.s_addr = INADDR_ANY;
1088 #endif
1089 memcpy(&mreq.imr_multiaddr, group_address.bytes().data(),
1090 IPAddress::kIPv4AddressSize);
1091 int rv = setsockopt(socket_, IPPROTO_IP, IP_DROP_MEMBERSHIP,
1092 &mreq, sizeof(mreq));
1093 if (rv < 0)
1094 return MapSystemError(errno);
1095 return OK;
1096 }
1097 case IPAddress::kIPv6AddressSize: {
1098 if (addr_family_ != AF_INET6)
1099 return ERR_ADDRESS_INVALID;
1100 ipv6_mreq mreq;
1101 #if defined(OS_FUCHSIA)
1102 mreq.ipv6mr_interface = multicast_interface_;
1103 #else // defined(OS_FUCHSIA)
1104 mreq.ipv6mr_interface = 0; // 0 indicates default multicast interface.
1105 #endif // !defined(OS_FUCHSIA)
1106 memcpy(&mreq.ipv6mr_multiaddr, group_address.bytes().data(),
1107 IPAddress::kIPv6AddressSize);
1108 int rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_LEAVE_GROUP,
1109 &mreq, sizeof(mreq));
1110 if (rv < 0)
1111 return MapSystemError(errno);
1112 return OK;
1113 }
1114 default:
1115 NOTREACHED() << "Invalid address family";
1116 return ERR_ADDRESS_INVALID;
1117 }
1118 }
1119
SetMulticastInterface(uint32_t interface_index)1120 int UDPSocketPosix::SetMulticastInterface(uint32_t interface_index) {
1121 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
1122 if (is_connected())
1123 return ERR_SOCKET_IS_CONNECTED;
1124 multicast_interface_ = interface_index;
1125 return OK;
1126 }
1127
SetMulticastTimeToLive(int time_to_live)1128 int UDPSocketPosix::SetMulticastTimeToLive(int time_to_live) {
1129 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
1130 if (is_connected())
1131 return ERR_SOCKET_IS_CONNECTED;
1132
1133 if (time_to_live < 0 || time_to_live > 255)
1134 return ERR_INVALID_ARGUMENT;
1135 multicast_time_to_live_ = time_to_live;
1136 return OK;
1137 }
1138
SetMulticastLoopbackMode(bool loopback)1139 int UDPSocketPosix::SetMulticastLoopbackMode(bool loopback) {
1140 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
1141 if (is_connected())
1142 return ERR_SOCKET_IS_CONNECTED;
1143
1144 if (loopback)
1145 socket_options_ |= SOCKET_OPTION_MULTICAST_LOOP;
1146 else
1147 socket_options_ &= ~SOCKET_OPTION_MULTICAST_LOOP;
1148 return OK;
1149 }
1150
SetDiffServCodePoint(DiffServCodePoint dscp)1151 int UDPSocketPosix::SetDiffServCodePoint(DiffServCodePoint dscp) {
1152 if (dscp == DSCP_NO_CHANGE) {
1153 return OK;
1154 }
1155
1156 int dscp_and_ecn = dscp << 2;
1157 // Set the IPv4 option in all cases to support dual-stack sockets.
1158 int rv = setsockopt(socket_, IPPROTO_IP, IP_TOS, &dscp_and_ecn,
1159 sizeof(dscp_and_ecn));
1160 if (addr_family_ == AF_INET6) {
1161 // In the IPv6 case, the previous socksetopt may fail because of a lack of
1162 // dual-stack support. Therefore ignore the previous return value.
1163 rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_TCLASS,
1164 &dscp_and_ecn, sizeof(dscp_and_ecn));
1165 }
1166 if (rv < 0)
1167 return MapSystemError(errno);
1168
1169 return OK;
1170 }
1171
DetachFromThread()1172 void UDPSocketPosix::DetachFromThread() {
1173 DETACH_FROM_THREAD(thread_checker_);
1174 }
1175
ApplySocketTag(const SocketTag & tag)1176 void UDPSocketPosix::ApplySocketTag(const SocketTag& tag) {
1177 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
1178 if (socket_ != kInvalidSocket && tag != tag_) {
1179 tag.Apply(socket_);
1180 }
1181 tag_ = tag;
1182 }
1183
UDPSocketPosixSender()1184 UDPSocketPosixSender::UDPSocketPosixSender() : sendmmsg_enabled_(false) {}
~UDPSocketPosixSender()1185 UDPSocketPosixSender::~UDPSocketPosixSender() {}
1186
SendResult()1187 SendResult::SendResult() : rv(0), write_count(0) {}
~SendResult()1188 SendResult::~SendResult() {}
SendResult(int _rv,int _write_count,DatagramBuffers _buffers)1189 SendResult::SendResult(int _rv, int _write_count, DatagramBuffers _buffers)
1190 : rv(_rv), write_count(_write_count), buffers(std::move(_buffers)) {}
1191 SendResult::SendResult(SendResult&& other) = default;
1192
InternalSendBuffers(int fd,DatagramBuffers buffers) const1193 SendResult UDPSocketPosixSender::InternalSendBuffers(
1194 int fd,
1195 DatagramBuffers buffers) const {
1196 int rv = 0;
1197 int write_count = 0;
1198 for (auto& buffer : buffers) {
1199 int result = HANDLE_EINTR(Send(fd, buffer->data(), buffer->length(), 0));
1200 if (result < 0) {
1201 rv = MapSystemError(errno);
1202 break;
1203 }
1204 write_count++;
1205 }
1206 return SendResult(rv, write_count, std::move(buffers));
1207 }
1208
1209 #if HAVE_SENDMMSG
InternalSendmmsgBuffers(int fd,DatagramBuffers buffers) const1210 SendResult UDPSocketPosixSender::InternalSendmmsgBuffers(
1211 int fd,
1212 DatagramBuffers buffers) const {
1213 base::StackVector<struct iovec, kWriteAsyncMaxBuffersThreshold + 1> msg_iov;
1214 base::StackVector<struct mmsghdr, kWriteAsyncMaxBuffersThreshold + 1> msgvec;
1215 msg_iov->reserve(buffers.size());
1216 for (auto& buffer : buffers)
1217 msg_iov->push_back({const_cast<char*>(buffer->data()), buffer->length()});
1218 msgvec->reserve(buffers.size());
1219 for (size_t j = 0; j < buffers.size(); j++)
1220 msgvec->push_back({{nullptr, 0, &msg_iov[j], 1, nullptr, 0, 0}, 0});
1221 int result = HANDLE_EINTR(Sendmmsg(fd, &msgvec[0], buffers.size(), 0));
1222 SendResult send_result(0, 0, std::move(buffers));
1223 if (result < 0) {
1224 send_result.rv = MapSystemError(errno);
1225 } else {
1226 send_result.write_count = result;
1227 }
1228 return send_result;
1229 }
1230 #endif
1231
SendBuffers(int fd,DatagramBuffers buffers)1232 SendResult UDPSocketPosixSender::SendBuffers(int fd, DatagramBuffers buffers) {
1233 #if HAVE_SENDMMSG
1234 if (sendmmsg_enabled_) {
1235 auto result = InternalSendmmsgBuffers(fd, std::move(buffers));
1236 if (LIKELY(result.rv != ERR_NOT_IMPLEMENTED)) {
1237 return result;
1238 }
1239 DLOG(WARNING) << "senddmsg() not implemented, falling back to send()";
1240 sendmmsg_enabled_ = false;
1241 buffers = std::move(result.buffers);
1242 }
1243 #endif
1244 return InternalSendBuffers(fd, std::move(buffers));
1245 }
1246
Send(int sockfd,const void * buf,size_t len,int flags) const1247 ssize_t UDPSocketPosixSender::Send(int sockfd,
1248 const void* buf,
1249 size_t len,
1250 int flags) const {
1251 return send(sockfd, buf, len, flags);
1252 }
1253
1254 #if HAVE_SENDMMSG
Sendmmsg(int sockfd,struct mmsghdr * msgvec,unsigned int vlen,unsigned int flags) const1255 int UDPSocketPosixSender::Sendmmsg(int sockfd,
1256 struct mmsghdr* msgvec,
1257 unsigned int vlen,
1258 unsigned int flags) const {
1259 return sendmmsg(sockfd, msgvec, vlen, flags);
1260 }
1261 #endif
1262
WriteAsync(const char * buffer,size_t buf_len,CompletionOnceCallback callback,const NetworkTrafficAnnotationTag & traffic_annotation)1263 int UDPSocketPosix::WriteAsync(
1264 const char* buffer,
1265 size_t buf_len,
1266 CompletionOnceCallback callback,
1267 const NetworkTrafficAnnotationTag& traffic_annotation) {
1268 DCHECK(datagram_buffer_pool_ != nullptr);
1269 IncreaseWriteAsyncOutstanding(1);
1270 datagram_buffer_pool_->Enqueue(buffer, buf_len, &pending_writes_);
1271 return InternalWriteAsync(std::move(callback), traffic_annotation);
1272 }
1273
WriteAsync(DatagramBuffers buffers,CompletionOnceCallback callback,const NetworkTrafficAnnotationTag & traffic_annotation)1274 int UDPSocketPosix::WriteAsync(
1275 DatagramBuffers buffers,
1276 CompletionOnceCallback callback,
1277 const NetworkTrafficAnnotationTag& traffic_annotation) {
1278 IncreaseWriteAsyncOutstanding(buffers.size());
1279 pending_writes_.splice(pending_writes_.end(), std::move(buffers));
1280 return InternalWriteAsync(std::move(callback), traffic_annotation);
1281 }
1282
InternalWriteAsync(CompletionOnceCallback callback,const NetworkTrafficAnnotationTag & traffic_annotation)1283 int UDPSocketPosix::InternalWriteAsync(
1284 CompletionOnceCallback callback,
1285 const NetworkTrafficAnnotationTag& traffic_annotation) {
1286 CHECK(write_callback_.is_null());
1287
1288 // Surface error immediately if one is pending.
1289 if (last_async_result_ < 0) {
1290 return ResetLastAsyncResult();
1291 }
1292
1293 size_t flush_threshold =
1294 write_batching_active_ ? kWriteAsyncPostBuffersThreshold : 1;
1295 if (pending_writes_.size() >= flush_threshold) {
1296 FlushPending();
1297 // Surface error immediately if one is pending.
1298 if (last_async_result_ < 0) {
1299 return ResetLastAsyncResult();
1300 }
1301 }
1302
1303 if (!write_async_timer_running_) {
1304 write_async_timer_running_ = true;
1305 write_async_timer_.Start(FROM_HERE, kWriteAsyncMsThreshold, this,
1306 &UDPSocketPosix::OnWriteAsyncTimerFired);
1307 }
1308
1309 int blocking_threshold =
1310 write_batching_active_ ? kWriteAsyncMaxBuffersThreshold : 1;
1311 if (write_async_outstanding_ >= blocking_threshold) {
1312 write_callback_ = std::move(callback);
1313 return ERR_IO_PENDING;
1314 }
1315
1316 DVLOG(2) << __func__ << " pending " << pending_writes_.size()
1317 << " outstanding " << write_async_outstanding_;
1318 return ResetWrittenBytes();
1319 }
1320
GetUnwrittenBuffers()1321 DatagramBuffers UDPSocketPosix::GetUnwrittenBuffers() {
1322 write_async_outstanding_ -= pending_writes_.size();
1323 return std::move(pending_writes_);
1324 }
1325
FlushPending()1326 void UDPSocketPosix::FlushPending() {
1327 // Nothing to do if socket is blocked.
1328 if (write_async_watcher_->watching())
1329 return;
1330
1331 if (pending_writes_.empty())
1332 return;
1333
1334 if (write_async_timer_running_)
1335 write_async_timer_.Reset();
1336
1337 int num_pending_writes = static_cast<int>(pending_writes_.size());
1338 if (!write_multi_core_enabled_ ||
1339 // Don't bother with post if not enough buffers
1340 (num_pending_writes <= kWriteAsyncMinBuffersThreshold &&
1341 // but not if there is a previous post
1342 // outstanding, to prevent out of order transmission.
1343 (num_pending_writes == write_async_outstanding_))) {
1344 LocalSendBuffers();
1345 } else {
1346 PostSendBuffers();
1347 }
1348 }
1349
1350 // TODO(ckrasic) Sad face. Do this lazily because many tests exploded
1351 // otherwise. |threading_and_tasks.md| advises to instantiate a
1352 // |base::test::TaskEnvironment| in the test, implementing that
1353 // for all tests that might exercise QUIC is too daunting. Also, in
1354 // some tests it seemed like following the advice just broke in other
1355 // ways.
GetTaskRunner()1356 base::SequencedTaskRunner* UDPSocketPosix::GetTaskRunner() {
1357 if (task_runner_ == nullptr)
1358 task_runner_ = base::ThreadPool::CreateSequencedTaskRunner({});
1359 return task_runner_.get();
1360 }
1361
OnWriteAsyncTimerFired()1362 void UDPSocketPosix::OnWriteAsyncTimerFired() {
1363 DVLOG(2) << __func__ << " pending writes " << pending_writes_.size();
1364 if (pending_writes_.empty()) {
1365 write_async_timer_.Stop();
1366 write_async_timer_running_ = false;
1367 return;
1368 }
1369 if (last_async_result_ < 0) {
1370 DVLOG(1) << __func__ << " socket not writeable";
1371 return;
1372 }
1373 FlushPending();
1374 }
1375
LocalSendBuffers()1376 void UDPSocketPosix::LocalSendBuffers() {
1377 DVLOG(1) << __func__ << " queue " << pending_writes_.size() << " out of "
1378 << write_async_outstanding_ << " total";
1379 DidSendBuffers(sender_->SendBuffers(socket_, std::move(pending_writes_)));
1380 }
1381
PostSendBuffers()1382 void UDPSocketPosix::PostSendBuffers() {
1383 DVLOG(1) << __func__ << " queue " << pending_writes_.size() << " out of "
1384 << write_async_outstanding_ << " total";
1385 base::PostTaskAndReplyWithResult(
1386 GetTaskRunner(), FROM_HERE,
1387 base::BindOnce(&UDPSocketPosixSender::SendBuffers, sender_, socket_,
1388 std::move(pending_writes_)),
1389 base::BindOnce(&UDPSocketPosix::DidSendBuffers,
1390 weak_factory_.GetWeakPtr()));
1391 }
1392
DidSendBuffers(SendResult send_result)1393 void UDPSocketPosix::DidSendBuffers(SendResult send_result) {
1394 DVLOG(3) << __func__;
1395 int write_count = send_result.write_count;
1396 DatagramBuffers& buffers = send_result.buffers;
1397
1398 DCHECK(!buffers.empty());
1399 int num_buffers = buffers.size();
1400
1401 // Dequeue buffers that have been written.
1402 if (write_count > 0) {
1403 write_async_outstanding_ -= write_count;
1404
1405 DatagramBuffers::const_iterator it;
1406 // Generate logs for written buffers
1407 it = buffers.cbegin();
1408 for (int i = 0; i < write_count; i++, it++) {
1409 auto& buffer = *it;
1410 LogWrite(buffer->length(), buffer->data(), NULL);
1411 written_bytes_ += buffer->length();
1412 }
1413 // Return written buffers to pool
1414 DatagramBuffers written_buffers;
1415 if (write_count == num_buffers) {
1416 it = buffers.cend();
1417 } else {
1418 it = buffers.cbegin();
1419 for (int i = 0; i < write_count; i++) {
1420 it++;
1421 }
1422 }
1423 written_buffers.splice(written_buffers.cend(), buffers, buffers.cbegin(),
1424 it);
1425 DCHECK(datagram_buffer_pool_ != nullptr);
1426 datagram_buffer_pool_->Dequeue(&written_buffers);
1427 }
1428
1429 // Requeue left-over (unwritten) buffers.
1430 if (!buffers.empty()) {
1431 DVLOG(2) << __func__ << " requeue " << buffers.size() << " buffers";
1432 pending_writes_.splice(pending_writes_.begin(), std::move(buffers));
1433 }
1434
1435 last_async_result_ = send_result.rv;
1436 if (last_async_result_ == ERR_IO_PENDING) {
1437 DVLOG(2) << __func__ << " WatchFileDescriptor start";
1438 if (!WatchFileDescriptor()) {
1439 DVPLOG(1) << "WatchFileDescriptor failed on write";
1440 last_async_result_ = MapSystemError(errno);
1441 LogWrite(last_async_result_, NULL, NULL);
1442 } else {
1443 last_async_result_ = 0;
1444 }
1445 } else if (last_async_result_ < 0 || pending_writes_.empty()) {
1446 DVLOG(2) << __func__ << " WatchFileDescriptor stop: result "
1447 << ErrorToShortString(last_async_result_) << " pending_writes "
1448 << pending_writes_.size();
1449 StopWatchingFileDescriptor();
1450 }
1451 DCHECK(last_async_result_ != ERR_IO_PENDING);
1452
1453 if (write_callback_.is_null())
1454 return;
1455
1456 if (last_async_result_ < 0) {
1457 DVLOG(1) << last_async_result_;
1458 // Update the writer with the latest result.
1459 DoWriteCallback(ResetLastAsyncResult());
1460 } else if (write_async_outstanding_ < kWriteAsyncCallbackBuffersThreshold) {
1461 DVLOG(1) << write_async_outstanding_ << " < "
1462 << kWriteAsyncCallbackBuffersThreshold;
1463 DoWriteCallback(ResetWrittenBytes());
1464 }
1465 }
1466
OnFileCanWriteWithoutBlocking(int)1467 void UDPSocketPosix::WriteAsyncWatcher::OnFileCanWriteWithoutBlocking(int) {
1468 DVLOG(1) << __func__ << " queue " << socket_->pending_writes_.size()
1469 << " out of " << socket_->write_async_outstanding_ << " total";
1470 socket_->StopWatchingFileDescriptor();
1471 socket_->FlushPending();
1472 }
1473
WatchFileDescriptor()1474 bool UDPSocketPosix::WatchFileDescriptor() {
1475 if (write_async_watcher_->watching())
1476 return true;
1477 bool result = InternalWatchFileDescriptor();
1478 if (result) {
1479 write_async_watcher_->set_watching(true);
1480 }
1481 return result;
1482 }
1483
StopWatchingFileDescriptor()1484 void UDPSocketPosix::StopWatchingFileDescriptor() {
1485 if (!write_async_watcher_->watching())
1486 return;
1487 InternalStopWatchingFileDescriptor();
1488 write_async_watcher_->set_watching(false);
1489 }
1490
InternalWatchFileDescriptor()1491 bool UDPSocketPosix::InternalWatchFileDescriptor() {
1492 return base::MessageLoopCurrentForIO::Get()->WatchFileDescriptor(
1493 socket_, true, base::MessagePumpForIO::WATCH_WRITE,
1494 &write_socket_watcher_, write_async_watcher_.get());
1495 }
1496
InternalStopWatchingFileDescriptor()1497 void UDPSocketPosix::InternalStopWatchingFileDescriptor() {
1498 bool ok = write_socket_watcher_.StopWatchingFileDescriptor();
1499 DCHECK(ok);
1500 }
1501
SetMaxPacketSize(size_t max_packet_size)1502 void UDPSocketPosix::SetMaxPacketSize(size_t max_packet_size) {
1503 datagram_buffer_pool_ = std::make_unique<DatagramBufferPool>(max_packet_size);
1504 }
1505
ResetLastAsyncResult()1506 int UDPSocketPosix::ResetLastAsyncResult() {
1507 int result = last_async_result_;
1508 last_async_result_ = 0;
1509 return result;
1510 }
1511
ResetWrittenBytes()1512 int UDPSocketPosix::ResetWrittenBytes() {
1513 int bytes = written_bytes_;
1514 written_bytes_ = 0;
1515 return bytes;
1516 }
1517
1518 } // namespace net
1519