1 /* vim: set expandtab ts=4 sw=4: */
2 /*
3  * You may redistribute this program and/or modify it under the terms of
4  * the GNU General Public License as published by the Free Software Foundation,
5  * either version 3 of the License, or (at your option) any later version.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program.  If not, see <https://www.gnu.org/licenses/>.
14  */
15 #include "util/events/libuv/UvWrapper.h"
16 #include "exception/Except.h"
17 #include "interface/Iface.h"
18 #include "util/events/UDPAddrIface.h"
19 #include "memory/Allocator.h"
20 #include "util/events/libuv/EventBase_pvt.h"
21 #include "util/platform/Sockaddr.h"
22 #include "util/Assert.h"
23 #include "util/Identity.h"
24 #include "wire/Message.h"
25 #include "wire/Error.h"
26 #include "util/Hex.h"
27 
28 struct UDPAddrIface_pvt
29 {
30     struct UDPAddrIface pub;
31 
32     struct Allocator* allocator;
33 
34     struct Log* logger;
35 
36     /** Job to close the handle when the allocator is freed */
37     struct Allocator_OnFreeJob* closeHandleOnFree;
38 
39     /** Job which blocks the freeing until the callback completes */
40     struct Allocator_OnFreeJob* blockFreeInsideCallback;
41 
42     uv_udp_t uvHandle;
43     int queueLen;
44 
45     /** true if we are inside of the callback, used by blockFreeInsideCallback */
46     int inCallback;
47 
48     Identity
49 };
50 
51 struct UDPAddrIface_WriteRequest_pvt {
52     uv_udp_send_t uvReq;
53     int32_t length;
54     struct UDPAddrIface_pvt* udp;
55     struct Message* msg;
56     struct Allocator* alloc;
57     Identity
58 };
59 
ifaceForHandle(uv_udp_t * handle)60 static struct UDPAddrIface_pvt* ifaceForHandle(uv_udp_t* handle)
61 {
62     char* hp = ((char*)handle) - offsetof(struct UDPAddrIface_pvt, uvHandle);
63     return Identity_check((struct UDPAddrIface_pvt*) hp);
64 }
65 
sendComplete(uv_udp_send_t * uvReq,int error)66 static void sendComplete(uv_udp_send_t* uvReq, int error)
67 {
68     struct UDPAddrIface_WriteRequest_pvt* req =
69         Identity_check((struct UDPAddrIface_WriteRequest_pvt*) uvReq);
70     if (error) {
71         Log_debug(req->udp->logger, "DROP Failed to write to UDPAddrIface [%s]",
72                   uv_strerror(error) );
73     }
74     Assert_true(req->msg->length == req->length);
75     req->udp->queueLen -= req->msg->length;
76     Assert_true(req->udp->queueLen >= 0);
77     Allocator_free(req->alloc);
78 }
79 
80 
incomingFromIface(struct Message * m,struct Iface * iface)81 static Iface_DEFUN incomingFromIface(struct Message* m, struct Iface* iface)
82 {
83     struct UDPAddrIface_pvt* context = Identity_check((struct UDPAddrIface_pvt*) iface);
84 
85     Assert_true(m->length >= Sockaddr_OVERHEAD);
86     if (((struct Sockaddr*)m->bytes)->flags & Sockaddr_flags_BCAST) {
87         Log_debug(context->logger, "Attempted bcast, bcast unsupported");
88         // bcast not supported.
89         return NULL;
90     }
91 
92     if (context->queueLen > UDPAddrIface_MAX_QUEUE) {
93         Log_warn(context->logger, "DROP msg length [%d] to [%s] maximum queue length reached",
94             m->length, Sockaddr_print(context->pub.generic.addr, m->alloc));
95         return NULL;
96     }
97 
98     // This allocator will hold the message allocator in existance after it is freed.
99     struct Allocator* reqAlloc = Allocator_child(context->allocator);
100     if (m->alloc) {
101         Allocator_adopt(reqAlloc, m->alloc);
102     } else {
103         m = Message_clone(m, reqAlloc);
104     }
105 
106     struct UDPAddrIface_WriteRequest_pvt* req =
107         Allocator_clone(reqAlloc, (&(struct UDPAddrIface_WriteRequest_pvt) {
108             .udp = context,
109             .msg = m,
110             .alloc = reqAlloc
111         }));
112     Identity_set(req);
113 
114     struct Sockaddr_storage ss;
115     Er_assert(Message_epop(m, &ss, context->pub.generic.addr->addrLen));
116     Assert_true(ss.addr.addrLen == context->pub.generic.addr->addrLen);
117 
118     req->length = m->length;
119 
120     uv_buf_t buffers[] = {
121         { .base = (char*)m->bytes, .len = m->length }
122     };
123 
124     int ret = uv_udp_send(&req->uvReq, &context->uvHandle, buffers, 1,
125                 (const struct sockaddr*)ss.nativeAddr, (uv_udp_send_cb)&sendComplete);
126 
127     if (ret) {
128         Log_info(context->logger, "DROP Failed writing to UDPAddrIface [%s]",
129                  uv_strerror(ret));
130         Allocator_free(req->alloc);
131         return NULL;
132     }
133     context->queueLen += m->length;
134 
135     return NULL;
136 }
137 
138 #if UDPAddrIface_PADDING_AMOUNT < 8
139     #error
140 #endif
141 #define ALLOC(buff) (((struct Allocator**) &(buff[-(8 + (((uintptr_t)buff) % 8))]))[0])
142 
incoming(uv_udp_t * handle,ssize_t nread,const uv_buf_t * buf,const struct sockaddr * addr,unsigned flags)143 static void incoming(uv_udp_t* handle,
144                      ssize_t nread,
145                      const uv_buf_t* buf,
146                      const struct sockaddr* addr,
147                      unsigned flags)
148 {
149     struct UDPAddrIface_pvt* context = ifaceForHandle(handle);
150 
151     context->inCallback = 1;
152 
153     // Grab out the allocator which was placed there by allocate()
154     struct Allocator* alloc = buf->base ? ALLOC(buf->base) : NULL;
155 
156     // if nread < 0, we used to log uv_last_error, which doesn't exist anymore.
157     if (nread == 0) {
158         // Happens constantly
159         //Log_debug(context->logger, "0 length read");
160 
161     } else {
162         struct Message* m = Allocator_calloc(alloc, sizeof(struct Message), 1);
163         m->length = nread;
164         m->padding = UDPAddrIface_PADDING_AMOUNT + context->pub.generic.addr->addrLen;
165         m->capacity = buf->len;
166         m->bytes = (uint8_t*)buf->base;
167         m->alloc = alloc;
168         Er_assert(Message_epush(m, addr, context->pub.generic.addr->addrLen - Sockaddr_OVERHEAD));
169 
170         // make sure the sockaddr doesn't have crap in it which will
171         // prevent it from being used as a lookup key
172         Sockaddr_normalizeNative((struct sockaddr*) m->bytes);
173 
174         Er_assert(Message_epush(m, context->pub.generic.addr, Sockaddr_OVERHEAD));
175 
176         /*uint8_t buff[256] = {0};
177         Assert_true(Hex_encode(buff, 255, m->bytes, context->pub.generic.addr->addrLen));
178         Log_debug(context->logger, "Message from [%s]", buff);*/
179 
180         Iface_send(&context->pub.generic.iface, m);
181     }
182 
183     if (alloc) {
184         Allocator_free(alloc);
185     }
186 
187     context->inCallback = 0;
188     if (context->blockFreeInsideCallback) {
189         Allocator_onFreeComplete((struct Allocator_OnFreeJob*) context->blockFreeInsideCallback);
190     }
191 }
192 
allocate(uv_handle_t * handle,size_t size,uv_buf_t * buf)193 static void allocate(uv_handle_t* handle, size_t size, uv_buf_t* buf)
194 {
195     struct UDPAddrIface_pvt* context = ifaceForHandle((uv_udp_t*)handle);
196 
197     size = UDPAddrIface_BUFFER_CAP;
198     size_t fullSize = size + UDPAddrIface_PADDING_AMOUNT + context->pub.generic.addr->addrLen;
199 
200     struct Allocator* child = Allocator_child(context->allocator);
201     char* buff = Allocator_malloc(child, fullSize);
202     buff += UDPAddrIface_PADDING_AMOUNT + context->pub.generic.addr->addrLen;
203 
204     ALLOC(buff) = child;
205 
206     buf->base = buff;
207     buf->len = size;
208 }
209 
onClosed(uv_handle_t * wasClosed)210 static void onClosed(uv_handle_t* wasClosed)
211 {
212     struct UDPAddrIface_pvt* context =
213         Identity_check((struct UDPAddrIface_pvt*) wasClosed->data);
214     Allocator_onFreeComplete((struct Allocator_OnFreeJob*) context->closeHandleOnFree);
215 }
216 
closeHandleOnFree(struct Allocator_OnFreeJob * job)217 static int closeHandleOnFree(struct Allocator_OnFreeJob* job)
218 {
219     struct UDPAddrIface_pvt* context =
220         Identity_check((struct UDPAddrIface_pvt*) job->userData);
221     context->closeHandleOnFree = job;
222     uv_close((uv_handle_t*)&context->uvHandle, onClosed);
223     return Allocator_ONFREE_ASYNC;
224 }
225 
blockFreeInsideCallback(struct Allocator_OnFreeJob * job)226 static int blockFreeInsideCallback(struct Allocator_OnFreeJob* job)
227 {
228     struct UDPAddrIface_pvt* context =
229         Identity_check((struct UDPAddrIface_pvt*) job->userData);
230     if (!context->inCallback) {
231         return 0;
232     }
233     context->blockFreeInsideCallback = job;
234     return Allocator_ONFREE_ASYNC;
235 }
236 
UDPAddrIface_setDSCP(struct UDPAddrIface * iface,uint8_t dscp)237 int UDPAddrIface_setDSCP(struct UDPAddrIface* iface, uint8_t dscp)
238 {
239     int res = 0;
240     /* For win32 setsockopt is unable to mark the TOS field in IP header, do not support it now */
241     #ifndef win32
242         struct UDPAddrIface_pvt* context = Identity_check((struct UDPAddrIface_pvt*) iface);
243         /* 6-bit DSCP, 2-bit ENC(useless for UDP) */
244         int tos = dscp << 2;
245         if (Sockaddr_getFamily(context->pub.generic.addr) == Sockaddr_AF_INET) {
246             res = setsockopt(context->uvHandle.io_watcher.fd, IPPROTO_IP, IP_TOS,
247                            &tos, sizeof(tos));
248         } else if (Sockaddr_getFamily(context->pub.generic.addr) == Sockaddr_AF_INET6) {
249             res = setsockopt(context->uvHandle.io_watcher.fd, IPPROTO_IPV6, IPV6_TCLASS,
250                            &tos, sizeof(tos));
251         }
252     #endif
253     return res;
254 }
255 
UDPAddrIface_getFd(struct UDPAddrIface * iface)256 int UDPAddrIface_getFd(struct UDPAddrIface* iface)
257 {
258     int out = -1;
259     #ifndef win32
260         struct UDPAddrIface_pvt* context = Identity_check((struct UDPAddrIface_pvt*) iface);
261         out = context->uvHandle.io_watcher.fd;
262     #endif
263     return out;
264 }
265 
UDPAddrIface_setBroadcast(struct UDPAddrIface * iface,bool enable)266 int UDPAddrIface_setBroadcast(struct UDPAddrIface* iface, bool enable)
267 {
268     struct UDPAddrIface_pvt* context = Identity_check((struct UDPAddrIface_pvt*) iface);
269     return uv_udp_set_broadcast(&context->uvHandle, enable ? 1 : 0);
270 }
271 
Er_DEFUN(struct UDPAddrIface * UDPAddrIface_new (struct EventBase * eventBase,struct Sockaddr * addr,struct Allocator * alloc,struct Log * logger))272 Er_DEFUN(struct UDPAddrIface* UDPAddrIface_new(struct EventBase* eventBase,
273                                       struct Sockaddr* addr,
274                                       struct Allocator* alloc,
275                                       struct Log* logger))
276 {
277     struct EventBase_pvt* base = EventBase_privatize(eventBase);
278 
279     struct UDPAddrIface_pvt* context =
280         Allocator_clone(alloc, (&(struct UDPAddrIface_pvt) {
281             .logger = logger,
282             .allocator = alloc
283         }));
284     context->pub.generic.alloc = alloc;
285     context->pub.generic.iface.send = incomingFromIface;
286     Identity_set(context);
287 
288     if (addr) {
289         Log_debug(logger, "Binding to address [%s]", Sockaddr_print(addr, alloc));
290     }
291 
292     struct Sockaddr_storage ss;
293     if (!addr) {
294         Sockaddr_parse("0.0.0.0:0", &ss);
295         addr = &ss.addr;
296     }
297 
298     uv_udp_init(base->loop, &context->uvHandle);
299     context->uvHandle.data = context;
300 
301     int ret;
302     void* native = Sockaddr_asNative(addr);
303     ret = uv_udp_bind(&context->uvHandle, (const struct sockaddr*)native, 0);
304 
305     if (ret) {
306         Er_raise(alloc, "call to uv_udp_bind() failed [%s]", uv_strerror(ret));
307     }
308 
309     ret = uv_udp_recv_start(&context->uvHandle, allocate, incoming);
310     if (ret) {
311         const char* err = uv_strerror(ret);
312         uv_close((uv_handle_t*) &context->uvHandle, NULL);
313         Er_raise(alloc, "uv_udp_recv_start() failed [%s]", err);
314     }
315 
316     int nameLen = sizeof(struct Sockaddr_storage);
317     Bits_memset(&ss, 0, sizeof(struct Sockaddr_storage));
318     ret = uv_udp_getsockname(&context->uvHandle, (void*)ss.nativeAddr, &nameLen);
319     if (ret) {
320         const char* err = uv_strerror(ret);
321         uv_close((uv_handle_t*) &context->uvHandle, NULL);
322         Er_raise(alloc, "uv_udp_getsockname() failed [%s]", err);
323     }
324     ss.addr.addrLen = nameLen + 8;
325 
326     context->pub.generic.addr = Sockaddr_clone(&ss.addr, alloc);
327     Log_debug(logger, "Bound to address [%s]", Sockaddr_print(context->pub.generic.addr, alloc));
328 
329     Allocator_onFree(alloc, closeHandleOnFree, context);
330     Allocator_onFree(alloc, blockFreeInsideCallback, context);
331 
332     Er_ret(&context->pub);
333 }
334