1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/socket/udp_socket_posix.h"
6
7 #include <errno.h>
8 #include <fcntl.h>
9 #include <net/if.h>
10 #include <netdb.h>
11 #include <netinet/in.h>
12 #include <sys/ioctl.h>
13
14 #include "base/bind.h"
15 #include "base/callback.h"
16 #include "base/callback_helpers.h"
17 #include "base/containers/stack_container.h"
18 #include "base/debug/alias.h"
19 #include "base/files/file_util.h"
20 #include "base/logging.h"
21 #include "base/metrics/histogram_functions.h"
22 #include "base/posix/eintr_wrapper.h"
23 #include "base/rand_util.h"
24 #include "base/task/current_thread.h"
25 #include "base/task/post_task.h"
26 #include "base/task/thread_pool.h"
27 #include "base/task_runner_util.h"
28 #include "base/trace_event/trace_event.h"
29 #include "build/build_config.h"
30 #include "net/base/io_buffer.h"
31 #include "net/base/ip_address.h"
32 #include "net/base/ip_endpoint.h"
33 #include "net/base/net_errors.h"
34 #include "net/base/network_activity_monitor.h"
35 #include "net/base/sockaddr_storage.h"
36 #include "net/base/trace_constants.h"
37 #include "net/log/net_log.h"
38 #include "net/log/net_log_event_type.h"
39 #include "net/log/net_log_source.h"
40 #include "net/log/net_log_source_type.h"
41 #include "net/socket/socket_descriptor.h"
42 #include "net/socket/socket_options.h"
43 #include "net/socket/socket_tag.h"
44 #include "net/socket/udp_net_log_parameters.h"
45 #include "net/traffic_annotation/network_traffic_annotation.h"
46
47 #if defined(OS_ANDROID)
48 #include <dlfcn.h>
49 #include "base/android/build_info.h"
50 #include "base/native_library.h"
51 #include "base/strings/utf_string_conversions.h"
52 #endif // defined(OS_ANDROID)
53
54 #if defined(OS_MAC)
55 // This was needed to debug crbug.com/640281.
56 // TODO(zhongyi): Remove once the bug is resolved.
57 #include <dlfcn.h>
58 #include <pthread.h>
59 #endif // defined(OS_MAC)
60
61 namespace net {
62
63 namespace {
64
65 const int kBindRetries = 10;
66 const int kPortStart = 1024;
67 const int kPortEnd = 65535;
68 const int kActivityMonitorBytesThreshold = 65535;
69 const int kActivityMonitorMinimumSamplesForThroughputEstimate = 2;
70 const base::TimeDelta kActivityMonitorMsThreshold =
71 base::TimeDelta::FromMilliseconds(100);
72
73 #if defined(OS_BSD)
GetIPv4AddressFromIndex(int socket,uint32_t index,uint32_t * address)74 int GetIPv4AddressFromIndex(int socket, uint32_t index, uint32_t* address) {
75 if (!index) {
76 *address = htonl(INADDR_ANY);
77 return OK;
78 }
79
80 sockaddr_in* result = nullptr;
81
82 ifreq ifr;
83 ifr.ifr_addr.sa_family = AF_INET;
84 if (!if_indextoname(index, ifr.ifr_name))
85 return MapSystemError(errno);
86 int rv = ioctl(socket, SIOCGIFADDR, &ifr);
87 if (rv == -1)
88 return MapSystemError(errno);
89 result = reinterpret_cast<sockaddr_in*>(&ifr.ifr_addr);
90
91 if (!result)
92 return ERR_ADDRESS_INVALID;
93
94 *address = result->sin_addr.s_addr;
95 return OK;
96 }
97 #endif
98
99 #if defined(OS_MAC)
100
101 // On OSX the file descriptor is guarded to detect the cause of
102 // crbug.com/640281. guarded API is supported only on newer versions of OSX,
103 // so the symbols need to be resolved dynamically.
104 // TODO(zhongyi): Removed this code once the bug is resolved.
105
106 typedef uint64_t guardid_t;
107
108 typedef int (*GuardedCloseNpFunction)(int fd, const guardid_t* guard);
109 typedef int (*ChangeFdguardNpFunction)(int fd,
110 const guardid_t* guard,
111 u_int flags,
112 const guardid_t* nguard,
113 u_int nflags,
114 int* fdflagsp);
115
116 GuardedCloseNpFunction g_guarded_close_np = nullptr;
117 ChangeFdguardNpFunction g_change_fdguard_np = nullptr;
118
119 pthread_once_t g_guarded_functions_once = PTHREAD_ONCE_INIT;
120
InitGuardedFunctions()121 void InitGuardedFunctions() {
122 void* libsystem_handle =
123 dlopen("/usr/lib/libSystem.dylib", RTLD_LAZY | RTLD_LOCAL | RTLD_NOLOAD);
124 if (libsystem_handle) {
125 g_guarded_close_np = reinterpret_cast<GuardedCloseNpFunction>(
126 dlsym(libsystem_handle, "guarded_close_np"));
127 g_change_fdguard_np = reinterpret_cast<ChangeFdguardNpFunction>(
128 dlsym(libsystem_handle, "change_fdguard_np"));
129
130 // If for any reason only one of the functions is found, set both of them to
131 // nullptr.
132 if (!g_guarded_close_np || !g_change_fdguard_np) {
133 g_guarded_close_np = nullptr;
134 g_change_fdguard_np = nullptr;
135 }
136 }
137 }
138
change_fdguard_np(int fd,const guardid_t * guard,u_int flags,const guardid_t * nguard,u_int nflags,int * fdflagsp)139 int change_fdguard_np(int fd,
140 const guardid_t* guard,
141 u_int flags,
142 const guardid_t* nguard,
143 u_int nflags,
144 int* fdflagsp) {
145 CHECK_EQ(pthread_once(&g_guarded_functions_once, InitGuardedFunctions), 0);
146 // Older version of OSX may not support guarded API.
147 if (!g_change_fdguard_np)
148 return 0;
149 return g_change_fdguard_np(fd, guard, flags, nguard, nflags, fdflagsp);
150 }
151
guarded_close_np(int fd,const guardid_t * guard)152 int guarded_close_np(int fd, const guardid_t* guard) {
153 // Older version of OSX may not support guarded API.
154 if (!g_guarded_close_np)
155 return close(fd);
156
157 return g_guarded_close_np(fd, guard);
158 }
159
160 const unsigned int GUARD_CLOSE = 1u << 0;
161 const unsigned int GUARD_DUP = 1u << 1;
162
163 const guardid_t kSocketFdGuard = 0xD712BC0BC9A4EAD4;
164
165 #endif // defined(OS_MAC)
166
GetSocketFDHash(int fd)167 int GetSocketFDHash(int fd) {
168 return fd ^ 1595649551;
169 }
170
171 } // namespace
172
UDPSocketPosix(DatagramSocket::BindType bind_type,net::NetLog * net_log,const net::NetLogSource & source)173 UDPSocketPosix::UDPSocketPosix(DatagramSocket::BindType bind_type,
174 net::NetLog* net_log,
175 const net::NetLogSource& source)
176 : write_async_watcher_(std::make_unique<WriteAsyncWatcher>(this)),
177 sender_(new UDPSocketPosixSender()),
178 socket_(kInvalidSocket),
179 socket_hash_(0),
180 addr_family_(0),
181 is_connected_(false),
182 socket_options_(SOCKET_OPTION_MULTICAST_LOOP),
183 sendto_flags_(0),
184 multicast_interface_(0),
185 multicast_time_to_live_(1),
186 bind_type_(bind_type),
187 read_socket_watcher_(FROM_HERE),
188 write_socket_watcher_(FROM_HERE),
189 read_watcher_(this),
190 write_watcher_(this),
191 last_async_result_(0),
192 write_async_timer_running_(false),
193 write_async_outstanding_(0),
194 read_buf_len_(0),
195 recv_from_address_(nullptr),
196 write_buf_len_(0),
197 net_log_(NetLogWithSource::Make(net_log, NetLogSourceType::UDP_SOCKET)),
198 bound_network_(NetworkChangeNotifier::kInvalidNetworkHandle),
199 experimental_recv_optimization_enabled_(false) {
200 net_log_.BeginEventReferencingSource(NetLogEventType::SOCKET_ALIVE, source);
201 }
202
~UDPSocketPosix()203 UDPSocketPosix::~UDPSocketPosix() {
204 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
205 Close();
206 net_log_.EndEvent(NetLogEventType::SOCKET_ALIVE);
207 }
208
Open(AddressFamily address_family)209 int UDPSocketPosix::Open(AddressFamily address_family) {
210 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
211 DCHECK_EQ(socket_, kInvalidSocket);
212
213 auto owned_socket_count = TryAcquireGlobalUDPSocketCount();
214 if (owned_socket_count.empty())
215 return ERR_INSUFFICIENT_RESOURCES;
216
217 addr_family_ = ConvertAddressFamily(address_family);
218 socket_ = CreatePlatformSocket(addr_family_, SOCK_DGRAM, 0);
219 if (socket_ == kInvalidSocket)
220 return MapSystemError(errno);
221 #if defined(OS_MAC)
222 PCHECK(change_fdguard_np(socket_, nullptr, 0, &kSocketFdGuard,
223 GUARD_CLOSE | GUARD_DUP, nullptr) == 0);
224 #endif // defined(OS_MAC)
225 socket_hash_ = GetSocketFDHash(socket_);
226 if (!base::SetNonBlocking(socket_)) {
227 const int err = MapSystemError(errno);
228 Close();
229 return err;
230 }
231 if (tag_ != SocketTag())
232 tag_.Apply(socket_);
233
234 owned_socket_count_ = std::move(owned_socket_count);
235 return OK;
236 }
237
Increment(uint32_t bytes)238 void UDPSocketPosix::ActivityMonitor::Increment(uint32_t bytes) {
239 if (!bytes)
240 return;
241 bool timer_running = timer_.IsRunning();
242 bytes_ += bytes;
243 increments_++;
244 // Allow initial updates to make sure throughput estimator has
245 // enough samples to generate a value. (low water mark)
246 // Or once the bytes threshold has be met. (high water mark)
247 if (increments_ < kActivityMonitorMinimumSamplesForThroughputEstimate ||
248 bytes_ > kActivityMonitorBytesThreshold) {
249 Update();
250 if (timer_running)
251 timer_.Reset();
252 }
253 if (!timer_running) {
254 timer_.Start(FROM_HERE, kActivityMonitorMsThreshold, this,
255 &UDPSocketPosix::ActivityMonitor::OnTimerFired);
256 }
257 }
258
Update()259 void UDPSocketPosix::ActivityMonitor::Update() {
260 if (!bytes_)
261 return;
262 NetworkActivityMonitorIncrement(bytes_);
263 bytes_ = 0;
264 }
265
OnClose()266 void UDPSocketPosix::ActivityMonitor::OnClose() {
267 timer_.Stop();
268 Update();
269 }
270
OnTimerFired()271 void UDPSocketPosix::ActivityMonitor::OnTimerFired() {
272 increments_ = 0;
273 if (!bytes_) {
274 // Can happen if the socket has been idle and have had no
275 // increments since the timer previously fired. Don't bother
276 // keeping the timer running in this case.
277 timer_.Stop();
278 return;
279 }
280 Update();
281 }
282
NetworkActivityMonitorIncrement(uint32_t bytes)283 void UDPSocketPosix::SentActivityMonitor::NetworkActivityMonitorIncrement(
284 uint32_t bytes) {
285 NetworkActivityMonitor::GetInstance()->IncrementBytesSent(bytes);
286 }
287
NetworkActivityMonitorIncrement(uint32_t bytes)288 void UDPSocketPosix::ReceivedActivityMonitor::NetworkActivityMonitorIncrement(
289 uint32_t bytes) {
290 NetworkActivityMonitor::GetInstance()->IncrementBytesReceived(bytes);
291 }
292
Close()293 void UDPSocketPosix::Close() {
294 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
295
296 owned_socket_count_.Reset();
297
298 if (socket_ == kInvalidSocket)
299 return;
300
301 // Zero out any pending read/write callback state.
302 read_buf_.reset();
303 read_buf_len_ = 0;
304 read_callback_.Reset();
305 recv_from_address_ = nullptr;
306 write_buf_.reset();
307 write_buf_len_ = 0;
308 write_callback_.Reset();
309 send_to_address_.reset();
310
311 bool ok = read_socket_watcher_.StopWatchingFileDescriptor();
312 DCHECK(ok);
313 ok = write_socket_watcher_.StopWatchingFileDescriptor();
314 DCHECK(ok);
315
316 // Verify that |socket_| hasn't been corrupted. Needed to debug
317 // crbug.com/906005.
318 CHECK_EQ(socket_hash_, GetSocketFDHash(socket_));
319 #if defined(OS_MAC)
320 PCHECK(IGNORE_EINTR(guarded_close_np(socket_, &kSocketFdGuard)) == 0);
321 #else
322 PCHECK(IGNORE_EINTR(close(socket_)) == 0);
323 #endif // defined(OS_MAC)
324
325 socket_ = kInvalidSocket;
326 addr_family_ = 0;
327 is_connected_ = false;
328 tag_ = SocketTag();
329
330 write_async_timer_.Stop();
331 sent_activity_monitor_.OnClose();
332 received_activity_monitor_.OnClose();
333 }
334
GetPeerAddress(IPEndPoint * address) const335 int UDPSocketPosix::GetPeerAddress(IPEndPoint* address) const {
336 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
337 DCHECK(address);
338 if (!is_connected())
339 return ERR_SOCKET_NOT_CONNECTED;
340
341 if (!remote_address_.get()) {
342 SockaddrStorage storage;
343 if (getpeername(socket_, storage.addr, &storage.addr_len))
344 return MapSystemError(errno);
345 std::unique_ptr<IPEndPoint> address(new IPEndPoint());
346 if (!address->FromSockAddr(storage.addr, storage.addr_len))
347 return ERR_ADDRESS_INVALID;
348 remote_address_ = std::move(address);
349 }
350
351 *address = *remote_address_;
352 return OK;
353 }
354
GetLocalAddress(IPEndPoint * address) const355 int UDPSocketPosix::GetLocalAddress(IPEndPoint* address) const {
356 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
357 DCHECK(address);
358 if (!is_connected())
359 return ERR_SOCKET_NOT_CONNECTED;
360
361 if (!local_address_.get()) {
362 SockaddrStorage storage;
363 if (getsockname(socket_, storage.addr, &storage.addr_len))
364 return MapSystemError(errno);
365 std::unique_ptr<IPEndPoint> address(new IPEndPoint());
366 if (!address->FromSockAddr(storage.addr, storage.addr_len))
367 return ERR_ADDRESS_INVALID;
368 local_address_ = std::move(address);
369 net_log_.AddEvent(NetLogEventType::UDP_LOCAL_ADDRESS, [&] {
370 return CreateNetLogUDPConnectParams(*local_address_, bound_network_);
371 });
372 }
373
374 *address = *local_address_;
375 return OK;
376 }
377
Read(IOBuffer * buf,int buf_len,CompletionOnceCallback callback)378 int UDPSocketPosix::Read(IOBuffer* buf,
379 int buf_len,
380 CompletionOnceCallback callback) {
381 return RecvFrom(buf, buf_len, nullptr, std::move(callback));
382 }
383
RecvFrom(IOBuffer * buf,int buf_len,IPEndPoint * address,CompletionOnceCallback callback)384 int UDPSocketPosix::RecvFrom(IOBuffer* buf,
385 int buf_len,
386 IPEndPoint* address,
387 CompletionOnceCallback callback) {
388 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
389 DCHECK_NE(kInvalidSocket, socket_);
390 CHECK(read_callback_.is_null());
391 DCHECK(!recv_from_address_);
392 DCHECK(!callback.is_null()); // Synchronous operation not supported
393 DCHECK_GT(buf_len, 0);
394
395 int nread = InternalRecvFrom(buf, buf_len, address);
396 if (nread != ERR_IO_PENDING)
397 return nread;
398
399 if (!base::CurrentIOThread::Get()->WatchFileDescriptor(
400 socket_, true, base::MessagePumpForIO::WATCH_READ,
401 &read_socket_watcher_, &read_watcher_)) {
402 PLOG(ERROR) << "WatchFileDescriptor failed on read";
403 int result = MapSystemError(errno);
404 LogRead(result, nullptr, 0, nullptr);
405 return result;
406 }
407
408 read_buf_ = buf;
409 read_buf_len_ = buf_len;
410 recv_from_address_ = address;
411 read_callback_ = std::move(callback);
412 return ERR_IO_PENDING;
413 }
414
Write(IOBuffer * buf,int buf_len,CompletionOnceCallback callback,const NetworkTrafficAnnotationTag & traffic_annotation)415 int UDPSocketPosix::Write(
416 IOBuffer* buf,
417 int buf_len,
418 CompletionOnceCallback callback,
419 const NetworkTrafficAnnotationTag& traffic_annotation) {
420 return SendToOrWrite(buf, buf_len, nullptr, std::move(callback));
421 }
422
SendTo(IOBuffer * buf,int buf_len,const IPEndPoint & address,CompletionOnceCallback callback)423 int UDPSocketPosix::SendTo(IOBuffer* buf,
424 int buf_len,
425 const IPEndPoint& address,
426 CompletionOnceCallback callback) {
427 return SendToOrWrite(buf, buf_len, &address, std::move(callback));
428 }
429
SendToOrWrite(IOBuffer * buf,int buf_len,const IPEndPoint * address,CompletionOnceCallback callback)430 int UDPSocketPosix::SendToOrWrite(IOBuffer* buf,
431 int buf_len,
432 const IPEndPoint* address,
433 CompletionOnceCallback callback) {
434 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
435 DCHECK_NE(kInvalidSocket, socket_);
436 CHECK(write_callback_.is_null());
437 DCHECK(!callback.is_null()); // Synchronous operation not supported
438 DCHECK_GT(buf_len, 0);
439
440 int result = InternalSendTo(buf, buf_len, address);
441 if (result != ERR_IO_PENDING)
442 return result;
443
444 if (!base::CurrentIOThread::Get()->WatchFileDescriptor(
445 socket_, true, base::MessagePumpForIO::WATCH_WRITE,
446 &write_socket_watcher_, &write_watcher_)) {
447 DVPLOG(1) << "WatchFileDescriptor failed on write";
448 int result = MapSystemError(errno);
449 LogWrite(result, nullptr, nullptr);
450 return result;
451 }
452
453 write_buf_ = buf;
454 write_buf_len_ = buf_len;
455 DCHECK(!send_to_address_.get());
456 if (address) {
457 send_to_address_.reset(new IPEndPoint(*address));
458 }
459 write_callback_ = std::move(callback);
460 return ERR_IO_PENDING;
461 }
462
Connect(const IPEndPoint & address)463 int UDPSocketPosix::Connect(const IPEndPoint& address) {
464 DCHECK_NE(socket_, kInvalidSocket);
465 net_log_.BeginEvent(NetLogEventType::UDP_CONNECT, [&] {
466 return CreateNetLogUDPConnectParams(address, bound_network_);
467 });
468 int rv = SetMulticastOptions();
469 if (rv != OK)
470 return rv;
471 rv = InternalConnect(address);
472 net_log_.EndEventWithNetErrorCode(NetLogEventType::UDP_CONNECT, rv);
473 is_connected_ = (rv == OK);
474 if (rv != OK)
475 tag_ = SocketTag();
476 return rv;
477 }
478
InternalConnect(const IPEndPoint & address)479 int UDPSocketPosix::InternalConnect(const IPEndPoint& address) {
480 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
481 DCHECK(!is_connected());
482 DCHECK(!remote_address_.get());
483
484 int rv = 0;
485 if (bind_type_ == DatagramSocket::RANDOM_BIND) {
486 // Construct IPAddress of appropriate size (IPv4 or IPv6) of 0s,
487 // representing INADDR_ANY or in6addr_any.
488 size_t addr_size = address.GetSockAddrFamily() == AF_INET
489 ? IPAddress::kIPv4AddressSize
490 : IPAddress::kIPv6AddressSize;
491 rv = RandomBind(IPAddress::AllZeros(addr_size));
492 }
493 // else connect() does the DatagramSocket::DEFAULT_BIND
494
495 if (rv < 0) {
496 base::UmaHistogramSparse("Net.UdpSocketRandomBindErrorCode", -rv);
497 return rv;
498 }
499
500 SockaddrStorage storage;
501 if (!address.ToSockAddr(storage.addr, &storage.addr_len))
502 return ERR_ADDRESS_INVALID;
503
504 rv = HANDLE_EINTR(connect(socket_, storage.addr, storage.addr_len));
505 if (rv < 0)
506 return MapSystemError(errno);
507
508 remote_address_.reset(new IPEndPoint(address));
509 return rv;
510 }
511
Bind(const IPEndPoint & address)512 int UDPSocketPosix::Bind(const IPEndPoint& address) {
513 DCHECK_NE(socket_, kInvalidSocket);
514 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
515 DCHECK(!is_connected());
516
517 int rv = SetMulticastOptions();
518 if (rv < 0)
519 return rv;
520
521 rv = DoBind(address);
522 if (rv < 0)
523 return rv;
524
525 is_connected_ = true;
526 local_address_.reset();
527 return rv;
528 }
529
BindToNetwork(NetworkChangeNotifier::NetworkHandle network)530 int UDPSocketPosix::BindToNetwork(
531 NetworkChangeNotifier::NetworkHandle network) {
532 DCHECK_NE(socket_, kInvalidSocket);
533 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
534 DCHECK(!is_connected());
535 if (network == NetworkChangeNotifier::kInvalidNetworkHandle)
536 return ERR_INVALID_ARGUMENT;
537 #if defined(OS_ANDROID)
538 // Android prior to Lollipop didn't have support for binding sockets to
539 // networks.
540 if (base::android::BuildInfo::GetInstance()->sdk_int() <
541 base::android::SDK_VERSION_LOLLIPOP) {
542 return ERR_NOT_IMPLEMENTED;
543 }
544 int rv;
545 // On Android M and newer releases use supported NDK API. On Android L use
546 // setNetworkForSocket from libnetd_client.so.
547 if (base::android::BuildInfo::GetInstance()->sdk_int() >=
548 base::android::SDK_VERSION_MARSHMALLOW) {
549 // See declaration of android_setsocknetwork() here:
550 // http://androidxref.com/6.0.0_r1/xref/development/ndk/platforms/android-M/include/android/multinetwork.h#65
551 // Function cannot be called directly as it will cause app to fail to load
552 // on pre-marshmallow devices.
553 typedef int (*MarshmallowSetNetworkForSocket)(int64_t netId, int socketFd);
554 static MarshmallowSetNetworkForSocket marshmallowSetNetworkForSocket;
555 // This is racy, but all racers should come out with the same answer so it
556 // shouldn't matter.
557 if (!marshmallowSetNetworkForSocket) {
558 base::FilePath file(base::GetNativeLibraryName("android"));
559 void* dl = dlopen(file.value().c_str(), RTLD_NOW);
560 marshmallowSetNetworkForSocket =
561 reinterpret_cast<MarshmallowSetNetworkForSocket>(
562 dlsym(dl, "android_setsocknetwork"));
563 }
564 if (!marshmallowSetNetworkForSocket)
565 return ERR_NOT_IMPLEMENTED;
566 rv = marshmallowSetNetworkForSocket(network, socket_);
567 if (rv)
568 rv = errno;
569 } else {
570 // NOTE(pauljensen): This does rely on Android implementation details, but
571 // they won't change because Lollipop is already released.
572 typedef int (*LollipopSetNetworkForSocket)(unsigned netId, int socketFd);
573 static LollipopSetNetworkForSocket lollipopSetNetworkForSocket;
574 // This is racy, but all racers should come out with the same answer so it
575 // shouldn't matter.
576 if (!lollipopSetNetworkForSocket) {
577 // Android's netd client library should always be loaded in our address
578 // space as it shims socket() which was used to create |socket_|.
579 base::FilePath file(base::GetNativeLibraryName("netd_client"));
580 // Use RTLD_NOW to match Android's prior loading of the library:
581 // http://androidxref.com/6.0.0_r5/xref/bionic/libc/bionic/NetdClient.cpp#37
582 // Use RTLD_NOLOAD to assert that the library is already loaded and
583 // avoid doing any disk IO.
584 void* dl = dlopen(file.value().c_str(), RTLD_NOW | RTLD_NOLOAD);
585 lollipopSetNetworkForSocket =
586 reinterpret_cast<LollipopSetNetworkForSocket>(
587 dlsym(dl, "setNetworkForSocket"));
588 }
589 if (!lollipopSetNetworkForSocket)
590 return ERR_NOT_IMPLEMENTED;
591 rv = -lollipopSetNetworkForSocket(network, socket_);
592 }
593 // If |network| has since disconnected, |rv| will be ENONET. Surface this as
594 // ERR_NETWORK_CHANGED, rather than MapSystemError(ENONET) which gives back
595 // the less descriptive ERR_FAILED.
596 if (rv == ENONET)
597 return ERR_NETWORK_CHANGED;
598 if (rv == 0)
599 bound_network_ = network;
600 return MapSystemError(rv);
601 #else
602 NOTIMPLEMENTED();
603 return ERR_NOT_IMPLEMENTED;
604 #endif
605 }
606
SetReceiveBufferSize(int32_t size)607 int UDPSocketPosix::SetReceiveBufferSize(int32_t size) {
608 DCHECK_NE(socket_, kInvalidSocket);
609 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
610 return SetSocketReceiveBufferSize(socket_, size);
611 }
612
SetSendBufferSize(int32_t size)613 int UDPSocketPosix::SetSendBufferSize(int32_t size) {
614 DCHECK_NE(socket_, kInvalidSocket);
615 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
616 return SetSocketSendBufferSize(socket_, size);
617 }
618
SetDoNotFragment()619 int UDPSocketPosix::SetDoNotFragment() {
620 DCHECK_NE(socket_, kInvalidSocket);
621 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
622
623 #if !defined(IP_PMTUDISC_DO)
624 return ERR_NOT_IMPLEMENTED;
625 #else
626 if (addr_family_ == AF_INET6) {
627 int val = IPV6_PMTUDISC_DO;
628 if (setsockopt(socket_, IPPROTO_IPV6, IPV6_MTU_DISCOVER, &val,
629 sizeof(val)) != 0) {
630 return MapSystemError(errno);
631 }
632
633 int v6_only = false;
634 socklen_t v6_only_len = sizeof(v6_only);
635 if (getsockopt(socket_, IPPROTO_IPV6, IPV6_V6ONLY, &v6_only,
636 &v6_only_len) != 0) {
637 return MapSystemError(errno);
638 }
639
640 if (v6_only)
641 return OK;
642 }
643
644 int val = IP_PMTUDISC_DO;
645 int rv = setsockopt(socket_, IPPROTO_IP, IP_MTU_DISCOVER, &val, sizeof(val));
646 return rv == 0 ? OK : MapSystemError(errno);
647 #endif
648 }
649
SetMsgConfirm(bool confirm)650 void UDPSocketPosix::SetMsgConfirm(bool confirm) {
651 #if !defined(OS_APPLE) && !defined(OS_BSD)
652 if (confirm) {
653 sendto_flags_ |= MSG_CONFIRM;
654 } else {
655 sendto_flags_ &= ~MSG_CONFIRM;
656 }
657 #endif // !defined(OS_APPLE) && !defined(OS_BSD)
658 }
659
AllowAddressReuse()660 int UDPSocketPosix::AllowAddressReuse() {
661 DCHECK_NE(socket_, kInvalidSocket);
662 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
663 DCHECK(!is_connected());
664 return SetReuseAddr(socket_, true);
665 }
666
SetBroadcast(bool broadcast)667 int UDPSocketPosix::SetBroadcast(bool broadcast) {
668 DCHECK_NE(socket_, kInvalidSocket);
669 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
670 int value = broadcast ? 1 : 0;
671 int rv;
672 #if defined(OS_APPLE)
673 // SO_REUSEPORT on OSX permits multiple processes to each receive
674 // UDP multicast or broadcast datagrams destined for the bound
675 // port.
676 // This is only being set on OSX because its behavior is platform dependent
677 // and we are playing it safe by only setting it on platforms where things
678 // break.
679 rv = setsockopt(socket_, SOL_SOCKET, SO_REUSEPORT, &value, sizeof(value));
680 if (rv != 0)
681 return MapSystemError(errno);
682 #endif // defined(OS_APPLE)
683 rv = setsockopt(socket_, SOL_SOCKET, SO_BROADCAST, &value, sizeof(value));
684
685 return rv == 0 ? OK : MapSystemError(errno);
686 }
687
AllowAddressSharingForMulticast()688 int UDPSocketPosix::AllowAddressSharingForMulticast() {
689 DCHECK_NE(socket_, kInvalidSocket);
690 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
691 DCHECK(!is_connected());
692
693 int rv = AllowAddressReuse();
694 if (rv != OK)
695 return rv;
696
697 #ifdef SO_REUSEPORT
698 // Attempt to set SO_REUSEPORT if available. On some platforms, this is
699 // necessary to allow the address to be fully shared between separate sockets.
700 // On platforms where the option does not exist, SO_REUSEADDR should be
701 // sufficient to share multicast packets if such sharing is at all possible.
702 int value = 1;
703 rv = setsockopt(socket_, SOL_SOCKET, SO_REUSEPORT, &value, sizeof(value));
704 // Ignore errors that the option does not exist.
705 if (rv != 0 && errno != ENOPROTOOPT)
706 return MapSystemError(errno);
707 #endif // SO_REUSEPORT
708
709 return OK;
710 }
711
OnFileCanReadWithoutBlocking(int)712 void UDPSocketPosix::ReadWatcher::OnFileCanReadWithoutBlocking(int) {
713 TRACE_EVENT0(NetTracingCategory(),
714 "UDPSocketPosix::ReadWatcher::OnFileCanReadWithoutBlocking");
715 if (!socket_->read_callback_.is_null())
716 socket_->DidCompleteRead();
717 }
718
OnFileCanWriteWithoutBlocking(int)719 void UDPSocketPosix::WriteWatcher::OnFileCanWriteWithoutBlocking(int) {
720 if (!socket_->write_callback_.is_null())
721 socket_->DidCompleteWrite();
722 }
723
DoReadCallback(int rv)724 void UDPSocketPosix::DoReadCallback(int rv) {
725 DCHECK_NE(rv, ERR_IO_PENDING);
726 DCHECK(!read_callback_.is_null());
727
728 // Since Run() may result in Read() being called,
729 // clear |read_callback_| up front.
730 std::move(read_callback_).Run(rv);
731 }
732
DoWriteCallback(int rv)733 void UDPSocketPosix::DoWriteCallback(int rv) {
734 DCHECK_NE(rv, ERR_IO_PENDING);
735 DCHECK(!write_callback_.is_null());
736
737 // Since Run() may result in Write() being called,
738 // clear |write_callback_| up front.
739 std::move(write_callback_).Run(rv);
740 }
741
DidCompleteRead()742 void UDPSocketPosix::DidCompleteRead() {
743 int result =
744 InternalRecvFrom(read_buf_.get(), read_buf_len_, recv_from_address_);
745 if (result != ERR_IO_PENDING) {
746 read_buf_.reset();
747 read_buf_len_ = 0;
748 recv_from_address_ = nullptr;
749 bool ok = read_socket_watcher_.StopWatchingFileDescriptor();
750 DCHECK(ok);
751 DoReadCallback(result);
752 }
753 }
754
LogRead(int result,const char * bytes,socklen_t addr_len,const sockaddr * addr)755 void UDPSocketPosix::LogRead(int result,
756 const char* bytes,
757 socklen_t addr_len,
758 const sockaddr* addr) {
759 if (result < 0) {
760 net_log_.AddEventWithNetErrorCode(NetLogEventType::UDP_RECEIVE_ERROR,
761 result);
762 return;
763 }
764
765 if (net_log_.IsCapturing()) {
766 DCHECK(addr_len > 0);
767 DCHECK(addr);
768
769 IPEndPoint address;
770 bool is_address_valid = address.FromSockAddr(addr, addr_len);
771 NetLogUDPDataTransfer(net_log_, NetLogEventType::UDP_BYTES_RECEIVED, result,
772 bytes, is_address_valid ? &address : nullptr);
773 }
774
775 received_activity_monitor_.Increment(result);
776 }
777
DidCompleteWrite()778 void UDPSocketPosix::DidCompleteWrite() {
779 int result =
780 InternalSendTo(write_buf_.get(), write_buf_len_, send_to_address_.get());
781
782 if (result != ERR_IO_PENDING) {
783 write_buf_.reset();
784 write_buf_len_ = 0;
785 send_to_address_.reset();
786 write_socket_watcher_.StopWatchingFileDescriptor();
787 DoWriteCallback(result);
788 }
789 }
790
LogWrite(int result,const char * bytes,const IPEndPoint * address)791 void UDPSocketPosix::LogWrite(int result,
792 const char* bytes,
793 const IPEndPoint* address) {
794 if (result < 0) {
795 net_log_.AddEventWithNetErrorCode(NetLogEventType::UDP_SEND_ERROR, result);
796 return;
797 }
798
799 if (net_log_.IsCapturing()) {
800 NetLogUDPDataTransfer(net_log_, NetLogEventType::UDP_BYTES_SENT, result,
801 bytes, address);
802 }
803
804 sent_activity_monitor_.Increment(result);
805 }
806
InternalRecvFrom(IOBuffer * buf,int buf_len,IPEndPoint * address)807 int UDPSocketPosix::InternalRecvFrom(IOBuffer* buf,
808 int buf_len,
809 IPEndPoint* address) {
810 // If the socket is connected and the remote address is known
811 // use the more efficient method that uses read() instead of recvmsg().
812 if (experimental_recv_optimization_enabled_ && is_connected_ &&
813 remote_address_) {
814 return InternalRecvFromConnectedSocket(buf, buf_len, address);
815 }
816 return InternalRecvFromNonConnectedSocket(buf, buf_len, address);
817 }
818
InternalRecvFromConnectedSocket(IOBuffer * buf,int buf_len,IPEndPoint * address)819 int UDPSocketPosix::InternalRecvFromConnectedSocket(IOBuffer* buf,
820 int buf_len,
821 IPEndPoint* address) {
822 DCHECK(is_connected_);
823 DCHECK(remote_address_);
824 int bytes_transferred;
825 bytes_transferred = HANDLE_EINTR(read(socket_, buf->data(), buf_len));
826 int result;
827
828 if (bytes_transferred < 0) {
829 result = MapSystemError(errno);
830 } else if (bytes_transferred == buf_len) {
831 result = ERR_MSG_TOO_BIG;
832 } else {
833 result = bytes_transferred;
834 if (address)
835 *address = *remote_address_.get();
836 }
837
838 if (result != ERR_IO_PENDING) {
839 SockaddrStorage sock_addr;
840 bool success =
841 remote_address_->ToSockAddr(sock_addr.addr, &sock_addr.addr_len);
842 DCHECK(success);
843 LogRead(result, buf->data(), sock_addr.addr_len, sock_addr.addr);
844 }
845 return result;
846 }
847
InternalRecvFromNonConnectedSocket(IOBuffer * buf,int buf_len,IPEndPoint * address)848 int UDPSocketPosix::InternalRecvFromNonConnectedSocket(IOBuffer* buf,
849 int buf_len,
850 IPEndPoint* address) {
851 int bytes_transferred;
852
853 struct iovec iov = {};
854 iov.iov_base = buf->data();
855 iov.iov_len = buf_len;
856
857 struct msghdr msg = {};
858 msg.msg_iov = &iov;
859 msg.msg_iovlen = 1;
860
861 SockaddrStorage storage;
862 msg.msg_name = storage.addr;
863 msg.msg_namelen = storage.addr_len;
864
865 bytes_transferred = HANDLE_EINTR(recvmsg(socket_, &msg, 0));
866 storage.addr_len = msg.msg_namelen;
867 int result;
868 if (bytes_transferred >= 0) {
869 if (msg.msg_flags & MSG_TRUNC) {
870 result = ERR_MSG_TOO_BIG;
871 } else {
872 result = bytes_transferred;
873 if (address && !address->FromSockAddr(storage.addr, storage.addr_len))
874 result = ERR_ADDRESS_INVALID;
875 }
876 } else {
877 result = MapSystemError(errno);
878 }
879 if (result != ERR_IO_PENDING)
880 LogRead(result, buf->data(), storage.addr_len, storage.addr);
881 return result;
882 }
883
InternalSendTo(IOBuffer * buf,int buf_len,const IPEndPoint * address)884 int UDPSocketPosix::InternalSendTo(IOBuffer* buf,
885 int buf_len,
886 const IPEndPoint* address) {
887 SockaddrStorage storage;
888 struct sockaddr* addr = storage.addr;
889 if (!address) {
890 addr = nullptr;
891 storage.addr_len = 0;
892 } else {
893 if (!address->ToSockAddr(storage.addr, &storage.addr_len)) {
894 int result = ERR_ADDRESS_INVALID;
895 LogWrite(result, nullptr, nullptr);
896 return result;
897 }
898 }
899
900 int result = HANDLE_EINTR(sendto(socket_, buf->data(), buf_len, sendto_flags_,
901 addr, storage.addr_len));
902 if (result < 0)
903 result = MapSystemError(errno);
904 if (result != ERR_IO_PENDING)
905 LogWrite(result, buf->data(), address);
906 return result;
907 }
908
SetMulticastOptions()909 int UDPSocketPosix::SetMulticastOptions() {
910 if (!(socket_options_ & SOCKET_OPTION_MULTICAST_LOOP)) {
911 int rv;
912 if (addr_family_ == AF_INET) {
913 u_char loop = 0;
914 rv = setsockopt(socket_, IPPROTO_IP, IP_MULTICAST_LOOP,
915 &loop, sizeof(loop));
916 } else {
917 u_int loop = 0;
918 rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_MULTICAST_LOOP,
919 &loop, sizeof(loop));
920 }
921 if (rv < 0)
922 return MapSystemError(errno);
923 }
924 if (multicast_time_to_live_ != IP_DEFAULT_MULTICAST_TTL) {
925 int rv;
926 if (addr_family_ == AF_INET) {
927 u_char ttl = multicast_time_to_live_;
928 rv = setsockopt(socket_, IPPROTO_IP, IP_MULTICAST_TTL,
929 &ttl, sizeof(ttl));
930 } else {
931 // Signed integer. -1 to use route default.
932 int ttl = multicast_time_to_live_;
933 rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_MULTICAST_HOPS,
934 &ttl, sizeof(ttl));
935 }
936 if (rv < 0)
937 return MapSystemError(errno);
938 }
939 if (multicast_interface_ != 0) {
940 switch (addr_family_) {
941 case AF_INET: {
942 //
943 // DragonFly BSD does not define ip_mreqn and setsockopt() doesn't allow
944 // to use this struct to set options via imr_ifindex.
945 //
946 #if defined(__FreeBSD__)
947 ip_mreqn mreq = {};
948 mreq.imr_ifindex = multicast_interface_;
949 mreq.imr_address.s_addr = htonl(INADDR_ANY);
950 #else
951 ip_mreq mreq = {};
952 int error = GetIPv4AddressFromIndex(socket_, multicast_interface_,
953 &mreq.imr_interface.s_addr);
954 if (error != OK)
955 return error;
956 #endif // defined(__FreeBSD__)
957 int rv = setsockopt(socket_, IPPROTO_IP, IP_MULTICAST_IF,
958 #if defined(OS_BSD)
959 # if defined(__FreeBSD__)
960 reinterpret_cast<const char*>(&mreq.imr_address.s_addr), sizeof(mreq.imr_address.s_addr));
961 # else
962 reinterpret_cast<const char*>(&mreq.imr_interface.s_addr), sizeof(mreq.imr_interface.s_addr));
963 # endif
964 #else
965 reinterpret_cast<const char*>(&mreq), sizeof(mreq));
966 #endif
967 if (rv)
968 return MapSystemError(errno);
969 break;
970 }
971 case AF_INET6: {
972 uint32_t interface_index = multicast_interface_;
973 int rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_MULTICAST_IF,
974 reinterpret_cast<const char*>(&interface_index),
975 sizeof(interface_index));
976 if (rv)
977 return MapSystemError(errno);
978 break;
979 }
980 default:
981 NOTREACHED() << "Invalid address family";
982 return ERR_ADDRESS_INVALID;
983 }
984 }
985 return OK;
986 }
987
DoBind(const IPEndPoint & address)988 int UDPSocketPosix::DoBind(const IPEndPoint& address) {
989 SockaddrStorage storage;
990 if (!address.ToSockAddr(storage.addr, &storage.addr_len))
991 return ERR_ADDRESS_INVALID;
992 int rv = bind(socket_, storage.addr, storage.addr_len);
993 if (rv == 0)
994 return OK;
995 int last_error = errno;
996 #if defined(OS_CHROMEOS)
997 if (last_error == EINVAL)
998 return ERR_ADDRESS_IN_USE;
999 #elif defined(OS_APPLE)
1000 if (last_error == EADDRNOTAVAIL)
1001 return ERR_ADDRESS_IN_USE;
1002 #endif
1003 return MapSystemError(last_error);
1004 }
1005
RandomBind(const IPAddress & address)1006 int UDPSocketPosix::RandomBind(const IPAddress& address) {
1007 DCHECK_EQ(bind_type_, DatagramSocket::RANDOM_BIND);
1008
1009 for (int i = 0; i < kBindRetries; ++i) {
1010 int rv = DoBind(IPEndPoint(address, base::RandInt(kPortStart, kPortEnd)));
1011 if (rv != ERR_ADDRESS_IN_USE)
1012 return rv;
1013 }
1014
1015 return DoBind(IPEndPoint(address, 0));
1016 }
1017
JoinGroup(const IPAddress & group_address) const1018 int UDPSocketPosix::JoinGroup(const IPAddress& group_address) const {
1019 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
1020 if (!is_connected())
1021 return ERR_SOCKET_NOT_CONNECTED;
1022
1023 switch (group_address.size()) {
1024 case IPAddress::kIPv4AddressSize: {
1025 if (addr_family_ != AF_INET)
1026 return ERR_ADDRESS_INVALID;
1027 #if defined(OS_BSD) && defined(__DragonFly__)
1028 ip_mreq mreq = {};
1029 int error = GetIPv4AddressFromIndex(socket_, multicast_interface_,
1030 &mreq.imr_interface.s_addr);
1031 if (error != OK)
1032 return error;
1033 #else
1034 ip_mreqn mreq = {};
1035 mreq.imr_ifindex = multicast_interface_;
1036 mreq.imr_address.s_addr = htonl(INADDR_ANY);
1037 #endif
1038 memcpy(&mreq.imr_multiaddr, group_address.bytes().data(),
1039 IPAddress::kIPv4AddressSize);
1040 int rv = setsockopt(socket_, IPPROTO_IP, IP_ADD_MEMBERSHIP,
1041 &mreq, sizeof(mreq));
1042 if (rv < 0)
1043 return MapSystemError(errno);
1044 return OK;
1045 }
1046 case IPAddress::kIPv6AddressSize: {
1047 if (addr_family_ != AF_INET6)
1048 return ERR_ADDRESS_INVALID;
1049 ipv6_mreq mreq;
1050 mreq.ipv6mr_interface = multicast_interface_;
1051 memcpy(&mreq.ipv6mr_multiaddr, group_address.bytes().data(),
1052 IPAddress::kIPv6AddressSize);
1053 int rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_JOIN_GROUP,
1054 &mreq, sizeof(mreq));
1055 if (rv < 0)
1056 return MapSystemError(errno);
1057 return OK;
1058 }
1059 default:
1060 NOTREACHED() << "Invalid address family";
1061 return ERR_ADDRESS_INVALID;
1062 }
1063 }
1064
LeaveGroup(const IPAddress & group_address) const1065 int UDPSocketPosix::LeaveGroup(const IPAddress& group_address) const {
1066 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
1067
1068 if (!is_connected())
1069 return ERR_SOCKET_NOT_CONNECTED;
1070
1071 switch (group_address.size()) {
1072 case IPAddress::kIPv4AddressSize: {
1073 if (addr_family_ != AF_INET)
1074 return ERR_ADDRESS_INVALID;
1075 #if defined(OS_BSD)
1076 ip_mreq mreq = {};
1077 int error = GetIPv4AddressFromIndex(socket_, multicast_interface_,
1078 &mreq.imr_interface.s_addr);
1079
1080 if (error != OK)
1081 return error;
1082 #else
1083 ip_mreqn mreq = {};
1084 mreq.imr_ifindex = multicast_interface_;
1085 mreq.imr_address.s_addr = INADDR_ANY;
1086 #endif
1087 memcpy(&mreq.imr_multiaddr, group_address.bytes().data(),
1088 IPAddress::kIPv4AddressSize);
1089 int rv = setsockopt(socket_, IPPROTO_IP, IP_DROP_MEMBERSHIP,
1090 &mreq, sizeof(mreq));
1091 if (rv < 0)
1092 return MapSystemError(errno);
1093 return OK;
1094 }
1095 case IPAddress::kIPv6AddressSize: {
1096 if (addr_family_ != AF_INET6)
1097 return ERR_ADDRESS_INVALID;
1098 ipv6_mreq mreq;
1099 #if defined(OS_FUCHSIA)
1100 mreq.ipv6mr_interface = multicast_interface_;
1101 #else // defined(OS_FUCHSIA)
1102 mreq.ipv6mr_interface = 0; // 0 indicates default multicast interface.
1103 #endif // !defined(OS_FUCHSIA)
1104 memcpy(&mreq.ipv6mr_multiaddr, group_address.bytes().data(),
1105 IPAddress::kIPv6AddressSize);
1106 int rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_LEAVE_GROUP,
1107 &mreq, sizeof(mreq));
1108 if (rv < 0)
1109 return MapSystemError(errno);
1110 return OK;
1111 }
1112 default:
1113 NOTREACHED() << "Invalid address family";
1114 return ERR_ADDRESS_INVALID;
1115 }
1116 }
1117
SetMulticastInterface(uint32_t interface_index)1118 int UDPSocketPosix::SetMulticastInterface(uint32_t interface_index) {
1119 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
1120 if (is_connected())
1121 return ERR_SOCKET_IS_CONNECTED;
1122 multicast_interface_ = interface_index;
1123 return OK;
1124 }
1125
SetMulticastTimeToLive(int time_to_live)1126 int UDPSocketPosix::SetMulticastTimeToLive(int time_to_live) {
1127 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
1128 if (is_connected())
1129 return ERR_SOCKET_IS_CONNECTED;
1130
1131 if (time_to_live < 0 || time_to_live > 255)
1132 return ERR_INVALID_ARGUMENT;
1133 multicast_time_to_live_ = time_to_live;
1134 return OK;
1135 }
1136
SetMulticastLoopbackMode(bool loopback)1137 int UDPSocketPosix::SetMulticastLoopbackMode(bool loopback) {
1138 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
1139 if (is_connected())
1140 return ERR_SOCKET_IS_CONNECTED;
1141
1142 if (loopback)
1143 socket_options_ |= SOCKET_OPTION_MULTICAST_LOOP;
1144 else
1145 socket_options_ &= ~SOCKET_OPTION_MULTICAST_LOOP;
1146 return OK;
1147 }
1148
SetDiffServCodePoint(DiffServCodePoint dscp)1149 int UDPSocketPosix::SetDiffServCodePoint(DiffServCodePoint dscp) {
1150 if (dscp == DSCP_NO_CHANGE) {
1151 return OK;
1152 }
1153
1154 int dscp_and_ecn = dscp << 2;
1155 // Set the IPv4 option in all cases to support dual-stack sockets.
1156 int rv = setsockopt(socket_, IPPROTO_IP, IP_TOS, &dscp_and_ecn,
1157 sizeof(dscp_and_ecn));
1158 if (addr_family_ == AF_INET6) {
1159 // In the IPv6 case, the previous socksetopt may fail because of a lack of
1160 // dual-stack support. Therefore ignore the previous return value.
1161 rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_TCLASS,
1162 &dscp_and_ecn, sizeof(dscp_and_ecn));
1163 }
1164 if (rv < 0)
1165 return MapSystemError(errno);
1166
1167 return OK;
1168 }
1169
DetachFromThread()1170 void UDPSocketPosix::DetachFromThread() {
1171 DETACH_FROM_THREAD(thread_checker_);
1172 }
1173
ApplySocketTag(const SocketTag & tag)1174 void UDPSocketPosix::ApplySocketTag(const SocketTag& tag) {
1175 DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
1176 if (socket_ != kInvalidSocket && tag != tag_) {
1177 tag.Apply(socket_);
1178 }
1179 tag_ = tag;
1180 }
1181
UDPSocketPosixSender()1182 UDPSocketPosixSender::UDPSocketPosixSender() : sendmmsg_enabled_(false) {}
~UDPSocketPosixSender()1183 UDPSocketPosixSender::~UDPSocketPosixSender() {}
1184
SendResult()1185 SendResult::SendResult() : rv(0), write_count(0) {}
~SendResult()1186 SendResult::~SendResult() {}
SendResult(int _rv,int _write_count,DatagramBuffers _buffers)1187 SendResult::SendResult(int _rv, int _write_count, DatagramBuffers _buffers)
1188 : rv(_rv), write_count(_write_count), buffers(std::move(_buffers)) {}
1189 SendResult::SendResult(SendResult&& other) = default;
1190
InternalSendBuffers(int fd,DatagramBuffers buffers) const1191 SendResult UDPSocketPosixSender::InternalSendBuffers(
1192 int fd,
1193 DatagramBuffers buffers) const {
1194 int rv = 0;
1195 int write_count = 0;
1196 for (auto& buffer : buffers) {
1197 int result = HANDLE_EINTR(Send(fd, buffer->data(), buffer->length(), 0));
1198 if (result < 0) {
1199 rv = MapSystemError(errno);
1200 break;
1201 }
1202 write_count++;
1203 }
1204 return SendResult(rv, write_count, std::move(buffers));
1205 }
1206
1207 #if HAVE_SENDMMSG
InternalSendmmsgBuffers(int fd,DatagramBuffers buffers) const1208 SendResult UDPSocketPosixSender::InternalSendmmsgBuffers(
1209 int fd,
1210 DatagramBuffers buffers) const {
1211 base::StackVector<struct iovec, kWriteAsyncMaxBuffersThreshold + 1> msg_iov;
1212 base::StackVector<struct mmsghdr, kWriteAsyncMaxBuffersThreshold + 1> msgvec;
1213 msg_iov->reserve(buffers.size());
1214 for (auto& buffer : buffers)
1215 msg_iov->push_back({const_cast<char*>(buffer->data()), buffer->length()});
1216 msgvec->reserve(buffers.size());
1217 for (size_t j = 0; j < buffers.size(); j++)
1218 msgvec->push_back({{nullptr, 0, &msg_iov[j], 1, nullptr, 0, 0}, 0});
1219 int result = HANDLE_EINTR(Sendmmsg(fd, &msgvec[0], buffers.size(), 0));
1220 SendResult send_result(0, 0, std::move(buffers));
1221 if (result < 0) {
1222 send_result.rv = MapSystemError(errno);
1223 } else {
1224 send_result.write_count = result;
1225 }
1226 return send_result;
1227 }
1228 #endif
1229
SendBuffers(int fd,DatagramBuffers buffers)1230 SendResult UDPSocketPosixSender::SendBuffers(int fd, DatagramBuffers buffers) {
1231 #if HAVE_SENDMMSG
1232 if (sendmmsg_enabled_) {
1233 auto result = InternalSendmmsgBuffers(fd, std::move(buffers));
1234 if (LIKELY(result.rv != ERR_NOT_IMPLEMENTED)) {
1235 return result;
1236 }
1237 DLOG(WARNING) << "senddmsg() not implemented, falling back to send()";
1238 sendmmsg_enabled_ = false;
1239 buffers = std::move(result.buffers);
1240 }
1241 #endif
1242 return InternalSendBuffers(fd, std::move(buffers));
1243 }
1244
Send(int sockfd,const void * buf,size_t len,int flags) const1245 ssize_t UDPSocketPosixSender::Send(int sockfd,
1246 const void* buf,
1247 size_t len,
1248 int flags) const {
1249 return send(sockfd, buf, len, flags);
1250 }
1251
1252 #if HAVE_SENDMMSG
Sendmmsg(int sockfd,struct mmsghdr * msgvec,unsigned int vlen,unsigned int flags) const1253 int UDPSocketPosixSender::Sendmmsg(int sockfd,
1254 struct mmsghdr* msgvec,
1255 unsigned int vlen,
1256 unsigned int flags) const {
1257 return sendmmsg(sockfd, msgvec, vlen, flags);
1258 }
1259 #endif
1260
WriteAsync(const char * buffer,size_t buf_len,CompletionOnceCallback callback,const NetworkTrafficAnnotationTag & traffic_annotation)1261 int UDPSocketPosix::WriteAsync(
1262 const char* buffer,
1263 size_t buf_len,
1264 CompletionOnceCallback callback,
1265 const NetworkTrafficAnnotationTag& traffic_annotation) {
1266 DCHECK(datagram_buffer_pool_ != nullptr);
1267 IncreaseWriteAsyncOutstanding(1);
1268 datagram_buffer_pool_->Enqueue(buffer, buf_len, &pending_writes_);
1269 return InternalWriteAsync(std::move(callback), traffic_annotation);
1270 }
1271
WriteAsync(DatagramBuffers buffers,CompletionOnceCallback callback,const NetworkTrafficAnnotationTag & traffic_annotation)1272 int UDPSocketPosix::WriteAsync(
1273 DatagramBuffers buffers,
1274 CompletionOnceCallback callback,
1275 const NetworkTrafficAnnotationTag& traffic_annotation) {
1276 IncreaseWriteAsyncOutstanding(buffers.size());
1277 pending_writes_.splice(pending_writes_.end(), std::move(buffers));
1278 return InternalWriteAsync(std::move(callback), traffic_annotation);
1279 }
1280
InternalWriteAsync(CompletionOnceCallback callback,const NetworkTrafficAnnotationTag & traffic_annotation)1281 int UDPSocketPosix::InternalWriteAsync(
1282 CompletionOnceCallback callback,
1283 const NetworkTrafficAnnotationTag& traffic_annotation) {
1284 CHECK(write_callback_.is_null());
1285
1286 // Surface error immediately if one is pending.
1287 if (last_async_result_ < 0) {
1288 return ResetLastAsyncResult();
1289 }
1290
1291 size_t flush_threshold =
1292 write_batching_active_ ? kWriteAsyncPostBuffersThreshold : 1;
1293 if (pending_writes_.size() >= flush_threshold) {
1294 FlushPending();
1295 // Surface error immediately if one is pending.
1296 if (last_async_result_ < 0) {
1297 return ResetLastAsyncResult();
1298 }
1299 }
1300
1301 if (!write_async_timer_running_) {
1302 write_async_timer_running_ = true;
1303 write_async_timer_.Start(FROM_HERE, kWriteAsyncMsThreshold, this,
1304 &UDPSocketPosix::OnWriteAsyncTimerFired);
1305 }
1306
1307 int blocking_threshold =
1308 write_batching_active_ ? kWriteAsyncMaxBuffersThreshold : 1;
1309 if (write_async_outstanding_ >= blocking_threshold) {
1310 write_callback_ = std::move(callback);
1311 return ERR_IO_PENDING;
1312 }
1313
1314 DVLOG(2) << __func__ << " pending " << pending_writes_.size()
1315 << " outstanding " << write_async_outstanding_;
1316 return ResetWrittenBytes();
1317 }
1318
GetUnwrittenBuffers()1319 DatagramBuffers UDPSocketPosix::GetUnwrittenBuffers() {
1320 write_async_outstanding_ -= pending_writes_.size();
1321 return std::move(pending_writes_);
1322 }
1323
FlushPending()1324 void UDPSocketPosix::FlushPending() {
1325 // Nothing to do if socket is blocked.
1326 if (write_async_watcher_->watching())
1327 return;
1328
1329 if (pending_writes_.empty())
1330 return;
1331
1332 if (write_async_timer_running_)
1333 write_async_timer_.Reset();
1334
1335 int num_pending_writes = static_cast<int>(pending_writes_.size());
1336 if (!write_multi_core_enabled_ ||
1337 // Don't bother with post if not enough buffers
1338 (num_pending_writes <= kWriteAsyncMinBuffersThreshold &&
1339 // but not if there is a previous post
1340 // outstanding, to prevent out of order transmission.
1341 (num_pending_writes == write_async_outstanding_))) {
1342 LocalSendBuffers();
1343 } else {
1344 PostSendBuffers();
1345 }
1346 }
1347
1348 // TODO(ckrasic) Sad face. Do this lazily because many tests exploded
1349 // otherwise. |threading_and_tasks.md| advises to instantiate a
1350 // |base::test::TaskEnvironment| in the test, implementing that
1351 // for all tests that might exercise QUIC is too daunting. Also, in
1352 // some tests it seemed like following the advice just broke in other
1353 // ways.
GetTaskRunner()1354 base::SequencedTaskRunner* UDPSocketPosix::GetTaskRunner() {
1355 if (task_runner_ == nullptr)
1356 task_runner_ = base::ThreadPool::CreateSequencedTaskRunner({});
1357 return task_runner_.get();
1358 }
1359
OnWriteAsyncTimerFired()1360 void UDPSocketPosix::OnWriteAsyncTimerFired() {
1361 DVLOG(2) << __func__ << " pending writes " << pending_writes_.size();
1362 if (pending_writes_.empty()) {
1363 write_async_timer_.Stop();
1364 write_async_timer_running_ = false;
1365 return;
1366 }
1367 if (last_async_result_ < 0) {
1368 DVLOG(1) << __func__ << " socket not writeable";
1369 return;
1370 }
1371 FlushPending();
1372 }
1373
LocalSendBuffers()1374 void UDPSocketPosix::LocalSendBuffers() {
1375 DVLOG(1) << __func__ << " queue " << pending_writes_.size() << " out of "
1376 << write_async_outstanding_ << " total";
1377 DidSendBuffers(sender_->SendBuffers(socket_, std::move(pending_writes_)));
1378 }
1379
PostSendBuffers()1380 void UDPSocketPosix::PostSendBuffers() {
1381 DVLOG(1) << __func__ << " queue " << pending_writes_.size() << " out of "
1382 << write_async_outstanding_ << " total";
1383 base::PostTaskAndReplyWithResult(
1384 GetTaskRunner(), FROM_HERE,
1385 base::BindOnce(&UDPSocketPosixSender::SendBuffers, sender_, socket_,
1386 std::move(pending_writes_)),
1387 base::BindOnce(&UDPSocketPosix::DidSendBuffers,
1388 weak_factory_.GetWeakPtr()));
1389 }
1390
DidSendBuffers(SendResult send_result)1391 void UDPSocketPosix::DidSendBuffers(SendResult send_result) {
1392 DVLOG(3) << __func__;
1393 int write_count = send_result.write_count;
1394 DatagramBuffers& buffers = send_result.buffers;
1395
1396 DCHECK(!buffers.empty());
1397 int num_buffers = buffers.size();
1398
1399 // Dequeue buffers that have been written.
1400 if (write_count > 0) {
1401 write_async_outstanding_ -= write_count;
1402
1403 DatagramBuffers::const_iterator it;
1404 // Generate logs for written buffers
1405 it = buffers.cbegin();
1406 for (int i = 0; i < write_count; i++, it++) {
1407 auto& buffer = *it;
1408 LogWrite(buffer->length(), buffer->data(), nullptr);
1409 written_bytes_ += buffer->length();
1410 }
1411 // Return written buffers to pool
1412 DatagramBuffers written_buffers;
1413 if (write_count == num_buffers) {
1414 it = buffers.cend();
1415 } else {
1416 it = buffers.cbegin();
1417 for (int i = 0; i < write_count; i++) {
1418 it++;
1419 }
1420 }
1421 written_buffers.splice(written_buffers.cend(), buffers, buffers.cbegin(),
1422 it);
1423 DCHECK(datagram_buffer_pool_ != nullptr);
1424 datagram_buffer_pool_->Dequeue(&written_buffers);
1425 }
1426
1427 // Requeue left-over (unwritten) buffers.
1428 if (!buffers.empty()) {
1429 DVLOG(2) << __func__ << " requeue " << buffers.size() << " buffers";
1430 pending_writes_.splice(pending_writes_.begin(), std::move(buffers));
1431 }
1432
1433 last_async_result_ = send_result.rv;
1434 if (last_async_result_ == ERR_IO_PENDING) {
1435 DVLOG(2) << __func__ << " WatchFileDescriptor start";
1436 if (!WatchFileDescriptor()) {
1437 DVPLOG(1) << "WatchFileDescriptor failed on write";
1438 last_async_result_ = MapSystemError(errno);
1439 LogWrite(last_async_result_, nullptr, nullptr);
1440 } else {
1441 last_async_result_ = 0;
1442 }
1443 } else if (last_async_result_ < 0 || pending_writes_.empty()) {
1444 DVLOG(2) << __func__ << " WatchFileDescriptor stop: result "
1445 << ErrorToShortString(last_async_result_) << " pending_writes "
1446 << pending_writes_.size();
1447 StopWatchingFileDescriptor();
1448 }
1449 DCHECK(last_async_result_ != ERR_IO_PENDING);
1450
1451 if (write_callback_.is_null())
1452 return;
1453
1454 if (last_async_result_ < 0) {
1455 DVLOG(1) << last_async_result_;
1456 // Update the writer with the latest result.
1457 DoWriteCallback(ResetLastAsyncResult());
1458 } else if (write_async_outstanding_ < kWriteAsyncCallbackBuffersThreshold) {
1459 DVLOG(1) << write_async_outstanding_ << " < "
1460 << kWriteAsyncCallbackBuffersThreshold;
1461 DoWriteCallback(ResetWrittenBytes());
1462 }
1463 }
1464
OnFileCanWriteWithoutBlocking(int)1465 void UDPSocketPosix::WriteAsyncWatcher::OnFileCanWriteWithoutBlocking(int) {
1466 DVLOG(1) << __func__ << " queue " << socket_->pending_writes_.size()
1467 << " out of " << socket_->write_async_outstanding_ << " total";
1468 socket_->StopWatchingFileDescriptor();
1469 socket_->FlushPending();
1470 }
1471
WatchFileDescriptor()1472 bool UDPSocketPosix::WatchFileDescriptor() {
1473 if (write_async_watcher_->watching())
1474 return true;
1475 bool result = InternalWatchFileDescriptor();
1476 if (result) {
1477 write_async_watcher_->set_watching(true);
1478 }
1479 return result;
1480 }
1481
StopWatchingFileDescriptor()1482 void UDPSocketPosix::StopWatchingFileDescriptor() {
1483 if (!write_async_watcher_->watching())
1484 return;
1485 InternalStopWatchingFileDescriptor();
1486 write_async_watcher_->set_watching(false);
1487 }
1488
InternalWatchFileDescriptor()1489 bool UDPSocketPosix::InternalWatchFileDescriptor() {
1490 return base::CurrentIOThread::Get()->WatchFileDescriptor(
1491 socket_, true, base::MessagePumpForIO::WATCH_WRITE,
1492 &write_socket_watcher_, write_async_watcher_.get());
1493 }
1494
InternalStopWatchingFileDescriptor()1495 void UDPSocketPosix::InternalStopWatchingFileDescriptor() {
1496 bool ok = write_socket_watcher_.StopWatchingFileDescriptor();
1497 DCHECK(ok);
1498 }
1499
SetMaxPacketSize(size_t max_packet_size)1500 void UDPSocketPosix::SetMaxPacketSize(size_t max_packet_size) {
1501 datagram_buffer_pool_ = std::make_unique<DatagramBufferPool>(max_packet_size);
1502 }
1503
ResetLastAsyncResult()1504 int UDPSocketPosix::ResetLastAsyncResult() {
1505 int result = last_async_result_;
1506 last_async_result_ = 0;
1507 return result;
1508 }
1509
ResetWrittenBytes()1510 int UDPSocketPosix::ResetWrittenBytes() {
1511 int bytes = written_bytes_;
1512 written_bytes_ = 0;
1513 return bytes;
1514 }
1515
SetIOSNetworkServiceType(int ios_network_service_type)1516 int UDPSocketPosix::SetIOSNetworkServiceType(int ios_network_service_type) {
1517 if (ios_network_service_type == 0) {
1518 return OK;
1519 }
1520 #if defined(OS_IOS)
1521 if (setsockopt(socket_, SOL_SOCKET, SO_NET_SERVICE_TYPE,
1522 &ios_network_service_type, sizeof(ios_network_service_type))) {
1523 return MapSystemError(errno);
1524 }
1525 #endif // defined(OS_IOS)
1526 return OK;
1527 }
1528
1529 } // namespace net
1530