1 /* vim: set expandtab ts=4 sw=4: */
2 /*
3 * You may redistribute this program and/or modify it under the terms of
4 * the GNU General Public License as published by the Free Software Foundation,
5 * either version 3 of the License, or (at your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program. If not, see <https://www.gnu.org/licenses/>.
14 */
15 #include "util/events/libuv/UvWrapper.h"
16 #include "exception/Except.h"
17 #include "interface/Iface.h"
18 #include "util/events/UDPAddrIface.h"
19 #include "memory/Allocator.h"
20 #include "util/events/libuv/EventBase_pvt.h"
21 #include "util/platform/Sockaddr.h"
22 #include "util/Assert.h"
23 #include "util/Identity.h"
24 #include "wire/Message.h"
25 #include "wire/Error.h"
26 #include "util/Hex.h"
27
28 struct UDPAddrIface_pvt
29 {
30 struct UDPAddrIface pub;
31
32 struct Allocator* allocator;
33
34 struct Log* logger;
35
36 /** Job to close the handle when the allocator is freed */
37 struct Allocator_OnFreeJob* closeHandleOnFree;
38
39 /** Job which blocks the freeing until the callback completes */
40 struct Allocator_OnFreeJob* blockFreeInsideCallback;
41
42 uv_udp_t uvHandle;
43 int queueLen;
44
45 /** true if we are inside of the callback, used by blockFreeInsideCallback */
46 int inCallback;
47
48 Identity
49 };
50
51 struct UDPAddrIface_WriteRequest_pvt {
52 uv_udp_send_t uvReq;
53 int32_t length;
54 struct UDPAddrIface_pvt* udp;
55 struct Message* msg;
56 struct Allocator* alloc;
57 Identity
58 };
59
ifaceForHandle(uv_udp_t * handle)60 static struct UDPAddrIface_pvt* ifaceForHandle(uv_udp_t* handle)
61 {
62 char* hp = ((char*)handle) - offsetof(struct UDPAddrIface_pvt, uvHandle);
63 return Identity_check((struct UDPAddrIface_pvt*) hp);
64 }
65
sendComplete(uv_udp_send_t * uvReq,int error)66 static void sendComplete(uv_udp_send_t* uvReq, int error)
67 {
68 struct UDPAddrIface_WriteRequest_pvt* req =
69 Identity_check((struct UDPAddrIface_WriteRequest_pvt*) uvReq);
70 if (error) {
71 Log_debug(req->udp->logger, "DROP Failed to write to UDPAddrIface [%s]",
72 uv_strerror(error) );
73 }
74 Assert_true(req->msg->length == req->length);
75 req->udp->queueLen -= req->msg->length;
76 Assert_true(req->udp->queueLen >= 0);
77 Allocator_free(req->alloc);
78 }
79
80
incomingFromIface(struct Message * m,struct Iface * iface)81 static Iface_DEFUN incomingFromIface(struct Message* m, struct Iface* iface)
82 {
83 struct UDPAddrIface_pvt* context = Identity_check((struct UDPAddrIface_pvt*) iface);
84
85 Assert_true(m->length >= Sockaddr_OVERHEAD);
86 if (((struct Sockaddr*)m->bytes)->flags & Sockaddr_flags_BCAST) {
87 Log_debug(context->logger, "Attempted bcast, bcast unsupported");
88 // bcast not supported.
89 return NULL;
90 }
91
92 if (context->queueLen > UDPAddrIface_MAX_QUEUE) {
93 Log_warn(context->logger, "DROP msg length [%d] to [%s] maximum queue length reached",
94 m->length, Sockaddr_print(context->pub.generic.addr, m->alloc));
95 return NULL;
96 }
97
98 // This allocator will hold the message allocator in existance after it is freed.
99 struct Allocator* reqAlloc = Allocator_child(context->allocator);
100 if (m->alloc) {
101 Allocator_adopt(reqAlloc, m->alloc);
102 } else {
103 m = Message_clone(m, reqAlloc);
104 }
105
106 struct UDPAddrIface_WriteRequest_pvt* req =
107 Allocator_clone(reqAlloc, (&(struct UDPAddrIface_WriteRequest_pvt) {
108 .udp = context,
109 .msg = m,
110 .alloc = reqAlloc
111 }));
112 Identity_set(req);
113
114 struct Sockaddr_storage ss;
115 Er_assert(Message_epop(m, &ss, context->pub.generic.addr->addrLen));
116 Assert_true(ss.addr.addrLen == context->pub.generic.addr->addrLen);
117
118 req->length = m->length;
119
120 uv_buf_t buffers[] = {
121 { .base = (char*)m->bytes, .len = m->length }
122 };
123
124 int ret = uv_udp_send(&req->uvReq, &context->uvHandle, buffers, 1,
125 (const struct sockaddr*)ss.nativeAddr, (uv_udp_send_cb)&sendComplete);
126
127 if (ret) {
128 Log_info(context->logger, "DROP Failed writing to UDPAddrIface [%s]",
129 uv_strerror(ret));
130 Allocator_free(req->alloc);
131 return NULL;
132 }
133 context->queueLen += m->length;
134
135 return NULL;
136 }
137
138 #if UDPAddrIface_PADDING_AMOUNT < 8
139 #error
140 #endif
141 #define ALLOC(buff) (((struct Allocator**) &(buff[-(8 + (((uintptr_t)buff) % 8))]))[0])
142
incoming(uv_udp_t * handle,ssize_t nread,const uv_buf_t * buf,const struct sockaddr * addr,unsigned flags)143 static void incoming(uv_udp_t* handle,
144 ssize_t nread,
145 const uv_buf_t* buf,
146 const struct sockaddr* addr,
147 unsigned flags)
148 {
149 struct UDPAddrIface_pvt* context = ifaceForHandle(handle);
150
151 context->inCallback = 1;
152
153 // Grab out the allocator which was placed there by allocate()
154 struct Allocator* alloc = buf->base ? ALLOC(buf->base) : NULL;
155
156 // if nread < 0, we used to log uv_last_error, which doesn't exist anymore.
157 if (nread == 0) {
158 // Happens constantly
159 //Log_debug(context->logger, "0 length read");
160
161 } else {
162 struct Message* m = Allocator_calloc(alloc, sizeof(struct Message), 1);
163 m->length = nread;
164 m->padding = UDPAddrIface_PADDING_AMOUNT + context->pub.generic.addr->addrLen;
165 m->capacity = buf->len;
166 m->bytes = (uint8_t*)buf->base;
167 m->alloc = alloc;
168 Er_assert(Message_epush(m, addr, context->pub.generic.addr->addrLen - Sockaddr_OVERHEAD));
169
170 // make sure the sockaddr doesn't have crap in it which will
171 // prevent it from being used as a lookup key
172 Sockaddr_normalizeNative((struct sockaddr*) m->bytes);
173
174 Er_assert(Message_epush(m, context->pub.generic.addr, Sockaddr_OVERHEAD));
175
176 /*uint8_t buff[256] = {0};
177 Assert_true(Hex_encode(buff, 255, m->bytes, context->pub.generic.addr->addrLen));
178 Log_debug(context->logger, "Message from [%s]", buff);*/
179
180 Iface_send(&context->pub.generic.iface, m);
181 }
182
183 if (alloc) {
184 Allocator_free(alloc);
185 }
186
187 context->inCallback = 0;
188 if (context->blockFreeInsideCallback) {
189 Allocator_onFreeComplete((struct Allocator_OnFreeJob*) context->blockFreeInsideCallback);
190 }
191 }
192
allocate(uv_handle_t * handle,size_t size,uv_buf_t * buf)193 static void allocate(uv_handle_t* handle, size_t size, uv_buf_t* buf)
194 {
195 struct UDPAddrIface_pvt* context = ifaceForHandle((uv_udp_t*)handle);
196
197 size = UDPAddrIface_BUFFER_CAP;
198 size_t fullSize = size + UDPAddrIface_PADDING_AMOUNT + context->pub.generic.addr->addrLen;
199
200 struct Allocator* child = Allocator_child(context->allocator);
201 char* buff = Allocator_malloc(child, fullSize);
202 buff += UDPAddrIface_PADDING_AMOUNT + context->pub.generic.addr->addrLen;
203
204 ALLOC(buff) = child;
205
206 buf->base = buff;
207 buf->len = size;
208 }
209
onClosed(uv_handle_t * wasClosed)210 static void onClosed(uv_handle_t* wasClosed)
211 {
212 struct UDPAddrIface_pvt* context =
213 Identity_check((struct UDPAddrIface_pvt*) wasClosed->data);
214 Allocator_onFreeComplete((struct Allocator_OnFreeJob*) context->closeHandleOnFree);
215 }
216
closeHandleOnFree(struct Allocator_OnFreeJob * job)217 static int closeHandleOnFree(struct Allocator_OnFreeJob* job)
218 {
219 struct UDPAddrIface_pvt* context =
220 Identity_check((struct UDPAddrIface_pvt*) job->userData);
221 context->closeHandleOnFree = job;
222 uv_close((uv_handle_t*)&context->uvHandle, onClosed);
223 return Allocator_ONFREE_ASYNC;
224 }
225
blockFreeInsideCallback(struct Allocator_OnFreeJob * job)226 static int blockFreeInsideCallback(struct Allocator_OnFreeJob* job)
227 {
228 struct UDPAddrIface_pvt* context =
229 Identity_check((struct UDPAddrIface_pvt*) job->userData);
230 if (!context->inCallback) {
231 return 0;
232 }
233 context->blockFreeInsideCallback = job;
234 return Allocator_ONFREE_ASYNC;
235 }
236
UDPAddrIface_setDSCP(struct UDPAddrIface * iface,uint8_t dscp)237 int UDPAddrIface_setDSCP(struct UDPAddrIface* iface, uint8_t dscp)
238 {
239 int res = 0;
240 /* For win32 setsockopt is unable to mark the TOS field in IP header, do not support it now */
241 #ifndef win32
242 struct UDPAddrIface_pvt* context = Identity_check((struct UDPAddrIface_pvt*) iface);
243 /* 6-bit DSCP, 2-bit ENC(useless for UDP) */
244 int tos = dscp << 2;
245 if (Sockaddr_getFamily(context->pub.generic.addr) == Sockaddr_AF_INET) {
246 res = setsockopt(context->uvHandle.io_watcher.fd, IPPROTO_IP, IP_TOS,
247 &tos, sizeof(tos));
248 } else if (Sockaddr_getFamily(context->pub.generic.addr) == Sockaddr_AF_INET6) {
249 res = setsockopt(context->uvHandle.io_watcher.fd, IPPROTO_IPV6, IPV6_TCLASS,
250 &tos, sizeof(tos));
251 }
252 #endif
253 return res;
254 }
255
UDPAddrIface_getFd(struct UDPAddrIface * iface)256 int UDPAddrIface_getFd(struct UDPAddrIface* iface)
257 {
258 int out = -1;
259 #ifndef win32
260 struct UDPAddrIface_pvt* context = Identity_check((struct UDPAddrIface_pvt*) iface);
261 out = context->uvHandle.io_watcher.fd;
262 #endif
263 return out;
264 }
265
UDPAddrIface_setBroadcast(struct UDPAddrIface * iface,bool enable)266 int UDPAddrIface_setBroadcast(struct UDPAddrIface* iface, bool enable)
267 {
268 struct UDPAddrIface_pvt* context = Identity_check((struct UDPAddrIface_pvt*) iface);
269 return uv_udp_set_broadcast(&context->uvHandle, enable ? 1 : 0);
270 }
271
Er_DEFUN(struct UDPAddrIface * UDPAddrIface_new (struct EventBase * eventBase,struct Sockaddr * addr,struct Allocator * alloc,struct Log * logger))272 Er_DEFUN(struct UDPAddrIface* UDPAddrIface_new(struct EventBase* eventBase,
273 struct Sockaddr* addr,
274 struct Allocator* alloc,
275 struct Log* logger))
276 {
277 struct EventBase_pvt* base = EventBase_privatize(eventBase);
278
279 struct UDPAddrIface_pvt* context =
280 Allocator_clone(alloc, (&(struct UDPAddrIface_pvt) {
281 .logger = logger,
282 .allocator = alloc
283 }));
284 context->pub.generic.alloc = alloc;
285 context->pub.generic.iface.send = incomingFromIface;
286 Identity_set(context);
287
288 if (addr) {
289 Log_debug(logger, "Binding to address [%s]", Sockaddr_print(addr, alloc));
290 }
291
292 struct Sockaddr_storage ss;
293 if (!addr) {
294 Sockaddr_parse("0.0.0.0:0", &ss);
295 addr = &ss.addr;
296 }
297
298 uv_udp_init(base->loop, &context->uvHandle);
299 context->uvHandle.data = context;
300
301 int ret;
302 void* native = Sockaddr_asNative(addr);
303 ret = uv_udp_bind(&context->uvHandle, (const struct sockaddr*)native, 0);
304
305 if (ret) {
306 Er_raise(alloc, "call to uv_udp_bind() failed [%s]", uv_strerror(ret));
307 }
308
309 ret = uv_udp_recv_start(&context->uvHandle, allocate, incoming);
310 if (ret) {
311 const char* err = uv_strerror(ret);
312 uv_close((uv_handle_t*) &context->uvHandle, NULL);
313 Er_raise(alloc, "uv_udp_recv_start() failed [%s]", err);
314 }
315
316 int nameLen = sizeof(struct Sockaddr_storage);
317 Bits_memset(&ss, 0, sizeof(struct Sockaddr_storage));
318 ret = uv_udp_getsockname(&context->uvHandle, (void*)ss.nativeAddr, &nameLen);
319 if (ret) {
320 const char* err = uv_strerror(ret);
321 uv_close((uv_handle_t*) &context->uvHandle, NULL);
322 Er_raise(alloc, "uv_udp_getsockname() failed [%s]", err);
323 }
324 ss.addr.addrLen = nameLen + 8;
325
326 context->pub.generic.addr = Sockaddr_clone(&ss.addr, alloc);
327 Log_debug(logger, "Bound to address [%s]", Sockaddr_print(context->pub.generic.addr, alloc));
328
329 Allocator_onFree(alloc, closeHandleOnFree, context);
330 Allocator_onFree(alloc, blockFreeInsideCallback, context);
331
332 Er_ret(&context->pub);
333 }
334