1 /** @file
2 
3   A brief file description
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 /****************************************************************************
25 
26   UnixUDPNet.cc
27   UDPNet implementation
28 
29 
30  ****************************************************************************/
31 
32 #if defined(darwin)
33 /* This is for IPV6_PKTINFO and IPV6_RECVPKTINFO */
34 #define __APPLE_USE_RFC_3542
35 #endif
36 
37 #include "P_Net.h"
38 #include "P_UDPNet.h"
39 
40 using UDPNetContHandler = int (UDPNetHandler::*)(int, void *);
41 
42 inkcoreapi ClassAllocator<UDPPacketInternal> udpPacketAllocator("udpPacketAllocator");
43 EventType ET_UDP;
44 
45 //
46 // Global Data
47 //
48 
49 UDPNetProcessorInternal udpNetInternal;
50 UDPNetProcessor &udpNet = udpNetInternal;
51 
52 int32_t g_udp_periodicCleanupSlots;
53 int32_t g_udp_periodicFreeCancelledPkts;
54 int32_t g_udp_numSendRetries;
55 
56 //
57 // Public functions
58 // See header for documentation
59 //
60 int G_bwGrapherFd;
61 sockaddr_in6 G_bwGrapherLoc;
62 
63 void
initialize_thread_for_udp_net(EThread * thread)64 initialize_thread_for_udp_net(EThread *thread)
65 {
66   UDPNetHandler *nh = get_UDPNetHandler(thread);
67 
68   new (reinterpret_cast<ink_dummy_for_new *>(nh)) UDPNetHandler;
69   new (reinterpret_cast<ink_dummy_for_new *>(get_UDPPollCont(thread))) PollCont(thread->mutex);
70   // The UDPNetHandler cannot be accessed across EThreads.
71   // Because the UDPNetHandler should be called back immediately after UDPPollCont.
72   nh->mutex  = thread->mutex.get();
73   nh->thread = thread;
74 
75   PollCont *upc       = get_UDPPollCont(thread);
76   PollDescriptor *upd = upc->pollDescriptor;
77   // due to ET_UDP is really simple, it should sleep for a long time
78   // TODO: fixed size
79   upc->poll_timeout = 100;
80   // This variable controls how often we cleanup the cancelled packets.
81   // If it is set to 0, then cleanup never occurs.
82   REC_ReadConfigInt32(g_udp_periodicFreeCancelledPkts, "proxy.config.udp.free_cancelled_pkts_sec");
83 
84   // This variable controls how many "slots" of the udp calendar queue we cleanup.
85   // If it is set to 0, then cleanup never occurs.  This value makes sense
86   // only if the above variable is set.
87   REC_ReadConfigInt32(g_udp_periodicCleanupSlots, "proxy.config.udp.periodic_cleanup");
88 
89   // UDP sends can fail with errno=EAGAIN.  This variable determines the # of
90   // times the UDP thread retries before giving up.  Set to 0 to keep trying forever.
91   REC_ReadConfigInt32(g_udp_numSendRetries, "proxy.config.udp.send_retries");
92   g_udp_numSendRetries = g_udp_numSendRetries < 0 ? 0 : g_udp_numSendRetries;
93 
94   thread->set_tail_handler(nh);
95   thread->ep = static_cast<EventIO *>(ats_malloc(sizeof(EventIO)));
96   new (thread->ep) EventIO();
97   thread->ep->type = EVENTIO_ASYNC_SIGNAL;
98 #if HAVE_EVENTFD
99   thread->ep->start(upd, thread->evfd, nullptr, EVENTIO_READ);
100 #else
101   thread->ep->start(upd, thread->evpipe[0], nullptr, EVENTIO_READ);
102 #endif
103 }
104 
105 int
start(int n_upd_threads,size_t stacksize)106 UDPNetProcessorInternal::start(int n_upd_threads, size_t stacksize)
107 {
108   if (n_upd_threads < 1) {
109     return -1;
110   }
111 
112   pollCont_offset      = eventProcessor.allocate(sizeof(PollCont));
113   udpNetHandler_offset = eventProcessor.allocate(sizeof(UDPNetHandler));
114 
115   ET_UDP = eventProcessor.register_event_type("ET_UDP");
116   eventProcessor.schedule_spawn(&initialize_thread_for_udp_net, ET_UDP);
117   eventProcessor.spawn_event_threads(ET_UDP, n_upd_threads, stacksize);
118 
119   return 0;
120 }
121 
122 void
udp_read_from_net(UDPNetHandler * nh,UDPConnection * xuc)123 UDPNetProcessorInternal::udp_read_from_net(UDPNetHandler *nh, UDPConnection *xuc)
124 {
125   UnixUDPConnection *uc = (UnixUDPConnection *)xuc;
126 
127   // receive packet and queue onto UDPConnection.
128   // don't call back connection at this time.
129   int64_t r;
130   int iters         = 0;
131   unsigned max_niov = 32;
132 
133   struct msghdr msg;
134   Ptr<IOBufferBlock> chain, next_chain;
135   struct iovec tiovec[max_niov];
136   int64_t size_index  = BUFFER_SIZE_INDEX_2K;
137   int64_t buffer_size = BUFFER_SIZE_FOR_INDEX(size_index);
138   // The max length of receive buffer is 32 * buffer_size (2048) = 65536 bytes.
139   // Because the 'UDP Length' is type of uint16_t defined in RFC 768.
140   // And there is 8 octets in 'User Datagram Header' which means the max length of payload is no more than 65527 bytes.
141   do {
142     // create IOBufferBlock chain to receive data
143     unsigned int niov;
144     IOBufferBlock *b, *last;
145 
146     // build struct iov
147     // reuse the block in chain if available
148     b    = chain.get();
149     last = nullptr;
150     for (niov = 0; niov < max_niov; niov++) {
151       if (b == nullptr) {
152         b = new_IOBufferBlock();
153         b->alloc(size_index);
154         if (last == nullptr) {
155           chain = b;
156         } else {
157           last->next = b;
158         }
159       }
160 
161       tiovec[niov].iov_base = b->buf();
162       tiovec[niov].iov_len  = b->block_size();
163 
164       last = b;
165       b    = b->next.get();
166     }
167 
168     // build struct msghdr
169     sockaddr_in6 fromaddr;
170     sockaddr_in6 toaddr;
171     int toaddr_len = sizeof(toaddr);
172     char *cbuf[1024];
173     msg.msg_name       = &fromaddr;
174     msg.msg_namelen    = sizeof(fromaddr);
175     msg.msg_iov        = tiovec;
176     msg.msg_iovlen     = niov;
177     msg.msg_control    = cbuf;
178     msg.msg_controllen = sizeof(cbuf);
179 
180     // receive data by recvmsg
181     r = socketManager.recvmsg(uc->getFd(), &msg, 0);
182     if (r <= 0) {
183       // error
184       break;
185     }
186 
187     // truncated check
188     if (msg.msg_flags & MSG_TRUNC) {
189       Debug("udp-read", "The UDP packet is truncated");
190     }
191 
192     // fill the IOBufferBlock chain
193     int64_t saved = r;
194     b             = chain.get();
195     while (b && saved > 0) {
196       if (saved > buffer_size) {
197         b->fill(buffer_size);
198         saved -= buffer_size;
199         b = b->next.get();
200       } else {
201         b->fill(saved);
202         saved      = 0;
203         next_chain = b->next.get();
204         b->next    = nullptr;
205       }
206     }
207 
208     safe_getsockname(xuc->getFd(), reinterpret_cast<struct sockaddr *>(&toaddr), &toaddr_len);
209     for (auto cmsg = CMSG_FIRSTHDR(&msg); cmsg != nullptr; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
210       switch (cmsg->cmsg_type) {
211 #ifdef IP_PKTINFO
212       case IP_PKTINFO:
213         if (cmsg->cmsg_level == IPPROTO_IP) {
214           struct in_pktinfo *pktinfo                                = reinterpret_cast<struct in_pktinfo *>(CMSG_DATA(cmsg));
215           reinterpret_cast<sockaddr_in *>(&toaddr)->sin_addr.s_addr = pktinfo->ipi_addr.s_addr;
216         }
217         break;
218 #endif
219 #ifdef IP_RECVDSTADDR
220       case IP_RECVDSTADDR:
221         if (cmsg->cmsg_level == IPPROTO_IP) {
222           struct in_addr *addr                                      = reinterpret_cast<struct in_addr *>(CMSG_DATA(cmsg));
223           reinterpret_cast<sockaddr_in *>(&toaddr)->sin_addr.s_addr = addr->s_addr;
224         }
225         break;
226 #endif
227 #if defined(IPV6_PKTINFO) || defined(IPV6_RECVPKTINFO)
228       case IPV6_PKTINFO: // IPV6_RECVPKTINFO uses IPV6_PKTINFO too
229         if (cmsg->cmsg_level == IPPROTO_IPV6) {
230           struct in6_pktinfo *pktinfo = reinterpret_cast<struct in6_pktinfo *>(CMSG_DATA(cmsg));
231           memcpy(toaddr.sin6_addr.s6_addr, &pktinfo->ipi6_addr, 16);
232         }
233         break;
234 #endif
235       }
236     }
237 
238     // create packet
239     UDPPacket *p = new_incoming_UDPPacket(ats_ip_sa_cast(&fromaddr), ats_ip_sa_cast(&toaddr), chain);
240     p->setConnection(uc);
241     // queue onto the UDPConnection
242     uc->inQueue.push((UDPPacketInternal *)p);
243 
244     // reload the unused block
245     chain      = next_chain;
246     next_chain = nullptr;
247     iters++;
248   } while (r > 0);
249   if (iters >= 1) {
250     Debug("udp-read", "read %d at a time", iters);
251   }
252   // if not already on to-be-called-back queue, then add it.
253   if (!uc->onCallbackQueue) {
254     ink_assert(uc->callback_link.next == nullptr);
255     ink_assert(uc->callback_link.prev == nullptr);
256     uc->AddRef();
257     nh->udp_callbacks.enqueue(uc);
258     uc->onCallbackQueue = 1;
259   }
260 }
261 
262 int
udp_callback(UDPNetHandler * nh,UDPConnection * xuc,EThread * thread)263 UDPNetProcessorInternal::udp_callback(UDPNetHandler *nh, UDPConnection *xuc, EThread *thread)
264 {
265   (void)nh;
266   UnixUDPConnection *uc = (UnixUDPConnection *)xuc;
267 
268   if (uc->continuation && uc->mutex) {
269     MUTEX_TRY_LOCK(lock, uc->mutex, thread);
270     if (!lock.is_locked()) {
271       return 1;
272     }
273     uc->AddRef();
274     uc->callbackHandler(0, nullptr);
275     return 0;
276   } else {
277     ink_assert(!"doesn't reach here");
278     if (!uc->callbackAction) {
279       uc->AddRef();
280       uc->callbackAction = eventProcessor.schedule_imm(uc);
281     }
282     return 0;
283   }
284 }
285 
286 #define UNINITIALIZED_EVENT_PTR (Event *)0xdeadbeef
287 
288 // cheesy implementation of a asynchronous read and callback for Unix
289 class UDPReadContinuation : public Continuation
290 {
291 public:
292   UDPReadContinuation(Event *completionToken);
293   UDPReadContinuation();
294   ~UDPReadContinuation() override;
295   inline void free();
296   inline void init_token(Event *completionToken);
297   inline void init_read(int fd, IOBufferBlock *buf, int len, struct sockaddr *fromaddr, socklen_t *fromaddrlen);
298 
299   void
set_timer(int seconds)300   set_timer(int seconds)
301   {
302     timeout_interval = HRTIME_SECONDS(seconds);
303   }
304 
305   void cancel();
306   int readPollEvent(int event, Event *e);
307 
308   Action *
getAction()309   getAction()
310   {
311     return event;
312   }
313 
314   void setupPollDescriptor();
315 
316 private:
317   Event *event = UNINITIALIZED_EVENT_PTR; // the completion event token created
318   // on behalf of the client
319   Ptr<IOBufferBlock> readbuf{nullptr};
320   int readlen                   = 0;
321   struct sockaddr_in6 *fromaddr = nullptr;
322   socklen_t *fromaddrlen        = nullptr;
323   int fd                        = NO_FD; // fd we are reading from
324   int ifd                       = NO_FD; // poll fd index
325   ink_hrtime period             = 0;     // polling period
326   ink_hrtime elapsed_time       = 0;
327   ink_hrtime timeout_interval   = 0;
328 };
329 
330 ClassAllocator<UDPReadContinuation> udpReadContAllocator("udpReadContAllocator");
331 
UDPReadContinuation(Event * completionToken)332 UDPReadContinuation::UDPReadContinuation(Event *completionToken)
333   : Continuation(nullptr),
334     event(completionToken),
335     readbuf(nullptr),
336 
337     fd(-1),
338     ifd(-1)
339 
340 {
341   if (completionToken->continuation) {
342     this->mutex = completionToken->continuation->mutex;
343   } else {
344     this->mutex = new_ProxyMutex();
345   }
346 }
347 
UDPReadContinuation()348 UDPReadContinuation::UDPReadContinuation() : Continuation(nullptr) {}
349 
350 inline void
free()351 UDPReadContinuation::free()
352 {
353   ink_assert(event != nullptr);
354   completionUtil::destroy(event);
355   event            = nullptr;
356   readbuf          = nullptr;
357   readlen          = 0;
358   fromaddrlen      = nullptr;
359   fd               = -1;
360   ifd              = -1;
361   period           = 0;
362   elapsed_time     = 0;
363   timeout_interval = 0;
364   mutex            = nullptr;
365   udpReadContAllocator.free(this);
366 }
367 
368 inline void
init_token(Event * completionToken)369 UDPReadContinuation::init_token(Event *completionToken)
370 {
371   if (completionToken->continuation) {
372     this->mutex = completionToken->continuation->mutex;
373   } else {
374     this->mutex = new_ProxyMutex();
375   }
376   event = completionToken;
377 }
378 
379 inline void
init_read(int rfd,IOBufferBlock * buf,int len,struct sockaddr * fromaddr_,socklen_t * fromaddrlen_)380 UDPReadContinuation::init_read(int rfd, IOBufferBlock *buf, int len, struct sockaddr *fromaddr_, socklen_t *fromaddrlen_)
381 {
382   ink_assert(rfd >= 0 && buf != nullptr && fromaddr_ != nullptr && fromaddrlen_ != nullptr);
383   fd          = rfd;
384   readbuf     = buf;
385   readlen     = len;
386   fromaddr    = ats_ip6_cast(fromaddr_);
387   fromaddrlen = fromaddrlen_;
388   SET_HANDLER(&UDPReadContinuation::readPollEvent);
389   period = -HRTIME_MSECONDS(net_event_period);
390   setupPollDescriptor();
391   this_ethread()->schedule_every(this, period);
392 }
393 
~UDPReadContinuation()394 UDPReadContinuation::~UDPReadContinuation()
395 {
396   if (event != UNINITIALIZED_EVENT_PTR) {
397     ink_assert(event != nullptr);
398     completionUtil::destroy(event);
399     event = nullptr;
400   }
401 }
402 
403 void
cancel()404 UDPReadContinuation::cancel()
405 {
406   // I don't think this actually cancels it correctly right now.
407   event->cancel();
408 }
409 
410 void
setupPollDescriptor()411 UDPReadContinuation::setupPollDescriptor()
412 {
413 #if TS_USE_EPOLL
414   Pollfd *pfd;
415   EThread *et  = (EThread *)this_thread();
416   PollCont *pc = get_PollCont(et);
417   if (pc->nextPollDescriptor == nullptr) {
418     pc->nextPollDescriptor = new PollDescriptor();
419   }
420   pfd     = pc->nextPollDescriptor->alloc();
421   pfd->fd = fd;
422   ifd     = pfd - pc->nextPollDescriptor->pfd;
423   ink_assert(pc->nextPollDescriptor->nfds > ifd);
424   pfd->events  = POLLIN;
425   pfd->revents = 0;
426 #endif
427 }
428 
429 int
readPollEvent(int event_,Event * e)430 UDPReadContinuation::readPollEvent(int event_, Event *e)
431 {
432   (void)event_;
433   (void)e;
434 
435   // PollCont *pc = get_PollCont(e->ethread);
436   Continuation *c;
437 
438   if (event->cancelled) {
439     e->cancel();
440     free();
441     return EVENT_DONE;
442   }
443 
444   // See if the request has timed out
445   if (timeout_interval) {
446     elapsed_time += -period;
447     if (elapsed_time >= timeout_interval) {
448       c = completionUtil::getContinuation(event);
449       // TODO: Should we deal with the return code?
450       c->handleEvent(NET_EVENT_DATAGRAM_READ_ERROR, event);
451       e->cancel();
452       free();
453       return EVENT_DONE;
454     }
455   }
456 
457   c = completionUtil::getContinuation(event);
458   // do read
459   socklen_t tmp_fromlen = *fromaddrlen;
460   int rlen              = socketManager.recvfrom(fd, readbuf->end(), readlen, 0, ats_ip_sa_cast(fromaddr), &tmp_fromlen);
461 
462   completionUtil::setThread(event, e->ethread);
463   // call back user with their event
464   if (rlen > 0) {
465     // do callback if read is successful
466     *fromaddrlen = tmp_fromlen;
467     completionUtil::setInfo(event, fd, readbuf, rlen, errno);
468     readbuf->fill(rlen);
469     // TODO: Should we deal with the return code?
470     c->handleEvent(NET_EVENT_DATAGRAM_READ_COMPLETE, event);
471     e->cancel();
472     free();
473 
474     return EVENT_DONE;
475   } else if (rlen < 0 && rlen != -EAGAIN) {
476     // signal error.
477     *fromaddrlen = tmp_fromlen;
478     completionUtil::setInfo(event, fd, readbuf, rlen, errno);
479     c = completionUtil::getContinuation(event);
480     // TODO: Should we deal with the return code?
481     c->handleEvent(NET_EVENT_DATAGRAM_READ_ERROR, event);
482     e->cancel();
483     free();
484 
485     return EVENT_DONE;
486   } else {
487     completionUtil::setThread(event, nullptr);
488   }
489 
490   if (event->cancelled) {
491     e->cancel();
492     free();
493 
494     return EVENT_DONE;
495   }
496   // reestablish poll
497   setupPollDescriptor();
498 
499   return EVENT_CONT;
500 }
501 
502 /* recvfrom:
503  * Unix:
504  *   assert(buf->write_avail() >= len);
505  *   *actual_len = recvfrom(fd,addr,buf->end(),len)
506  *   if successful then
507  *      buf->fill(*actual_len);
508  *	    return ACTION_RESULT_DONE
509  *   else if nothing read
510  *      *actual_len is 0
511  *      create "UDP read continuation" C with 'cont's lock
512  *         set user callback to 'cont'
513  *      return C's action.
514  *   else
515  *      return error;
516  */
517 Action *
recvfrom_re(Continuation * cont,void * token,int fd,struct sockaddr * fromaddr,socklen_t * fromaddrlen,IOBufferBlock * buf,int len,bool useReadCont,int timeout)518 UDPNetProcessor::recvfrom_re(Continuation *cont, void *token, int fd, struct sockaddr *fromaddr, socklen_t *fromaddrlen,
519                              IOBufferBlock *buf, int len, bool useReadCont, int timeout)
520 {
521   (void)useReadCont;
522   ink_assert(buf->write_avail() >= len);
523   int actual;
524   Event *event = completionUtil::create();
525 
526   completionUtil::setContinuation(event, cont);
527   completionUtil::setHandle(event, token);
528   actual = socketManager.recvfrom(fd, buf->end(), len, 0, fromaddr, fromaddrlen);
529 
530   if (actual > 0) {
531     completionUtil::setThread(event, this_ethread());
532     completionUtil::setInfo(event, fd, make_ptr(buf), actual, errno);
533     buf->fill(actual);
534     cont->handleEvent(NET_EVENT_DATAGRAM_READ_COMPLETE, event);
535     completionUtil::destroy(event);
536     return ACTION_RESULT_DONE;
537   } else if (actual == 0 || (actual < 0 && actual == -EAGAIN)) {
538     UDPReadContinuation *c = udpReadContAllocator.alloc();
539     c->init_token(event);
540     c->init_read(fd, buf, len, fromaddr, fromaddrlen);
541     if (timeout) {
542       c->set_timer(timeout);
543     }
544     return event;
545   } else {
546     completionUtil::setThread(event, this_ethread());
547     completionUtil::setInfo(event, fd, make_ptr(buf), actual, errno);
548     cont->handleEvent(NET_EVENT_DATAGRAM_READ_ERROR, event);
549     completionUtil::destroy(event);
550     return ACTION_IO_ERROR;
551   }
552 }
553 
554 /* sendmsg:
555  * Unix:
556  *   *actual_len = sendmsg(fd,msg,default-flags);
557  *   if successful,
558  *      return ACTION_RESULT_DONE
559  *   else
560  *      return error
561  */
562 Action *
sendmsg_re(Continuation * cont,void * token,int fd,struct msghdr * msg)563 UDPNetProcessor::sendmsg_re(Continuation *cont, void *token, int fd, struct msghdr *msg)
564 {
565   int actual;
566   Event *event = completionUtil::create();
567 
568   completionUtil::setContinuation(event, cont);
569   completionUtil::setHandle(event, token);
570 
571   actual = socketManager.sendmsg(fd, msg, 0);
572   if (actual >= 0) {
573     completionUtil::setThread(event, this_ethread());
574     completionUtil::setInfo(event, fd, msg, actual, errno);
575     cont->handleEvent(NET_EVENT_DATAGRAM_WRITE_COMPLETE, event);
576     completionUtil::destroy(event);
577     return ACTION_RESULT_DONE;
578   } else {
579     completionUtil::setThread(event, this_ethread());
580     completionUtil::setInfo(event, fd, msg, actual, errno);
581     cont->handleEvent(NET_EVENT_DATAGRAM_WRITE_ERROR, event);
582     completionUtil::destroy(event);
583     return ACTION_IO_ERROR;
584   }
585 }
586 
587 /* sendto:
588  * If this were implemented, it might be implemented like this:
589  * Unix:
590  *   call sendto(fd,addr,buf->reader()->start(),len);
591  *   if successful,
592  *      buf->consume(len);
593  *      return ACTION_RESULT_DONE
594  *   else
595  *      return error
596  *
597  */
598 Action *
sendto_re(Continuation * cont,void * token,int fd,struct sockaddr const * toaddr,int toaddrlen,IOBufferBlock * buf,int len)599 UDPNetProcessor::sendto_re(Continuation *cont, void *token, int fd, struct sockaddr const *toaddr, int toaddrlen,
600                            IOBufferBlock *buf, int len)
601 {
602   (void)token;
603   ink_assert(buf->read_avail() >= len);
604   int nbytes_sent = socketManager.sendto(fd, buf->start(), len, 0, toaddr, toaddrlen);
605 
606   if (nbytes_sent >= 0) {
607     ink_assert(nbytes_sent == len);
608     buf->consume(nbytes_sent);
609     cont->handleEvent(NET_EVENT_DATAGRAM_WRITE_COMPLETE, (void *)-1);
610     return ACTION_RESULT_DONE;
611   } else {
612     cont->handleEvent(NET_EVENT_DATAGRAM_WRITE_ERROR, (void *)static_cast<intptr_t>(nbytes_sent));
613     return ACTION_IO_ERROR;
614   }
615 }
616 
617 bool
CreateUDPSocket(int * resfd,sockaddr const * remote_addr,Action ** status,NetVCOptions & opt)618 UDPNetProcessor::CreateUDPSocket(int *resfd, sockaddr const *remote_addr, Action **status, NetVCOptions &opt)
619 {
620   int res = 0, fd = -1;
621   int local_addr_len;
622   IpEndpoint local_addr;
623 
624   // Need to do address calculations first, so we can determine the
625   // address family for socket creation.
626   ink_zero(local_addr);
627 
628   bool is_any_address = false;
629   if (NetVCOptions::FOREIGN_ADDR == opt.addr_binding || NetVCOptions::INTF_ADDR == opt.addr_binding) {
630     // Same for now, transparency for foreign addresses must be handled
631     // *after* the socket is created, and we need to do this calculation
632     // before the socket to get the IP family correct.
633     ink_release_assert(opt.local_ip.isValid());
634     local_addr.assign(opt.local_ip, htons(opt.local_port));
635     ink_assert(ats_ip_are_compatible(remote_addr, &local_addr.sa));
636   } else {
637     // No local address specified, so use family option if possible.
638     int family = ats_is_ip(opt.ip_family) ? opt.ip_family : AF_INET;
639     local_addr.setToAnyAddr(family);
640     is_any_address    = true;
641     local_addr.port() = htons(opt.local_port);
642   }
643 
644   *resfd = -1;
645   if ((res = socketManager.socket(remote_addr->sa_family, SOCK_DGRAM, 0)) < 0) {
646     goto HardError;
647   }
648 
649   fd = res;
650   if (safe_fcntl(fd, F_SETFL, O_NONBLOCK) < 0) {
651     goto HardError;
652   }
653 
654   if (opt.socket_recv_bufsize > 0) {
655     if (unlikely(socketManager.set_rcvbuf_size(fd, opt.socket_recv_bufsize))) {
656       Debug("udpnet", "set_dnsbuf_size(%d) failed", opt.socket_recv_bufsize);
657     }
658   }
659   if (opt.socket_send_bufsize > 0) {
660     if (unlikely(socketManager.set_sndbuf_size(fd, opt.socket_send_bufsize))) {
661       Debug("udpnet", "set_dnsbuf_size(%d) failed", opt.socket_send_bufsize);
662     }
663   }
664 
665   if (opt.ip_family == AF_INET) {
666     bool succeeded = false;
667     int enable     = 1;
668 #ifdef IP_PKTINFO
669     if (safe_setsockopt(fd, IPPROTO_IP, IP_PKTINFO, reinterpret_cast<char *>(&enable), sizeof(enable)) == 0) {
670       succeeded = true;
671     }
672 #endif
673 #ifdef IP_RECVDSTADDR
674     if (safe_setsockopt(fd, IPPROTO_IP, IP_RECVDSTADDR, reinterpret_cast<char *>(&enable), sizeof(enable)) == 0) {
675       succeeded = true;
676     }
677 #endif
678     if (!succeeded) {
679       Debug("udpnet", "setsockeopt for pktinfo failed");
680       goto HardError;
681     }
682   } else if (opt.ip_family == AF_INET6) {
683     bool succeeded = false;
684     int enable     = 1;
685 #ifdef IPV6_PKTINFO
686     if (safe_setsockopt(fd, IPPROTO_IPV6, IPV6_PKTINFO, reinterpret_cast<char *>(&enable), sizeof(enable)) == 0) {
687       succeeded = true;
688     }
689 #endif
690 #ifdef IPV6_RECVPKTINFO
691     if (safe_setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, reinterpret_cast<char *>(&enable), sizeof(enable)) == 0) {
692       succeeded = true;
693     }
694 #endif
695     if (!succeeded) {
696       Debug("udpnet", "setsockeopt for pktinfo failed");
697       goto HardError;
698     }
699   }
700 
701   if (local_addr.port() || !is_any_address) {
702     if (-1 == socketManager.ink_bind(fd, &local_addr.sa, ats_ip_size(&local_addr.sa))) {
703       char buff[INET6_ADDRPORTSTRLEN];
704       Debug("udpnet", "ink bind failed on %s", ats_ip_nptop(local_addr, buff, sizeof(buff)));
705       goto SoftError;
706     }
707 
708     if (safe_getsockname(fd, &local_addr.sa, &local_addr_len) < 0) {
709       Debug("udpnet", "CreateUdpsocket: getsockname didn't work");
710       goto HardError;
711     }
712   }
713 
714   *resfd  = fd;
715   *status = nullptr;
716   Debug("udpnet", "creating a udp socket port = %d, %d---success", ats_ip_port_host_order(remote_addr),
717         ats_ip_port_host_order(local_addr));
718   return true;
719 SoftError:
720   Debug("udpnet", "creating a udp socket port = %d---soft failure", ats_ip_port_host_order(local_addr));
721   if (fd != -1) {
722     socketManager.close(fd);
723   }
724   *resfd  = -1;
725   *status = nullptr;
726   return false;
727 HardError:
728   Debug("udpnet", "creating a udp socket port = %d---hard failure", ats_ip_port_host_order(local_addr));
729   if (fd != -1) {
730     socketManager.close(fd);
731   }
732   *resfd  = -1;
733   *status = ACTION_IO_ERROR;
734   return false;
735 }
736 
737 Action *
UDPBind(Continuation * cont,sockaddr const * addr,int fd,int send_bufsize,int recv_bufsize)738 UDPNetProcessor::UDPBind(Continuation *cont, sockaddr const *addr, int fd, int send_bufsize, int recv_bufsize)
739 {
740   int res              = 0;
741   UnixUDPConnection *n = nullptr;
742   IpEndpoint myaddr;
743   int myaddr_len     = sizeof(myaddr);
744   PollCont *pc       = nullptr;
745   PollDescriptor *pd = nullptr;
746   bool need_bind     = true;
747 
748   if (fd == -1) {
749     if ((res = socketManager.socket(addr->sa_family, SOCK_DGRAM, 0)) < 0) {
750       goto Lerror;
751     }
752     fd = res;
753   } else {
754     need_bind = false;
755   }
756   if (fcntl(fd, F_SETFL, O_NONBLOCK) < 0) {
757     goto Lerror;
758   }
759 
760   if (addr->sa_family == AF_INET) {
761     bool succeeded = false;
762     int enable     = 1;
763 #ifdef IP_PKTINFO
764     if (safe_setsockopt(fd, IPPROTO_IP, IP_PKTINFO, reinterpret_cast<char *>(&enable), sizeof(enable)) == 0) {
765       succeeded = true;
766     }
767 #endif
768 #ifdef IP_RECVDSTADDR
769     if (safe_setsockopt(fd, IPPROTO_IP, IP_RECVDSTADDR, reinterpret_cast<char *>(&enable), sizeof(enable)) == 0) {
770       succeeded = true;
771     }
772 #endif
773     if (!succeeded) {
774       Debug("udpnet", "setsockeopt for pktinfo failed");
775       goto Lerror;
776     }
777   } else if (addr->sa_family == AF_INET6) {
778     bool succeeded = false;
779     int enable     = 1;
780 #ifdef IPV6_PKTINFO
781     if (safe_setsockopt(fd, IPPROTO_IPV6, IPV6_PKTINFO, reinterpret_cast<char *>(&enable), sizeof(enable)) == 0) {
782       succeeded = true;
783     }
784 #endif
785 #ifdef IPV6_RECVPKTINFO
786     if (safe_setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, reinterpret_cast<char *>(&enable), sizeof(enable)) == 0) {
787       succeeded = true;
788     }
789 #endif
790     if (!succeeded) {
791       Debug("udpnet", "setsockeopt for pktinfo failed");
792       goto Lerror;
793     }
794   }
795 
796   // If this is a class D address (i.e. multicast address), use REUSEADDR.
797   if (ats_is_ip_multicast(addr)) {
798     int enable_reuseaddr = 1;
799 
800     if (safe_setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char *>(&enable_reuseaddr), sizeof(enable_reuseaddr)) < 0) {
801       goto Lerror;
802     }
803   }
804 
805   if (need_bind && ats_is_ip6(addr) && safe_setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, SOCKOPT_ON, sizeof(int)) < 0) {
806     goto Lerror;
807   }
808 
809   if (need_bind && (socketManager.ink_bind(fd, addr, ats_ip_size(addr)) < 0)) {
810     Debug("udpnet", "ink_bind failed");
811     goto Lerror;
812   }
813 
814   if (recv_bufsize) {
815     if (unlikely(socketManager.set_rcvbuf_size(fd, recv_bufsize))) {
816       Debug("udpnet", "set_dnsbuf_size(%d) failed", recv_bufsize);
817     }
818   }
819   if (send_bufsize) {
820     if (unlikely(socketManager.set_sndbuf_size(fd, send_bufsize))) {
821       Debug("udpnet", "set_dnsbuf_size(%d) failed", send_bufsize);
822     }
823   }
824   if (safe_getsockname(fd, &myaddr.sa, &myaddr_len) < 0) {
825     goto Lerror;
826   }
827   n = new UnixUDPConnection(fd);
828 
829   Debug("udpnet", "UDPNetProcessor::UDPBind: %p fd=%d", n, fd);
830   n->setBinding(&myaddr.sa);
831   n->bindToThread(cont);
832 
833   pc = get_UDPPollCont(n->ethread);
834   pd = pc->pollDescriptor;
835 
836   n->ep.start(pd, n, EVENTIO_READ);
837 
838   cont->handleEvent(NET_EVENT_DATAGRAM_OPEN, n);
839   return ACTION_RESULT_DONE;
840 Lerror:
841   if (fd != NO_FD) {
842     socketManager.close(fd);
843   }
844   Debug("udpnet", "Error: %s (%d)", strerror(errno), errno);
845 
846   cont->handleEvent(NET_EVENT_DATAGRAM_ERROR, nullptr);
847   return ACTION_IO_ERROR;
848 }
849 
850 // send out all packets that need to be sent out as of time=now
UDPQueue()851 UDPQueue::UDPQueue() {}
852 
~UDPQueue()853 UDPQueue::~UDPQueue() {}
854 
855 /*
856  * Driver function that aggregates packets across cont's and sends them
857  */
858 void
service(UDPNetHandler * nh)859 UDPQueue::service(UDPNetHandler *nh)
860 {
861   (void)nh;
862   ink_hrtime now     = Thread::get_hrtime_updated();
863   uint64_t timeSpent = 0;
864   uint64_t pktSendStartTime;
865   ink_hrtime pktSendTime;
866   UDPPacketInternal *p = nullptr;
867 
868   SList(UDPPacketInternal, alink) aq(outQueue.popall());
869   Queue<UDPPacketInternal> stk;
870   while ((p = aq.pop())) {
871     stk.push(p);
872   }
873 
874   // walk backwards down list since this is actually an atomic stack.
875   while ((p = stk.pop())) {
876     ink_assert(p->link.prev == nullptr);
877     ink_assert(p->link.next == nullptr);
878     // insert into our queue.
879     Debug("udp-send", "Adding %p", p);
880     if (p->conn->lastPktStartTime == 0) {
881       pktSendStartTime = std::max(now, p->delivery_time);
882     } else {
883       pktSendTime      = p->delivery_time;
884       pktSendStartTime = std::max(std::max(now, pktSendTime), p->delivery_time);
885     }
886     p->conn->lastPktStartTime = pktSendStartTime;
887     p->delivery_time          = pktSendStartTime;
888 
889     pipeInfo.addPacket(p, now);
890   }
891 
892   pipeInfo.advanceNow(now);
893   SendPackets();
894 
895   timeSpent = ink_hrtime_to_msec(now - last_report);
896   if (timeSpent > 10000) {
897     last_report = now;
898     added       = 0;
899     packets     = 0;
900   }
901   last_service = now;
902 }
903 
904 void
SendPackets()905 UDPQueue::SendPackets()
906 {
907   UDPPacketInternal *p;
908   static ink_hrtime lastCleanupTime = Thread::get_hrtime_updated();
909   ink_hrtime now                    = Thread::get_hrtime_updated();
910   ink_hrtime send_threshold_time    = now + SLOT_TIME;
911   int32_t bytesThisSlot = INT_MAX, bytesUsed = 0;
912   int32_t bytesThisPipe, sentOne;
913   int64_t pktLen;
914 
915   bytesThisSlot = INT_MAX;
916 
917 sendPackets:
918   sentOne       = false;
919   bytesThisPipe = bytesThisSlot;
920 
921   while ((bytesThisPipe > 0) && (pipeInfo.firstPacket(send_threshold_time))) {
922     p      = pipeInfo.getFirstPacket();
923     pktLen = p->getPktLength();
924 
925     if (p->conn->shouldDestroy()) {
926       goto next_pkt;
927     }
928     if (p->conn->GetSendGenerationNumber() != p->reqGenerationNum) {
929       goto next_pkt;
930     }
931 
932     SendUDPPacket(p, pktLen);
933     bytesUsed += pktLen;
934     bytesThisPipe -= pktLen;
935   next_pkt:
936     sentOne = true;
937     p->free();
938 
939     if (bytesThisPipe < 0) {
940       break;
941     }
942   }
943 
944   bytesThisSlot -= bytesUsed;
945 
946   if ((bytesThisSlot > 0) && sentOne) {
947     // redistribute the slack...
948     now = Thread::get_hrtime_updated();
949     if (pipeInfo.firstPacket(now) == nullptr) {
950       pipeInfo.advanceNow(now);
951     }
952     goto sendPackets;
953   }
954 
955   if ((g_udp_periodicFreeCancelledPkts) && (now - lastCleanupTime > ink_hrtime_from_sec(g_udp_periodicFreeCancelledPkts))) {
956     pipeInfo.FreeCancelledPackets(g_udp_periodicCleanupSlots);
957     lastCleanupTime = now;
958   }
959 }
960 
961 void
SendUDPPacket(UDPPacketInternal * p,int32_t)962 UDPQueue::SendUDPPacket(UDPPacketInternal *p, int32_t /* pktLen ATS_UNUSED */)
963 {
964   struct msghdr msg;
965   struct iovec iov[32];
966   int real_len = 0;
967   int n, count, iov_len = 0;
968 
969   p->conn->lastSentPktStartTime = p->delivery_time;
970   Debug("udp-send", "Sending %p", p);
971 
972 #if !defined(solaris)
973   msg.msg_control    = nullptr;
974   msg.msg_controllen = 0;
975   msg.msg_flags      = 0;
976 #endif
977   msg.msg_name    = reinterpret_cast<caddr_t>(&p->to.sa);
978   msg.msg_namelen = ats_ip_size(p->to);
979   iov_len         = 0;
980 
981   for (IOBufferBlock *b = p->chain.get(); b != nullptr; b = b->next.get()) {
982     iov[iov_len].iov_base = static_cast<caddr_t>(b->start());
983     iov[iov_len].iov_len  = b->size();
984     real_len += iov[iov_len].iov_len;
985     iov_len++;
986   }
987   msg.msg_iov    = iov;
988   msg.msg_iovlen = iov_len;
989 
990   count = 0;
991   while (true) {
992     // stupid Linux problem: sendmsg can return EAGAIN
993     n = ::sendmsg(p->conn->getFd(), &msg, 0);
994     if ((n >= 0) || ((n < 0) && (errno != EAGAIN))) {
995       // send succeeded or some random error happened.
996       if (n < 0) {
997         Debug("udp-send", "Error: %s (%d)", strerror(errno), errno);
998       }
999 
1000       break;
1001     }
1002     if (errno == EAGAIN) {
1003       ++count;
1004       if ((g_udp_numSendRetries > 0) && (count >= g_udp_numSendRetries)) {
1005         // tried too many times; give up
1006         Debug("udpnet", "Send failed: too many retries");
1007         break;
1008       }
1009     }
1010   }
1011 }
1012 
1013 void
send(UDPPacket * p)1014 UDPQueue::send(UDPPacket *p)
1015 {
1016   // XXX: maybe fastpath for immediate send?
1017   outQueue.push((UDPPacketInternal *)p);
1018 }
1019 
1020 #undef LINK
1021 
1022 static void
net_signal_hook_callback(EThread * thread)1023 net_signal_hook_callback(EThread *thread)
1024 {
1025 #if HAVE_EVENTFD
1026   uint64_t counter;
1027   ATS_UNUSED_RETURN(read(thread->evfd, &counter, sizeof(uint64_t)));
1028 #elif TS_USE_PORT
1029 /* Nothing to drain or do */
1030 #else
1031   char dummy[1024];
1032   ATS_UNUSED_RETURN(read(thread->evpipe[0], &dummy[0], 1024));
1033 #endif
1034 }
1035 
UDPNetHandler()1036 UDPNetHandler::UDPNetHandler()
1037 {
1038   nextCheck = Thread::get_hrtime_updated() + HRTIME_MSECONDS(1000);
1039   lastCheck = 0;
1040   SET_HANDLER((UDPNetContHandler)&UDPNetHandler::startNetEvent);
1041 }
1042 
1043 int
startNetEvent(int event,Event * e)1044 UDPNetHandler::startNetEvent(int event, Event *e)
1045 {
1046   (void)event;
1047   SET_HANDLER((UDPNetContHandler)&UDPNetHandler::mainNetEvent);
1048   trigger_event = e;
1049   e->schedule_every(-HRTIME_MSECONDS(UDP_NH_PERIOD));
1050   return EVENT_CONT;
1051 }
1052 
1053 int
mainNetEvent(int event,Event * e)1054 UDPNetHandler::mainNetEvent(int event, Event *e)
1055 {
1056   ink_assert(trigger_event == e && event == EVENT_POLL);
1057   return this->waitForActivity(net_config_poll_timeout);
1058 }
1059 
1060 int
waitForActivity(ink_hrtime timeout)1061 UDPNetHandler::waitForActivity(ink_hrtime timeout)
1062 {
1063   UnixUDPConnection *uc;
1064   PollCont *pc = get_UDPPollCont(this->thread);
1065   pc->do_poll(timeout);
1066 
1067   /* Notice: the race between traversal of newconn_list and UDPBind()
1068    *
1069    * If the UDPBind() is called after the traversal of newconn_list,
1070    * the UDPConnection, the one from the pollDescriptor->result, did not push into the open_list.
1071    *
1072    * TODO:
1073    *
1074    * Take UnixNetVConnection::acceptEvent() as reference to create UnixUDPConnection::newconnEvent().
1075    */
1076 
1077   // handle new UDP connection
1078   SList(UnixUDPConnection, newconn_alink) ncq(newconn_list.popall());
1079   while ((uc = ncq.pop())) {
1080     if (uc->shouldDestroy()) {
1081       open_list.remove(uc); // due to the above race
1082       uc->Release();
1083     } else {
1084       ink_assert(uc->mutex && uc->continuation);
1085       open_list.in_or_enqueue(uc); // due to the above race
1086     }
1087   }
1088 
1089   // handle UDP outgoing engine
1090   udpOutQueue.service(this);
1091 
1092   // handle UDP read operations
1093   int i        = 0;
1094   EventIO *epd = nullptr;
1095   for (i = 0; i < pc->pollDescriptor->result; i++) {
1096     epd = static_cast<EventIO *> get_ev_data(pc->pollDescriptor, i);
1097     if (epd->type == EVENTIO_UDP_CONNECTION) {
1098       // TODO: handle EVENTIO_ERROR
1099       if (get_ev_events(pc->pollDescriptor, i) & EVENTIO_READ) {
1100         uc = epd->data.uc;
1101         ink_assert(uc && uc->mutex && uc->continuation);
1102         ink_assert(uc->refcount >= 1);
1103         open_list.in_or_enqueue(uc); // due to the above race
1104         if (uc->shouldDestroy()) {
1105           open_list.remove(uc);
1106           uc->Release();
1107         } else {
1108           udpNetInternal.udp_read_from_net(this, uc);
1109         }
1110       } else {
1111         Debug("iocore_udp_main", "Unhandled epoll event: 0x%04x", get_ev_events(pc->pollDescriptor, i));
1112       }
1113     } else if (epd->type == EVENTIO_DNS_CONNECTION) {
1114       // TODO: handle DNS conn if there is ET_UDP
1115       if (epd->data.dnscon != nullptr) {
1116         epd->data.dnscon->trigger();
1117 #if defined(USE_EDGE_TRIGGER)
1118         epd->refresh(EVENTIO_READ);
1119 #endif
1120       }
1121     } else if (epd->type == EVENTIO_ASYNC_SIGNAL) {
1122       net_signal_hook_callback(this->thread);
1123     }
1124   } // end for
1125 
1126   // remove dead UDP connections
1127   ink_hrtime now = Thread::get_hrtime_updated();
1128   if (now >= nextCheck) {
1129     forl_LL(UnixUDPConnection, xuc, open_list)
1130     {
1131       ink_assert(xuc->mutex && xuc->continuation);
1132       ink_assert(xuc->refcount >= 1);
1133       if (xuc->shouldDestroy()) {
1134         open_list.remove(xuc);
1135         xuc->Release();
1136       }
1137     }
1138     nextCheck = Thread::get_hrtime_updated() + HRTIME_MSECONDS(1000);
1139   }
1140   // service UDPConnections with data ready for callback.
1141   Que(UnixUDPConnection, callback_link) q = udp_callbacks;
1142   udp_callbacks.clear();
1143   while ((uc = q.dequeue())) {
1144     ink_assert(uc->mutex && uc->continuation);
1145     if (udpNetInternal.udp_callback(this, uc, this->thread)) { // not successful
1146       // schedule on a thread of its own.
1147       ink_assert(uc->callback_link.next == nullptr);
1148       ink_assert(uc->callback_link.prev == nullptr);
1149       udp_callbacks.enqueue(uc);
1150     } else {
1151       ink_assert(uc->callback_link.next == nullptr);
1152       ink_assert(uc->callback_link.prev == nullptr);
1153       uc->onCallbackQueue = 0;
1154       uc->Release();
1155     }
1156   }
1157 
1158   return EVENT_CONT;
1159 }
1160 
1161 void
signalActivity()1162 UDPNetHandler::signalActivity()
1163 {
1164 #if HAVE_EVENTFD
1165   uint64_t counter = 1;
1166   ATS_UNUSED_RETURN(write(thread->evfd, &counter, sizeof(uint64_t)));
1167 #elif TS_USE_PORT
1168   PollDescriptor *pd = get_PollDescriptor(thread);
1169   ATS_UNUSED_RETURN(port_send(pd->port_fd, 0, thread->ep));
1170 #else
1171   char dummy = 1;
1172   ATS_UNUSED_RETURN(write(thread->evpipe[1], &dummy, 1));
1173 #endif
1174 }
1175