1 /*
2  * A Random IP reputation service acting as a Stream Processing Offload Agent
3  *
4  * This is a very simple service that implement a "random" ip reputation
5  * service. It will return random scores for all checked IP addresses. It only
6  * shows you how to implement a ip reputation service or such kind of services
7  * using the SPOE.
8  *
9  * Copyright 2016 HAProxy Technologies, Christopher Faulet <cfaulet@haproxy.com>
10  *
11  * This program is free software; you can redistribute it and/or
12  * modify it under the terms of the GNU General Public License
13  * as published by the Free Software Foundation; either version
14  * 2 of the License, or (at your option) any later version.
15  *
16  */
17 #include <unistd.h>
18 #include <stdlib.h>
19 #include <string.h>
20 #include <stdbool.h>
21 #include <errno.h>
22 #include <stdio.h>
23 #include <signal.h>
24 #include <arpa/inet.h>
25 #include <netinet/in.h>
26 #include <netinet/tcp.h>
27 #include <sys/socket.h>
28 #include <err.h>
29 #include <ctype.h>
30 
31 #include <pthread.h>
32 
33 #include <event2/util.h>
34 #include <event2/event.h>
35 #include <event2/event_struct.h>
36 #include <event2/thread.h>
37 
38 #include <mini-clist.h>
39 #include <spoe_types.h>
40 #include <spop_functions.h>
41 
42 #define DEFAULT_PORT       12345
43 #define CONNECTION_BACKLOG 10
44 #define NUM_WORKERS        10
45 #define MAX_FRAME_SIZE     16384
46 #define SPOP_VERSION       "2.0"
47 
48 #define SLEN(str) (sizeof(str)-1)
49 
50 #define LOG(worker, fmt, args...)                                       \
51 	do {								\
52 		struct timeval  now;					\
53                                                                         \
54 		gettimeofday(&now, NULL);				\
55 		fprintf(stderr, "%ld.%06ld [%02d] " fmt "\n",		\
56 			now.tv_sec, now.tv_usec, (worker)->id, ##args);	\
57 	} while (0)
58 
59 #define DEBUG(x...)				\
60 	do {					\
61 		if (debug)			\
62 			LOG(x);			\
63 	} while (0)
64 
65 
66 enum spoa_state {
67 	SPOA_ST_CONNECTING = 0,
68 	SPOA_ST_PROCESSING,
69 	SPOA_ST_DISCONNECTING,
70 };
71 
72 enum spoa_frame_type {
73 	SPOA_FRM_T_UNKNOWN = 0,
74 	SPOA_FRM_T_HAPROXY,
75 	SPOA_FRM_T_AGENT,
76 };
77 
78 struct spoe_engine {
79 	char       *id;
80 
81 	struct list processing_frames;
82 	struct list outgoing_frames;
83 
84 	struct list clients;
85 	struct list list;
86 };
87 
88 struct spoe_frame {
89 	enum spoa_frame_type type;
90 	char                *buf;
91 	unsigned int         offset;
92 	unsigned int         len;
93 
94 	unsigned int         stream_id;
95 	unsigned int         frame_id;
96 	unsigned int         flags;
97 	bool                 hcheck;     /* true is the CONNECT frame is a healthcheck */
98 	bool                 fragmented; /* true if the frame is fragmented */
99 	int                  ip_score;   /* -1 if unset, else between 0 and 100 */
100 
101 	struct event         process_frame_event;
102 	struct worker       *worker;
103 	struct spoe_engine  *engine;
104 	struct client       *client;
105 	struct list          list;
106 
107 	char                *frag_buf; /* used to accumulate payload of a fragmented frame */
108 	unsigned int         frag_len;
109 
110 	char                 data[0];
111 };
112 
113 struct client {
114 	int                 fd;
115 	unsigned long       id;
116 	enum spoa_state     state;
117 
118 	struct event        read_frame_event;
119 	struct event        write_frame_event;
120 
121 	struct spoe_frame  *incoming_frame;
122 	struct spoe_frame  *outgoing_frame;
123 
124 	struct list         processing_frames;
125 	struct list         outgoing_frames;
126 
127 	unsigned int        max_frame_size;
128 	int                 status_code;
129 
130 	char               *engine_id;
131 	struct spoe_engine *engine;
132 	bool                pipelining;
133 	bool                async;
134 	bool                fragmentation;
135 
136 	struct worker      *worker;
137 	struct list         by_worker;
138 	struct list         by_engine;
139 };
140 
141 struct worker {
142 	pthread_t           thread;
143 	int                 id;
144 	struct event_base  *base;
145 	struct event       *monitor_event;
146 
147 	struct list         engines;
148 
149 	unsigned int        nbclients;
150 	struct list         clients;
151 
152 	struct list         frames;
153 	unsigned int        nbframes;
154 };
155 
156 
157 /* Globals */
158 static struct worker *workers          = NULL;
159 static struct worker  null_worker      = { .id = 0 };
160 static unsigned long  clicount         = 0;
161 static int            server_port      = DEFAULT_PORT;
162 static int            num_workers      = NUM_WORKERS;
163 static unsigned int   max_frame_size   = MAX_FRAME_SIZE;
164 struct timeval        processing_delay = {0, 0};
165 static bool           debug            = false;
166 static bool           pipelining       = false;
167 static bool           async            = false;
168 static bool           fragmentation    = false;
169 
170 
171 static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
172 	[SPOE_FRM_ERR_NONE]               = "normal",
173 	[SPOE_FRM_ERR_IO]                 = "I/O error",
174 	[SPOE_FRM_ERR_TOUT]               = "a timeout occurred",
175 	[SPOE_FRM_ERR_TOO_BIG]            = "frame is too big",
176 	[SPOE_FRM_ERR_INVALID]            = "invalid frame received",
177 	[SPOE_FRM_ERR_NO_VSN]             = "version value not found",
178 	[SPOE_FRM_ERR_NO_FRAME_SIZE]      = "max-frame-size value not found",
179 	[SPOE_FRM_ERR_NO_CAP]             = "capabilities value not found",
180 	[SPOE_FRM_ERR_BAD_VSN]            = "unsupported version",
181 	[SPOE_FRM_ERR_BAD_FRAME_SIZE]     = "max-frame-size too big or too small",
182 	[SPOE_FRM_ERR_FRAG_NOT_SUPPORTED] = "fragmentation not supported",
183 	[SPOE_FRM_ERR_INTERLACED_FRAMES]  = "invalid interlaced frames",
184 	[SPOE_FRM_ERR_FRAMEID_NOTFOUND]   = "frame-id not found",
185 	[SPOE_FRM_ERR_RES]                = "resource allocation error",
186 	[SPOE_FRM_ERR_UNKNOWN]            = "an unknown error occurred",
187 };
188 
189 static void signal_cb(evutil_socket_t, short, void *);
190 static void accept_cb(evutil_socket_t, short, void *);
191 static void worker_monitor_cb(evutil_socket_t, short, void *);
192 static void process_frame_cb(evutil_socket_t, short, void *);
193 static void read_frame_cb(evutil_socket_t, short, void *);
194 static void write_frame_cb(evutil_socket_t, short, void *);
195 
196 static void use_spoe_engine(struct client *);
197 static void unuse_spoe_engine(struct client *);
198 static void release_frame(struct spoe_frame *);
199 static void release_client(struct client *);
200 
201 static void
check_ipv4_reputation(struct spoe_frame * frame,struct in_addr * ipv4)202 check_ipv4_reputation(struct spoe_frame *frame, struct in_addr *ipv4)
203 {
204 	char str[INET_ADDRSTRLEN];
205 
206 	if (inet_ntop(AF_INET, ipv4, str, INET_ADDRSTRLEN) == NULL)
207 		return;
208 
209 	frame->ip_score = random() % 100;
210 
211 	DEBUG(frame->worker, "IP score for %.*s is %d",
212 	      INET_ADDRSTRLEN, str, frame->ip_score);
213 }
214 
215 static void
check_ipv6_reputation(struct spoe_frame * frame,struct in6_addr * ipv6)216 check_ipv6_reputation(struct spoe_frame *frame, struct in6_addr *ipv6)
217 {
218 	char str[INET6_ADDRSTRLEN];
219 
220 	if (inet_ntop(AF_INET6, ipv6, str, INET6_ADDRSTRLEN) == NULL)
221 		return;
222 
223 	frame->ip_score = random() % 100;
224 
225 	DEBUG(frame->worker, "IP score for %.*s is %d",
226 	      INET6_ADDRSTRLEN, str, frame->ip_score);
227 }
228 
229 
230 /* Check the protocol version. It returns -1 if an error occurred, the number of
231  * read bytes otherwise. */
232 static int
check_proto_version(struct spoe_frame * frame,char ** buf,char * end)233 check_proto_version(struct spoe_frame *frame, char **buf, char *end)
234 {
235 	char      *str, *p = *buf;
236 	uint64_t   sz;
237 	int        ret;
238 
239 	/* Get the list of all supported versions by HAProxy */
240 	if ((*p++ & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR)
241 		return -1;
242 	ret = spoe_decode_buffer(&p, end, &str, &sz);
243 	if (ret == -1 || !str)
244 		return -1;
245 
246 	DEBUG(frame->worker, "<%lu> Supported versions : %.*s",
247 	      frame->client->id, (int)sz, str);
248 
249 	/* TODO: Find the right verion in supported ones */
250 
251 	ret  = (p - *buf);
252 	*buf = p;
253 	return ret;
254 }
255 
256 /* Check max frame size value. It returns -1 if an error occurred, the number of
257  * read bytes otherwise. */
258 static int
check_max_frame_size(struct spoe_frame * frame,char ** buf,char * end)259 check_max_frame_size(struct spoe_frame *frame, char **buf, char *end)
260 {
261 	char    *p = *buf;
262 	uint64_t sz;
263 	int      type, ret;
264 
265 	/* Get the max-frame-size value of HAProxy */
266 	type =  *p++;
267 	if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32  &&
268 	    (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64  &&
269 	    (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
270 	    (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64)
271 		return -1;
272 	if (decode_varint(&p, end, &sz) == -1)
273 		return -1;
274 
275 	/* Keep the lower value */
276 	if (sz < frame->client->max_frame_size)
277 		frame->client->max_frame_size = sz;
278 
279 	DEBUG(frame->worker, "<%lu> HAProxy maximum frame size : %u",
280 	      frame->client->id, (unsigned int)sz);
281 
282 	ret  = (p - *buf);
283 	*buf = p;
284 	return ret;
285 }
286 
287 /* Check healthcheck value. It returns -1 if an error occurred, the number of
288  * read bytes otherwise. */
289 static int
check_healthcheck(struct spoe_frame * frame,char ** buf,char * end)290 check_healthcheck(struct spoe_frame *frame, char **buf, char *end)
291 {
292 	char *p = *buf;
293 	int   type, ret;
294 
295 	/* Get the "healthcheck" value */
296 	type = *p++;
297 	if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_BOOL)
298 		return -1;
299 	frame->hcheck = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
300 
301 	DEBUG(frame->worker, "<%lu> HELLO healthcheck : %s",
302 	      frame->client->id, (frame->hcheck ? "true" : "false"));
303 
304 	ret  = (p - *buf);
305 	*buf = p;
306 	return ret;
307 }
308 
309 /* Check capabilities value. It returns -1 if an error occurred, the number of
310  * read bytes otherwise. */
311 static int
check_capabilities(struct spoe_frame * frame,char ** buf,char * end)312 check_capabilities(struct spoe_frame *frame, char **buf, char *end)
313 {
314 	struct client *client = frame->client;
315 	char          *str, *p = *buf;
316 	uint64_t       sz;
317 	int            ret;
318 
319 	if ((*p++ & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR)
320 		return -1;
321 	if (spoe_decode_buffer(&p, end, &str, &sz) == -1)
322 		return -1;
323 	if (str == NULL) /* this is not an error */
324 		goto end;
325 
326 	DEBUG(frame->worker, "<%lu> HAProxy capabilities : %.*s",
327 	      client->id, (int)sz, str);
328 
329 	while (sz) {
330 		char *delim;
331 
332 		/* Skip leading spaces */
333 		for (; isspace(*str) && sz; sz--);
334 
335 		if (sz >= 10 && !strncmp(str, "pipelining", 10)) {
336 			str += 10; sz -= 10;
337 			if (!sz || isspace(*str) || *str == ',') {
338 				DEBUG(frame->worker,
339 				      "<%lu> HAProxy supports frame pipelining",
340 				      client->id);
341 				client->pipelining = true;
342 			}
343 		}
344 		else if (sz >= 5 && !strncmp(str, "async", 5)) {
345 			str += 5; sz -= 5;
346 			if (!sz || isspace(*str) || *str == ',') {
347 				DEBUG(frame->worker,
348 				      "<%lu> HAProxy supports asynchronous frame",
349 				      client->id);
350 				client->async = true;
351 			}
352 		}
353 		else if (sz >= 13 && !strncmp(str, "fragmentation", 13)) {
354 			str += 13; sz -= 13;
355 			if (!sz || isspace(*str) || *str == ',') {
356 				DEBUG(frame->worker,
357 				      "<%lu> HAProxy supports fragmented frame",
358 				      client->id);
359 				client->fragmentation = true;
360 			}
361 		}
362 
363 		if (!sz || (delim = memchr(str, ',', sz)) == NULL)
364 			break;
365 		delim++;
366 		sz -= (delim - str);
367 		str = delim;
368 	}
369   end:
370 	ret  = (p - *buf);
371 	*buf = p;
372 	return ret;
373 }
374 
375 /* Check engine-id value. It returns -1 if an error occurred, the number of
376  * read bytes otherwise. */
377 static int
check_engine_id(struct spoe_frame * frame,char ** buf,char * end)378 check_engine_id(struct spoe_frame *frame, char **buf, char *end)
379 {
380 	struct client *client = frame->client;
381 	char          *str, *p = *buf;
382 	uint64_t       sz;
383 	int            ret;
384 
385 	if ((*p++ & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR)
386 		return -1;
387 
388 	if (spoe_decode_buffer(&p, end, &str, &sz) == -1)
389 		return -1;
390 	if (str == NULL) /* this is not an error */
391 		goto end;
392 
393 	if (client->engine != NULL)
394 		goto end;
395 
396 	DEBUG(frame->worker, "<%lu> HAProxy engine id : %.*s",
397 	      client->id, (int)sz, str);
398 
399 	client->engine_id = strndup(str, (int)sz);
400   end:
401 	ret  = (p - *buf);
402 	*buf = p;
403 	return ret;
404 }
405 
406 static int
acc_payload(struct spoe_frame * frame)407 acc_payload(struct spoe_frame *frame)
408 {
409 	struct client *client = frame->client;
410 	char          *buf;
411 	size_t         len = frame->len - frame->offset;
412 	int            ret = frame->offset;
413 
414 	/* No need to accumulation payload */
415 	if (frame->fragmented == false)
416 		return ret;
417 
418 	buf = realloc(frame->frag_buf, frame->frag_len + len);
419 	if (buf == NULL) {
420 		client->status_code = SPOE_FRM_ERR_RES;
421 		return -1;
422 	}
423 	memcpy(buf + frame->frag_len, frame->buf + frame->offset, len);
424 	frame->frag_buf  = buf;
425 	frame->frag_len += len;
426 
427 	if (!(frame->flags & SPOE_FRM_FL_FIN)) {
428 		/* Wait for next parts */
429 		frame->buf    = (char *)(frame->data);
430 		frame->offset = 0;
431 		frame->len    = 0;
432 		frame->flags  = 0;
433 		return 1;
434 	}
435 
436 	frame->buf    = frame->frag_buf;
437 	frame->len    = frame->frag_len;
438 	frame->offset = 0;
439 	return ret;
440 }
441 
442 /* Check disconnect status code. It returns -1 if an error occurred, the number
443  * of read bytes otherwise. */
444 static int
check_discon_status_code(struct spoe_frame * frame,char ** buf,char * end)445 check_discon_status_code(struct spoe_frame *frame, char **buf, char *end)
446 {
447 	char    *p = *buf;
448 	uint64_t sz;
449 	int      type, ret;
450 
451 	/* Get the "status-code" value */
452 	type =  *p++;
453 	if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
454 	    (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
455 	    (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
456 	    (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64)
457 		return -1;
458 	if (decode_varint(&p, end, &sz) == -1)
459 		return -1;
460 
461 	frame->client->status_code = (unsigned int)sz;
462 
463 	DEBUG(frame->worker, "<%lu> Disconnect status code : %u",
464 	      frame->client->id, frame->client->status_code);
465 
466 	ret  = (p - *buf);
467 	*buf = p;
468 	return ret;
469 }
470 
471 /* Check the disconnect message. It returns -1 if an error occurred, the number
472  * of read bytes otherwise. */
473 static int
check_discon_message(struct spoe_frame * frame,char ** buf,char * end)474 check_discon_message(struct spoe_frame *frame, char **buf, char *end)
475 {
476 	char    *str, *p = *buf;
477 	uint64_t sz;
478 	int      ret;
479 
480 	/* Get the "message" value */
481 	if ((*p++ & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR)
482 		return -1;
483 	ret = spoe_decode_buffer(&p, end, &str, &sz);
484 	if (ret == -1 || !str)
485 		return -1;
486 
487 	DEBUG(frame->worker, "<%lu> Disconnect message : %.*s",
488 	      frame->client->id, (int)sz, str);
489 
490 	ret  = (p - *buf);
491 	*buf = p;
492 	return ret;
493 }
494 
495 
496 
497 /* Decode a HELLO frame received from HAProxy. It returns -1 if an error
498  * occurred, otherwise the number of read bytes. HELLO frame cannot be
499  * ignored and having another frame than a HELLO frame is an error. */
500 static int
handle_hahello(struct spoe_frame * frame)501 handle_hahello(struct spoe_frame *frame)
502 {
503 	struct client *client = frame->client;
504 	char          *p, *end;
505 
506 	p = frame->buf;
507 	end = frame->buf + frame->len;
508 
509 	/* Check frame type: we really want a HELLO frame */
510 	if (*p++ != SPOE_FRM_T_HAPROXY_HELLO)
511 		goto error;
512 
513 	DEBUG(frame->worker, "<%lu> Decode HAProxy HELLO frame", client->id);
514 
515 	/* Retrieve flags */
516 	memcpy((char *)&(frame->flags), p, 4);
517 	frame->flags = ntohl(frame->flags);
518 	p += 4;
519 
520 	/* Fragmentation is not supported for HELLO frame */
521 	if (!(frame->flags & SPOE_FRM_FL_FIN)) {
522 		client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
523 		goto error;
524 	}
525 
526 	/* stream-id and frame-id must be cleared */
527 	if (*p != 0 || *(p+1) != 0) {
528 		client->status_code = SPOE_FRM_ERR_INVALID;
529 		goto error;
530 	}
531 	p += 2;
532 
533 	/* Loop on K/V items */
534 	while (p < end) {
535 		char     *str;
536 		uint64_t  sz;
537 
538 		/* Decode the item name */
539 		spoe_decode_buffer(&p, end, &str, &sz);
540 		if (!str) {
541 			client->status_code = SPOE_FRM_ERR_INVALID;
542 			goto error;
543 		}
544 
545 		/* Check "supported-versions" K/V item */
546 		if (!memcmp(str, "supported-versions", sz)) {
547 			if (check_proto_version(frame, &p, end)  == -1) {
548 				client->status_code = SPOE_FRM_ERR_INVALID;
549 				goto error;
550 			}
551 		}
552 		/* Check "max-frame-size" K/V item */
553 		else if (!memcmp(str, "max-frame-size", sz)) {
554 			if (check_max_frame_size(frame, &p, end) == -1) {
555 				client->status_code = SPOE_FRM_ERR_INVALID;
556 				goto error;
557 			}
558 		}
559 		/* Check "healthcheck" K/V item */
560 		else if (!memcmp(str, "healthcheck", sz)) {
561 			if (check_healthcheck(frame, &p, end) == -1) {
562 				client->status_code = SPOE_FRM_ERR_INVALID;
563 				goto error;
564 			}
565 		}
566 		/* Check "capabilities" K/V item */
567 		else if (!memcmp(str, "capabilities", sz)) {
568 			if (check_capabilities(frame, &p, end) == -1) {
569 				client->status_code = SPOE_FRM_ERR_INVALID;
570 				goto error;
571 			}
572 		}
573 		/* Check "engine-id" K/V item */
574 		else if (!memcmp(str, "engine-id", sz)) {
575 			if (check_engine_id(frame, &p, end) == -1) {
576 				client->status_code = SPOE_FRM_ERR_INVALID;
577 				goto error;
578 			}
579 		}
580 		else {
581 			DEBUG(frame->worker, "<%lu> Skip K/V item : key=%.*s",
582 			      client->id, (int)sz, str);
583 
584 			/* Silently ignore unknown item */
585 			if (spoe_skip_data(&p, end) == -1) {
586 				client->status_code = SPOE_FRM_ERR_INVALID;
587 				goto error;
588 			}
589 		}
590 	}
591 
592 	if (async == false || client->engine_id == NULL)
593 		client->async = false;
594 	if (pipelining == false)
595 		client->pipelining = false;
596 
597 	if (client->async == true)
598 		use_spoe_engine(client);
599 
600 	return (p - frame->buf);
601   error:
602 	return -1;
603 }
604 
605 /* Decode a DISCONNECT frame received from HAProxy. It returns -1 if an error
606  * occurred, otherwise the number of read bytes. DISCONNECT frame cannot be
607  * ignored and having another frame than a DISCONNECT frame is an error.*/
608 static int
handle_hadiscon(struct spoe_frame * frame)609 handle_hadiscon(struct spoe_frame *frame)
610 {
611 	struct client *client = frame->client;
612 	char          *p, *end;
613 
614 	p = frame->buf;
615 	end = frame->buf + frame->len;
616 
617 	/* Check frame type: we really want a DISCONNECT frame */
618 	if (*p++ != SPOE_FRM_T_HAPROXY_DISCON)
619 		goto error;
620 
621 	DEBUG(frame->worker, "<%lu> Decode HAProxy DISCONNECT frame", client->id);
622 
623 	/* Retrieve flags */
624 	memcpy((char *)&(frame->flags), p, 4);
625 	frame->flags = ntohl(frame->flags);
626 	p += 4;
627 
628 	/* Fragmentation is not supported for DISCONNECT frame */
629 	if (!(frame->flags & SPOE_FRM_FL_FIN)) {
630 		client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
631 		goto error;
632 	}
633 
634 	/* stream-id and frame-id must be cleared */
635 	if (*p != 0 || *(p+1) != 0) {
636 		client->status_code = SPOE_FRM_ERR_INVALID;
637 		goto error;
638 	}
639 	p += 2;
640 
641 	client->status_code = SPOE_FRM_ERR_NONE;
642 
643 	/* Loop on K/V items */
644 	while (p < end) {
645 		char     *str;
646 		uint64_t  sz;
647 
648 		/* Decode item key */
649 		spoe_decode_buffer(&p, end, &str, &sz);
650 		if (!str) {
651 			client->status_code = SPOE_FRM_ERR_INVALID;
652 			goto error;
653 		}
654 
655 		/* Check "status-code" K/V item */
656 		if (!memcmp(str, "status-code", sz)) {
657 			if (check_discon_status_code(frame, &p, end) == -1) {
658 				client->status_code = SPOE_FRM_ERR_INVALID;
659 				goto error;
660 			}
661 		}
662 		/* Check "message" K/V item */
663 		else if (!memcmp(str, "message", sz)) {
664 			if (check_discon_message(frame, &p, end) == -1) {
665 				client->status_code = SPOE_FRM_ERR_INVALID;
666 				goto error;
667 			}
668 		}
669 		else {
670 			DEBUG(frame->worker, "<%lu> Skip K/V item : key=%.*s",
671 			      client->id, (int)sz, str);
672 
673 			/* Silently ignore unknown item */
674 			if (spoe_skip_data(&p, end) == -1) {
675 				client->status_code = SPOE_FRM_ERR_INVALID;
676 				goto error;
677 			}
678 		}
679 	}
680 
681 	return (p - frame->buf);
682   error:
683 	return -1;
684 }
685 
686 /* Decode a NOTIFY frame received from HAProxy. It returns -1 if an error
687  * occurred, 0 if it must be must be ignored, otherwise the number of read
688  * bytes. */
689 static int
handle_hanotify(struct spoe_frame * frame)690 handle_hanotify(struct spoe_frame *frame)
691 {
692 	struct client *client = frame->client;
693 	char          *p, *end;
694 	uint64_t       stream_id, frame_id;
695 
696 	p = frame->buf;
697 	end = frame->buf + frame->len;
698 
699 	/* Check frame type */
700 	if (*p++ != SPOE_FRM_T_HAPROXY_NOTIFY)
701 		goto ignore;
702 
703 	DEBUG(frame->worker, "<%lu> Decode HAProxy NOTIFY frame", client->id);
704 
705 	/* Retrieve flags */
706 	memcpy((char *)&(frame->flags), p, 4);
707 	frame->flags = ntohl(frame->flags);
708 	p += 4;
709 
710 	/* Fragmentation is not supported */
711 	if (!(frame->flags & SPOE_FRM_FL_FIN) && fragmentation == false) {
712 		client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
713 		goto error;
714 	}
715 
716 	/* Read the stream-id and frame-id */
717 	if (decode_varint(&p, end, &stream_id) == -1)
718 		goto ignore;
719 	if (decode_varint(&p, end, &frame_id) == -1)
720 		goto ignore;
721 
722 	frame->stream_id = (unsigned int)stream_id;
723 	frame->frame_id  = (unsigned int)frame_id;
724 
725 	DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
726 	      " - %s frame received"
727 	      " - frag_len=%u - len=%u - offset=%ld",
728 	      client->id, frame->stream_id, frame->frame_id,
729 	      (frame->flags & SPOE_FRM_FL_FIN) ? "unfragmented" : "fragmented",
730 	      frame->frag_len, frame->len, p - frame->buf);
731 
732 	frame->fragmented = !(frame->flags & SPOE_FRM_FL_FIN);
733 	frame->offset = (p - frame->buf);
734 	return acc_payload(frame);
735 
736   ignore:
737 	return 0;
738 
739   error:
740 	return -1;
741 }
742 
743 /* Decode next part of a fragmented frame received from HAProxy. It returns -1
744  * if an error occurred, 0 if it must be must be ignored, otherwise the number
745  * of read bytes. */
746 static int
handle_hafrag(struct spoe_frame * frame)747 handle_hafrag(struct spoe_frame *frame)
748 {
749 	struct client *client = frame->client;
750 	char          *p, *end;
751 	uint64_t       stream_id, frame_id;
752 
753 	p = frame->buf;
754 	end = frame->buf + frame->len;
755 
756 	/* Check frame type */
757 	if (*p++ != SPOE_FRM_T_UNSET)
758 		goto ignore;
759 
760 	DEBUG(frame->worker, "<%lu> Decode Next part of a fragmented frame", client->id);
761 
762 	/* Fragmentation is not supported */
763 	if (fragmentation == false) {
764 		client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
765 		goto error;
766 	}
767 
768 	/* Retrieve flags */
769 	memcpy((char *)&(frame->flags), p, 4);
770 	frame->flags = ntohl(frame->flags);
771 	p+= 4;
772 
773 	/* Read the stream-id and frame-id */
774 	if (decode_varint(&p, end, &stream_id) == -1)
775 		goto ignore;
776 	if (decode_varint(&p, end, &frame_id) == -1)
777 		goto ignore;
778 
779 	if (frame->fragmented == false                  ||
780 	    frame->stream_id != (unsigned int)stream_id ||
781 	    frame->frame_id  != (unsigned int)frame_id) {
782 		client->status_code = SPOE_FRM_ERR_INTERLACED_FRAMES;
783 		goto error;
784 	}
785 
786 	if (frame->flags & SPOE_FRM_FL_ABRT) {
787 		DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
788 		      " - Abort processing of a fragmented frame"
789 		      " - frag_len=%u - len=%u - offset=%ld",
790 		      client->id, frame->stream_id, frame->frame_id,
791 		      frame->frag_len, frame->len, p - frame->buf);
792 		goto ignore;
793 	}
794 
795 	DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
796 	      " - %s fragment of a fragmented frame received"
797 	      " - frag_len=%u - len=%u - offset=%ld",
798 	      client->id, frame->stream_id, frame->frame_id,
799 	      (frame->flags & SPOE_FRM_FL_FIN) ? "last" : "next",
800 	      frame->frag_len, frame->len, p - frame->buf);
801 
802 	frame->offset = (p - frame->buf);
803 	return acc_payload(frame);
804 
805   ignore:
806 	return 0;
807 
808   error:
809 	return -1;
810 }
811 
812 /* Encode a HELLO frame to send it to HAProxy. It returns the number of written
813  * bytes. */
814 static int
prepare_agenthello(struct spoe_frame * frame)815 prepare_agenthello(struct spoe_frame *frame)
816 {
817 	struct client *client = frame->client;
818 	char          *p, *end;
819 	char           capabilities[64];
820 	int            n;
821 	unsigned int   flags  = SPOE_FRM_FL_FIN;
822 
823 	DEBUG(frame->worker, "<%lu> Encode Agent HELLO frame", client->id);
824 	frame->type = SPOA_FRM_T_AGENT;
825 
826 	p   = frame->buf;
827 	end = frame->buf+max_frame_size;
828 
829 	/* Frame Type */
830 	*p++ = SPOE_FRM_T_AGENT_HELLO;
831 
832 	/* Set flags */
833 	flags = htonl(flags);
834 	memcpy(p, (char *)&flags, 4);
835 	p += 4;
836 
837 	/* No stream-id and frame-id for HELLO frames */
838 	*p++ = 0;
839 	*p++ = 0;
840 
841 	/* "version" K/V item */
842 	spoe_encode_buffer("version", 7, &p, end);
843 	*p++ = SPOE_DATA_T_STR;
844 	spoe_encode_buffer(SPOP_VERSION, SLEN(SPOP_VERSION), &p, end);
845 	DEBUG(frame->worker, "<%lu> Agent version : %s",
846 	      client->id, SPOP_VERSION);
847 
848 
849 	/* "max-frame-size" K/V item */
850 	spoe_encode_buffer("max-frame-size", 14, &p ,end);
851 	*p++ = SPOE_DATA_T_UINT32;
852 	encode_varint(client->max_frame_size, &p, end);
853 	DEBUG(frame->worker, "<%lu> Agent maximum frame size : %u",
854 	      client->id, client->max_frame_size);
855 
856 	/* "capabilities" K/V item */
857 	spoe_encode_buffer("capabilities", 12, &p, end);
858 	*p++ = SPOE_DATA_T_STR;
859 
860 	memset(capabilities, 0, sizeof(capabilities));
861 	n = 0;
862 
863 	/*     1. Fragmentation capability ? */
864 	if (fragmentation == true) {
865 		memcpy(capabilities, "fragmentation", 13);
866 		n += 13;
867 	}
868 	/*     2. Pipelining capability ? */
869 	if (client->pipelining == true) {
870 		if (n) capabilities[n++] = ',';
871 		memcpy(capabilities + n, "pipelining", 10);
872 		n += 10;
873 	}
874 	/*     3. Async capability ? */
875 	if (client->async == true) {
876 		if (n) capabilities[n++] = ',';
877 		memcpy(capabilities + n, "async", 5);
878 		n += 5;
879 	}
880 	spoe_encode_buffer(capabilities, n, &p, end);
881 
882 	DEBUG(frame->worker, "<%lu> Agent capabilities : %.*s",
883 	      client->id, n, capabilities);
884 
885 	frame->len = (p - frame->buf);
886 	return frame->len;
887 }
888 
889 /* Encode a DISCONNECT frame to send it to HAProxy. It returns the number of
890  * written bytes. */
891 static int
prepare_agentdicon(struct spoe_frame * frame)892 prepare_agentdicon(struct spoe_frame *frame)
893 {
894 	struct client *client = frame->client;
895 	char           *p, *end;
896 	const char     *reason;
897 	int             rlen;
898 	unsigned int    flags  = SPOE_FRM_FL_FIN;
899 
900 	DEBUG(frame->worker, "<%lu> Encode Agent DISCONNECT frame", client->id);
901 	frame->type = SPOA_FRM_T_AGENT;
902 
903 	p   = frame->buf;
904 	end = frame->buf+max_frame_size;
905 
906 	if (client->status_code >= SPOE_FRM_ERRS)
907 		client->status_code = SPOE_FRM_ERR_UNKNOWN;
908 	reason = spoe_frm_err_reasons[client->status_code];
909 	rlen   = strlen(reason);
910 
911 	/* Frame type */
912 	*p++ = SPOE_FRM_T_AGENT_DISCON;
913 
914 	/* Set flags */
915 	flags = htonl(flags);
916 	memcpy(p, (char *)&flags, 4);
917 	p += 4;
918 
919 	/* No stream-id and frame-id for DISCONNECT frames */
920 	*p++ = 0;
921 	*p++ = 0;
922 
923 	/* There are 2 mandatory items: "status-code" and "message" */
924 
925 	/* "status-code" K/V item */
926 	spoe_encode_buffer("status-code", 11, &p, end);
927 	*p++ = SPOE_DATA_T_UINT32;
928 	encode_varint(client->status_code, &p, end);
929 	DEBUG(frame->worker, "<%lu> Disconnect status code : %u",
930 	      client->id, client->status_code);
931 
932 	/* "message" K/V item */
933 	spoe_encode_buffer("message", 7, &p, end);
934 	*p++ = SPOE_DATA_T_STR;
935 	spoe_encode_buffer(reason, rlen, &p, end);
936 	DEBUG(frame->worker, "<%lu> Disconnect message : %s",
937 	      client->id, reason);
938 
939 	frame->len = (p - frame->buf);
940 	return frame->len;
941 }
942 
943 /* Encode a ACK frame to send it to HAProxy. It returns the number of written
944  * bytes. */
945 static int
prepare_agentack(struct spoe_frame * frame)946 prepare_agentack(struct spoe_frame *frame)
947 {
948 	char        *p, *end;
949 	unsigned int flags  = SPOE_FRM_FL_FIN;
950 
951 	/* Be careful here, in async mode, frame->client can be NULL */
952 
953 	DEBUG(frame->worker, "Encode Agent ACK frame");
954 	frame->type = SPOA_FRM_T_AGENT;
955 
956 	p   = frame->buf;
957 	end = frame->buf+max_frame_size;
958 
959 	/* Frame type */
960 	*p++ = SPOE_FRM_T_AGENT_ACK;
961 
962 	/* Set flags */
963 	flags = htonl(flags);
964 	memcpy(p, (char *)&flags, 4);
965 	p += 4;
966 
967 	/* Set stream-id and frame-id for ACK frames */
968 	encode_varint(frame->stream_id, &p, end);
969 	encode_varint(frame->frame_id, &p, end);
970 
971 	DEBUG(frame->worker, "STREAM-ID=%u - FRAME-ID=%u",
972 	      frame->stream_id, frame->frame_id);
973 
974 	frame->len = (p - frame->buf);
975 	return frame->len;
976 }
977 
978 static int
create_server_socket(void)979 create_server_socket(void)
980 {
981 	struct sockaddr_in listen_addr;
982 	int                fd, yes = 1;
983 
984 	fd = socket(AF_INET, SOCK_STREAM, 0);
985 	if (fd < 0) {
986 		LOG(&null_worker, "Failed to create service socket : %m");
987 		return -1;
988 	}
989 
990 	memset(&listen_addr, 0, sizeof(listen_addr));
991 	listen_addr.sin_family = AF_INET;
992 	listen_addr.sin_addr.s_addr = INADDR_ANY;
993 	listen_addr.sin_port = htons(server_port);
994 
995 	if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) < 0 ||
996 	    setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)) < 0) {
997 		LOG(&null_worker, "Failed to set option on server socket : %m");
998 		return -1;
999 	}
1000 
1001 	if (bind(fd, (struct sockaddr *) &listen_addr, sizeof(listen_addr)) < 0) {
1002 		LOG(&null_worker, "Failed to bind server socket : %m");
1003 		return -1;
1004 	}
1005 
1006 	if (listen(fd, CONNECTION_BACKLOG) < 0) {
1007 		LOG(&null_worker, "Failed to listen on server socket : %m");
1008 		return -1;
1009 	}
1010 
1011 	return fd;
1012 }
1013 
1014 static void
release_frame(struct spoe_frame * frame)1015 release_frame(struct spoe_frame *frame)
1016 {
1017 	struct worker *worker;
1018 
1019 	if (frame == NULL)
1020 		return;
1021 
1022 	if (event_pending(&frame->process_frame_event, EV_TIMEOUT, NULL))
1023 		event_del(&frame->process_frame_event);
1024 
1025 	worker = frame->worker;
1026 	LIST_DEL(&frame->list);
1027 	if (frame->frag_buf)
1028 		free(frame->frag_buf);
1029 	memset(frame, 0, sizeof(*frame)+max_frame_size+4);
1030 	LIST_ADDQ(&worker->frames, &frame->list);
1031 }
1032 
1033 static void
release_client(struct client * c)1034 release_client(struct client *c)
1035 {
1036 	struct spoe_frame *frame, *back;
1037 
1038 	if (c == NULL)
1039 		return;
1040 
1041 	DEBUG(c->worker, "<%lu> Release client", c->id);
1042 
1043 	LIST_DEL(&c->by_worker);
1044 	c->worker->nbclients--;
1045 
1046 	unuse_spoe_engine(c);
1047 	free(c->engine_id);
1048 
1049 	if (event_pending(&c->read_frame_event, EV_READ, NULL))
1050 		event_del(&c->read_frame_event);
1051 	if (event_pending(&c->write_frame_event, EV_WRITE, NULL))
1052 		event_del(&c->write_frame_event);
1053 
1054 	release_frame(c->incoming_frame);
1055 	release_frame(c->outgoing_frame);
1056 	list_for_each_entry_safe(frame, back, &c->processing_frames, list) {
1057 		release_frame(frame);
1058 	}
1059 	list_for_each_entry_safe(frame, back, &c->outgoing_frames, list) {
1060 		release_frame(frame);
1061 	}
1062 
1063 	if (c->fd >= 0)
1064 		close(c->fd);
1065 
1066 	free(c);
1067 }
1068 
1069 static void
reset_frame(struct spoe_frame * frame)1070 reset_frame(struct spoe_frame *frame)
1071 {
1072 	if (frame == NULL)
1073 		return;
1074 
1075 	if (frame->frag_buf)
1076 		free(frame->frag_buf);
1077 
1078 	frame->type       = SPOA_FRM_T_UNKNOWN;
1079 	frame->buf        = (char *)(frame->data);
1080 	frame->offset     = 0;
1081 	frame->len        = 0;
1082 	frame->stream_id  = 0;
1083 	frame->frame_id   = 0;
1084 	frame->flags      = 0;
1085 	frame->hcheck     = false;
1086 	frame->fragmented = false;
1087 	frame->ip_score   = -1;
1088 	frame->frag_buf   = NULL;
1089 	frame->frag_len   = 0;
1090 	LIST_INIT(&frame->list);
1091 }
1092 
1093 static void
use_spoe_engine(struct client * client)1094 use_spoe_engine(struct client *client)
1095 {
1096 	struct spoe_engine *eng;
1097 
1098 	if (client->engine_id == NULL)
1099 		return;
1100 
1101 	list_for_each_entry(eng, &client->worker->engines, list) {
1102 		if (!strcmp(eng->id, client->engine_id))
1103 			goto end;
1104 	}
1105 
1106 	if ((eng = malloc(sizeof(*eng))) == NULL) {
1107 		client->async = false;
1108 		return;
1109 	}
1110 
1111 	eng->id = strdup(client->engine_id);
1112 	LIST_INIT(&eng->clients);
1113 	LIST_INIT(&eng->processing_frames);
1114 	LIST_INIT(&eng->outgoing_frames);
1115 	LIST_ADDQ(&client->worker->engines, &eng->list);
1116 	LOG(client->worker, "Add new SPOE engine '%s'", eng->id);
1117 
1118   end:
1119 	client->engine = eng;
1120 	LIST_ADDQ(&eng->clients, &client->by_engine);
1121 }
1122 
1123 static void
unuse_spoe_engine(struct client * client)1124 unuse_spoe_engine(struct client *client)
1125 {
1126 	struct spoe_engine *eng;
1127 	struct spoe_frame  *frame, *back;
1128 
1129 	if (client == NULL || client->engine == NULL)
1130 		return;
1131 
1132 	eng = client->engine;
1133 	client->engine = NULL;
1134 	LIST_DEL(&client->by_engine);
1135 	if (!LIST_ISEMPTY(&eng->clients))
1136 		return;
1137 
1138 	LOG(client->worker, "Remove SPOE engine '%s'", eng->id);
1139 	LIST_DEL(&eng->list);
1140 
1141 	list_for_each_entry_safe(frame, back, &eng->processing_frames, list) {
1142 		release_frame(frame);
1143 	}
1144 	list_for_each_entry_safe(frame, back, &eng->outgoing_frames, list) {
1145 		release_frame(frame);
1146 	}
1147 	free(eng->id);
1148 	free(eng);
1149 }
1150 
1151 
1152 static struct spoe_frame *
acquire_incoming_frame(struct client * client)1153 acquire_incoming_frame(struct client *client)
1154 {
1155 	struct spoe_frame *frame;
1156 
1157 	frame = client->incoming_frame;
1158 	if (frame != NULL)
1159 		return frame;
1160 
1161 	if (LIST_ISEMPTY(&client->worker->frames)) {
1162 		if ((frame = calloc(1, sizeof(*frame)+max_frame_size+4)) == NULL) {
1163 			LOG(client->worker, "Failed to allocate new frame : %m");
1164 			return NULL;
1165 		}
1166 	}
1167 	else {
1168 		frame = LIST_NEXT(&client->worker->frames, typeof(frame), list);
1169 		LIST_DEL(&frame->list);
1170 	}
1171 
1172 	reset_frame(frame);
1173 	frame->worker = client->worker;
1174 	frame->engine = client->engine;
1175 	frame->client = client;
1176 
1177 	if (event_assign(&frame->process_frame_event, client->worker->base, -1,
1178 			 EV_TIMEOUT|EV_PERSIST, process_frame_cb, frame) < 0) {
1179 		LOG(client->worker, "Failed to create frame event");
1180 		return NULL;
1181 	}
1182 
1183 	client->incoming_frame = frame;
1184 	return frame;
1185 }
1186 
1187 static struct spoe_frame *
acquire_outgoing_frame(struct client * client)1188 acquire_outgoing_frame(struct client *client)
1189 {
1190 	struct spoe_engine *engine = client->engine;
1191 	struct spoe_frame  *frame = NULL;
1192 
1193 	if (client->outgoing_frame != NULL)
1194 		frame = client->outgoing_frame;
1195 	else if (!LIST_ISEMPTY(&client->outgoing_frames)) {
1196 		frame = LIST_NEXT(&client->outgoing_frames, typeof(frame), list);
1197 		LIST_DEL(&frame->list);
1198 		client->outgoing_frame = frame;
1199 	}
1200 	else if (engine!= NULL && !LIST_ISEMPTY(&engine->outgoing_frames)) {
1201 		frame = LIST_NEXT(&engine->outgoing_frames, typeof(frame), list);
1202 		LIST_DEL(&frame->list);
1203 		client->outgoing_frame = frame;
1204 	}
1205 	return frame;
1206 }
1207 
1208 static void
write_frame(struct client * client,struct spoe_frame * frame)1209 write_frame(struct client *client, struct spoe_frame *frame)
1210 {
1211 	uint32_t netint;
1212 
1213 	LIST_DEL(&frame->list);
1214 
1215 	frame->buf    = (char *)(frame->data);
1216 	frame->offset = 0;
1217 	netint        = htonl(frame->len);
1218 	memcpy(frame->buf, &netint, 4);
1219 
1220 	if (client != NULL) { /* HELLO or DISCONNECT frames */
1221 		event_add(&client->write_frame_event, NULL);
1222 
1223 		/* Try to process the frame as soon as possible, and always
1224 		 * attach it to the client */
1225 		if (client->async || client->pipelining) {
1226 			if (client->outgoing_frame == NULL)
1227 				client->outgoing_frame = frame;
1228 			else
1229 				LIST_ADD(&client->outgoing_frames, &frame->list);
1230 		}
1231 		else {
1232 			client->outgoing_frame = frame;
1233 			event_del(&client->read_frame_event);
1234 		}
1235 	}
1236 	else { /* for all other frames */
1237 		if (frame->client == NULL) { /* async mode ! */
1238 			LIST_ADDQ(&frame->engine->outgoing_frames, &frame->list);
1239 			list_for_each_entry(client, &frame->engine->clients, by_engine)
1240 				event_add(&client->write_frame_event, NULL);
1241 		}
1242 		else if (frame->client->pipelining) {
1243 			LIST_ADDQ(&frame->client->outgoing_frames, &frame->list);
1244 			event_add(&frame->client->write_frame_event, NULL);
1245 		}
1246 		else {
1247 			frame->client->outgoing_frame = frame;
1248 			event_add(&frame->client->write_frame_event, NULL);
1249 			event_del(&frame->client->read_frame_event);
1250 		}
1251 	}
1252 }
1253 
1254 static void
process_incoming_frame(struct spoe_frame * frame)1255 process_incoming_frame(struct spoe_frame *frame)
1256 {
1257 	struct client *client = frame->client;
1258 
1259 	if (event_add(&frame->process_frame_event, &processing_delay) < 0) {
1260 		LOG(client->worker, "Failed to process incoming frame");
1261 		release_frame(frame);
1262 		return;
1263 	}
1264 
1265 	if (client->async) {
1266 		frame->client = NULL;
1267 		LIST_ADDQ(&frame->engine->processing_frames, &frame->list);
1268 	}
1269 	else if (client->pipelining)
1270 		LIST_ADDQ(&client->processing_frames, &frame->list);
1271 	else
1272 		event_del(&client->read_frame_event);
1273 }
1274 
1275 static void
signal_cb(evutil_socket_t sig,short events,void * user_data)1276 signal_cb(evutil_socket_t sig, short events, void *user_data)
1277 {
1278 	struct event_base *base = user_data;
1279 	int                i;
1280 
1281 	DEBUG(&null_worker, "Stopping the server");
1282 
1283 	event_base_loopbreak(base);
1284 	DEBUG(&null_worker, "Main event loop stopped");
1285 
1286 	for (i = 0; i < num_workers; i++) {
1287 		event_base_loopbreak(workers[i].base);
1288 		DEBUG(&null_worker, "Event loop stopped for worker %02d",
1289 		      workers[i].id);
1290 	}
1291 }
1292 
1293 static void
worker_monitor_cb(evutil_socket_t fd,short events,void * arg)1294 worker_monitor_cb(evutil_socket_t fd, short events, void *arg)
1295 {
1296 	struct worker *worker = arg;
1297 
1298 	LOG(worker, "%u clients connected (%u frames)", worker->nbclients, worker->nbframes);
1299 }
1300 
1301 static void
process_frame_cb(evutil_socket_t fd,short events,void * arg)1302 process_frame_cb(evutil_socket_t fd, short events, void *arg)
1303 {
1304 	struct spoe_frame *frame  = arg;
1305 	char              *p, *end;
1306 	int                ret;
1307 
1308 	DEBUG(frame->worker,
1309 	      "Process frame messages : STREAM-ID=%u - FRAME-ID=%u - length=%u bytes",
1310 	      frame->stream_id, frame->frame_id, frame->len - frame->offset);
1311 
1312 	p   = frame->buf + frame->offset;
1313 	end = frame->buf + frame->len;
1314 
1315 	/* Loop on messages */
1316 	while (p < end) {
1317 		char    *str;
1318 		uint64_t sz;
1319 		int      nbargs;
1320 
1321 		/* Decode the message name */
1322 		spoe_decode_buffer(&p, end, &str, &sz);
1323 		if (!str)
1324 			goto stop_processing;
1325 
1326 		DEBUG(frame->worker, "Process SPOE Message '%.*s'", (int)sz, str);
1327 
1328 		nbargs = (unsigned char)*p++;      /* Get the number of arguments */
1329 		frame->offset = (p - frame->buf);  /* Save index to handle errors and skip args */
1330 		if (!memcmp(str, "check-client-ip", sz)) {
1331 			union spoe_data data;
1332 			enum spoe_data_type type;
1333 
1334 			if (nbargs != 1)
1335 				goto skip_message;
1336 
1337 			if (spoe_decode_buffer(&p, end, &str, &sz) == -1)
1338 				goto stop_processing;
1339 			if (spoe_decode_data(&p, end, &data, &type) == -1)
1340 				goto skip_message;
1341 			frame->worker->nbframes++;
1342 			if (type == SPOE_DATA_T_IPV4)
1343 				check_ipv4_reputation(frame, &data.ipv4);
1344 			if (type == SPOE_DATA_T_IPV6)
1345 				check_ipv6_reputation(frame, &data.ipv6);
1346 		}
1347 		else {
1348 		  skip_message:
1349 			p = frame->buf + frame->offset; /* Restore index */
1350 
1351 			while (nbargs-- > 0) {
1352 				/* Silently ignore argument: its name and its value */
1353 				if (spoe_decode_buffer(&p, end, &str, &sz) == -1)
1354 					goto stop_processing;
1355 				if (spoe_skip_data(&p, end) == -1)
1356 					goto stop_processing;
1357 			}
1358 		}
1359 	}
1360 
1361   stop_processing:
1362 	/* Prepare agent ACK frame */
1363 	frame->buf    = (char *)(frame->data) + 4;
1364 	frame->offset = 0;
1365 	frame->len    = 0;
1366 	frame->flags  = 0;
1367 
1368 	ret = prepare_agentack(frame);
1369 	p   = frame->buf + ret;
1370 	end = frame->buf+max_frame_size;
1371 
1372 	if (frame->ip_score != -1) {
1373 		DEBUG(frame->worker, "Add action : set variable ip_scode=%u",
1374 		      frame->ip_score);
1375 
1376 		*p++ = SPOE_ACT_T_SET_VAR;                     /* Action type */
1377 		*p++ = 3;                                      /* Number of args */
1378 		*p++ = SPOE_SCOPE_SESS;                        /* Arg 1: the scope */
1379 		spoe_encode_buffer("ip_score", 8, &p, end);    /* Arg 2: variable name */
1380 		*p++ = SPOE_DATA_T_UINT32;
1381 		encode_varint(frame->ip_score, &p, end); /* Arg 3: variable value */
1382 		frame->len = (p - frame->buf);
1383 	}
1384 	write_frame(NULL, frame);
1385 }
1386 
1387 static void
read_frame_cb(evutil_socket_t fd,short events,void * arg)1388 read_frame_cb(evutil_socket_t fd, short events, void *arg)
1389 {
1390 	struct client     *client = arg;
1391 	struct spoe_frame *frame;
1392 	uint32_t           netint;
1393 	int                n;
1394 
1395 	DEBUG(client->worker, "<%lu> %s", client->id, __FUNCTION__);
1396 	if ((frame = acquire_incoming_frame(client)) == NULL)
1397 		goto close;
1398 
1399 	frame->type = SPOA_FRM_T_HAPROXY;
1400 	if (frame->buf == (char *)(frame->data)) {
1401 		/* Read the frame length: frame->buf points on length part (frame->data) */
1402 		n = read(client->fd, frame->buf+frame->offset, 4-frame->offset);
1403 		if (n <= 0) {
1404 			if (n < 0)
1405 				LOG(client->worker, "Failed to read frame length : %m");
1406 			goto close;
1407 		}
1408 		frame->offset += n;
1409 		if (frame->offset != 4)
1410 			return;
1411 		memcpy(&netint, frame->buf, 4);
1412 		frame->buf   += 4;
1413 		frame->offset = 0;
1414 		frame->len    = ntohl(netint);
1415 	}
1416 
1417 	/* Read the frame: frame->buf points on frame part (frame->data+4)*/
1418 	n = read(client->fd, frame->buf + frame->offset,
1419 		 frame->len - frame->offset);
1420 	if (n <= 0) {
1421 		if (n < 0) {
1422 			LOG(client->worker, "Frame to read frame : %m");
1423 			goto close;
1424 		}
1425 		return;
1426 	}
1427 	frame->offset += n;
1428 	if (frame->offset != frame->len)
1429 		return;
1430 	frame->offset = 0;
1431 
1432 	DEBUG(client->worker, "<%lu> New Frame of %u bytes received",
1433 	      client->id, frame->len);
1434 
1435 	switch (client->state) {
1436 		case SPOA_ST_CONNECTING:
1437 			if (handle_hahello(frame) < 0) {
1438 				LOG(client->worker, "Failed to decode HELLO frame");
1439 				goto disconnect;
1440 			}
1441 			prepare_agenthello(frame);
1442 			goto write_frame;
1443 
1444 		case SPOA_ST_PROCESSING:
1445 			if (frame->buf[0] == SPOE_FRM_T_HAPROXY_DISCON) {
1446 				client->state = SPOA_ST_DISCONNECTING;
1447 				goto disconnecting;
1448 			}
1449 			if (frame->buf[0] == SPOE_FRM_T_UNSET)
1450 				n = handle_hafrag(frame);
1451 			else
1452 				n = handle_hanotify(frame);
1453 
1454 			if (n < 0) {
1455 				LOG(client->worker, "Failed to decode frame: %s",
1456 				    spoe_frm_err_reasons[client->status_code]);
1457 				goto disconnect;
1458 			}
1459 			else if (n == 0) {
1460 				LOG(client->worker, "Ignore invalid/unknown/aborted frame");
1461 				goto ignore_frame;
1462 			}
1463 			else if (n == 1)
1464 				goto noop;
1465 			else
1466 				goto process_frame;
1467 
1468 		case SPOA_ST_DISCONNECTING:
1469 		  disconnecting:
1470 			if (handle_hadiscon(frame) < 0) {
1471 				LOG(client->worker, "Failed to decode DISCONNECT frame");
1472 				goto disconnect;
1473 			}
1474 			if (client->status_code != SPOE_FRM_ERR_NONE)
1475 				LOG(client->worker, "<%lu> Peer closed connection: %s",
1476 				    client->id, spoe_frm_err_reasons[client->status_code]);
1477 			goto disconnect;
1478 	}
1479 
1480   noop:
1481 	return;
1482 
1483   ignore_frame:
1484 	reset_frame(frame);
1485 	return;
1486 
1487   process_frame:
1488 	process_incoming_frame(frame);
1489 	client->incoming_frame = NULL;
1490 	return;
1491 
1492   write_frame:
1493 	write_frame(client, frame);
1494 	client->incoming_frame = NULL;
1495 	return;
1496 
1497   disconnect:
1498 	client->state = SPOA_ST_DISCONNECTING;
1499 	if (prepare_agentdicon(frame) < 0) {
1500 		LOG(client->worker, "Failed to encode DISCONNECT frame");
1501 		goto close;
1502 	}
1503 	goto write_frame;
1504 
1505   close:
1506 	release_client(client);
1507 }
1508 
1509 static void
write_frame_cb(evutil_socket_t fd,short events,void * arg)1510 write_frame_cb(evutil_socket_t fd, short events, void *arg)
1511 {
1512 	struct client     *client = arg;
1513 	struct spoe_frame *frame;
1514 	int                n;
1515 
1516 	DEBUG(client->worker, "<%lu> %s", client->id, __FUNCTION__);
1517 	if ((frame = acquire_outgoing_frame(client)) == NULL) {
1518 		event_del(&client->write_frame_event);
1519 		return;
1520 	}
1521 
1522 	if (frame->buf == (char *)(frame->data)) {
1523 		/* Write the frame length: frame->buf points on length part (frame->data) */
1524 		n = write(client->fd, frame->buf+frame->offset, 4-frame->offset);
1525 		if (n <= 0) {
1526 			if (n < 0)
1527 				LOG(client->worker, "Failed to write frame length : %m");
1528 			goto close;
1529 		}
1530 		frame->offset += n;
1531 		if (frame->offset != 4)
1532 			return;
1533 		frame->buf   += 4;
1534 		frame->offset = 0;
1535 	}
1536 
1537 	/* Write the frame: frame->buf points on frame part (frame->data+4)*/
1538 	n = write(client->fd, frame->buf + frame->offset,
1539 		  frame->len - frame->offset);
1540 	if (n <= 0) {
1541 		if (n < 0) {
1542 			LOG(client->worker, "Failed to write frame : %m");
1543 			goto close;
1544 		}
1545 		return;
1546 	}
1547 	frame->offset += n;
1548 	if (frame->offset != frame->len)
1549 		return;
1550 
1551 	DEBUG(client->worker, "<%lu> Frame of %u bytes send",
1552 	      client->id, frame->len);
1553 
1554 	switch (client->state) {
1555 		case SPOA_ST_CONNECTING:
1556 			if (frame->hcheck == true) {
1557 				DEBUG(client->worker,
1558 				      "<%lu> Close client after healthcheck",
1559 				      client->id);
1560 				goto close;
1561 			}
1562 			client->state = SPOA_ST_PROCESSING;
1563 			break;
1564 
1565 		case SPOA_ST_PROCESSING:
1566 			break;
1567 
1568 		case SPOA_ST_DISCONNECTING:
1569 			goto close;
1570 	}
1571 
1572 	release_frame(frame);
1573 	client->outgoing_frame = NULL;
1574 	if (!client->async && !client->pipelining) {
1575 		event_del(&client->write_frame_event);
1576 		event_add(&client->read_frame_event, NULL);
1577 	}
1578 	return;
1579 
1580   close:
1581 	release_client(client);
1582 }
1583 
1584 static void
accept_cb(int listener,short event,void * arg)1585 accept_cb(int listener, short event, void *arg)
1586 {
1587 	struct worker     *worker;
1588 	struct client     *client;
1589 	int                fd;
1590 
1591 	worker = &workers[clicount++ % num_workers];
1592 
1593 	if ((fd = accept(listener, NULL, NULL)) < 0) {
1594 		if (errno != EAGAIN && errno != EWOULDBLOCK)
1595 			LOG(worker, "Failed to accept client connection : %m");
1596 		return;
1597 	}
1598 
1599 	DEBUG(&null_worker,
1600 	      "<%lu> New Client connection accepted and assigned to worker %02d",
1601 	      clicount, worker->id);
1602 
1603 	if (evutil_make_socket_nonblocking(fd) < 0) {
1604 		LOG(&null_worker, "Failed to set client socket to non-blocking : %m");
1605 		close(fd);
1606 		return;
1607 	}
1608 
1609 	if ((client = calloc(1, sizeof(*client))) == NULL) {
1610 		LOG(&null_worker, "Failed to allocate memory for client state : %m");
1611 		close(fd);
1612 		return;
1613 	}
1614 
1615 	client->id             = clicount;
1616 	client->fd             = fd;
1617 	client->worker         = worker;
1618 	client->state          = SPOA_ST_CONNECTING;
1619 	client->status_code    = SPOE_FRM_ERR_NONE;
1620 	client->max_frame_size = max_frame_size;
1621 	client->engine         = NULL;
1622 	client->pipelining     = false;
1623 	client->async          = false;
1624 	client->incoming_frame = NULL;
1625 	client->outgoing_frame = NULL;
1626 	LIST_INIT(&client->processing_frames);
1627 	LIST_INIT(&client->outgoing_frames);
1628 
1629 	LIST_ADDQ(&worker->clients, &client->by_worker);
1630 
1631 	worker->nbclients++;
1632 
1633 	if (event_assign(&client->read_frame_event, worker->base, fd,
1634 			 EV_READ|EV_PERSIST, read_frame_cb, client) < 0     ||
1635 	    event_assign(&client->write_frame_event, worker->base, fd,
1636 			 EV_WRITE|EV_PERSIST, write_frame_cb, client) < 0) {
1637 		LOG(&null_worker, "Failed to create client events");
1638 		release_client(client);
1639 		return;
1640 	}
1641 	event_add(&client->read_frame_event,  NULL);
1642 }
1643 
1644 static void *
worker_function(void * data)1645 worker_function(void *data)
1646 {
1647 	struct client     *client, *cback;
1648 	struct spoe_frame *frame, *fback;
1649 	struct worker     *worker = data;
1650 
1651 	DEBUG(worker, "Worker ready to process client messages");
1652 	event_base_dispatch(worker->base);
1653 
1654 	list_for_each_entry_safe(client, cback, &worker->clients, by_worker) {
1655 		release_client(client);
1656 	}
1657 
1658 	list_for_each_entry_safe(frame, fback, &worker->frames, list) {
1659 		LIST_DEL(&frame->list);
1660 		free(frame);
1661 	}
1662 
1663 	event_free(worker->monitor_event);
1664 	event_base_free(worker->base);
1665 	DEBUG(worker, "Worker is stopped");
1666 	pthread_exit(&null_worker);
1667 }
1668 
1669 
1670 static int
parse_processing_delay(const char * str)1671 parse_processing_delay(const char *str)
1672 {
1673         unsigned long value;
1674 
1675         value = 0;
1676         while (1) {
1677                 unsigned int j;
1678 
1679                 j = *str - '0';
1680                 if (j > 9)
1681                         break;
1682                 str++;
1683                 value *= 10;
1684                 value += j;
1685         }
1686 
1687         switch (*str) {
1688 		case '\0': /* no unit = millisecond */
1689 			value *= 1000;
1690 			break;
1691 		case 's': /* second */
1692 			value *= 1000000;
1693 			str++;
1694 			break;
1695 		case 'm': /* millisecond : "ms" */
1696 			if (str[1] != 's')
1697 				return -1;
1698 			value *= 1000;
1699 			str += 2;
1700 			break;
1701 		case 'u': /* microsecond : "us" */
1702 			if (str[1] != 's')
1703 				return -1;
1704 			str += 2;
1705 			break;
1706 		default:
1707 			return -1;
1708         }
1709 	if (*str)
1710 		return -1;
1711 
1712 	processing_delay.tv_sec = (time_t)(value / 1000000);
1713 	processing_delay.tv_usec = (suseconds_t)(value % 1000000);
1714         return 0;
1715 }
1716 
1717 
1718 static void
usage(char * prog)1719 usage(char *prog)
1720 {
1721 	fprintf(stderr,
1722 		"Usage : %s [OPTION]...\n"
1723 		"    -h                   Print this message\n"
1724 		"    -d                   Enable the debug mode\n"
1725 		"    -m <max-frame-size>  Specify the maximum frame size (default : %u)\n"
1726 		"    -p <port>            Specify the port to listen on (default : %d)\n"
1727 		"    -n <num-workers>     Specify the number of workers (default : %d)\n"
1728 		"    -c <capability>      Enable the support of the specified capability\n"
1729 		"    -t <time>            Set a delay to process a message (default: 0)\n"
1730 		"                           The value is specified in milliseconds by default,\n"
1731 		"                           but can be in any other unit if the number is suffixed\n"
1732 		"                           by a unit (us, ms, s)\n"
1733 		"\n"
1734 		"    Supported capabilities: fragmentation, pipelining, async\n",
1735 		prog, MAX_FRAME_SIZE, DEFAULT_PORT, NUM_WORKERS);
1736 }
1737 
1738 int
main(int argc,char ** argv)1739 main(int argc, char **argv)
1740 {
1741 	struct event_base *base = NULL;
1742 	struct event      *signal_event = NULL, *accept_event = NULL;
1743 	int                opt, i, fd = -1;
1744 
1745 	// TODO: add '-t <processing-time>' option
1746 	while ((opt = getopt(argc, argv, "hdm:n:p:c:t:")) != -1) {
1747 		switch (opt) {
1748 			case 'h':
1749 				usage(argv[0]);
1750 				return EXIT_SUCCESS;
1751 			case 'd':
1752 				debug = true;
1753 				break;
1754 			case 'm':
1755 				max_frame_size = atoi(optarg);
1756 				break;
1757 			case 'n':
1758 				num_workers = atoi(optarg);
1759 				break;
1760 			case 'p':
1761 				server_port = atoi(optarg);
1762 				break;
1763 			case 'c':
1764 				if (!strcmp(optarg, "pipelining"))
1765 					pipelining = true;
1766 				else if (!strcmp(optarg, "async"))
1767 					async = true;
1768 				else if (!strcmp(optarg, "fragmentation"))
1769 					fragmentation = true;
1770 				else
1771 					fprintf(stderr, "WARNING: unsupported capability '%s'\n", optarg);
1772 				break;
1773 			case 't':
1774 				if (!parse_processing_delay(optarg))
1775 					break;
1776 				fprintf(stderr, "%s: failed to parse time '%s'.\n", argv[0], optarg);
1777 				fprintf(stderr, "Try '%s -h' for more information.\n", argv[0]);
1778 				return EXIT_FAILURE;
1779 			default:
1780 				usage(argv[0]);
1781 				return EXIT_FAILURE;
1782 		}
1783 	}
1784 
1785 	if (num_workers <= 0) {
1786 		LOG(&null_worker, "%s : Invalid number of workers '%d'\n",
1787 		    argv[0], num_workers);
1788 		goto error;
1789 	}
1790 
1791 	if (server_port <= 0) {
1792 		LOG(&null_worker, "%s : Invalid port '%d'\n",
1793 		    argv[0], server_port);
1794 		goto error;
1795 	}
1796 
1797 
1798 	if (evthread_use_pthreads() < 0) {
1799 		LOG(&null_worker, "No pthreads support for libevent");
1800 		goto error;
1801 	}
1802 
1803 	if ((base = event_base_new()) == NULL) {
1804 		LOG(&null_worker, "Failed to initialize libevent : %m");
1805 		goto error;
1806 	}
1807 
1808 	signal(SIGPIPE, SIG_IGN);
1809 
1810 	if ((fd = create_server_socket()) < 0) {
1811 		LOG(&null_worker, "Failed to create server socket");
1812 		goto error;
1813 	}
1814 	if (evutil_make_socket_nonblocking(fd) < 0) {
1815 		LOG(&null_worker, "Failed to set server socket to non-blocking");
1816 		goto error;
1817 	}
1818 
1819 	if ((workers = calloc(num_workers, sizeof(*workers))) == NULL) {
1820 		LOG(&null_worker, "Failed to set allocate memory for workers");
1821 		goto error;
1822 	}
1823 
1824 	for (i = 0; i < num_workers; ++i) {
1825 		struct worker *w = &workers[i];
1826 
1827 		w->id        = i+1;
1828 		w->nbclients = 0;
1829 		w->nbframes  = 0;
1830 		LIST_INIT(&w->engines);
1831 		LIST_INIT(&w->clients);
1832 		LIST_INIT(&w->frames);
1833 
1834 		if ((w->base = event_base_new()) == NULL) {
1835 			LOG(&null_worker,
1836 			    "Failed to initialize libevent for worker %02d : %m",
1837 			    w->id);
1838 			goto error;
1839 		}
1840 
1841 		w->monitor_event = event_new(w->base, fd, EV_PERSIST,
1842 					     worker_monitor_cb, (void *)w);
1843 		if (w->monitor_event == NULL ||
1844 		    event_add(w->monitor_event, (struct timeval[]){{5,0}}) < 0) {
1845 			LOG(&null_worker,
1846 			    "Failed to create monitor event for worker %02d",
1847 			    w->id);
1848 			goto error;
1849 		}
1850 
1851 		if (pthread_create(&w->thread, NULL, worker_function, (void *)w)) {
1852 			LOG(&null_worker,
1853 			    "Failed to start thread for worker %02d : %m",
1854 			    w->id);
1855 		}
1856 		DEBUG(&null_worker, "Worker %02d initialized", w->id);
1857 	}
1858 
1859 	accept_event = event_new(base, fd, EV_READ|EV_PERSIST, accept_cb,
1860 				 (void *)base);
1861 	if (accept_event == NULL || event_add(accept_event, NULL) < 0) {
1862 		LOG(&null_worker, "Failed to create accept event : %m");
1863 	}
1864 
1865 	signal_event = evsignal_new(base, SIGINT, signal_cb, (void *)base);
1866 	if (signal_event == NULL || event_add(signal_event, NULL) < 0) {
1867 		LOG(&null_worker, "Failed to create signal event : %m");
1868 	}
1869 
1870 	DEBUG(&null_worker,
1871 	      "Server is ready"
1872 	      " [fragmentation=%s - pipelining=%s - async=%s - debug=%s - max-frame-size=%u]",
1873 	      (fragmentation?"true":"false"), (pipelining?"true":"false"), (async?"true":"false"),
1874 	      (debug?"true":"false"), max_frame_size);
1875 	event_base_dispatch(base);
1876 
1877 	for (i = 0; i < num_workers; i++) {
1878 		struct worker *w = &workers[i];
1879 
1880 		pthread_join(w->thread, NULL);
1881 		DEBUG(&null_worker, "Worker %02d terminated", w->id);
1882 	}
1883 
1884 	free(workers);
1885 	event_free(signal_event);
1886 	event_free(accept_event);
1887 	event_base_free(base);
1888 	close(fd);
1889 	return EXIT_SUCCESS;
1890 
1891   error:
1892 	if (workers != NULL)
1893 		free(workers);
1894 	if (signal_event != NULL)
1895 		event_free(signal_event);
1896 	if (accept_event != NULL)
1897 		event_free(accept_event);
1898 	if (base != NULL)
1899 		event_base_free(base);
1900 	if (fd != -1)
1901 		close(fd);
1902 	return EXIT_FAILURE;
1903 }
1904