1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/socket/udp_socket_posix.h"
6 
7 #include <errno.h>
8 #include <fcntl.h>
9 #include <net/if.h>
10 #include <netdb.h>
11 #include <netinet/in.h>
12 #include <sys/ioctl.h>
13 
14 #include "base/bind.h"
15 #include "base/callback.h"
16 #include "base/callback_helpers.h"
17 #include "base/containers/stack_container.h"
18 #include "base/debug/alias.h"
19 #include "base/files/file_util.h"
20 #include "base/logging.h"
21 #include "base/message_loop/message_loop_current.h"
22 #include "base/metrics/histogram_functions.h"
23 #include "base/posix/eintr_wrapper.h"
24 #include "base/rand_util.h"
25 #include "base/task/post_task.h"
26 #include "base/task/thread_pool.h"
27 #include "base/task_runner_util.h"
28 #include "base/trace_event/trace_event.h"
29 #include "build/build_config.h"
30 #include "net/base/io_buffer.h"
31 #include "net/base/ip_address.h"
32 #include "net/base/ip_endpoint.h"
33 #include "net/base/net_errors.h"
34 #include "net/base/network_activity_monitor.h"
35 #include "net/base/sockaddr_storage.h"
36 #include "net/base/trace_constants.h"
37 #include "net/log/net_log.h"
38 #include "net/log/net_log_event_type.h"
39 #include "net/log/net_log_source.h"
40 #include "net/log/net_log_source_type.h"
41 #include "net/socket/socket_descriptor.h"
42 #include "net/socket/socket_options.h"
43 #include "net/socket/socket_tag.h"
44 #include "net/socket/udp_net_log_parameters.h"
45 #include "net/traffic_annotation/network_traffic_annotation.h"
46 
47 #if defined(OS_ANDROID)
48 #include <dlfcn.h>
49 #include "base/android/build_info.h"
50 #include "base/native_library.h"
51 #include "base/strings/utf_string_conversions.h"
52 #endif  // defined(OS_ANDROID)
53 
54 #if defined(OS_MACOSX) && !defined(OS_IOS)
55 // This was needed to debug crbug.com/640281.
56 // TODO(zhongyi): Remove once the bug is resolved.
57 #include <dlfcn.h>
58 #include <pthread.h>
59 #endif  // defined(OS_MACOSX) && !defined(OS_IOS)
60 
61 namespace net {
62 
63 namespace {
64 
65 const int kBindRetries = 10;
66 const int kPortStart = 1024;
67 const int kPortEnd = 65535;
68 const int kActivityMonitorBytesThreshold = 65535;
69 const int kActivityMonitorMinimumSamplesForThroughputEstimate = 2;
70 const base::TimeDelta kActivityMonitorMsThreshold =
71     base::TimeDelta::FromMilliseconds(100);
72 
73 #if defined(OS_MACOSX) || defined(OS_BSD)
74 // When enabling multicast using setsockopt(IP_MULTICAST_IF) MacOS
75 // requires passing IPv4 address instead of interface index. This function
76 // resolves IPv4 address by interface index. The |address| is returned in
77 // network order.
GetIPv4AddressFromIndex(int socket,uint32_t index,uint32_t * address)78 int GetIPv4AddressFromIndex(int socket, uint32_t index, uint32_t* address) {
79   if (!index) {
80     *address = htonl(INADDR_ANY);
81     return OK;
82   }
83 
84   sockaddr_in* result = nullptr;
85 
86   ifreq ifr;
87   ifr.ifr_addr.sa_family = AF_INET;
88   if (!if_indextoname(index, ifr.ifr_name))
89     return MapSystemError(errno);
90   int rv = ioctl(socket, SIOCGIFADDR, &ifr);
91   if (rv == -1)
92     return MapSystemError(errno);
93   result = reinterpret_cast<sockaddr_in*>(&ifr.ifr_addr);
94 
95   if (!result)
96     return ERR_ADDRESS_INVALID;
97 
98   *address = result->sin_addr.s_addr;
99   return OK;
100 }
101 
102 #endif  // OS_MACOSX || OS_BSD
103 
104 #if defined(OS_MACOSX) && !defined(OS_IOS)
105 
106 // On OSX the file descriptor is guarded to detect the cause of
107 // crbug.com/640281. guarded API is supported only on newer versions of OSX,
108 // so the symbols need to be resolved dynamically.
109 // TODO(zhongyi): Removed this code once the bug is resolved.
110 
111 typedef uint64_t guardid_t;
112 
113 typedef int (*GuardedCloseNpFunction)(int fd, const guardid_t* guard);
114 typedef int (*ChangeFdguardNpFunction)(int fd,
115                                        const guardid_t* guard,
116                                        u_int flags,
117                                        const guardid_t* nguard,
118                                        u_int nflags,
119                                        int* fdflagsp);
120 
121 GuardedCloseNpFunction g_guarded_close_np = nullptr;
122 ChangeFdguardNpFunction g_change_fdguard_np = nullptr;
123 
124 pthread_once_t g_guarded_functions_once = PTHREAD_ONCE_INIT;
125 
InitGuardedFunctions()126 void InitGuardedFunctions() {
127   void* libsystem_handle =
128       dlopen("/usr/lib/libSystem.dylib", RTLD_LAZY | RTLD_LOCAL | RTLD_NOLOAD);
129   if (libsystem_handle) {
130     g_guarded_close_np = reinterpret_cast<GuardedCloseNpFunction>(
131         dlsym(libsystem_handle, "guarded_close_np"));
132     g_change_fdguard_np = reinterpret_cast<ChangeFdguardNpFunction>(
133         dlsym(libsystem_handle, "change_fdguard_np"));
134 
135     // If for any reason only one of the functions is found, set both of them to
136     // nullptr.
137     if (!g_guarded_close_np || !g_change_fdguard_np) {
138       g_guarded_close_np = nullptr;
139       g_change_fdguard_np = nullptr;
140     }
141   }
142 }
143 
change_fdguard_np(int fd,const guardid_t * guard,u_int flags,const guardid_t * nguard,u_int nflags,int * fdflagsp)144 int change_fdguard_np(int fd,
145                       const guardid_t* guard,
146                       u_int flags,
147                       const guardid_t* nguard,
148                       u_int nflags,
149                       int* fdflagsp) {
150   CHECK_EQ(pthread_once(&g_guarded_functions_once, InitGuardedFunctions), 0);
151   // Older version of OSX may not support guarded API.
152   if (!g_change_fdguard_np)
153     return 0;
154   return g_change_fdguard_np(fd, guard, flags, nguard, nflags, fdflagsp);
155 }
156 
guarded_close_np(int fd,const guardid_t * guard)157 int guarded_close_np(int fd, const guardid_t* guard) {
158   // Older version of OSX may not support guarded API.
159   if (!g_guarded_close_np)
160     return close(fd);
161 
162   return g_guarded_close_np(fd, guard);
163 }
164 
165 const unsigned int GUARD_CLOSE = 1u << 0;
166 const unsigned int GUARD_DUP = 1u << 1;
167 
168 const guardid_t kSocketFdGuard = 0xD712BC0BC9A4EAD4;
169 
170 #endif  // defined(OS_MACOSX) && !defined(OS_IOS)
171 
GetSocketFDHash(int fd)172 int GetSocketFDHash(int fd) {
173   return fd ^ 1595649551;
174 }
175 
176 }  // namespace
177 
UDPSocketPosix(DatagramSocket::BindType bind_type,net::NetLog * net_log,const net::NetLogSource & source)178 UDPSocketPosix::UDPSocketPosix(DatagramSocket::BindType bind_type,
179                                net::NetLog* net_log,
180                                const net::NetLogSource& source)
181     : write_async_watcher_(std::make_unique<WriteAsyncWatcher>(this)),
182       sender_(new UDPSocketPosixSender()),
183       socket_(kInvalidSocket),
184       socket_hash_(0),
185       addr_family_(0),
186       is_connected_(false),
187       socket_options_(SOCKET_OPTION_MULTICAST_LOOP),
188       sendto_flags_(0),
189       multicast_interface_(0),
190       multicast_time_to_live_(1),
191       bind_type_(bind_type),
192       read_socket_watcher_(FROM_HERE),
193       write_socket_watcher_(FROM_HERE),
194       read_watcher_(this),
195       write_watcher_(this),
196       last_async_result_(0),
197       write_async_timer_running_(false),
198       write_async_outstanding_(0),
199       read_buf_len_(0),
200       recv_from_address_(NULL),
201       write_buf_len_(0),
202       net_log_(NetLogWithSource::Make(net_log, NetLogSourceType::UDP_SOCKET)),
203       bound_network_(NetworkChangeNotifier::kInvalidNetworkHandle),
204       experimental_recv_optimization_enabled_(false) {
205   net_log_.BeginEventReferencingSource(NetLogEventType::SOCKET_ALIVE, source);
206 }
207 
~UDPSocketPosix()208 UDPSocketPosix::~UDPSocketPosix() {
209   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
210   Close();
211   net_log_.EndEvent(NetLogEventType::SOCKET_ALIVE);
212 }
213 
Open(AddressFamily address_family)214 int UDPSocketPosix::Open(AddressFamily address_family) {
215   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
216   DCHECK_EQ(socket_, kInvalidSocket);
217 
218   auto owned_socket_count = TryAcquireGlobalUDPSocketCount();
219   if (owned_socket_count.empty())
220     return ERR_INSUFFICIENT_RESOURCES;
221 
222   addr_family_ = ConvertAddressFamily(address_family);
223   socket_ = CreatePlatformSocket(addr_family_, SOCK_DGRAM, 0);
224   if (socket_ == kInvalidSocket)
225     return MapSystemError(errno);
226 #if defined(OS_MACOSX) && !defined(OS_IOS)
227   PCHECK(change_fdguard_np(socket_, NULL, 0, &kSocketFdGuard,
228                            GUARD_CLOSE | GUARD_DUP, NULL) == 0);
229 #endif  // defined(OS_MACOSX) && !defined(OS_IOS)
230   socket_hash_ = GetSocketFDHash(socket_);
231   if (!base::SetNonBlocking(socket_)) {
232     const int err = MapSystemError(errno);
233     Close();
234     return err;
235   }
236   if (tag_ != SocketTag())
237     tag_.Apply(socket_);
238 
239   owned_socket_count_ = std::move(owned_socket_count);
240   return OK;
241 }
242 
Increment(uint32_t bytes)243 void UDPSocketPosix::ActivityMonitor::Increment(uint32_t bytes) {
244   if (!bytes)
245     return;
246   bool timer_running = timer_.IsRunning();
247   bytes_ += bytes;
248   increments_++;
249   // Allow initial updates to make sure throughput estimator has
250   // enough samples to generate a value. (low water mark)
251   // Or once the bytes threshold has be met. (high water mark)
252   if (increments_ < kActivityMonitorMinimumSamplesForThroughputEstimate ||
253       bytes_ > kActivityMonitorBytesThreshold) {
254     Update();
255     if (timer_running)
256       timer_.Reset();
257   }
258   if (!timer_running) {
259     timer_.Start(FROM_HERE, kActivityMonitorMsThreshold, this,
260                  &UDPSocketPosix::ActivityMonitor::OnTimerFired);
261   }
262 }
263 
Update()264 void UDPSocketPosix::ActivityMonitor::Update() {
265   if (!bytes_)
266     return;
267   NetworkActivityMonitorIncrement(bytes_);
268   bytes_ = 0;
269 }
270 
OnClose()271 void UDPSocketPosix::ActivityMonitor::OnClose() {
272   timer_.Stop();
273   Update();
274 }
275 
OnTimerFired()276 void UDPSocketPosix::ActivityMonitor::OnTimerFired() {
277   increments_ = 0;
278   if (!bytes_) {
279     // Can happen if the socket has been idle and have had no
280     // increments since the timer previously fired.  Don't bother
281     // keeping the timer running in this case.
282     timer_.Stop();
283     return;
284   }
285   Update();
286 }
287 
NetworkActivityMonitorIncrement(uint32_t bytes)288 void UDPSocketPosix::SentActivityMonitor::NetworkActivityMonitorIncrement(
289     uint32_t bytes) {
290   NetworkActivityMonitor::GetInstance()->IncrementBytesSent(bytes);
291 }
292 
NetworkActivityMonitorIncrement(uint32_t bytes)293 void UDPSocketPosix::ReceivedActivityMonitor::NetworkActivityMonitorIncrement(
294     uint32_t bytes) {
295   NetworkActivityMonitor::GetInstance()->IncrementBytesReceived(bytes);
296 }
297 
Close()298 void UDPSocketPosix::Close() {
299   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
300 
301   owned_socket_count_.Reset();
302 
303   if (socket_ == kInvalidSocket)
304     return;
305 
306   // Zero out any pending read/write callback state.
307   read_buf_.reset();
308   read_buf_len_ = 0;
309   read_callback_.Reset();
310   recv_from_address_ = NULL;
311   write_buf_.reset();
312   write_buf_len_ = 0;
313   write_callback_.Reset();
314   send_to_address_.reset();
315 
316   bool ok = read_socket_watcher_.StopWatchingFileDescriptor();
317   DCHECK(ok);
318   ok = write_socket_watcher_.StopWatchingFileDescriptor();
319   DCHECK(ok);
320 
321   // Verify that |socket_| hasn't been corrupted. Needed to debug
322   // crbug.com/906005.
323   CHECK_EQ(socket_hash_, GetSocketFDHash(socket_));
324 #if defined(OS_MACOSX) && !defined(OS_IOS)
325   PCHECK(IGNORE_EINTR(guarded_close_np(socket_, &kSocketFdGuard)) == 0);
326 #else
327   PCHECK(IGNORE_EINTR(close(socket_)) == 0);
328 #endif  // defined(OS_MACOSX) && !defined(OS_IOS)
329 
330   socket_ = kInvalidSocket;
331   addr_family_ = 0;
332   is_connected_ = false;
333   tag_ = SocketTag();
334 
335   write_async_timer_.Stop();
336   sent_activity_monitor_.OnClose();
337   received_activity_monitor_.OnClose();
338 }
339 
GetPeerAddress(IPEndPoint * address) const340 int UDPSocketPosix::GetPeerAddress(IPEndPoint* address) const {
341   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
342   DCHECK(address);
343   if (!is_connected())
344     return ERR_SOCKET_NOT_CONNECTED;
345 
346   if (!remote_address_.get()) {
347     SockaddrStorage storage;
348     if (getpeername(socket_, storage.addr, &storage.addr_len))
349       return MapSystemError(errno);
350     std::unique_ptr<IPEndPoint> address(new IPEndPoint());
351     if (!address->FromSockAddr(storage.addr, storage.addr_len))
352       return ERR_ADDRESS_INVALID;
353     remote_address_ = std::move(address);
354   }
355 
356   *address = *remote_address_;
357   return OK;
358 }
359 
GetLocalAddress(IPEndPoint * address) const360 int UDPSocketPosix::GetLocalAddress(IPEndPoint* address) const {
361   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
362   DCHECK(address);
363   if (!is_connected())
364     return ERR_SOCKET_NOT_CONNECTED;
365 
366   if (!local_address_.get()) {
367     SockaddrStorage storage;
368     if (getsockname(socket_, storage.addr, &storage.addr_len))
369       return MapSystemError(errno);
370     std::unique_ptr<IPEndPoint> address(new IPEndPoint());
371     if (!address->FromSockAddr(storage.addr, storage.addr_len))
372       return ERR_ADDRESS_INVALID;
373     local_address_ = std::move(address);
374     net_log_.AddEvent(NetLogEventType::UDP_LOCAL_ADDRESS, [&] {
375       return CreateNetLogUDPConnectParams(*local_address_, bound_network_);
376     });
377   }
378 
379   *address = *local_address_;
380   return OK;
381 }
382 
Read(IOBuffer * buf,int buf_len,CompletionOnceCallback callback)383 int UDPSocketPosix::Read(IOBuffer* buf,
384                          int buf_len,
385                          CompletionOnceCallback callback) {
386   return RecvFrom(buf, buf_len, NULL, std::move(callback));
387 }
388 
RecvFrom(IOBuffer * buf,int buf_len,IPEndPoint * address,CompletionOnceCallback callback)389 int UDPSocketPosix::RecvFrom(IOBuffer* buf,
390                              int buf_len,
391                              IPEndPoint* address,
392                              CompletionOnceCallback callback) {
393   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
394   DCHECK_NE(kInvalidSocket, socket_);
395   CHECK(read_callback_.is_null());
396   DCHECK(!recv_from_address_);
397   DCHECK(!callback.is_null());  // Synchronous operation not supported
398   DCHECK_GT(buf_len, 0);
399 
400   int nread = InternalRecvFrom(buf, buf_len, address);
401   if (nread != ERR_IO_PENDING)
402     return nread;
403 
404   if (!base::MessageLoopCurrentForIO::Get()->WatchFileDescriptor(
405           socket_, true, base::MessagePumpForIO::WATCH_READ,
406           &read_socket_watcher_, &read_watcher_)) {
407     PLOG(ERROR) << "WatchFileDescriptor failed on read";
408     int result = MapSystemError(errno);
409     LogRead(result, NULL, 0, NULL);
410     return result;
411   }
412 
413   read_buf_ = buf;
414   read_buf_len_ = buf_len;
415   recv_from_address_ = address;
416   read_callback_ = std::move(callback);
417   return ERR_IO_PENDING;
418 }
419 
Write(IOBuffer * buf,int buf_len,CompletionOnceCallback callback,const NetworkTrafficAnnotationTag & traffic_annotation)420 int UDPSocketPosix::Write(
421     IOBuffer* buf,
422     int buf_len,
423     CompletionOnceCallback callback,
424     const NetworkTrafficAnnotationTag& traffic_annotation) {
425   return SendToOrWrite(buf, buf_len, NULL, std::move(callback));
426 }
427 
SendTo(IOBuffer * buf,int buf_len,const IPEndPoint & address,CompletionOnceCallback callback)428 int UDPSocketPosix::SendTo(IOBuffer* buf,
429                            int buf_len,
430                            const IPEndPoint& address,
431                            CompletionOnceCallback callback) {
432   return SendToOrWrite(buf, buf_len, &address, std::move(callback));
433 }
434 
SendToOrWrite(IOBuffer * buf,int buf_len,const IPEndPoint * address,CompletionOnceCallback callback)435 int UDPSocketPosix::SendToOrWrite(IOBuffer* buf,
436                                   int buf_len,
437                                   const IPEndPoint* address,
438                                   CompletionOnceCallback callback) {
439   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
440   DCHECK_NE(kInvalidSocket, socket_);
441   CHECK(write_callback_.is_null());
442   DCHECK(!callback.is_null());  // Synchronous operation not supported
443   DCHECK_GT(buf_len, 0);
444 
445   int result = InternalSendTo(buf, buf_len, address);
446   if (result != ERR_IO_PENDING)
447     return result;
448 
449   if (!base::MessageLoopCurrentForIO::Get()->WatchFileDescriptor(
450           socket_, true, base::MessagePumpForIO::WATCH_WRITE,
451           &write_socket_watcher_, &write_watcher_)) {
452     DVPLOG(1) << "WatchFileDescriptor failed on write";
453     int result = MapSystemError(errno);
454     LogWrite(result, NULL, NULL);
455     return result;
456   }
457 
458   write_buf_ = buf;
459   write_buf_len_ = buf_len;
460   DCHECK(!send_to_address_.get());
461   if (address) {
462     send_to_address_.reset(new IPEndPoint(*address));
463   }
464   write_callback_ = std::move(callback);
465   return ERR_IO_PENDING;
466 }
467 
Connect(const IPEndPoint & address)468 int UDPSocketPosix::Connect(const IPEndPoint& address) {
469   DCHECK_NE(socket_, kInvalidSocket);
470   net_log_.BeginEvent(NetLogEventType::UDP_CONNECT, [&] {
471     return CreateNetLogUDPConnectParams(address, bound_network_);
472   });
473   int rv = SetMulticastOptions();
474   if (rv != OK)
475     return rv;
476   rv = InternalConnect(address);
477   net_log_.EndEventWithNetErrorCode(NetLogEventType::UDP_CONNECT, rv);
478   is_connected_ = (rv == OK);
479   if (rv != OK)
480     tag_ = SocketTag();
481   return rv;
482 }
483 
InternalConnect(const IPEndPoint & address)484 int UDPSocketPosix::InternalConnect(const IPEndPoint& address) {
485   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
486   DCHECK(!is_connected());
487   DCHECK(!remote_address_.get());
488 
489   int rv = 0;
490   if (bind_type_ == DatagramSocket::RANDOM_BIND) {
491     // Construct IPAddress of appropriate size (IPv4 or IPv6) of 0s,
492     // representing INADDR_ANY or in6addr_any.
493     size_t addr_size = address.GetSockAddrFamily() == AF_INET
494                            ? IPAddress::kIPv4AddressSize
495                            : IPAddress::kIPv6AddressSize;
496     rv = RandomBind(IPAddress::AllZeros(addr_size));
497   }
498   // else connect() does the DatagramSocket::DEFAULT_BIND
499 
500   if (rv < 0) {
501     base::UmaHistogramSparse("Net.UdpSocketRandomBindErrorCode", -rv);
502     return rv;
503   }
504 
505   SockaddrStorage storage;
506   if (!address.ToSockAddr(storage.addr, &storage.addr_len))
507     return ERR_ADDRESS_INVALID;
508 
509   rv = HANDLE_EINTR(connect(socket_, storage.addr, storage.addr_len));
510   if (rv < 0)
511     return MapSystemError(errno);
512 
513   remote_address_.reset(new IPEndPoint(address));
514   return rv;
515 }
516 
Bind(const IPEndPoint & address)517 int UDPSocketPosix::Bind(const IPEndPoint& address) {
518   DCHECK_NE(socket_, kInvalidSocket);
519   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
520   DCHECK(!is_connected());
521 
522   int rv = SetMulticastOptions();
523   if (rv < 0)
524     return rv;
525 
526   rv = DoBind(address);
527   if (rv < 0)
528     return rv;
529 
530   is_connected_ = true;
531   local_address_.reset();
532   return rv;
533 }
534 
BindToNetwork(NetworkChangeNotifier::NetworkHandle network)535 int UDPSocketPosix::BindToNetwork(
536     NetworkChangeNotifier::NetworkHandle network) {
537   DCHECK_NE(socket_, kInvalidSocket);
538   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
539   DCHECK(!is_connected());
540   if (network == NetworkChangeNotifier::kInvalidNetworkHandle)
541     return ERR_INVALID_ARGUMENT;
542 #if defined(OS_ANDROID)
543   // Android prior to Lollipop didn't have support for binding sockets to
544   // networks.
545   if (base::android::BuildInfo::GetInstance()->sdk_int() <
546       base::android::SDK_VERSION_LOLLIPOP) {
547     return ERR_NOT_IMPLEMENTED;
548   }
549   int rv;
550   // On Android M and newer releases use supported NDK API. On Android L use
551   // setNetworkForSocket from libnetd_client.so.
552   if (base::android::BuildInfo::GetInstance()->sdk_int() >=
553       base::android::SDK_VERSION_MARSHMALLOW) {
554     // See declaration of android_setsocknetwork() here:
555     // http://androidxref.com/6.0.0_r1/xref/development/ndk/platforms/android-M/include/android/multinetwork.h#65
556     // Function cannot be called directly as it will cause app to fail to load
557     // on pre-marshmallow devices.
558     typedef int (*MarshmallowSetNetworkForSocket)(int64_t netId, int socketFd);
559     static MarshmallowSetNetworkForSocket marshmallowSetNetworkForSocket;
560     // This is racy, but all racers should come out with the same answer so it
561     // shouldn't matter.
562     if (!marshmallowSetNetworkForSocket) {
563       base::FilePath file(base::GetNativeLibraryName("android"));
564       void* dl = dlopen(file.value().c_str(), RTLD_NOW);
565       marshmallowSetNetworkForSocket =
566           reinterpret_cast<MarshmallowSetNetworkForSocket>(
567               dlsym(dl, "android_setsocknetwork"));
568     }
569     if (!marshmallowSetNetworkForSocket)
570       return ERR_NOT_IMPLEMENTED;
571     rv = marshmallowSetNetworkForSocket(network, socket_);
572     if (rv)
573       rv = errno;
574   } else {
575     // NOTE(pauljensen): This does rely on Android implementation details, but
576     // they won't change because Lollipop is already released.
577     typedef int (*LollipopSetNetworkForSocket)(unsigned netId, int socketFd);
578     static LollipopSetNetworkForSocket lollipopSetNetworkForSocket;
579     // This is racy, but all racers should come out with the same answer so it
580     // shouldn't matter.
581     if (!lollipopSetNetworkForSocket) {
582       // Android's netd client library should always be loaded in our address
583       // space as it shims socket() which was used to create |socket_|.
584       base::FilePath file(base::GetNativeLibraryName("netd_client"));
585       // Use RTLD_NOW to match Android's prior loading of the library:
586       // http://androidxref.com/6.0.0_r5/xref/bionic/libc/bionic/NetdClient.cpp#37
587       // Use RTLD_NOLOAD to assert that the library is already loaded and
588       // avoid doing any disk IO.
589       void* dl = dlopen(file.value().c_str(), RTLD_NOW | RTLD_NOLOAD);
590       lollipopSetNetworkForSocket =
591           reinterpret_cast<LollipopSetNetworkForSocket>(
592               dlsym(dl, "setNetworkForSocket"));
593     }
594     if (!lollipopSetNetworkForSocket)
595       return ERR_NOT_IMPLEMENTED;
596     rv = -lollipopSetNetworkForSocket(network, socket_);
597   }
598   // If |network| has since disconnected, |rv| will be ENONET.  Surface this as
599   // ERR_NETWORK_CHANGED, rather than MapSystemError(ENONET) which gives back
600   // the less descriptive ERR_FAILED.
601   if (rv == ENONET)
602     return ERR_NETWORK_CHANGED;
603   if (rv == 0)
604     bound_network_ = network;
605   return MapSystemError(rv);
606 #else
607   NOTIMPLEMENTED();
608   return ERR_NOT_IMPLEMENTED;
609 #endif
610 }
611 
SetReceiveBufferSize(int32_t size)612 int UDPSocketPosix::SetReceiveBufferSize(int32_t size) {
613   DCHECK_NE(socket_, kInvalidSocket);
614   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
615   return SetSocketReceiveBufferSize(socket_, size);
616 }
617 
SetSendBufferSize(int32_t size)618 int UDPSocketPosix::SetSendBufferSize(int32_t size) {
619   DCHECK_NE(socket_, kInvalidSocket);
620   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
621   return SetSocketSendBufferSize(socket_, size);
622 }
623 
SetDoNotFragment()624 int UDPSocketPosix::SetDoNotFragment() {
625   DCHECK_NE(socket_, kInvalidSocket);
626   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
627 
628 #if !defined(IP_PMTUDISC_DO)
629   return ERR_NOT_IMPLEMENTED;
630 #else
631   if (addr_family_ == AF_INET6) {
632     int val = IPV6_PMTUDISC_DO;
633     if (setsockopt(socket_, IPPROTO_IPV6, IPV6_MTU_DISCOVER, &val,
634                    sizeof(val)) != 0) {
635       return MapSystemError(errno);
636     }
637 
638     int v6_only = false;
639     socklen_t v6_only_len = sizeof(v6_only);
640     if (getsockopt(socket_, IPPROTO_IPV6, IPV6_V6ONLY, &v6_only,
641                    &v6_only_len) != 0) {
642       return MapSystemError(errno);
643     }
644 
645     if (v6_only)
646       return OK;
647   }
648 
649   int val = IP_PMTUDISC_DO;
650   int rv = setsockopt(socket_, IPPROTO_IP, IP_MTU_DISCOVER, &val, sizeof(val));
651   return rv == 0 ? OK : MapSystemError(errno);
652 #endif
653 }
654 
SetMsgConfirm(bool confirm)655 void UDPSocketPosix::SetMsgConfirm(bool confirm) {
656 #if !defined(OS_MACOSX) && !defined(OS_IOS) && !defined(OS_BSD)
657   if (confirm) {
658     sendto_flags_ |= MSG_CONFIRM;
659   } else {
660     sendto_flags_ &= ~MSG_CONFIRM;
661   }
662 #endif  // !defined(OS_MACOSX) && !defined(OS_IOS) && !defined(OS_BSD)
663 }
664 
AllowAddressReuse()665 int UDPSocketPosix::AllowAddressReuse() {
666   DCHECK_NE(socket_, kInvalidSocket);
667   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
668   DCHECK(!is_connected());
669   return SetReuseAddr(socket_, true);
670 }
671 
SetBroadcast(bool broadcast)672 int UDPSocketPosix::SetBroadcast(bool broadcast) {
673   DCHECK_NE(socket_, kInvalidSocket);
674   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
675   int value = broadcast ? 1 : 0;
676   int rv;
677 #if defined(OS_MACOSX) || defined(OS_BSD)
678   // SO_REUSEPORT on OSX permits multiple processes to each receive
679   // UDP multicast or broadcast datagrams destined for the bound
680   // port.
681   // This is only being set on OSX because its behavior is platform dependent
682   // and we are playing it safe by only setting it on platforms where things
683   // break.
684   rv = setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value));
685   if (rv != 0)
686     return MapSystemError(errno);
687   rv = setsockopt(socket_, SOL_SOCKET, SO_REUSEPORT, &value, sizeof(value));
688   if (rv != 0)
689     return MapSystemError(errno);
690 #endif  // defined(OS_MACOSX) || defined(OS_BSD)
691   rv = setsockopt(socket_, SOL_SOCKET, SO_BROADCAST, &value, sizeof(value));
692 
693   return rv == 0 ? OK : MapSystemError(errno);
694 }
695 
AllowAddressSharingForMulticast()696 int UDPSocketPosix::AllowAddressSharingForMulticast() {
697   DCHECK_NE(socket_, kInvalidSocket);
698   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
699   DCHECK(!is_connected());
700 
701   int rv = AllowAddressReuse();
702   if (rv != OK)
703     return rv;
704 
705 #ifdef SO_REUSEPORT
706   // Attempt to set SO_REUSEPORT if available. On some platforms, this is
707   // necessary to allow the address to be fully shared between separate sockets.
708   // On platforms where the option does not exist, SO_REUSEADDR should be
709   // sufficient to share multicast packets if such sharing is at all possible.
710   int value = 1;
711   rv = setsockopt(socket_, SOL_SOCKET, SO_REUSEPORT, &value, sizeof(value));
712   // Ignore errors that the option does not exist.
713   if (rv != 0 && errno != ENOPROTOOPT)
714     return MapSystemError(errno);
715 #endif  // SO_REUSEPORT
716 
717   return OK;
718 }
719 
OnFileCanReadWithoutBlocking(int)720 void UDPSocketPosix::ReadWatcher::OnFileCanReadWithoutBlocking(int) {
721   TRACE_EVENT0(NetTracingCategory(),
722                "UDPSocketPosix::ReadWatcher::OnFileCanReadWithoutBlocking");
723   if (!socket_->read_callback_.is_null())
724     socket_->DidCompleteRead();
725 }
726 
OnFileCanWriteWithoutBlocking(int)727 void UDPSocketPosix::WriteWatcher::OnFileCanWriteWithoutBlocking(int) {
728   if (!socket_->write_callback_.is_null())
729     socket_->DidCompleteWrite();
730 }
731 
DoReadCallback(int rv)732 void UDPSocketPosix::DoReadCallback(int rv) {
733   DCHECK_NE(rv, ERR_IO_PENDING);
734   DCHECK(!read_callback_.is_null());
735 
736   // Since Run() may result in Read() being called,
737   // clear |read_callback_| up front.
738   std::move(read_callback_).Run(rv);
739 }
740 
DoWriteCallback(int rv)741 void UDPSocketPosix::DoWriteCallback(int rv) {
742   DCHECK_NE(rv, ERR_IO_PENDING);
743   DCHECK(!write_callback_.is_null());
744 
745   // Since Run() may result in Write() being called,
746   // clear |write_callback_| up front.
747   std::move(write_callback_).Run(rv);
748 }
749 
DidCompleteRead()750 void UDPSocketPosix::DidCompleteRead() {
751   int result =
752       InternalRecvFrom(read_buf_.get(), read_buf_len_, recv_from_address_);
753   if (result != ERR_IO_PENDING) {
754     read_buf_.reset();
755     read_buf_len_ = 0;
756     recv_from_address_ = NULL;
757     bool ok = read_socket_watcher_.StopWatchingFileDescriptor();
758     DCHECK(ok);
759     DoReadCallback(result);
760   }
761 }
762 
LogRead(int result,const char * bytes,socklen_t addr_len,const sockaddr * addr)763 void UDPSocketPosix::LogRead(int result,
764                              const char* bytes,
765                              socklen_t addr_len,
766                              const sockaddr* addr) {
767   if (result < 0) {
768     net_log_.AddEventWithNetErrorCode(NetLogEventType::UDP_RECEIVE_ERROR,
769                                       result);
770     return;
771   }
772 
773   if (net_log_.IsCapturing()) {
774     DCHECK(addr_len > 0);
775     DCHECK(addr);
776 
777     IPEndPoint address;
778     bool is_address_valid = address.FromSockAddr(addr, addr_len);
779     NetLogUDPDataTransfer(net_log_, NetLogEventType::UDP_BYTES_RECEIVED, result,
780                           bytes, is_address_valid ? &address : nullptr);
781   }
782 
783   received_activity_monitor_.Increment(result);
784 }
785 
DidCompleteWrite()786 void UDPSocketPosix::DidCompleteWrite() {
787   int result =
788       InternalSendTo(write_buf_.get(), write_buf_len_, send_to_address_.get());
789 
790   if (result != ERR_IO_PENDING) {
791     write_buf_.reset();
792     write_buf_len_ = 0;
793     send_to_address_.reset();
794     write_socket_watcher_.StopWatchingFileDescriptor();
795     DoWriteCallback(result);
796   }
797 }
798 
LogWrite(int result,const char * bytes,const IPEndPoint * address)799 void UDPSocketPosix::LogWrite(int result,
800                               const char* bytes,
801                               const IPEndPoint* address) {
802   if (result < 0) {
803     net_log_.AddEventWithNetErrorCode(NetLogEventType::UDP_SEND_ERROR, result);
804     return;
805   }
806 
807   if (net_log_.IsCapturing()) {
808     NetLogUDPDataTransfer(net_log_, NetLogEventType::UDP_BYTES_SENT, result,
809                           bytes, address);
810   }
811 
812   sent_activity_monitor_.Increment(result);
813 }
814 
InternalRecvFrom(IOBuffer * buf,int buf_len,IPEndPoint * address)815 int UDPSocketPosix::InternalRecvFrom(IOBuffer* buf,
816                                      int buf_len,
817                                      IPEndPoint* address) {
818   // If the socket is connected and the remote address is known
819   // use the more efficient method that uses read() instead of recvmsg().
820   if (experimental_recv_optimization_enabled_ && is_connected_ &&
821       remote_address_) {
822     return InternalRecvFromConnectedSocket(buf, buf_len, address);
823   }
824   return InternalRecvFromNonConnectedSocket(buf, buf_len, address);
825 }
826 
InternalRecvFromConnectedSocket(IOBuffer * buf,int buf_len,IPEndPoint * address)827 int UDPSocketPosix::InternalRecvFromConnectedSocket(IOBuffer* buf,
828                                                     int buf_len,
829                                                     IPEndPoint* address) {
830   DCHECK(is_connected_);
831   DCHECK(remote_address_);
832   int bytes_transferred;
833   bytes_transferred = HANDLE_EINTR(read(socket_, buf->data(), buf_len));
834   int result;
835 
836   if (bytes_transferred < 0) {
837     result = MapSystemError(errno);
838   } else if (bytes_transferred == buf_len) {
839     result = ERR_MSG_TOO_BIG;
840   } else {
841     result = bytes_transferred;
842     if (address)
843       *address = *remote_address_.get();
844   }
845 
846   if (result != ERR_IO_PENDING) {
847     SockaddrStorage sock_addr;
848     bool success =
849         remote_address_->ToSockAddr(sock_addr.addr, &sock_addr.addr_len);
850     DCHECK(success);
851     LogRead(result, buf->data(), sock_addr.addr_len, sock_addr.addr);
852   }
853   return result;
854 }
855 
InternalRecvFromNonConnectedSocket(IOBuffer * buf,int buf_len,IPEndPoint * address)856 int UDPSocketPosix::InternalRecvFromNonConnectedSocket(IOBuffer* buf,
857                                                        int buf_len,
858                                                        IPEndPoint* address) {
859   int bytes_transferred;
860 
861   struct iovec iov = {};
862   iov.iov_base = buf->data();
863   iov.iov_len = buf_len;
864 
865   struct msghdr msg = {};
866   msg.msg_iov = &iov;
867   msg.msg_iovlen = 1;
868 
869   SockaddrStorage storage;
870   msg.msg_name = storage.addr;
871   msg.msg_namelen = storage.addr_len;
872 
873   bytes_transferred = HANDLE_EINTR(recvmsg(socket_, &msg, 0));
874   storage.addr_len = msg.msg_namelen;
875   int result;
876   if (bytes_transferred >= 0) {
877     if (msg.msg_flags & MSG_TRUNC) {
878       result = ERR_MSG_TOO_BIG;
879     } else {
880       result = bytes_transferred;
881       if (address && !address->FromSockAddr(storage.addr, storage.addr_len))
882         result = ERR_ADDRESS_INVALID;
883     }
884   } else {
885     result = MapSystemError(errno);
886   }
887   if (result != ERR_IO_PENDING)
888     LogRead(result, buf->data(), storage.addr_len, storage.addr);
889   return result;
890 }
891 
InternalSendTo(IOBuffer * buf,int buf_len,const IPEndPoint * address)892 int UDPSocketPosix::InternalSendTo(IOBuffer* buf,
893                                    int buf_len,
894                                    const IPEndPoint* address) {
895   SockaddrStorage storage;
896   struct sockaddr* addr = storage.addr;
897   if (!address) {
898     addr = NULL;
899     storage.addr_len = 0;
900   } else {
901     if (!address->ToSockAddr(storage.addr, &storage.addr_len)) {
902       int result = ERR_ADDRESS_INVALID;
903       LogWrite(result, NULL, NULL);
904       return result;
905     }
906   }
907 
908   int result = HANDLE_EINTR(sendto(socket_, buf->data(), buf_len, sendto_flags_,
909                                    addr, storage.addr_len));
910   if (result < 0)
911     result = MapSystemError(errno);
912   if (result != ERR_IO_PENDING)
913     LogWrite(result, buf->data(), address);
914   return result;
915 }
916 
SetMulticastOptions()917 int UDPSocketPosix::SetMulticastOptions() {
918   if (!(socket_options_ & SOCKET_OPTION_MULTICAST_LOOP)) {
919     int rv;
920     if (addr_family_ == AF_INET) {
921       u_char loop = 0;
922       rv = setsockopt(socket_, IPPROTO_IP, IP_MULTICAST_LOOP,
923                       &loop, sizeof(loop));
924     } else {
925       u_int loop = 0;
926       rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_MULTICAST_LOOP,
927                       &loop, sizeof(loop));
928     }
929     if (rv < 0)
930       return MapSystemError(errno);
931   }
932   if (multicast_time_to_live_ != IP_DEFAULT_MULTICAST_TTL) {
933     int rv;
934     if (addr_family_ == AF_INET) {
935       u_char ttl = multicast_time_to_live_;
936       rv = setsockopt(socket_, IPPROTO_IP, IP_MULTICAST_TTL,
937                       &ttl, sizeof(ttl));
938     } else {
939       // Signed integer. -1 to use route default.
940       int ttl = multicast_time_to_live_;
941       rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_MULTICAST_HOPS,
942                       &ttl, sizeof(ttl));
943     }
944     if (rv < 0)
945       return MapSystemError(errno);
946   }
947   if (multicast_interface_ != 0) {
948     switch (addr_family_) {
949       case AF_INET: {
950 #if defined(OS_MACOSX) || defined(OS_BSD)
951         ip_mreq mreq = {};
952         int error = GetIPv4AddressFromIndex(socket_, multicast_interface_,
953                                             &mreq.imr_interface.s_addr);
954         if (error != OK)
955           return error;
956 #else   //  defined(OS_MACOSX) || defined(OS_BSD)
957         ip_mreqn mreq = {};
958         mreq.imr_ifindex = multicast_interface_;
959         mreq.imr_address.s_addr = htonl(INADDR_ANY);
960 #endif  //  !defined(OS_MACOSX) || defined(OS_BSD)
961         int rv = setsockopt(socket_, IPPROTO_IP, IP_MULTICAST_IF,
962 #if defined(OS_BSD)
963                             reinterpret_cast<const char*>(&mreq.imr_interface.s_addr),
964                             sizeof(mreq.imr_interface.s_addr));
965 #else
966                             reinterpret_cast<const char*>(&mreq), sizeof(mreq));
967 #endif
968         if (rv)
969           return MapSystemError(errno);
970         break;
971       }
972       case AF_INET6: {
973         uint32_t interface_index = multicast_interface_;
974         int rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_MULTICAST_IF,
975                             reinterpret_cast<const char*>(&interface_index),
976                             sizeof(interface_index));
977         if (rv)
978           return MapSystemError(errno);
979         break;
980       }
981       default:
982         NOTREACHED() << "Invalid address family";
983         return ERR_ADDRESS_INVALID;
984     }
985   }
986   return OK;
987 }
988 
DoBind(const IPEndPoint & address)989 int UDPSocketPosix::DoBind(const IPEndPoint& address) {
990   SockaddrStorage storage;
991   if (!address.ToSockAddr(storage.addr, &storage.addr_len))
992     return ERR_ADDRESS_INVALID;
993   int rv = bind(socket_, storage.addr, storage.addr_len);
994   if (rv == 0)
995     return OK;
996   int last_error = errno;
997 #if defined(OS_CHROMEOS)
998   if (last_error == EINVAL)
999     return ERR_ADDRESS_IN_USE;
1000 #elif defined(OS_MACOSX)
1001   if (last_error == EADDRNOTAVAIL)
1002     return ERR_ADDRESS_IN_USE;
1003 #endif
1004   return MapSystemError(last_error);
1005 }
1006 
RandomBind(const IPAddress & address)1007 int UDPSocketPosix::RandomBind(const IPAddress& address) {
1008   DCHECK_EQ(bind_type_, DatagramSocket::RANDOM_BIND);
1009 
1010   for (int i = 0; i < kBindRetries; ++i) {
1011     int rv = DoBind(IPEndPoint(address, base::RandInt(kPortStart, kPortEnd)));
1012     if (rv != ERR_ADDRESS_IN_USE)
1013       return rv;
1014   }
1015 
1016   return DoBind(IPEndPoint(address, 0));
1017 }
1018 
JoinGroup(const IPAddress & group_address) const1019 int UDPSocketPosix::JoinGroup(const IPAddress& group_address) const {
1020   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
1021   if (!is_connected())
1022     return ERR_SOCKET_NOT_CONNECTED;
1023 
1024   switch (group_address.size()) {
1025     case IPAddress::kIPv4AddressSize: {
1026       if (addr_family_ != AF_INET)
1027         return ERR_ADDRESS_INVALID;
1028 
1029 #if defined(OS_MACOSX) || defined(OS_BSD)
1030       ip_mreq mreq = {};
1031       int error = GetIPv4AddressFromIndex(socket_, multicast_interface_,
1032                                           &mreq.imr_interface.s_addr);
1033       if (error != OK)
1034         return error;
1035 #else
1036       ip_mreqn mreq = {};
1037       mreq.imr_ifindex = multicast_interface_;
1038       mreq.imr_address.s_addr = htonl(INADDR_ANY);
1039 #endif
1040       memcpy(&mreq.imr_multiaddr, group_address.bytes().data(),
1041              IPAddress::kIPv4AddressSize);
1042       int rv = setsockopt(socket_, IPPROTO_IP, IP_ADD_MEMBERSHIP,
1043                           &mreq, sizeof(mreq));
1044       if (rv < 0)
1045         return MapSystemError(errno);
1046       return OK;
1047     }
1048     case IPAddress::kIPv6AddressSize: {
1049       if (addr_family_ != AF_INET6)
1050         return ERR_ADDRESS_INVALID;
1051       ipv6_mreq mreq;
1052       mreq.ipv6mr_interface = multicast_interface_;
1053       memcpy(&mreq.ipv6mr_multiaddr, group_address.bytes().data(),
1054              IPAddress::kIPv6AddressSize);
1055       int rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_JOIN_GROUP,
1056                           &mreq, sizeof(mreq));
1057       if (rv < 0)
1058         return MapSystemError(errno);
1059       return OK;
1060     }
1061     default:
1062       NOTREACHED() << "Invalid address family";
1063       return ERR_ADDRESS_INVALID;
1064   }
1065 }
1066 
LeaveGroup(const IPAddress & group_address) const1067 int UDPSocketPosix::LeaveGroup(const IPAddress& group_address) const {
1068   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
1069 
1070   if (!is_connected())
1071     return ERR_SOCKET_NOT_CONNECTED;
1072 
1073   switch (group_address.size()) {
1074     case IPAddress::kIPv4AddressSize: {
1075       if (addr_family_ != AF_INET)
1076         return ERR_ADDRESS_INVALID;
1077 #if defined(OS_BSD)
1078       ip_mreq mreq = {};
1079       int error = GetIPv4AddressFromIndex(socket_, multicast_interface_,
1080                                           &mreq.imr_interface.s_addr);
1081 
1082       if (error != OK)
1083         return error;
1084 #else
1085       ip_mreqn mreq = {};
1086       mreq.imr_ifindex = multicast_interface_;
1087       mreq.imr_address.s_addr = INADDR_ANY;
1088 #endif
1089       memcpy(&mreq.imr_multiaddr, group_address.bytes().data(),
1090              IPAddress::kIPv4AddressSize);
1091       int rv = setsockopt(socket_, IPPROTO_IP, IP_DROP_MEMBERSHIP,
1092                           &mreq, sizeof(mreq));
1093       if (rv < 0)
1094         return MapSystemError(errno);
1095       return OK;
1096     }
1097     case IPAddress::kIPv6AddressSize: {
1098       if (addr_family_ != AF_INET6)
1099         return ERR_ADDRESS_INVALID;
1100       ipv6_mreq mreq;
1101 #if defined(OS_FUCHSIA)
1102       mreq.ipv6mr_interface = multicast_interface_;
1103 #else   // defined(OS_FUCHSIA)
1104       mreq.ipv6mr_interface = 0;  // 0 indicates default multicast interface.
1105 #endif  // !defined(OS_FUCHSIA)
1106       memcpy(&mreq.ipv6mr_multiaddr, group_address.bytes().data(),
1107              IPAddress::kIPv6AddressSize);
1108       int rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_LEAVE_GROUP,
1109                           &mreq, sizeof(mreq));
1110       if (rv < 0)
1111         return MapSystemError(errno);
1112       return OK;
1113     }
1114     default:
1115       NOTREACHED() << "Invalid address family";
1116       return ERR_ADDRESS_INVALID;
1117   }
1118 }
1119 
SetMulticastInterface(uint32_t interface_index)1120 int UDPSocketPosix::SetMulticastInterface(uint32_t interface_index) {
1121   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
1122   if (is_connected())
1123     return ERR_SOCKET_IS_CONNECTED;
1124   multicast_interface_ = interface_index;
1125   return OK;
1126 }
1127 
SetMulticastTimeToLive(int time_to_live)1128 int UDPSocketPosix::SetMulticastTimeToLive(int time_to_live) {
1129   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
1130   if (is_connected())
1131     return ERR_SOCKET_IS_CONNECTED;
1132 
1133   if (time_to_live < 0 || time_to_live > 255)
1134     return ERR_INVALID_ARGUMENT;
1135   multicast_time_to_live_ = time_to_live;
1136   return OK;
1137 }
1138 
SetMulticastLoopbackMode(bool loopback)1139 int UDPSocketPosix::SetMulticastLoopbackMode(bool loopback) {
1140   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
1141   if (is_connected())
1142     return ERR_SOCKET_IS_CONNECTED;
1143 
1144   if (loopback)
1145     socket_options_ |= SOCKET_OPTION_MULTICAST_LOOP;
1146   else
1147     socket_options_ &= ~SOCKET_OPTION_MULTICAST_LOOP;
1148   return OK;
1149 }
1150 
SetDiffServCodePoint(DiffServCodePoint dscp)1151 int UDPSocketPosix::SetDiffServCodePoint(DiffServCodePoint dscp) {
1152   if (dscp == DSCP_NO_CHANGE) {
1153     return OK;
1154   }
1155 
1156   int dscp_and_ecn = dscp << 2;
1157   // Set the IPv4 option in all cases to support dual-stack sockets.
1158   int rv = setsockopt(socket_, IPPROTO_IP, IP_TOS, &dscp_and_ecn,
1159                       sizeof(dscp_and_ecn));
1160   if (addr_family_ == AF_INET6) {
1161     // In the IPv6 case, the previous socksetopt may fail because of a lack of
1162     // dual-stack support. Therefore ignore the previous return value.
1163     rv = setsockopt(socket_, IPPROTO_IPV6, IPV6_TCLASS,
1164                     &dscp_and_ecn, sizeof(dscp_and_ecn));
1165   }
1166   if (rv < 0)
1167     return MapSystemError(errno);
1168 
1169   return OK;
1170 }
1171 
DetachFromThread()1172 void UDPSocketPosix::DetachFromThread() {
1173   DETACH_FROM_THREAD(thread_checker_);
1174 }
1175 
ApplySocketTag(const SocketTag & tag)1176 void UDPSocketPosix::ApplySocketTag(const SocketTag& tag) {
1177   DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
1178   if (socket_ != kInvalidSocket && tag != tag_) {
1179     tag.Apply(socket_);
1180   }
1181   tag_ = tag;
1182 }
1183 
UDPSocketPosixSender()1184 UDPSocketPosixSender::UDPSocketPosixSender() : sendmmsg_enabled_(false) {}
~UDPSocketPosixSender()1185 UDPSocketPosixSender::~UDPSocketPosixSender() {}
1186 
SendResult()1187 SendResult::SendResult() : rv(0), write_count(0) {}
~SendResult()1188 SendResult::~SendResult() {}
SendResult(int _rv,int _write_count,DatagramBuffers _buffers)1189 SendResult::SendResult(int _rv, int _write_count, DatagramBuffers _buffers)
1190     : rv(_rv), write_count(_write_count), buffers(std::move(_buffers)) {}
1191 SendResult::SendResult(SendResult&& other) = default;
1192 
InternalSendBuffers(int fd,DatagramBuffers buffers) const1193 SendResult UDPSocketPosixSender::InternalSendBuffers(
1194     int fd,
1195     DatagramBuffers buffers) const {
1196   int rv = 0;
1197   int write_count = 0;
1198   for (auto& buffer : buffers) {
1199     int result = HANDLE_EINTR(Send(fd, buffer->data(), buffer->length(), 0));
1200     if (result < 0) {
1201       rv = MapSystemError(errno);
1202       break;
1203     }
1204     write_count++;
1205   }
1206   return SendResult(rv, write_count, std::move(buffers));
1207 }
1208 
1209 #if HAVE_SENDMMSG
InternalSendmmsgBuffers(int fd,DatagramBuffers buffers) const1210 SendResult UDPSocketPosixSender::InternalSendmmsgBuffers(
1211     int fd,
1212     DatagramBuffers buffers) const {
1213   base::StackVector<struct iovec, kWriteAsyncMaxBuffersThreshold + 1> msg_iov;
1214   base::StackVector<struct mmsghdr, kWriteAsyncMaxBuffersThreshold + 1> msgvec;
1215   msg_iov->reserve(buffers.size());
1216   for (auto& buffer : buffers)
1217     msg_iov->push_back({const_cast<char*>(buffer->data()), buffer->length()});
1218   msgvec->reserve(buffers.size());
1219   for (size_t j = 0; j < buffers.size(); j++)
1220     msgvec->push_back({{nullptr, 0, &msg_iov[j], 1, nullptr, 0, 0}, 0});
1221   int result = HANDLE_EINTR(Sendmmsg(fd, &msgvec[0], buffers.size(), 0));
1222   SendResult send_result(0, 0, std::move(buffers));
1223   if (result < 0) {
1224     send_result.rv = MapSystemError(errno);
1225   } else {
1226     send_result.write_count = result;
1227   }
1228   return send_result;
1229 }
1230 #endif
1231 
SendBuffers(int fd,DatagramBuffers buffers)1232 SendResult UDPSocketPosixSender::SendBuffers(int fd, DatagramBuffers buffers) {
1233 #if HAVE_SENDMMSG
1234   if (sendmmsg_enabled_) {
1235     auto result = InternalSendmmsgBuffers(fd, std::move(buffers));
1236     if (LIKELY(result.rv != ERR_NOT_IMPLEMENTED)) {
1237       return result;
1238     }
1239     DLOG(WARNING) << "senddmsg() not implemented, falling back to send()";
1240     sendmmsg_enabled_ = false;
1241     buffers = std::move(result.buffers);
1242   }
1243 #endif
1244   return InternalSendBuffers(fd, std::move(buffers));
1245 }
1246 
Send(int sockfd,const void * buf,size_t len,int flags) const1247 ssize_t UDPSocketPosixSender::Send(int sockfd,
1248                                    const void* buf,
1249                                    size_t len,
1250                                    int flags) const {
1251   return send(sockfd, buf, len, flags);
1252 }
1253 
1254 #if HAVE_SENDMMSG
Sendmmsg(int sockfd,struct mmsghdr * msgvec,unsigned int vlen,unsigned int flags) const1255 int UDPSocketPosixSender::Sendmmsg(int sockfd,
1256                                    struct mmsghdr* msgvec,
1257                                    unsigned int vlen,
1258                                    unsigned int flags) const {
1259   return sendmmsg(sockfd, msgvec, vlen, flags);
1260 }
1261 #endif
1262 
WriteAsync(const char * buffer,size_t buf_len,CompletionOnceCallback callback,const NetworkTrafficAnnotationTag & traffic_annotation)1263 int UDPSocketPosix::WriteAsync(
1264     const char* buffer,
1265     size_t buf_len,
1266     CompletionOnceCallback callback,
1267     const NetworkTrafficAnnotationTag& traffic_annotation) {
1268   DCHECK(datagram_buffer_pool_ != nullptr);
1269   IncreaseWriteAsyncOutstanding(1);
1270   datagram_buffer_pool_->Enqueue(buffer, buf_len, &pending_writes_);
1271   return InternalWriteAsync(std::move(callback), traffic_annotation);
1272 }
1273 
WriteAsync(DatagramBuffers buffers,CompletionOnceCallback callback,const NetworkTrafficAnnotationTag & traffic_annotation)1274 int UDPSocketPosix::WriteAsync(
1275     DatagramBuffers buffers,
1276     CompletionOnceCallback callback,
1277     const NetworkTrafficAnnotationTag& traffic_annotation) {
1278   IncreaseWriteAsyncOutstanding(buffers.size());
1279   pending_writes_.splice(pending_writes_.end(), std::move(buffers));
1280   return InternalWriteAsync(std::move(callback), traffic_annotation);
1281 }
1282 
InternalWriteAsync(CompletionOnceCallback callback,const NetworkTrafficAnnotationTag & traffic_annotation)1283 int UDPSocketPosix::InternalWriteAsync(
1284     CompletionOnceCallback callback,
1285     const NetworkTrafficAnnotationTag& traffic_annotation) {
1286   CHECK(write_callback_.is_null());
1287 
1288   // Surface error immediately if one is pending.
1289   if (last_async_result_ < 0) {
1290     return ResetLastAsyncResult();
1291   }
1292 
1293   size_t flush_threshold =
1294       write_batching_active_ ? kWriteAsyncPostBuffersThreshold : 1;
1295   if (pending_writes_.size() >= flush_threshold) {
1296     FlushPending();
1297     // Surface error immediately if one is pending.
1298     if (last_async_result_ < 0) {
1299       return ResetLastAsyncResult();
1300     }
1301   }
1302 
1303   if (!write_async_timer_running_) {
1304     write_async_timer_running_ = true;
1305     write_async_timer_.Start(FROM_HERE, kWriteAsyncMsThreshold, this,
1306                              &UDPSocketPosix::OnWriteAsyncTimerFired);
1307   }
1308 
1309   int blocking_threshold =
1310       write_batching_active_ ? kWriteAsyncMaxBuffersThreshold : 1;
1311   if (write_async_outstanding_ >= blocking_threshold) {
1312     write_callback_ = std::move(callback);
1313     return ERR_IO_PENDING;
1314   }
1315 
1316   DVLOG(2) << __func__ << " pending " << pending_writes_.size()
1317            << " outstanding " << write_async_outstanding_;
1318   return ResetWrittenBytes();
1319 }
1320 
GetUnwrittenBuffers()1321 DatagramBuffers UDPSocketPosix::GetUnwrittenBuffers() {
1322   write_async_outstanding_ -= pending_writes_.size();
1323   return std::move(pending_writes_);
1324 }
1325 
FlushPending()1326 void UDPSocketPosix::FlushPending() {
1327   // Nothing to do if socket is blocked.
1328   if (write_async_watcher_->watching())
1329     return;
1330 
1331   if (pending_writes_.empty())
1332     return;
1333 
1334   if (write_async_timer_running_)
1335     write_async_timer_.Reset();
1336 
1337   int num_pending_writes = static_cast<int>(pending_writes_.size());
1338   if (!write_multi_core_enabled_ ||
1339       // Don't bother with post if not enough buffers
1340       (num_pending_writes <= kWriteAsyncMinBuffersThreshold &&
1341        // but not if there is a previous post
1342        // outstanding, to prevent out of order transmission.
1343        (num_pending_writes == write_async_outstanding_))) {
1344     LocalSendBuffers();
1345   } else {
1346     PostSendBuffers();
1347   }
1348 }
1349 
1350 // TODO(ckrasic) Sad face.  Do this lazily because many tests exploded
1351 // otherwise.  |threading_and_tasks.md| advises to instantiate a
1352 // |base::test::TaskEnvironment| in the test, implementing that
1353 // for all tests that might exercise QUIC is too daunting.  Also, in
1354 // some tests it seemed like following the advice just broke in other
1355 // ways.
GetTaskRunner()1356 base::SequencedTaskRunner* UDPSocketPosix::GetTaskRunner() {
1357   if (task_runner_ == nullptr)
1358     task_runner_ = base::ThreadPool::CreateSequencedTaskRunner({});
1359   return task_runner_.get();
1360 }
1361 
OnWriteAsyncTimerFired()1362 void UDPSocketPosix::OnWriteAsyncTimerFired() {
1363   DVLOG(2) << __func__ << " pending writes " << pending_writes_.size();
1364   if (pending_writes_.empty()) {
1365     write_async_timer_.Stop();
1366     write_async_timer_running_ = false;
1367     return;
1368   }
1369   if (last_async_result_ < 0) {
1370     DVLOG(1) << __func__ << " socket not writeable";
1371     return;
1372   }
1373   FlushPending();
1374 }
1375 
LocalSendBuffers()1376 void UDPSocketPosix::LocalSendBuffers() {
1377   DVLOG(1) << __func__ << " queue " << pending_writes_.size() << " out of "
1378            << write_async_outstanding_ << " total";
1379   DidSendBuffers(sender_->SendBuffers(socket_, std::move(pending_writes_)));
1380 }
1381 
PostSendBuffers()1382 void UDPSocketPosix::PostSendBuffers() {
1383   DVLOG(1) << __func__ << " queue " << pending_writes_.size() << " out of "
1384            << write_async_outstanding_ << " total";
1385   base::PostTaskAndReplyWithResult(
1386       GetTaskRunner(), FROM_HERE,
1387       base::BindOnce(&UDPSocketPosixSender::SendBuffers, sender_, socket_,
1388                      std::move(pending_writes_)),
1389       base::BindOnce(&UDPSocketPosix::DidSendBuffers,
1390                      weak_factory_.GetWeakPtr()));
1391 }
1392 
DidSendBuffers(SendResult send_result)1393 void UDPSocketPosix::DidSendBuffers(SendResult send_result) {
1394   DVLOG(3) << __func__;
1395   int write_count = send_result.write_count;
1396   DatagramBuffers& buffers = send_result.buffers;
1397 
1398   DCHECK(!buffers.empty());
1399   int num_buffers = buffers.size();
1400 
1401   // Dequeue buffers that have been written.
1402   if (write_count > 0) {
1403     write_async_outstanding_ -= write_count;
1404 
1405     DatagramBuffers::const_iterator it;
1406     // Generate logs for written buffers
1407     it = buffers.cbegin();
1408     for (int i = 0; i < write_count; i++, it++) {
1409       auto& buffer = *it;
1410       LogWrite(buffer->length(), buffer->data(), NULL);
1411       written_bytes_ += buffer->length();
1412     }
1413     // Return written buffers to pool
1414     DatagramBuffers written_buffers;
1415     if (write_count == num_buffers) {
1416       it = buffers.cend();
1417     } else {
1418       it = buffers.cbegin();
1419       for (int i = 0; i < write_count; i++) {
1420         it++;
1421       }
1422     }
1423     written_buffers.splice(written_buffers.cend(), buffers, buffers.cbegin(),
1424                            it);
1425     DCHECK(datagram_buffer_pool_ != nullptr);
1426     datagram_buffer_pool_->Dequeue(&written_buffers);
1427   }
1428 
1429   // Requeue left-over (unwritten) buffers.
1430   if (!buffers.empty()) {
1431     DVLOG(2) << __func__ << " requeue " << buffers.size() << " buffers";
1432     pending_writes_.splice(pending_writes_.begin(), std::move(buffers));
1433   }
1434 
1435   last_async_result_ = send_result.rv;
1436   if (last_async_result_ == ERR_IO_PENDING) {
1437     DVLOG(2) << __func__ << " WatchFileDescriptor start";
1438     if (!WatchFileDescriptor()) {
1439       DVPLOG(1) << "WatchFileDescriptor failed on write";
1440       last_async_result_ = MapSystemError(errno);
1441       LogWrite(last_async_result_, NULL, NULL);
1442     } else {
1443       last_async_result_ = 0;
1444     }
1445   } else if (last_async_result_ < 0 || pending_writes_.empty()) {
1446     DVLOG(2) << __func__ << " WatchFileDescriptor stop: result "
1447              << ErrorToShortString(last_async_result_) << " pending_writes "
1448              << pending_writes_.size();
1449     StopWatchingFileDescriptor();
1450   }
1451   DCHECK(last_async_result_ != ERR_IO_PENDING);
1452 
1453   if (write_callback_.is_null())
1454     return;
1455 
1456   if (last_async_result_ < 0) {
1457     DVLOG(1) << last_async_result_;
1458     // Update the writer with the latest result.
1459     DoWriteCallback(ResetLastAsyncResult());
1460   } else if (write_async_outstanding_ < kWriteAsyncCallbackBuffersThreshold) {
1461     DVLOG(1) << write_async_outstanding_ << " < "
1462              << kWriteAsyncCallbackBuffersThreshold;
1463     DoWriteCallback(ResetWrittenBytes());
1464   }
1465 }
1466 
OnFileCanWriteWithoutBlocking(int)1467 void UDPSocketPosix::WriteAsyncWatcher::OnFileCanWriteWithoutBlocking(int) {
1468   DVLOG(1) << __func__ << " queue " << socket_->pending_writes_.size()
1469            << " out of " << socket_->write_async_outstanding_ << " total";
1470   socket_->StopWatchingFileDescriptor();
1471   socket_->FlushPending();
1472 }
1473 
WatchFileDescriptor()1474 bool UDPSocketPosix::WatchFileDescriptor() {
1475   if (write_async_watcher_->watching())
1476     return true;
1477   bool result = InternalWatchFileDescriptor();
1478   if (result) {
1479     write_async_watcher_->set_watching(true);
1480   }
1481   return result;
1482 }
1483 
StopWatchingFileDescriptor()1484 void UDPSocketPosix::StopWatchingFileDescriptor() {
1485   if (!write_async_watcher_->watching())
1486     return;
1487   InternalStopWatchingFileDescriptor();
1488   write_async_watcher_->set_watching(false);
1489 }
1490 
InternalWatchFileDescriptor()1491 bool UDPSocketPosix::InternalWatchFileDescriptor() {
1492   return base::MessageLoopCurrentForIO::Get()->WatchFileDescriptor(
1493       socket_, true, base::MessagePumpForIO::WATCH_WRITE,
1494       &write_socket_watcher_, write_async_watcher_.get());
1495 }
1496 
InternalStopWatchingFileDescriptor()1497 void UDPSocketPosix::InternalStopWatchingFileDescriptor() {
1498   bool ok = write_socket_watcher_.StopWatchingFileDescriptor();
1499   DCHECK(ok);
1500 }
1501 
SetMaxPacketSize(size_t max_packet_size)1502 void UDPSocketPosix::SetMaxPacketSize(size_t max_packet_size) {
1503   datagram_buffer_pool_ = std::make_unique<DatagramBufferPool>(max_packet_size);
1504 }
1505 
ResetLastAsyncResult()1506 int UDPSocketPosix::ResetLastAsyncResult() {
1507   int result = last_async_result_;
1508   last_async_result_ = 0;
1509   return result;
1510 }
1511 
ResetWrittenBytes()1512 int UDPSocketPosix::ResetWrittenBytes() {
1513   int bytes = written_bytes_;
1514   written_bytes_ = 0;
1515   return bytes;
1516 }
1517 
1518 }  // namespace net
1519