1 /** @file
2
3 A brief file description
4
5 @section license License
6
7 Licensed to the Apache Software Foundation (ASF) under one
8 or more contributor license agreements. See the NOTICE file
9 distributed with this work for additional information
10 regarding copyright ownership. The ASF licenses this file
11 to you under the Apache License, Version 2.0 (the
12 "License"); you may not use this file except in compliance
13 with the License. You may obtain a copy of the License at
14
15 http://www.apache.org/licenses/LICENSE-2.0
16
17 Unless required by applicable law or agreed to in writing, software
18 distributed under the License is distributed on an "AS IS" BASIS,
19 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 See the License for the specific language governing permissions and
21 limitations under the License.
22 */
23
24 /****************************************************************************
25
26 UnixUDPNet.cc
27 UDPNet implementation
28
29
30 ****************************************************************************/
31
32 #if defined(darwin)
33 /* This is for IPV6_PKTINFO and IPV6_RECVPKTINFO */
34 #define __APPLE_USE_RFC_3542
35 #endif
36
37 #include "P_Net.h"
38 #include "P_UDPNet.h"
39
40 using UDPNetContHandler = int (UDPNetHandler::*)(int, void *);
41
42 inkcoreapi ClassAllocator<UDPPacketInternal> udpPacketAllocator("udpPacketAllocator");
43 EventType ET_UDP;
44
45 //
46 // Global Data
47 //
48
49 UDPNetProcessorInternal udpNetInternal;
50 UDPNetProcessor &udpNet = udpNetInternal;
51
52 int32_t g_udp_periodicCleanupSlots;
53 int32_t g_udp_periodicFreeCancelledPkts;
54 int32_t g_udp_numSendRetries;
55
56 //
57 // Public functions
58 // See header for documentation
59 //
60 int G_bwGrapherFd;
61 sockaddr_in6 G_bwGrapherLoc;
62
63 void
initialize_thread_for_udp_net(EThread * thread)64 initialize_thread_for_udp_net(EThread *thread)
65 {
66 UDPNetHandler *nh = get_UDPNetHandler(thread);
67
68 new (reinterpret_cast<ink_dummy_for_new *>(nh)) UDPNetHandler;
69 new (reinterpret_cast<ink_dummy_for_new *>(get_UDPPollCont(thread))) PollCont(thread->mutex);
70 // The UDPNetHandler cannot be accessed across EThreads.
71 // Because the UDPNetHandler should be called back immediately after UDPPollCont.
72 nh->mutex = thread->mutex.get();
73 nh->thread = thread;
74
75 PollCont *upc = get_UDPPollCont(thread);
76 PollDescriptor *upd = upc->pollDescriptor;
77 // due to ET_UDP is really simple, it should sleep for a long time
78 // TODO: fixed size
79 upc->poll_timeout = 100;
80 // This variable controls how often we cleanup the cancelled packets.
81 // If it is set to 0, then cleanup never occurs.
82 REC_ReadConfigInt32(g_udp_periodicFreeCancelledPkts, "proxy.config.udp.free_cancelled_pkts_sec");
83
84 // This variable controls how many "slots" of the udp calendar queue we cleanup.
85 // If it is set to 0, then cleanup never occurs. This value makes sense
86 // only if the above variable is set.
87 REC_ReadConfigInt32(g_udp_periodicCleanupSlots, "proxy.config.udp.periodic_cleanup");
88
89 // UDP sends can fail with errno=EAGAIN. This variable determines the # of
90 // times the UDP thread retries before giving up. Set to 0 to keep trying forever.
91 REC_ReadConfigInt32(g_udp_numSendRetries, "proxy.config.udp.send_retries");
92 g_udp_numSendRetries = g_udp_numSendRetries < 0 ? 0 : g_udp_numSendRetries;
93
94 thread->set_tail_handler(nh);
95 thread->ep = static_cast<EventIO *>(ats_malloc(sizeof(EventIO)));
96 new (thread->ep) EventIO();
97 thread->ep->type = EVENTIO_ASYNC_SIGNAL;
98 #if HAVE_EVENTFD
99 thread->ep->start(upd, thread->evfd, nullptr, EVENTIO_READ);
100 #else
101 thread->ep->start(upd, thread->evpipe[0], nullptr, EVENTIO_READ);
102 #endif
103 }
104
105 int
start(int n_upd_threads,size_t stacksize)106 UDPNetProcessorInternal::start(int n_upd_threads, size_t stacksize)
107 {
108 if (n_upd_threads < 1) {
109 return -1;
110 }
111
112 pollCont_offset = eventProcessor.allocate(sizeof(PollCont));
113 udpNetHandler_offset = eventProcessor.allocate(sizeof(UDPNetHandler));
114
115 ET_UDP = eventProcessor.register_event_type("ET_UDP");
116 eventProcessor.schedule_spawn(&initialize_thread_for_udp_net, ET_UDP);
117 eventProcessor.spawn_event_threads(ET_UDP, n_upd_threads, stacksize);
118
119 return 0;
120 }
121
122 void
udp_read_from_net(UDPNetHandler * nh,UDPConnection * xuc)123 UDPNetProcessorInternal::udp_read_from_net(UDPNetHandler *nh, UDPConnection *xuc)
124 {
125 UnixUDPConnection *uc = (UnixUDPConnection *)xuc;
126
127 // receive packet and queue onto UDPConnection.
128 // don't call back connection at this time.
129 int64_t r;
130 int iters = 0;
131 unsigned max_niov = 32;
132
133 struct msghdr msg;
134 Ptr<IOBufferBlock> chain, next_chain;
135 struct iovec tiovec[max_niov];
136 int64_t size_index = BUFFER_SIZE_INDEX_2K;
137 int64_t buffer_size = BUFFER_SIZE_FOR_INDEX(size_index);
138 // The max length of receive buffer is 32 * buffer_size (2048) = 65536 bytes.
139 // Because the 'UDP Length' is type of uint16_t defined in RFC 768.
140 // And there is 8 octets in 'User Datagram Header' which means the max length of payload is no more than 65527 bytes.
141 do {
142 // create IOBufferBlock chain to receive data
143 unsigned int niov;
144 IOBufferBlock *b, *last;
145
146 // build struct iov
147 // reuse the block in chain if available
148 b = chain.get();
149 last = nullptr;
150 for (niov = 0; niov < max_niov; niov++) {
151 if (b == nullptr) {
152 b = new_IOBufferBlock();
153 b->alloc(size_index);
154 if (last == nullptr) {
155 chain = b;
156 } else {
157 last->next = b;
158 }
159 }
160
161 tiovec[niov].iov_base = b->buf();
162 tiovec[niov].iov_len = b->block_size();
163
164 last = b;
165 b = b->next.get();
166 }
167
168 // build struct msghdr
169 sockaddr_in6 fromaddr;
170 sockaddr_in6 toaddr;
171 int toaddr_len = sizeof(toaddr);
172 char *cbuf[1024];
173 msg.msg_name = &fromaddr;
174 msg.msg_namelen = sizeof(fromaddr);
175 msg.msg_iov = tiovec;
176 msg.msg_iovlen = niov;
177 msg.msg_control = cbuf;
178 msg.msg_controllen = sizeof(cbuf);
179
180 // receive data by recvmsg
181 r = socketManager.recvmsg(uc->getFd(), &msg, 0);
182 if (r <= 0) {
183 // error
184 break;
185 }
186
187 // truncated check
188 if (msg.msg_flags & MSG_TRUNC) {
189 Debug("udp-read", "The UDP packet is truncated");
190 }
191
192 // fill the IOBufferBlock chain
193 int64_t saved = r;
194 b = chain.get();
195 while (b && saved > 0) {
196 if (saved > buffer_size) {
197 b->fill(buffer_size);
198 saved -= buffer_size;
199 b = b->next.get();
200 } else {
201 b->fill(saved);
202 saved = 0;
203 next_chain = b->next.get();
204 b->next = nullptr;
205 }
206 }
207
208 safe_getsockname(xuc->getFd(), reinterpret_cast<struct sockaddr *>(&toaddr), &toaddr_len);
209 for (auto cmsg = CMSG_FIRSTHDR(&msg); cmsg != nullptr; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
210 switch (cmsg->cmsg_type) {
211 #ifdef IP_PKTINFO
212 case IP_PKTINFO:
213 if (cmsg->cmsg_level == IPPROTO_IP) {
214 struct in_pktinfo *pktinfo = reinterpret_cast<struct in_pktinfo *>(CMSG_DATA(cmsg));
215 reinterpret_cast<sockaddr_in *>(&toaddr)->sin_addr.s_addr = pktinfo->ipi_addr.s_addr;
216 }
217 break;
218 #endif
219 #ifdef IP_RECVDSTADDR
220 case IP_RECVDSTADDR:
221 if (cmsg->cmsg_level == IPPROTO_IP) {
222 struct in_addr *addr = reinterpret_cast<struct in_addr *>(CMSG_DATA(cmsg));
223 reinterpret_cast<sockaddr_in *>(&toaddr)->sin_addr.s_addr = addr->s_addr;
224 }
225 break;
226 #endif
227 #if defined(IPV6_PKTINFO) || defined(IPV6_RECVPKTINFO)
228 case IPV6_PKTINFO: // IPV6_RECVPKTINFO uses IPV6_PKTINFO too
229 if (cmsg->cmsg_level == IPPROTO_IPV6) {
230 struct in6_pktinfo *pktinfo = reinterpret_cast<struct in6_pktinfo *>(CMSG_DATA(cmsg));
231 memcpy(toaddr.sin6_addr.s6_addr, &pktinfo->ipi6_addr, 16);
232 }
233 break;
234 #endif
235 }
236 }
237
238 // create packet
239 UDPPacket *p = new_incoming_UDPPacket(ats_ip_sa_cast(&fromaddr), ats_ip_sa_cast(&toaddr), chain);
240 p->setConnection(uc);
241 // queue onto the UDPConnection
242 uc->inQueue.push((UDPPacketInternal *)p);
243
244 // reload the unused block
245 chain = next_chain;
246 next_chain = nullptr;
247 iters++;
248 } while (r > 0);
249 if (iters >= 1) {
250 Debug("udp-read", "read %d at a time", iters);
251 }
252 // if not already on to-be-called-back queue, then add it.
253 if (!uc->onCallbackQueue) {
254 ink_assert(uc->callback_link.next == nullptr);
255 ink_assert(uc->callback_link.prev == nullptr);
256 uc->AddRef();
257 nh->udp_callbacks.enqueue(uc);
258 uc->onCallbackQueue = 1;
259 }
260 }
261
262 int
udp_callback(UDPNetHandler * nh,UDPConnection * xuc,EThread * thread)263 UDPNetProcessorInternal::udp_callback(UDPNetHandler *nh, UDPConnection *xuc, EThread *thread)
264 {
265 (void)nh;
266 UnixUDPConnection *uc = (UnixUDPConnection *)xuc;
267
268 if (uc->continuation && uc->mutex) {
269 MUTEX_TRY_LOCK(lock, uc->mutex, thread);
270 if (!lock.is_locked()) {
271 return 1;
272 }
273 uc->AddRef();
274 uc->callbackHandler(0, nullptr);
275 return 0;
276 } else {
277 ink_assert(!"doesn't reach here");
278 if (!uc->callbackAction) {
279 uc->AddRef();
280 uc->callbackAction = eventProcessor.schedule_imm(uc);
281 }
282 return 0;
283 }
284 }
285
286 #define UNINITIALIZED_EVENT_PTR (Event *)0xdeadbeef
287
288 // cheesy implementation of a asynchronous read and callback for Unix
289 class UDPReadContinuation : public Continuation
290 {
291 public:
292 UDPReadContinuation(Event *completionToken);
293 UDPReadContinuation();
294 ~UDPReadContinuation() override;
295 inline void free();
296 inline void init_token(Event *completionToken);
297 inline void init_read(int fd, IOBufferBlock *buf, int len, struct sockaddr *fromaddr, socklen_t *fromaddrlen);
298
299 void
set_timer(int seconds)300 set_timer(int seconds)
301 {
302 timeout_interval = HRTIME_SECONDS(seconds);
303 }
304
305 void cancel();
306 int readPollEvent(int event, Event *e);
307
308 Action *
getAction()309 getAction()
310 {
311 return event;
312 }
313
314 void setupPollDescriptor();
315
316 private:
317 Event *event = UNINITIALIZED_EVENT_PTR; // the completion event token created
318 // on behalf of the client
319 Ptr<IOBufferBlock> readbuf{nullptr};
320 int readlen = 0;
321 struct sockaddr_in6 *fromaddr = nullptr;
322 socklen_t *fromaddrlen = nullptr;
323 int fd = NO_FD; // fd we are reading from
324 int ifd = NO_FD; // poll fd index
325 ink_hrtime period = 0; // polling period
326 ink_hrtime elapsed_time = 0;
327 ink_hrtime timeout_interval = 0;
328 };
329
330 ClassAllocator<UDPReadContinuation> udpReadContAllocator("udpReadContAllocator");
331
UDPReadContinuation(Event * completionToken)332 UDPReadContinuation::UDPReadContinuation(Event *completionToken)
333 : Continuation(nullptr),
334 event(completionToken),
335 readbuf(nullptr),
336
337 fd(-1),
338 ifd(-1)
339
340 {
341 if (completionToken->continuation) {
342 this->mutex = completionToken->continuation->mutex;
343 } else {
344 this->mutex = new_ProxyMutex();
345 }
346 }
347
UDPReadContinuation()348 UDPReadContinuation::UDPReadContinuation() : Continuation(nullptr) {}
349
350 inline void
free()351 UDPReadContinuation::free()
352 {
353 ink_assert(event != nullptr);
354 completionUtil::destroy(event);
355 event = nullptr;
356 readbuf = nullptr;
357 readlen = 0;
358 fromaddrlen = nullptr;
359 fd = -1;
360 ifd = -1;
361 period = 0;
362 elapsed_time = 0;
363 timeout_interval = 0;
364 mutex = nullptr;
365 udpReadContAllocator.free(this);
366 }
367
368 inline void
init_token(Event * completionToken)369 UDPReadContinuation::init_token(Event *completionToken)
370 {
371 if (completionToken->continuation) {
372 this->mutex = completionToken->continuation->mutex;
373 } else {
374 this->mutex = new_ProxyMutex();
375 }
376 event = completionToken;
377 }
378
379 inline void
init_read(int rfd,IOBufferBlock * buf,int len,struct sockaddr * fromaddr_,socklen_t * fromaddrlen_)380 UDPReadContinuation::init_read(int rfd, IOBufferBlock *buf, int len, struct sockaddr *fromaddr_, socklen_t *fromaddrlen_)
381 {
382 ink_assert(rfd >= 0 && buf != nullptr && fromaddr_ != nullptr && fromaddrlen_ != nullptr);
383 fd = rfd;
384 readbuf = buf;
385 readlen = len;
386 fromaddr = ats_ip6_cast(fromaddr_);
387 fromaddrlen = fromaddrlen_;
388 SET_HANDLER(&UDPReadContinuation::readPollEvent);
389 period = -HRTIME_MSECONDS(net_event_period);
390 setupPollDescriptor();
391 this_ethread()->schedule_every(this, period);
392 }
393
~UDPReadContinuation()394 UDPReadContinuation::~UDPReadContinuation()
395 {
396 if (event != UNINITIALIZED_EVENT_PTR) {
397 ink_assert(event != nullptr);
398 completionUtil::destroy(event);
399 event = nullptr;
400 }
401 }
402
403 void
cancel()404 UDPReadContinuation::cancel()
405 {
406 // I don't think this actually cancels it correctly right now.
407 event->cancel();
408 }
409
410 void
setupPollDescriptor()411 UDPReadContinuation::setupPollDescriptor()
412 {
413 #if TS_USE_EPOLL
414 Pollfd *pfd;
415 EThread *et = (EThread *)this_thread();
416 PollCont *pc = get_PollCont(et);
417 if (pc->nextPollDescriptor == nullptr) {
418 pc->nextPollDescriptor = new PollDescriptor();
419 }
420 pfd = pc->nextPollDescriptor->alloc();
421 pfd->fd = fd;
422 ifd = pfd - pc->nextPollDescriptor->pfd;
423 ink_assert(pc->nextPollDescriptor->nfds > ifd);
424 pfd->events = POLLIN;
425 pfd->revents = 0;
426 #endif
427 }
428
429 int
readPollEvent(int event_,Event * e)430 UDPReadContinuation::readPollEvent(int event_, Event *e)
431 {
432 (void)event_;
433 (void)e;
434
435 // PollCont *pc = get_PollCont(e->ethread);
436 Continuation *c;
437
438 if (event->cancelled) {
439 e->cancel();
440 free();
441 return EVENT_DONE;
442 }
443
444 // See if the request has timed out
445 if (timeout_interval) {
446 elapsed_time += -period;
447 if (elapsed_time >= timeout_interval) {
448 c = completionUtil::getContinuation(event);
449 // TODO: Should we deal with the return code?
450 c->handleEvent(NET_EVENT_DATAGRAM_READ_ERROR, event);
451 e->cancel();
452 free();
453 return EVENT_DONE;
454 }
455 }
456
457 c = completionUtil::getContinuation(event);
458 // do read
459 socklen_t tmp_fromlen = *fromaddrlen;
460 int rlen = socketManager.recvfrom(fd, readbuf->end(), readlen, 0, ats_ip_sa_cast(fromaddr), &tmp_fromlen);
461
462 completionUtil::setThread(event, e->ethread);
463 // call back user with their event
464 if (rlen > 0) {
465 // do callback if read is successful
466 *fromaddrlen = tmp_fromlen;
467 completionUtil::setInfo(event, fd, readbuf, rlen, errno);
468 readbuf->fill(rlen);
469 // TODO: Should we deal with the return code?
470 c->handleEvent(NET_EVENT_DATAGRAM_READ_COMPLETE, event);
471 e->cancel();
472 free();
473
474 return EVENT_DONE;
475 } else if (rlen < 0 && rlen != -EAGAIN) {
476 // signal error.
477 *fromaddrlen = tmp_fromlen;
478 completionUtil::setInfo(event, fd, readbuf, rlen, errno);
479 c = completionUtil::getContinuation(event);
480 // TODO: Should we deal with the return code?
481 c->handleEvent(NET_EVENT_DATAGRAM_READ_ERROR, event);
482 e->cancel();
483 free();
484
485 return EVENT_DONE;
486 } else {
487 completionUtil::setThread(event, nullptr);
488 }
489
490 if (event->cancelled) {
491 e->cancel();
492 free();
493
494 return EVENT_DONE;
495 }
496 // reestablish poll
497 setupPollDescriptor();
498
499 return EVENT_CONT;
500 }
501
502 /* recvfrom:
503 * Unix:
504 * assert(buf->write_avail() >= len);
505 * *actual_len = recvfrom(fd,addr,buf->end(),len)
506 * if successful then
507 * buf->fill(*actual_len);
508 * return ACTION_RESULT_DONE
509 * else if nothing read
510 * *actual_len is 0
511 * create "UDP read continuation" C with 'cont's lock
512 * set user callback to 'cont'
513 * return C's action.
514 * else
515 * return error;
516 */
517 Action *
recvfrom_re(Continuation * cont,void * token,int fd,struct sockaddr * fromaddr,socklen_t * fromaddrlen,IOBufferBlock * buf,int len,bool useReadCont,int timeout)518 UDPNetProcessor::recvfrom_re(Continuation *cont, void *token, int fd, struct sockaddr *fromaddr, socklen_t *fromaddrlen,
519 IOBufferBlock *buf, int len, bool useReadCont, int timeout)
520 {
521 (void)useReadCont;
522 ink_assert(buf->write_avail() >= len);
523 int actual;
524 Event *event = completionUtil::create();
525
526 completionUtil::setContinuation(event, cont);
527 completionUtil::setHandle(event, token);
528 actual = socketManager.recvfrom(fd, buf->end(), len, 0, fromaddr, fromaddrlen);
529
530 if (actual > 0) {
531 completionUtil::setThread(event, this_ethread());
532 completionUtil::setInfo(event, fd, make_ptr(buf), actual, errno);
533 buf->fill(actual);
534 cont->handleEvent(NET_EVENT_DATAGRAM_READ_COMPLETE, event);
535 completionUtil::destroy(event);
536 return ACTION_RESULT_DONE;
537 } else if (actual == 0 || (actual < 0 && actual == -EAGAIN)) {
538 UDPReadContinuation *c = udpReadContAllocator.alloc();
539 c->init_token(event);
540 c->init_read(fd, buf, len, fromaddr, fromaddrlen);
541 if (timeout) {
542 c->set_timer(timeout);
543 }
544 return event;
545 } else {
546 completionUtil::setThread(event, this_ethread());
547 completionUtil::setInfo(event, fd, make_ptr(buf), actual, errno);
548 cont->handleEvent(NET_EVENT_DATAGRAM_READ_ERROR, event);
549 completionUtil::destroy(event);
550 return ACTION_IO_ERROR;
551 }
552 }
553
554 /* sendmsg:
555 * Unix:
556 * *actual_len = sendmsg(fd,msg,default-flags);
557 * if successful,
558 * return ACTION_RESULT_DONE
559 * else
560 * return error
561 */
562 Action *
sendmsg_re(Continuation * cont,void * token,int fd,struct msghdr * msg)563 UDPNetProcessor::sendmsg_re(Continuation *cont, void *token, int fd, struct msghdr *msg)
564 {
565 int actual;
566 Event *event = completionUtil::create();
567
568 completionUtil::setContinuation(event, cont);
569 completionUtil::setHandle(event, token);
570
571 actual = socketManager.sendmsg(fd, msg, 0);
572 if (actual >= 0) {
573 completionUtil::setThread(event, this_ethread());
574 completionUtil::setInfo(event, fd, msg, actual, errno);
575 cont->handleEvent(NET_EVENT_DATAGRAM_WRITE_COMPLETE, event);
576 completionUtil::destroy(event);
577 return ACTION_RESULT_DONE;
578 } else {
579 completionUtil::setThread(event, this_ethread());
580 completionUtil::setInfo(event, fd, msg, actual, errno);
581 cont->handleEvent(NET_EVENT_DATAGRAM_WRITE_ERROR, event);
582 completionUtil::destroy(event);
583 return ACTION_IO_ERROR;
584 }
585 }
586
587 /* sendto:
588 * If this were implemented, it might be implemented like this:
589 * Unix:
590 * call sendto(fd,addr,buf->reader()->start(),len);
591 * if successful,
592 * buf->consume(len);
593 * return ACTION_RESULT_DONE
594 * else
595 * return error
596 *
597 */
598 Action *
sendto_re(Continuation * cont,void * token,int fd,struct sockaddr const * toaddr,int toaddrlen,IOBufferBlock * buf,int len)599 UDPNetProcessor::sendto_re(Continuation *cont, void *token, int fd, struct sockaddr const *toaddr, int toaddrlen,
600 IOBufferBlock *buf, int len)
601 {
602 (void)token;
603 ink_assert(buf->read_avail() >= len);
604 int nbytes_sent = socketManager.sendto(fd, buf->start(), len, 0, toaddr, toaddrlen);
605
606 if (nbytes_sent >= 0) {
607 ink_assert(nbytes_sent == len);
608 buf->consume(nbytes_sent);
609 cont->handleEvent(NET_EVENT_DATAGRAM_WRITE_COMPLETE, (void *)-1);
610 return ACTION_RESULT_DONE;
611 } else {
612 cont->handleEvent(NET_EVENT_DATAGRAM_WRITE_ERROR, (void *)static_cast<intptr_t>(nbytes_sent));
613 return ACTION_IO_ERROR;
614 }
615 }
616
617 bool
CreateUDPSocket(int * resfd,sockaddr const * remote_addr,Action ** status,NetVCOptions & opt)618 UDPNetProcessor::CreateUDPSocket(int *resfd, sockaddr const *remote_addr, Action **status, NetVCOptions &opt)
619 {
620 int res = 0, fd = -1;
621 int local_addr_len;
622 IpEndpoint local_addr;
623
624 // Need to do address calculations first, so we can determine the
625 // address family for socket creation.
626 ink_zero(local_addr);
627
628 bool is_any_address = false;
629 if (NetVCOptions::FOREIGN_ADDR == opt.addr_binding || NetVCOptions::INTF_ADDR == opt.addr_binding) {
630 // Same for now, transparency for foreign addresses must be handled
631 // *after* the socket is created, and we need to do this calculation
632 // before the socket to get the IP family correct.
633 ink_release_assert(opt.local_ip.isValid());
634 local_addr.assign(opt.local_ip, htons(opt.local_port));
635 ink_assert(ats_ip_are_compatible(remote_addr, &local_addr.sa));
636 } else {
637 // No local address specified, so use family option if possible.
638 int family = ats_is_ip(opt.ip_family) ? opt.ip_family : AF_INET;
639 local_addr.setToAnyAddr(family);
640 is_any_address = true;
641 local_addr.port() = htons(opt.local_port);
642 }
643
644 *resfd = -1;
645 if ((res = socketManager.socket(remote_addr->sa_family, SOCK_DGRAM, 0)) < 0) {
646 goto HardError;
647 }
648
649 fd = res;
650 if (safe_fcntl(fd, F_SETFL, O_NONBLOCK) < 0) {
651 goto HardError;
652 }
653
654 if (opt.socket_recv_bufsize > 0) {
655 if (unlikely(socketManager.set_rcvbuf_size(fd, opt.socket_recv_bufsize))) {
656 Debug("udpnet", "set_dnsbuf_size(%d) failed", opt.socket_recv_bufsize);
657 }
658 }
659 if (opt.socket_send_bufsize > 0) {
660 if (unlikely(socketManager.set_sndbuf_size(fd, opt.socket_send_bufsize))) {
661 Debug("udpnet", "set_dnsbuf_size(%d) failed", opt.socket_send_bufsize);
662 }
663 }
664
665 if (opt.ip_family == AF_INET) {
666 bool succeeded = false;
667 int enable = 1;
668 #ifdef IP_PKTINFO
669 if (safe_setsockopt(fd, IPPROTO_IP, IP_PKTINFO, reinterpret_cast<char *>(&enable), sizeof(enable)) == 0) {
670 succeeded = true;
671 }
672 #endif
673 #ifdef IP_RECVDSTADDR
674 if (safe_setsockopt(fd, IPPROTO_IP, IP_RECVDSTADDR, reinterpret_cast<char *>(&enable), sizeof(enable)) == 0) {
675 succeeded = true;
676 }
677 #endif
678 if (!succeeded) {
679 Debug("udpnet", "setsockeopt for pktinfo failed");
680 goto HardError;
681 }
682 } else if (opt.ip_family == AF_INET6) {
683 bool succeeded = false;
684 int enable = 1;
685 #ifdef IPV6_PKTINFO
686 if (safe_setsockopt(fd, IPPROTO_IPV6, IPV6_PKTINFO, reinterpret_cast<char *>(&enable), sizeof(enable)) == 0) {
687 succeeded = true;
688 }
689 #endif
690 #ifdef IPV6_RECVPKTINFO
691 if (safe_setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, reinterpret_cast<char *>(&enable), sizeof(enable)) == 0) {
692 succeeded = true;
693 }
694 #endif
695 if (!succeeded) {
696 Debug("udpnet", "setsockeopt for pktinfo failed");
697 goto HardError;
698 }
699 }
700
701 if (local_addr.port() || !is_any_address) {
702 if (-1 == socketManager.ink_bind(fd, &local_addr.sa, ats_ip_size(&local_addr.sa))) {
703 char buff[INET6_ADDRPORTSTRLEN];
704 Debug("udpnet", "ink bind failed on %s", ats_ip_nptop(local_addr, buff, sizeof(buff)));
705 goto SoftError;
706 }
707
708 if (safe_getsockname(fd, &local_addr.sa, &local_addr_len) < 0) {
709 Debug("udpnet", "CreateUdpsocket: getsockname didn't work");
710 goto HardError;
711 }
712 }
713
714 *resfd = fd;
715 *status = nullptr;
716 Debug("udpnet", "creating a udp socket port = %d, %d---success", ats_ip_port_host_order(remote_addr),
717 ats_ip_port_host_order(local_addr));
718 return true;
719 SoftError:
720 Debug("udpnet", "creating a udp socket port = %d---soft failure", ats_ip_port_host_order(local_addr));
721 if (fd != -1) {
722 socketManager.close(fd);
723 }
724 *resfd = -1;
725 *status = nullptr;
726 return false;
727 HardError:
728 Debug("udpnet", "creating a udp socket port = %d---hard failure", ats_ip_port_host_order(local_addr));
729 if (fd != -1) {
730 socketManager.close(fd);
731 }
732 *resfd = -1;
733 *status = ACTION_IO_ERROR;
734 return false;
735 }
736
737 Action *
UDPBind(Continuation * cont,sockaddr const * addr,int fd,int send_bufsize,int recv_bufsize)738 UDPNetProcessor::UDPBind(Continuation *cont, sockaddr const *addr, int fd, int send_bufsize, int recv_bufsize)
739 {
740 int res = 0;
741 UnixUDPConnection *n = nullptr;
742 IpEndpoint myaddr;
743 int myaddr_len = sizeof(myaddr);
744 PollCont *pc = nullptr;
745 PollDescriptor *pd = nullptr;
746 bool need_bind = true;
747
748 if (fd == -1) {
749 if ((res = socketManager.socket(addr->sa_family, SOCK_DGRAM, 0)) < 0) {
750 goto Lerror;
751 }
752 fd = res;
753 } else {
754 need_bind = false;
755 }
756 if (fcntl(fd, F_SETFL, O_NONBLOCK) < 0) {
757 goto Lerror;
758 }
759
760 if (addr->sa_family == AF_INET) {
761 bool succeeded = false;
762 int enable = 1;
763 #ifdef IP_PKTINFO
764 if (safe_setsockopt(fd, IPPROTO_IP, IP_PKTINFO, reinterpret_cast<char *>(&enable), sizeof(enable)) == 0) {
765 succeeded = true;
766 }
767 #endif
768 #ifdef IP_RECVDSTADDR
769 if (safe_setsockopt(fd, IPPROTO_IP, IP_RECVDSTADDR, reinterpret_cast<char *>(&enable), sizeof(enable)) == 0) {
770 succeeded = true;
771 }
772 #endif
773 if (!succeeded) {
774 Debug("udpnet", "setsockeopt for pktinfo failed");
775 goto Lerror;
776 }
777 } else if (addr->sa_family == AF_INET6) {
778 bool succeeded = false;
779 int enable = 1;
780 #ifdef IPV6_PKTINFO
781 if (safe_setsockopt(fd, IPPROTO_IPV6, IPV6_PKTINFO, reinterpret_cast<char *>(&enable), sizeof(enable)) == 0) {
782 succeeded = true;
783 }
784 #endif
785 #ifdef IPV6_RECVPKTINFO
786 if (safe_setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, reinterpret_cast<char *>(&enable), sizeof(enable)) == 0) {
787 succeeded = true;
788 }
789 #endif
790 if (!succeeded) {
791 Debug("udpnet", "setsockeopt for pktinfo failed");
792 goto Lerror;
793 }
794 }
795
796 // If this is a class D address (i.e. multicast address), use REUSEADDR.
797 if (ats_is_ip_multicast(addr)) {
798 int enable_reuseaddr = 1;
799
800 if (safe_setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char *>(&enable_reuseaddr), sizeof(enable_reuseaddr)) < 0) {
801 goto Lerror;
802 }
803 }
804
805 if (need_bind && ats_is_ip6(addr) && safe_setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, SOCKOPT_ON, sizeof(int)) < 0) {
806 goto Lerror;
807 }
808
809 if (need_bind && (socketManager.ink_bind(fd, addr, ats_ip_size(addr)) < 0)) {
810 Debug("udpnet", "ink_bind failed");
811 goto Lerror;
812 }
813
814 if (recv_bufsize) {
815 if (unlikely(socketManager.set_rcvbuf_size(fd, recv_bufsize))) {
816 Debug("udpnet", "set_dnsbuf_size(%d) failed", recv_bufsize);
817 }
818 }
819 if (send_bufsize) {
820 if (unlikely(socketManager.set_sndbuf_size(fd, send_bufsize))) {
821 Debug("udpnet", "set_dnsbuf_size(%d) failed", send_bufsize);
822 }
823 }
824 if (safe_getsockname(fd, &myaddr.sa, &myaddr_len) < 0) {
825 goto Lerror;
826 }
827 n = new UnixUDPConnection(fd);
828
829 Debug("udpnet", "UDPNetProcessor::UDPBind: %p fd=%d", n, fd);
830 n->setBinding(&myaddr.sa);
831 n->bindToThread(cont);
832
833 pc = get_UDPPollCont(n->ethread);
834 pd = pc->pollDescriptor;
835
836 n->ep.start(pd, n, EVENTIO_READ);
837
838 cont->handleEvent(NET_EVENT_DATAGRAM_OPEN, n);
839 return ACTION_RESULT_DONE;
840 Lerror:
841 if (fd != NO_FD) {
842 socketManager.close(fd);
843 }
844 Debug("udpnet", "Error: %s (%d)", strerror(errno), errno);
845
846 cont->handleEvent(NET_EVENT_DATAGRAM_ERROR, nullptr);
847 return ACTION_IO_ERROR;
848 }
849
850 // send out all packets that need to be sent out as of time=now
UDPQueue()851 UDPQueue::UDPQueue() {}
852
~UDPQueue()853 UDPQueue::~UDPQueue() {}
854
855 /*
856 * Driver function that aggregates packets across cont's and sends them
857 */
858 void
service(UDPNetHandler * nh)859 UDPQueue::service(UDPNetHandler *nh)
860 {
861 (void)nh;
862 ink_hrtime now = Thread::get_hrtime_updated();
863 uint64_t timeSpent = 0;
864 uint64_t pktSendStartTime;
865 ink_hrtime pktSendTime;
866 UDPPacketInternal *p = nullptr;
867
868 SList(UDPPacketInternal, alink) aq(outQueue.popall());
869 Queue<UDPPacketInternal> stk;
870 while ((p = aq.pop())) {
871 stk.push(p);
872 }
873
874 // walk backwards down list since this is actually an atomic stack.
875 while ((p = stk.pop())) {
876 ink_assert(p->link.prev == nullptr);
877 ink_assert(p->link.next == nullptr);
878 // insert into our queue.
879 Debug("udp-send", "Adding %p", p);
880 if (p->conn->lastPktStartTime == 0) {
881 pktSendStartTime = std::max(now, p->delivery_time);
882 } else {
883 pktSendTime = p->delivery_time;
884 pktSendStartTime = std::max(std::max(now, pktSendTime), p->delivery_time);
885 }
886 p->conn->lastPktStartTime = pktSendStartTime;
887 p->delivery_time = pktSendStartTime;
888
889 pipeInfo.addPacket(p, now);
890 }
891
892 pipeInfo.advanceNow(now);
893 SendPackets();
894
895 timeSpent = ink_hrtime_to_msec(now - last_report);
896 if (timeSpent > 10000) {
897 last_report = now;
898 added = 0;
899 packets = 0;
900 }
901 last_service = now;
902 }
903
904 void
SendPackets()905 UDPQueue::SendPackets()
906 {
907 UDPPacketInternal *p;
908 static ink_hrtime lastCleanupTime = Thread::get_hrtime_updated();
909 ink_hrtime now = Thread::get_hrtime_updated();
910 ink_hrtime send_threshold_time = now + SLOT_TIME;
911 int32_t bytesThisSlot = INT_MAX, bytesUsed = 0;
912 int32_t bytesThisPipe, sentOne;
913 int64_t pktLen;
914
915 bytesThisSlot = INT_MAX;
916
917 sendPackets:
918 sentOne = false;
919 bytesThisPipe = bytesThisSlot;
920
921 while ((bytesThisPipe > 0) && (pipeInfo.firstPacket(send_threshold_time))) {
922 p = pipeInfo.getFirstPacket();
923 pktLen = p->getPktLength();
924
925 if (p->conn->shouldDestroy()) {
926 goto next_pkt;
927 }
928 if (p->conn->GetSendGenerationNumber() != p->reqGenerationNum) {
929 goto next_pkt;
930 }
931
932 SendUDPPacket(p, pktLen);
933 bytesUsed += pktLen;
934 bytesThisPipe -= pktLen;
935 next_pkt:
936 sentOne = true;
937 p->free();
938
939 if (bytesThisPipe < 0) {
940 break;
941 }
942 }
943
944 bytesThisSlot -= bytesUsed;
945
946 if ((bytesThisSlot > 0) && sentOne) {
947 // redistribute the slack...
948 now = Thread::get_hrtime_updated();
949 if (pipeInfo.firstPacket(now) == nullptr) {
950 pipeInfo.advanceNow(now);
951 }
952 goto sendPackets;
953 }
954
955 if ((g_udp_periodicFreeCancelledPkts) && (now - lastCleanupTime > ink_hrtime_from_sec(g_udp_periodicFreeCancelledPkts))) {
956 pipeInfo.FreeCancelledPackets(g_udp_periodicCleanupSlots);
957 lastCleanupTime = now;
958 }
959 }
960
961 void
SendUDPPacket(UDPPacketInternal * p,int32_t)962 UDPQueue::SendUDPPacket(UDPPacketInternal *p, int32_t /* pktLen ATS_UNUSED */)
963 {
964 struct msghdr msg;
965 struct iovec iov[32];
966 int real_len = 0;
967 int n, count, iov_len = 0;
968
969 p->conn->lastSentPktStartTime = p->delivery_time;
970 Debug("udp-send", "Sending %p", p);
971
972 #if !defined(solaris)
973 msg.msg_control = nullptr;
974 msg.msg_controllen = 0;
975 msg.msg_flags = 0;
976 #endif
977 msg.msg_name = reinterpret_cast<caddr_t>(&p->to.sa);
978 msg.msg_namelen = ats_ip_size(p->to);
979 iov_len = 0;
980
981 for (IOBufferBlock *b = p->chain.get(); b != nullptr; b = b->next.get()) {
982 iov[iov_len].iov_base = static_cast<caddr_t>(b->start());
983 iov[iov_len].iov_len = b->size();
984 real_len += iov[iov_len].iov_len;
985 iov_len++;
986 }
987 msg.msg_iov = iov;
988 msg.msg_iovlen = iov_len;
989
990 count = 0;
991 while (true) {
992 // stupid Linux problem: sendmsg can return EAGAIN
993 n = ::sendmsg(p->conn->getFd(), &msg, 0);
994 if ((n >= 0) || ((n < 0) && (errno != EAGAIN))) {
995 // send succeeded or some random error happened.
996 if (n < 0) {
997 Debug("udp-send", "Error: %s (%d)", strerror(errno), errno);
998 }
999
1000 break;
1001 }
1002 if (errno == EAGAIN) {
1003 ++count;
1004 if ((g_udp_numSendRetries > 0) && (count >= g_udp_numSendRetries)) {
1005 // tried too many times; give up
1006 Debug("udpnet", "Send failed: too many retries");
1007 break;
1008 }
1009 }
1010 }
1011 }
1012
1013 void
send(UDPPacket * p)1014 UDPQueue::send(UDPPacket *p)
1015 {
1016 // XXX: maybe fastpath for immediate send?
1017 outQueue.push((UDPPacketInternal *)p);
1018 }
1019
1020 #undef LINK
1021
1022 static void
net_signal_hook_callback(EThread * thread)1023 net_signal_hook_callback(EThread *thread)
1024 {
1025 #if HAVE_EVENTFD
1026 uint64_t counter;
1027 ATS_UNUSED_RETURN(read(thread->evfd, &counter, sizeof(uint64_t)));
1028 #elif TS_USE_PORT
1029 /* Nothing to drain or do */
1030 #else
1031 char dummy[1024];
1032 ATS_UNUSED_RETURN(read(thread->evpipe[0], &dummy[0], 1024));
1033 #endif
1034 }
1035
UDPNetHandler()1036 UDPNetHandler::UDPNetHandler()
1037 {
1038 nextCheck = Thread::get_hrtime_updated() + HRTIME_MSECONDS(1000);
1039 lastCheck = 0;
1040 SET_HANDLER((UDPNetContHandler)&UDPNetHandler::startNetEvent);
1041 }
1042
1043 int
startNetEvent(int event,Event * e)1044 UDPNetHandler::startNetEvent(int event, Event *e)
1045 {
1046 (void)event;
1047 SET_HANDLER((UDPNetContHandler)&UDPNetHandler::mainNetEvent);
1048 trigger_event = e;
1049 e->schedule_every(-HRTIME_MSECONDS(UDP_NH_PERIOD));
1050 return EVENT_CONT;
1051 }
1052
1053 int
mainNetEvent(int event,Event * e)1054 UDPNetHandler::mainNetEvent(int event, Event *e)
1055 {
1056 ink_assert(trigger_event == e && event == EVENT_POLL);
1057 return this->waitForActivity(net_config_poll_timeout);
1058 }
1059
1060 int
waitForActivity(ink_hrtime timeout)1061 UDPNetHandler::waitForActivity(ink_hrtime timeout)
1062 {
1063 UnixUDPConnection *uc;
1064 PollCont *pc = get_UDPPollCont(this->thread);
1065 pc->do_poll(timeout);
1066
1067 /* Notice: the race between traversal of newconn_list and UDPBind()
1068 *
1069 * If the UDPBind() is called after the traversal of newconn_list,
1070 * the UDPConnection, the one from the pollDescriptor->result, did not push into the open_list.
1071 *
1072 * TODO:
1073 *
1074 * Take UnixNetVConnection::acceptEvent() as reference to create UnixUDPConnection::newconnEvent().
1075 */
1076
1077 // handle new UDP connection
1078 SList(UnixUDPConnection, newconn_alink) ncq(newconn_list.popall());
1079 while ((uc = ncq.pop())) {
1080 if (uc->shouldDestroy()) {
1081 open_list.remove(uc); // due to the above race
1082 uc->Release();
1083 } else {
1084 ink_assert(uc->mutex && uc->continuation);
1085 open_list.in_or_enqueue(uc); // due to the above race
1086 }
1087 }
1088
1089 // handle UDP outgoing engine
1090 udpOutQueue.service(this);
1091
1092 // handle UDP read operations
1093 int i = 0;
1094 EventIO *epd = nullptr;
1095 for (i = 0; i < pc->pollDescriptor->result; i++) {
1096 epd = static_cast<EventIO *> get_ev_data(pc->pollDescriptor, i);
1097 if (epd->type == EVENTIO_UDP_CONNECTION) {
1098 // TODO: handle EVENTIO_ERROR
1099 if (get_ev_events(pc->pollDescriptor, i) & EVENTIO_READ) {
1100 uc = epd->data.uc;
1101 ink_assert(uc && uc->mutex && uc->continuation);
1102 ink_assert(uc->refcount >= 1);
1103 open_list.in_or_enqueue(uc); // due to the above race
1104 if (uc->shouldDestroy()) {
1105 open_list.remove(uc);
1106 uc->Release();
1107 } else {
1108 udpNetInternal.udp_read_from_net(this, uc);
1109 }
1110 } else {
1111 Debug("iocore_udp_main", "Unhandled epoll event: 0x%04x", get_ev_events(pc->pollDescriptor, i));
1112 }
1113 } else if (epd->type == EVENTIO_DNS_CONNECTION) {
1114 // TODO: handle DNS conn if there is ET_UDP
1115 if (epd->data.dnscon != nullptr) {
1116 epd->data.dnscon->trigger();
1117 #if defined(USE_EDGE_TRIGGER)
1118 epd->refresh(EVENTIO_READ);
1119 #endif
1120 }
1121 } else if (epd->type == EVENTIO_ASYNC_SIGNAL) {
1122 net_signal_hook_callback(this->thread);
1123 }
1124 } // end for
1125
1126 // remove dead UDP connections
1127 ink_hrtime now = Thread::get_hrtime_updated();
1128 if (now >= nextCheck) {
1129 forl_LL(UnixUDPConnection, xuc, open_list)
1130 {
1131 ink_assert(xuc->mutex && xuc->continuation);
1132 ink_assert(xuc->refcount >= 1);
1133 if (xuc->shouldDestroy()) {
1134 open_list.remove(xuc);
1135 xuc->Release();
1136 }
1137 }
1138 nextCheck = Thread::get_hrtime_updated() + HRTIME_MSECONDS(1000);
1139 }
1140 // service UDPConnections with data ready for callback.
1141 Que(UnixUDPConnection, callback_link) q = udp_callbacks;
1142 udp_callbacks.clear();
1143 while ((uc = q.dequeue())) {
1144 ink_assert(uc->mutex && uc->continuation);
1145 if (udpNetInternal.udp_callback(this, uc, this->thread)) { // not successful
1146 // schedule on a thread of its own.
1147 ink_assert(uc->callback_link.next == nullptr);
1148 ink_assert(uc->callback_link.prev == nullptr);
1149 udp_callbacks.enqueue(uc);
1150 } else {
1151 ink_assert(uc->callback_link.next == nullptr);
1152 ink_assert(uc->callback_link.prev == nullptr);
1153 uc->onCallbackQueue = 0;
1154 uc->Release();
1155 }
1156 }
1157
1158 return EVENT_CONT;
1159 }
1160
1161 void
signalActivity()1162 UDPNetHandler::signalActivity()
1163 {
1164 #if HAVE_EVENTFD
1165 uint64_t counter = 1;
1166 ATS_UNUSED_RETURN(write(thread->evfd, &counter, sizeof(uint64_t)));
1167 #elif TS_USE_PORT
1168 PollDescriptor *pd = get_PollDescriptor(thread);
1169 ATS_UNUSED_RETURN(port_send(pd->port_fd, 0, thread->ep));
1170 #else
1171 char dummy = 1;
1172 ATS_UNUSED_RETURN(write(thread->evpipe[1], &dummy, 1));
1173 #endif
1174 }
1175