1 /*
2  * Stream processing offload engine management.
3  *
4  * Copyright 2016 HAProxy Technologies, Christopher Faulet <cfaulet@haproxy.com>
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version
9  * 2 of the License, or (at your option) any later version.
10  *
11  */
12 #include <ctype.h>
13 #include <errno.h>
14 
15 #include <common/buffer.h>
16 #include <common/cfgparse.h>
17 #include <common/compat.h>
18 #include <common/config.h>
19 #include <common/debug.h>
20 #include <common/memory.h>
21 #include <common/time.h>
22 
23 #include <types/arg.h>
24 #include <types/filters.h>
25 #include <types/global.h>
26 #include <types/proxy.h>
27 #include <types/sample.h>
28 #include <types/stream.h>
29 
30 #include <proto/arg.h>
31 #include <proto/backend.h>
32 #include <proto/filters.h>
33 #include <proto/freq_ctr.h>
34 #include <proto/frontend.h>
35 #include <proto/log.h>
36 #include <proto/proto_http.h>
37 #include <proto/proxy.h>
38 #include <proto/sample.h>
39 #include <proto/session.h>
40 #include <proto/signal.h>
41 #include <proto/stream.h>
42 #include <proto/stream_interface.h>
43 #include <proto/task.h>
44 #include <proto/vars.h>
45 
46 #if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
47 #define SPOE_PRINTF(x...) fprintf(x)
48 #else
49 #define SPOE_PRINTF(x...)
50 #endif
51 
52 /* Helper to get ctx inside an appctx */
53 #define APPCTX_SPOE(appctx) ((appctx)->ctx.spoe)
54 
55 /* Minimal size for a frame */
56 #define MIN_FRAME_SIZE 256
57 
58 /* Flags set on the SPOE agent */
59 #define SPOE_FL_CONT_ON_ERR       0x00000001 /* Do not stop events processing when an error occurred */
60 
61 /* Flags set on the SPOE context */
62 #define SPOE_CTX_FL_CLI_CONNECTED 0x00000001 /* Set after that on-client-session event was processed */
63 #define SPOE_CTX_FL_SRV_CONNECTED 0x00000002 /* Set after that on-server-session event was processed */
64 #define SPOE_CTX_FL_REQ_PROCESS   0x00000004 /* Set when SPOE is processing the request */
65 #define SPOE_CTX_FL_RSP_PROCESS   0x00000008 /* Set when SPOE is processing the response */
66 
67 #define SPOE_CTX_FL_PROCESS (SPOE_CTX_FL_REQ_PROCESS|SPOE_CTX_FL_RSP_PROCESS)
68 
69 #define SPOE_APPCTX_ERR_NONE    0x00000000 /* no error yet, leave it to zero */
70 #define SPOE_APPCTX_ERR_TOUT    0x00000001 /* SPOE applet timeout */
71 
72 /* All possible states for a SPOE context */
73 enum spoe_ctx_state {
74 	SPOE_CTX_ST_NONE = 0,
75 	SPOE_CTX_ST_READY,
76 	SPOE_CTX_ST_SENDING_MSGS,
77 	SPOE_CTX_ST_WAITING_ACK,
78 	SPOE_CTX_ST_DONE,
79 	SPOE_CTX_ST_ERROR,
80 };
81 
82 /* All possible states for a SPOE applet */
83 enum spoe_appctx_state {
84 	SPOE_APPCTX_ST_CONNECT = 0,
85 	SPOE_APPCTX_ST_CONNECTING,
86 	SPOE_APPCTX_ST_PROCESSING,
87 	SPOE_APPCTX_ST_DISCONNECT,
88 	SPOE_APPCTX_ST_DISCONNECTING,
89 	SPOE_APPCTX_ST_EXIT,
90 	SPOE_APPCTX_ST_END,
91 };
92 
93 /* All supported SPOE actions */
94 enum spoe_action_type {
95 	SPOE_ACT_T_SET_VAR = 1,
96 	SPOE_ACT_T_UNSET_VAR,
97 	SPOE_ACT_TYPES,
98 };
99 
100 /* All supported SPOE events */
101 enum spoe_event {
102 	SPOE_EV_NONE = 0,
103 
104 	/* Request events */
105 	SPOE_EV_ON_CLIENT_SESS = 1,
106 	SPOE_EV_ON_TCP_REQ_FE,
107 	SPOE_EV_ON_TCP_REQ_BE,
108 	SPOE_EV_ON_HTTP_REQ_FE,
109 	SPOE_EV_ON_HTTP_REQ_BE,
110 
111 	/* Response events */
112 	SPOE_EV_ON_SERVER_SESS,
113 	SPOE_EV_ON_TCP_RSP,
114 	SPOE_EV_ON_HTTP_RSP,
115 
116 	SPOE_EV_EVENTS
117 };
118 
119 /* Errors triggerd by SPOE applet */
120 enum spoe_frame_error {
121 	SPOE_FRM_ERR_NONE = 0,
122 	SPOE_FRM_ERR_IO,
123 	SPOE_FRM_ERR_TOUT,
124 	SPOE_FRM_ERR_TOO_BIG,
125 	SPOE_FRM_ERR_INVALID,
126 	SPOE_FRM_ERR_NO_VSN,
127 	SPOE_FRM_ERR_NO_FRAME_SIZE,
128 	SPOE_FRM_ERR_NO_CAP,
129 	SPOE_FRM_ERR_BAD_VSN,
130 	SPOE_FRM_ERR_BAD_FRAME_SIZE,
131 	SPOE_FRM_ERR_UNKNOWN = 99,
132 	SPOE_FRM_ERRS,
133 };
134 
135 /* Scopes used for variables set by agents. It is a way to be agnotic to vars
136  * scope. */
137 enum spoe_vars_scope {
138 	SPOE_SCOPE_PROC = 0, /* <=> SCOPE_PROC  */
139 	SPOE_SCOPE_SESS,     /* <=> SCOPE_SESS */
140 	SPOE_SCOPE_TXN,      /* <=> SCOPE_TXN  */
141 	SPOE_SCOPE_REQ,      /* <=> SCOPE_REQ  */
142 	SPOE_SCOPE_RES,      /* <=> SCOPE_RES  */
143 };
144 
145 
146 /* Describe an argument that will be linked to a message. It is a sample fetch,
147  * with an optional name. */
148 struct spoe_arg {
149 	char               *name;     /* Name of the argument, may be NULL */
150 	unsigned int        name_len; /* The name length, 0 if NULL */
151 	struct sample_expr *expr;     /* Sample expression */
152 	struct list         list;     /* Used to chain SPOE args */
153 };
154 
155 /* Used during the config parsing only because, when a SPOE agent section is
156  * parsed, messages can be undefined. */
157 struct spoe_msg_placeholder {
158 	char       *id;    /* SPOE message placeholder id */
159 	struct list list;  /* Use to chain SPOE message placeholders */
160 };
161 
162 /* Describe a message that will be sent in a NOTIFY frame. A message has a name,
163  * an argument list (see above) and it is linked to a specific event. */
164 struct spoe_message {
165 	char             *id;      /* SPOE message id */
166 	unsigned int      id_len;  /* The message id length */
167 	struct spoe_agent *agent;   /* SPOE agent owning this SPOE message */
168         struct {
169                 char     *file;    /* file where the SPOE message appears */
170                 int       line;    /* line where the SPOE message appears */
171         } conf;                    /* config information */
172 	struct list       args;    /* Arguments added when the SPOE messages is sent */
173 	struct list       list;    /* Used to chain SPOE messages */
174 
175 	enum spoe_event    event;   /* SPOE_EV_* */
176 };
177 
178 /* Describe a SPOE agent. */
179 struct spoe_agent {
180 	char                 *id;             /* SPOE agent id (name) */
181         struct {
182                 char         *file;           /* file where the SPOE agent appears */
183                 int           line;           /* line where the SPOE agent appears */
184         } conf;                               /* config information */
185 	union {
186 		struct proxy *be;             /* Backend used by this agent */
187 		char         *name;           /* Backend name used during conf parsing */
188 	} b;
189 	struct {
190 		unsigned int  hello;          /* Max time to receive AGENT-HELLO frame (in SPOE applet) */
191 		unsigned int  idle;           /* Max Idle timeout  (in SPOE applet) */
192 		unsigned int  processing;     /* Max time to process an event (in the main stream) */
193 	} timeout;
194 
195 	char                 *var_pfx;        /* Prefix used for vars set by the agent */
196 	char                 *var_on_error;   /* Variable to set when an error occured, in the TXN scope */
197 	unsigned int          flags;          /* SPOE_FL_* */
198 	unsigned int          cps_max;        /* Maximum number of connections per second */
199 	unsigned int          eps_max;        /* Maximum number of errors per second */
200 
201 	struct list           cache;          /* List used to cache SPOE streams. In
202 					       * fact, we cache the SPOE applect ctx */
203 
204 	struct list messages[SPOE_EV_EVENTS]; /* List of SPOE messages that will be sent
205 					       * for each supported events */
206 
207 	struct list        applet_wq;         /* List of streams waiting for a SPOE applet */
208 	struct freq_ctr    conn_per_sec;      /* connections per second */
209 	struct freq_ctr    err_per_sec;       /* connetion errors per second */
210 };
211 
212 /* SPOE filter configuration */
213 struct spoe_config {
214 	struct proxy      *proxy;       /* Proxy owning the filter */
215 	struct spoe_agent *agent;       /* Agent used by this filter */
216 	struct proxy       agent_fe;    /* Agent frontend */
217 };
218 
219 /* SPOE context attached to a stream. It is the main structure that handles the
220  * processing offload */
221 struct spoe_context {
222 	struct filter      *filter;       /* The SPOE filter */
223 	struct stream      *strm;         /* The stream that should be offloaded */
224 	struct appctx      *appctx;       /* The SPOE appctx */
225 	struct list        *messages;     /* List of messages that will be sent during the stream processing */
226 	struct buffer      *buffer;       /* Buffer used to store a NOTIFY or ACK frame */
227 	struct buffer_wait  buffer_wait;  /* position in the list of streams waiting for a buffer */
228 	struct list         applet_wait;  /* position in the list of streams waiting for a SPOE applet */
229 
230 	enum spoe_ctx_state state;        /* SPOE_CTX_ST_* */
231 	unsigned int        flags;        /* SPOE_CTX_FL_* */
232 
233 	unsigned int        stream_id;    /* stream_id and frame_id are used */
234 	unsigned int        frame_id;     /* to map NOTIFY and ACK frames */
235 	unsigned int        process_exp;  /* expiration date to process an event */
236 };
237 
238 /* SPOE filter id. Used to identify SPOE filters */
239 const char *spoe_filter_id = "SPOE filter";
240 
241 /* Set if the handle on SIGUSR1 is registered */
242 static int sighandler_registered = 0;
243 
244 /* proxy used during the parsing */
245 struct proxy *curproxy = NULL;
246 
247 /* The name of the SPOE engine, used during the parsing */
248 char *curengine = NULL;
249 
250 /* SPOE agent used during the parsing */
251 struct spoe_agent *curagent = NULL;
252 
253 /* SPOE message used during the parsing */
254 struct spoe_message *curmsg = NULL;
255 
256 /* list of SPOE messages and placeholders used during the parsing */
257 struct list curmsgs;
258 struct list curmps;
259 
260 /* Pool used to allocate new SPOE contexts */
261 static struct pool_head *pool2_spoe_ctx = NULL;
262 
263 /* Temporary variables used to ease error processing */
264 int  spoe_status_code = SPOE_FRM_ERR_NONE;
265 char spoe_reason[256];
266 
267 struct flt_ops spoe_ops;
268 
269 static void offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx);
270 static void on_new_spoe_appctx_failure(struct spoe_agent *agent);
271 static void on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx);
272 
273 /********************************************************************
274  * helper functions/globals
275  ********************************************************************/
276 static void
release_spoe_msg_placeholder(struct spoe_msg_placeholder * mp)277 release_spoe_msg_placeholder(struct spoe_msg_placeholder *mp)
278 {
279 	if (!mp)
280 		return;
281 	free(mp->id);
282 	free(mp);
283 }
284 
285 
286 static void
release_spoe_message(struct spoe_message * msg)287 release_spoe_message(struct spoe_message *msg)
288 {
289 	struct spoe_arg *arg, *back;
290 
291 	if (!msg)
292 		return;
293 	free(msg->id);
294 	free(msg->conf.file);
295 	list_for_each_entry_safe(arg, back, &msg->args, list) {
296 		release_sample_expr(arg->expr);
297 		free(arg->name);
298 		LIST_DEL(&arg->list);
299 		free(arg);
300 	}
301 	free(msg);
302 }
303 
304 static void
release_spoe_agent(struct spoe_agent * agent)305 release_spoe_agent(struct spoe_agent *agent)
306 {
307 	struct spoe_message *msg, *back;
308 	int                  i;
309 
310 	if (!agent)
311 		return;
312 	free(agent->id);
313 	free(agent->conf.file);
314 	free(agent->var_pfx);
315 	free(agent->var_on_error);
316 	for (i = 0; i < SPOE_EV_EVENTS; ++i) {
317 		list_for_each_entry_safe(msg, back, &agent->messages[i], list) {
318 			LIST_DEL(&msg->list);
319 			release_spoe_message(msg);
320 		}
321 	}
322 	free(agent);
323 }
324 
325 static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
326 	[SPOE_FRM_ERR_NONE]           = "normal",
327 	[SPOE_FRM_ERR_IO]             = "I/O error",
328 	[SPOE_FRM_ERR_TOUT]           = "a timeout occurred",
329 	[SPOE_FRM_ERR_TOO_BIG]        = "frame is too big",
330 	[SPOE_FRM_ERR_INVALID]        = "invalid frame received",
331 	[SPOE_FRM_ERR_NO_VSN]         = "version value not found",
332 	[SPOE_FRM_ERR_NO_FRAME_SIZE]  = "max-frame-size value not found",
333 	[SPOE_FRM_ERR_NO_CAP]         = "capabilities value not found",
334 	[SPOE_FRM_ERR_BAD_VSN]        = "unsupported version",
335 	[SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
336 	[SPOE_FRM_ERR_UNKNOWN]        = "an unknown error occurred",
337 };
338 
339 static const char *spoe_event_str[SPOE_EV_EVENTS] = {
340 	[SPOE_EV_ON_CLIENT_SESS] = "on-client-session",
341 	[SPOE_EV_ON_TCP_REQ_FE]  = "on-frontend-tcp-request",
342 	[SPOE_EV_ON_TCP_REQ_BE]  = "on-backend-tcp-request",
343 	[SPOE_EV_ON_HTTP_REQ_FE] = "on-frontend-http-request",
344 	[SPOE_EV_ON_HTTP_REQ_BE] = "on-backend-http-request",
345 
346 	[SPOE_EV_ON_SERVER_SESS] = "on-server-session",
347 	[SPOE_EV_ON_TCP_RSP]     = "on-tcp-response",
348 	[SPOE_EV_ON_HTTP_RSP]    = "on-http-response",
349 };
350 
351 
352 #if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
353 
354 static const char *spoe_ctx_state_str[SPOE_CTX_ST_ERROR+1] = {
355 	[SPOE_CTX_ST_NONE]         = "NONE",
356 	[SPOE_CTX_ST_READY]        = "READY",
357 	[SPOE_CTX_ST_SENDING_MSGS] = "SENDING_MSGS",
358 	[SPOE_CTX_ST_WAITING_ACK]  = "WAITING_ACK",
359 	[SPOE_CTX_ST_DONE]         = "DONE",
360 	[SPOE_CTX_ST_ERROR]        = "ERROR",
361 };
362 
363 static const char *spoe_appctx_state_str[SPOE_APPCTX_ST_END+1] = {
364 	[SPOE_APPCTX_ST_CONNECT]       = "CONNECT",
365 	[SPOE_APPCTX_ST_CONNECTING]    = "CONNECTING",
366 	[SPOE_APPCTX_ST_PROCESSING]    = "PROCESSING",
367 	[SPOE_APPCTX_ST_DISCONNECT]    = "DISCONNECT",
368 	[SPOE_APPCTX_ST_DISCONNECTING] = "DISCONNECTING",
369 	[SPOE_APPCTX_ST_EXIT]          = "EXIT",
370 	[SPOE_APPCTX_ST_END]           = "END",
371 };
372 
373 #endif
374 /********************************************************************
375  * Functions that encode/decode SPOE frames
376  ********************************************************************/
377 /* Frame Types sent by HAProxy and by agents */
378 enum spoe_frame_type {
379 	/* Frames sent by HAProxy */
380 	SPOE_FRM_T_HAPROXY_HELLO = 1,
381 	SPOE_FRM_T_HAPROXY_DISCON,
382 	SPOE_FRM_T_HAPROXY_NOTIFY,
383 
384 	/* Frames sent by the agents */
385 	SPOE_FRM_T_AGENT_HELLO = 101,
386 	SPOE_FRM_T_AGENT_DISCON,
387 	SPOE_FRM_T_AGENT_ACK
388 };
389 
390 /* All supported data types */
391 enum spoe_data_type {
392 	SPOE_DATA_T_NULL = 0,
393 	SPOE_DATA_T_BOOL,
394 	SPOE_DATA_T_INT32,
395 	SPOE_DATA_T_UINT32,
396 	SPOE_DATA_T_INT64,
397 	SPOE_DATA_T_UINT64,
398 	SPOE_DATA_T_IPV4,
399 	SPOE_DATA_T_IPV6,
400 	SPOE_DATA_T_STR,
401 	SPOE_DATA_T_BIN,
402 	SPOE_DATA_TYPES
403 };
404 
405 /* Masks to get data type or flags value */
406 #define SPOE_DATA_T_MASK  0x0F
407 #define SPOE_DATA_FL_MASK 0xF0
408 
409 /* Flags to set Boolean values */
410 #define SPOE_DATA_FL_FALSE 0x00
411 #define SPOE_DATA_FL_TRUE  0x10
412 
413 /* Helper to get static string length, excluding the terminating null byte */
414 #define SLEN(str) (sizeof(str)-1)
415 
416 /* Predefined key used in HELLO/DISCONNECT frames */
417 #define SUPPORTED_VERSIONS_KEY     "supported-versions"
418 #define VERSION_KEY                "version"
419 #define MAX_FRAME_SIZE_KEY         "max-frame-size"
420 #define CAPABILITIES_KEY           "capabilities"
421 #define HEALTHCHECK_KEY            "healthcheck"
422 #define STATUS_CODE_KEY            "status-code"
423 #define MSG_KEY                    "message"
424 
425 struct spoe_version {
426 	char *str;
427 	int   min;
428 	int   max;
429 };
430 
431 /* All supported versions */
432 static struct spoe_version supported_versions[] = {
433 	{"1.0", 1000, 1000},
434 	{NULL,  0, 0}
435 };
436 
437 /* Comma-separated list of supported versions */
438 #define SUPPORTED_VERSIONS_VAL  "1.0"
439 
440 /* Comma-separated list of supported capabilities (none for now) */
441 #define CAPABILITIES_VAL ""
442 
443 static int
decode_spoe_version(const char * str,size_t len)444 decode_spoe_version(const char *str, size_t len)
445 {
446 	char   tmp[len+1], *start, *end;
447 	double d;
448 	int    vsn = -1;
449 
450 	memset(tmp, 0, len+1);
451 	memcpy(tmp, str, len);
452 
453 	start = tmp;
454 	while (isspace(*start))
455 		start++;
456 
457 	d = strtod(start, &end);
458 	if (d == 0 || start == end)
459 		goto out;
460 
461 	if (*end) {
462 		while (isspace(*end))
463 			end++;
464 		if (*end)
465 			goto out;
466 	}
467 	vsn = (int)(d * 1000);
468   out:
469 	return vsn;
470 }
471 
472 /* Encode a variable-length integer. This function never fails and returns the
473  * number of written bytes. */
474 static int
encode_spoe_varint(uint64_t i,char * buf)475 encode_spoe_varint(uint64_t i, char *buf)
476 {
477 	int idx;
478 
479 	if (i < 240) {
480 		buf[0] = (unsigned char)i;
481 		return 1;
482 	}
483 
484 	buf[0] = (unsigned char)i | 240;
485 	i = (i - 240) >> 4;
486 	for (idx = 1; i >= 128; ++idx) {
487 		buf[idx] = (unsigned char)i | 128;
488 		i = (i - 128) >> 7;
489 	}
490 	buf[idx++] = (unsigned char)i;
491 	return idx;
492 }
493 
494 /* Decode a varable-length integer. If the decoding fails, -1 is returned. This
495  * happens when the buffer's end in reached. On success, the number of read
496  * bytes is returned. */
497 static int
decode_spoe_varint(const char * buf,const char * end,uint64_t * i)498 decode_spoe_varint(const char *buf, const char *end, uint64_t *i)
499 {
500 	unsigned char *msg = (unsigned char *)buf;
501 	int            idx = 0;
502 
503 	if (msg > (unsigned char *)end)
504 		return -1;
505 
506 	if (msg[0] < 240) {
507 		*i = msg[0];
508 		return 1;
509 	}
510 	*i = msg[0];
511 	do {
512 		++idx;
513 		if (msg+idx > (unsigned char *)end)
514 			return -1;
515 		*i += (uint64_t)msg[idx] <<  (4 + 7 * (idx-1));
516 	} while (msg[idx] >= 128);
517 	return (idx + 1);
518 }
519 
520 /* Encode a string. The string will be prefix by its length, encoded as a
521  * variable-length integer. This function never fails and returns the number of
522  * written bytes. */
523 static int
encode_spoe_string(const char * str,size_t len,char * dst)524 encode_spoe_string(const char *str, size_t len, char *dst)
525 {
526 	int idx = 0;
527 
528 	if (!len) {
529 		dst[0] = 0;
530 		return 1;
531 	}
532 
533 	idx += encode_spoe_varint(len, dst);
534 	memcpy(dst+idx, str, len);
535 	return (idx + len);
536 }
537 
538 /* Decode a string. Its length is decoded first as a variable-length integer. If
539  * it succeeds, and if the string length is valid, the begin of the string is
540  * saved in <*str>, its length is saved in <*len> and the total numbre of bytes
541  * read is returned. If an error occurred, -1 is returned and <*str> remains
542  * NULL. */
543 static int
decode_spoe_string(char * buf,char * end,char ** str,uint64_t * len)544 decode_spoe_string(char *buf, char *end, char **str, uint64_t *len)
545 {
546 	int i, idx = 0;
547 
548 	*str = NULL;
549 	*len = 0;
550 
551 	if ((i = decode_spoe_varint(buf, end, len)) == -1)
552 		goto error;
553 	idx += i;
554 	if (buf + idx + *len > end)
555 		goto error;
556 
557 	*str = buf+idx;
558 	return (idx + *len);
559 
560   error:
561 	return -1;
562 }
563 
564 /* Skip a typed data. If an error occurred, -1 is returned, otherwise the number
565  * of bytes read is returned. A types data is composed of a type (1 byte) and
566  * corresponding data:
567  *  - boolean: non additional data (0 bytes)
568  *  - integers: a variable-length integer (see decode_spoe_varint)
569  *  - ipv4: 4 bytes
570  *  - ipv6: 16 bytes
571  *  - binary and string: a buffer prefixed by its size, a variable-length
572  *    integer (see decode_spoe_string) */
573 static int
skip_spoe_data(char * frame,char * end)574 skip_spoe_data(char *frame, char *end)
575 {
576 	uint64_t sz = 0;
577 	int      i, idx = 0;
578 
579 	if (frame > end)
580 		return -1;
581 
582 	switch (frame[idx++] & SPOE_DATA_T_MASK) {
583 		case SPOE_DATA_T_BOOL:
584 			break;
585 		case SPOE_DATA_T_INT32:
586 		case SPOE_DATA_T_INT64:
587 		case SPOE_DATA_T_UINT32:
588 		case SPOE_DATA_T_UINT64:
589 			if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
590 				return -1;
591 			idx += i;
592 			break;
593 		case SPOE_DATA_T_IPV4:
594 			idx += 4;
595 			break;
596 		case SPOE_DATA_T_IPV6:
597 			idx += 16;
598 			break;
599 		case SPOE_DATA_T_STR:
600 		case SPOE_DATA_T_BIN:
601 			if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
602 				return -1;
603 			idx += i + sz;
604 			break;
605 	}
606 
607 	if (frame+idx > end)
608 		return -1;
609 	return idx;
610 }
611 
612 /* Decode a typed data. If an error occurred, -1 is returned, otherwise the
613  * number of read bytes is returned. See skip_spoe_data for details. */
614 static int
decode_spoe_data(char * frame,char * end,struct sample * smp)615 decode_spoe_data(char *frame, char *end, struct sample *smp)
616 {
617 	uint64_t sz = 0;
618 	int      type, i, idx = 0;
619 
620 	if (frame > end)
621 		return -1;
622 
623 	type = frame[idx++];
624 	switch (type & SPOE_DATA_T_MASK) {
625 		case SPOE_DATA_T_BOOL:
626 			smp->data.u.sint = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
627 			smp->data.type = SMP_T_BOOL;
628 			break;
629 		case SPOE_DATA_T_INT32:
630 		case SPOE_DATA_T_INT64:
631 		case SPOE_DATA_T_UINT32:
632 		case SPOE_DATA_T_UINT64:
633 			if ((i = decode_spoe_varint(frame+idx, end, (uint64_t *)&smp->data.u.sint)) == -1)
634 				return -1;
635 			idx += i;
636 			smp->data.type = SMP_T_SINT;
637 			break;
638 		case SPOE_DATA_T_IPV4:
639 			if (frame+idx+4 > end)
640 				return -1;
641 			memcpy(&smp->data.u.ipv4, frame+idx, 4);
642 			smp->data.type = SMP_T_IPV4;
643 			idx += 4;
644 			break;
645 		case SPOE_DATA_T_IPV6:
646 			if (frame+idx+16 > end)
647 				return -1;
648 			memcpy(&smp->data.u.ipv6, frame+idx, 16);
649 			smp->data.type = SMP_T_IPV6;
650 			idx += 16;
651 			break;
652 		case SPOE_DATA_T_STR:
653 			if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
654 				return -1;
655 			idx += i;
656 			if (frame+idx+sz > end)
657 				return -1;
658 			smp->data.u.str.str = frame+idx;
659 			smp->data.u.str.len = sz;
660 			smp->data.type = SMP_T_STR;
661 			idx += sz;
662 			break;
663 		case SPOE_DATA_T_BIN:
664 			if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
665 				return -1;
666 			idx += i;
667 			if (frame+idx+sz > end)
668 				return -1;
669 			smp->data.u.str.str = frame+idx;
670 			smp->data.u.str.len = sz;
671 			smp->data.type = SMP_T_BIN;
672 			idx += sz;
673 			break;
674 	}
675 
676 	if (frame+idx > end)
677 		return -1;
678 	return idx;
679 }
680 
681 /* Skip an action in a frame received from an agent. If an error occurred, -1 is
682  * returned, otherwise the number of read bytes is returned. An action is
683  * composed of the action type followed by a typed data. */
684 static int
skip_spoe_action(char * frame,char * end)685 skip_spoe_action(char *frame, char *end)
686 {
687 	int n, i, idx = 0;
688 
689 	if (frame+2 > end)
690 		return -1;
691 
692 	idx++; /* Skip the action type */
693 	n = frame[idx++];
694 	while (n-- > 0) {
695 		if ((i = skip_spoe_data(frame+idx, end)) == -1)
696 			return -1;
697 		idx += i;
698 	}
699 
700 	if (frame+idx > end)
701 		return -1;
702 	return idx;
703 }
704 
705 /* Encode HELLO frame sent by HAProxy to an agent. It returns the frame size on
706  * success, 0 if the frame can be ignored and -1 if an error occurred. */
707 static int
prepare_spoe_hahello_frame(struct appctx * appctx,char * frame,size_t size)708 prepare_spoe_hahello_frame(struct appctx *appctx, char *frame, size_t size)
709 {
710 	int      idx = 0;
711 	size_t   max = (7   /* TYPE + METADATA */
712 			+ 1 + SLEN(SUPPORTED_VERSIONS_KEY) + 1 + 1 + SLEN(SUPPORTED_VERSIONS_VAL)
713 			+ 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 4
714 			+ 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + SLEN(CAPABILITIES_VAL));
715 
716 	if (size < max)
717 		return -1;
718 
719 	/* Frame type */
720 	frame[idx++] = SPOE_FRM_T_HAPROXY_HELLO;
721 
722 	/* No flags for now */
723 	memset(frame+idx, 0, 4);
724 	idx += 4;
725 
726 	/* No stream-id and frame-id for HELLO frames */
727 	frame[idx++] = 0;
728 	frame[idx++] = 0;
729 
730 	/* There are 3 mandatory items: "supported-versions", "max-frame-size"
731 	 * and "capabilities" */
732 
733 	/* "supported-versions" K/V item */
734 	idx += encode_spoe_string(SUPPORTED_VERSIONS_KEY, SLEN(SUPPORTED_VERSIONS_KEY), frame+idx);
735 	frame[idx++] = SPOE_DATA_T_STR;
736 	idx += encode_spoe_string(SUPPORTED_VERSIONS_VAL, SLEN(SUPPORTED_VERSIONS_VAL), frame+idx);
737 
738 	/* "max-fram-size" K/V item */
739 	idx += encode_spoe_string(MAX_FRAME_SIZE_KEY, SLEN(MAX_FRAME_SIZE_KEY), frame+idx);
740 	frame[idx++] = SPOE_DATA_T_UINT32;
741 	idx += encode_spoe_varint(APPCTX_SPOE(appctx).max_frame_size, frame+idx);
742 
743 	/* "capabilities" K/V item */
744 	idx += encode_spoe_string(CAPABILITIES_KEY, SLEN(CAPABILITIES_KEY), frame+idx);
745 	frame[idx++] = SPOE_DATA_T_STR;
746 	idx += encode_spoe_string(CAPABILITIES_VAL, SLEN(CAPABILITIES_VAL), frame+idx);
747 
748 	return idx;
749 }
750 
751 /* Encode DISCONNECT frame sent by HAProxy to an agent. It returns the frame
752  * size on success, 0 if the frame can be ignored and -1 if an error
753  * occurred. */
754 static int
prepare_spoe_hadiscon_frame(struct appctx * appctx,char * frame,size_t size)755 prepare_spoe_hadiscon_frame(struct appctx *appctx, char *frame, size_t size)
756 {
757 	const char *reason;
758 	int         rlen, idx = 0;
759 	size_t      max = (7   /* TYPE + METADATA */
760 			   + 1 + SLEN(STATUS_CODE_KEY) + 1 + 2
761 			   + 1 + SLEN(MSG_KEY) + 1 + 2 + 255);
762 
763 	if (size < max)
764 		return -1;
765 
766 	/* Get the message corresponding to the status code */
767 	if (spoe_status_code >= SPOE_FRM_ERRS)
768 		spoe_status_code = SPOE_FRM_ERR_UNKNOWN;
769 	reason = spoe_frm_err_reasons[spoe_status_code];
770 	rlen   = strlen(reason);
771 
772 	 /* Frame type */
773 	frame[idx++] = SPOE_FRM_T_HAPROXY_DISCON;
774 
775 	/* No flags for now */
776 	memset(frame+idx, 0, 4);
777 	idx += 4;
778 
779 	/* No stream-id and frame-id for DISCONNECT frames */
780 	frame[idx++] = 0;
781 	frame[idx++] = 0;
782 
783 	/* There are 2 mandatory items: "status-code" and "message" */
784 
785 	/* "status-code" K/V item */
786 	idx += encode_spoe_string(STATUS_CODE_KEY, SLEN(STATUS_CODE_KEY), frame+idx);
787 	frame[idx++] = SPOE_DATA_T_UINT32;
788 	idx += encode_spoe_varint(spoe_status_code, frame+idx);
789 
790 	/* "message" K/V item */
791 	idx += encode_spoe_string(MSG_KEY, SLEN(MSG_KEY), frame+idx);
792 	frame[idx++] = SPOE_DATA_T_STR;
793 	idx += encode_spoe_string(reason, rlen, frame+idx);
794 
795 	return idx;
796 }
797 
798 /* Encode NOTIFY frame sent by HAProxy to an agent. It returns the frame size on
799  * success, 0 if the frame can be ignored and -1 if an error occurred. */
800 static int
prepare_spoe_hanotify_frame(struct appctx * appctx,char * frame,size_t size)801 prepare_spoe_hanotify_frame(struct appctx *appctx, char *frame, size_t size)
802 {
803 	struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
804 	int                  idx = 0;
805 
806 	if (size < APPCTX_SPOE(appctx).max_frame_size)
807 		return -1;
808 
809 	frame[idx++] = SPOE_FRM_T_HAPROXY_NOTIFY;
810 
811 	/* No flags for now */
812 	memset(frame+idx, 0, 4);
813 	idx += 4;
814 
815 	/* Set stream-id and frame-id */
816 	idx += encode_spoe_varint(ctx->stream_id, frame+idx);
817 	idx += encode_spoe_varint(ctx->frame_id, frame+idx);
818 
819 	/* Copy encoded messages */
820 	memcpy(frame+idx, ctx->buffer->p, ctx->buffer->i);
821 	idx += ctx->buffer->i;
822 
823 	return idx;
824 }
825 
826 /* Decode HELLO frame sent by an agent. It returns the number of by read bytes
827  * on success, 0 if the frame can be ignored and -1 if an error occurred. */
828 static int
handle_spoe_agenthello_frame(struct appctx * appctx,char * frame,size_t size)829 handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size)
830 {
831 	int    vsn, max_frame_size;
832 	int    i, idx = 0;
833 	size_t min_size = (7   /* TYPE + METADATA */
834 			   + 1 + SLEN(VERSION_KEY) + 1 + 1 + 3
835 			   + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 1
836 			   + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + 0);
837 
838 	/* Check frame type */
839 	if (frame[idx++] != SPOE_FRM_T_AGENT_HELLO)
840 		return 0;
841 
842 	if (size < min_size) {
843 		spoe_status_code = SPOE_FRM_ERR_INVALID;
844 		return -1;
845 	}
846 
847 	/* Skip flags: fragmentation is not supported for now */
848 	idx += 4;
849 
850 	/* stream-id and frame-id must be cleared */
851 	if (frame[idx] != 0 || frame[idx+1] != 0) {
852 		spoe_status_code = SPOE_FRM_ERR_INVALID;
853 		return -1;
854 	}
855 	idx += 2;
856 
857 	/* There are 3 mandatory items: "version", "max-frame-size" and
858 	 * "capabilities" */
859 
860 	/* Loop on K/V items */
861 	vsn = max_frame_size = 0;
862 	while (idx < size) {
863 		char     *str;
864 		uint64_t  sz;
865 
866 		/* Decode the item key */
867 		idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
868 		if (str == NULL) {
869 			spoe_status_code = SPOE_FRM_ERR_INVALID;
870 			return -1;
871 		}
872 		/* Check "version" K/V item */
873 		if (!memcmp(str, VERSION_KEY, sz)) {
874 			/* The value must be a string */
875 			if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
876 				spoe_status_code = SPOE_FRM_ERR_INVALID;
877 				return -1;
878 			}
879 			idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
880 			if (str == NULL) {
881 				spoe_status_code = SPOE_FRM_ERR_INVALID;
882 				return -1;
883 			}
884 
885 			vsn = decode_spoe_version(str, sz);
886 			if (vsn == -1) {
887 				spoe_status_code = SPOE_FRM_ERR_BAD_VSN;
888 				return -1;
889 			}
890 			for (i = 0; supported_versions[i].str != NULL; ++i) {
891 				if (vsn >= supported_versions[i].min &&
892 				    vsn <= supported_versions[i].max)
893 					break;
894 			}
895 			if (supported_versions[i].str == NULL) {
896 				spoe_status_code = SPOE_FRM_ERR_BAD_VSN;
897 				return -1;
898 			}
899 		}
900 		/* Check "max-frame-size" K/V item */
901 		else if (!memcmp(str, MAX_FRAME_SIZE_KEY, sz)) {
902 			int type;
903 
904 			/* The value must be integer */
905 			type = frame[idx++];
906 			if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
907 			    (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
908 			    (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
909 			    (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
910 				spoe_status_code = SPOE_FRM_ERR_INVALID;
911 				return -1;
912 			}
913 			if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) {
914 				spoe_status_code = SPOE_FRM_ERR_INVALID;
915 				return -1;
916 			}
917 			idx += i;
918 			if (sz < MIN_FRAME_SIZE || sz > APPCTX_SPOE(appctx).max_frame_size) {
919 				spoe_status_code = SPOE_FRM_ERR_BAD_FRAME_SIZE;
920 				return -1;
921 			}
922 			max_frame_size = sz;
923 		}
924 		/* Skip "capabilities" K/V item for now */
925 		else {
926 			/* Silently ignore unknown item */
927 			if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
928 				spoe_status_code = SPOE_FRM_ERR_INVALID;
929 				return -1;
930 			}
931 			idx += i;
932 		}
933 	}
934 
935 	/* Final checks */
936 	if (!vsn) {
937 		spoe_status_code = SPOE_FRM_ERR_NO_VSN;
938 		return -1;
939 	}
940 	if (!max_frame_size) {
941 		spoe_status_code = SPOE_FRM_ERR_NO_FRAME_SIZE;
942 		return -1;
943 	}
944 
945 	APPCTX_SPOE(appctx).version        = (unsigned int)vsn;
946 	APPCTX_SPOE(appctx).max_frame_size = (unsigned int)max_frame_size;
947 	return idx;
948 }
949 
950 /* Decode DISCONNECT frame sent by an agent. It returns the number of by read
951  * bytes on success, 0 if the frame can be ignored and -1 if an error
952  * occurred. */
953 static int
handle_spoe_agentdiscon_frame(struct appctx * appctx,char * frame,size_t size)954 handle_spoe_agentdiscon_frame(struct appctx *appctx, char *frame, size_t size)
955 {
956 	int    i, idx = 0;
957 	size_t min_size = (7   /* TYPE + METADATA */
958 			   + 1 + SLEN(STATUS_CODE_KEY) + 1 + 1
959 			   + 1 + SLEN(MSG_KEY) + 1 + 1);
960 
961 	/* Check frame type */
962 	if (frame[idx++] != SPOE_FRM_T_AGENT_DISCON)
963 		return 0;
964 
965 	if (size < min_size) {
966 		spoe_status_code = SPOE_FRM_ERR_INVALID;
967 		return -1;
968 	}
969 
970 	/* Skip flags: fragmentation is not supported for now */
971 	idx += 4;
972 
973 	/* stream-id and frame-id must be cleared */
974 	if (frame[idx] != 0 || frame[idx+1] != 0) {
975 		spoe_status_code = SPOE_FRM_ERR_INVALID;
976 		return -1;
977 	}
978 	idx += 2;
979 
980 	/* There are 2 mandatory items: "status-code" and "message" */
981 
982 	/* Loop on K/V items */
983 	while (idx < size) {
984 		char     *str;
985 		uint64_t  sz;
986 
987 		/* Decode the item key */
988 		idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
989 		if (str == NULL) {
990 			spoe_status_code = SPOE_FRM_ERR_INVALID;
991 			return -1;
992 		}
993 
994 		/* Check "status-code" K/V item */
995 		if (!memcmp(str, STATUS_CODE_KEY, sz)) {
996 			int type;
997 
998 			/* The value must be an integer */
999 			type = frame[idx++];
1000 			if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
1001 			    (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
1002 			    (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
1003 			    (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
1004 				spoe_status_code = SPOE_FRM_ERR_INVALID;
1005 				return -1;
1006 			}
1007 			if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) {
1008 				spoe_status_code = SPOE_FRM_ERR_INVALID;
1009 				return -1;
1010 			}
1011 			idx += i;
1012 			spoe_status_code = sz;
1013 		}
1014 
1015 		/* Check "message" K/V item */
1016 		else if (sz && !memcmp(str, MSG_KEY, sz)) {
1017 			/* The value must be a string */
1018 			if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
1019 				spoe_status_code = SPOE_FRM_ERR_INVALID;
1020 				return -1;
1021 			}
1022 			idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
1023 			if (str == NULL || sz > 255) {
1024 				spoe_status_code = SPOE_FRM_ERR_INVALID;
1025 				return -1;
1026 			}
1027 			memcpy(spoe_reason, str, sz);
1028 			spoe_reason[sz] = 0;
1029 		}
1030 		else {
1031 			/* Silently ignore unknown item */
1032 			if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
1033 				spoe_status_code = SPOE_FRM_ERR_INVALID;
1034 				return -1;
1035 			}
1036 			idx += i;
1037 		}
1038 	}
1039 
1040 	return idx;
1041 }
1042 
1043 
1044 /* Decode ACK frame sent by an agent. It returns the number of by read bytes on
1045  * success, 0 if the frame can be ignored and -1 if an error occurred. */
1046 static int
handle_spoe_agentack_frame(struct appctx * appctx,char * frame,size_t size)1047 handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size)
1048 {
1049 	struct spoe_context  *ctx = APPCTX_SPOE(appctx).ctx;
1050 	uint64_t              stream_id, frame_id;
1051 	int                   idx = 0, i;
1052 	size_t                min_size = (7  /* TYPE + METADATA */);
1053 
1054 	/* Check frame type */
1055 	if (frame[idx++] != SPOE_FRM_T_AGENT_ACK)
1056 		return 0;
1057 
1058 	if (size < min_size) {
1059 		spoe_status_code = SPOE_FRM_ERR_INVALID;
1060 		return -1;
1061 	}
1062 
1063 	/* Skip flags: fragmentation is not supported for now */
1064 	idx += 4;
1065 
1066 	/* Get the stream-id and the frame-id */
1067 	if ((i = decode_spoe_varint(frame+idx, frame+size, &stream_id)) == -1) {
1068 		spoe_status_code = SPOE_FRM_ERR_INVALID;
1069 		return -1;
1070 	}
1071 	idx += i;
1072 
1073 	if ((i = decode_spoe_varint(frame+idx, frame+size, &frame_id)) == -1) {
1074 		spoe_status_code = SPOE_FRM_ERR_INVALID;
1075 		return -1;
1076 	}
1077 	idx += i;
1078 
1079 	/* Check stream-id and frame-id */
1080 	if (ctx->stream_id != (unsigned int)stream_id ||
1081 	    ctx->frame_id  != (unsigned int)frame_id)
1082 		return 0;
1083 
1084 	/* Copy encoded actions */
1085 	b_reset(ctx->buffer);
1086 	memcpy(ctx->buffer->p, frame+idx, size-idx);
1087 	ctx->buffer->i = size-idx;
1088 
1089 	return idx;
1090 }
1091 
1092 /* This function is used in cfgparse.c and declared in proto/checks.h. It
1093  * prepare the request to send to agents during a healthcheck. It returns 0 on
1094  * success and -1 if an error occurred. */
1095 int
prepare_spoe_healthcheck_request(char ** req,int * len)1096 prepare_spoe_healthcheck_request(char **req, int *len)
1097 {
1098 	struct appctx a;
1099 	char          *frame, buf[global.tune.bufsize];
1100 	unsigned int  framesz;
1101 	int	      idx;
1102 
1103 	memset(&a, 0, sizeof(a));
1104 	memset(buf, 0, sizeof(buf));
1105 	APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize;
1106 
1107 	frame = buf+4;
1108 	idx = prepare_spoe_hahello_frame(&a, frame, global.tune.bufsize-4);
1109 	if (idx <= 0)
1110 		return -1;
1111 	if (idx + SLEN(HEALTHCHECK_KEY) + 1 > global.tune.bufsize-4)
1112 		return -1;
1113 
1114 	/* "healthcheck" K/V item */
1115 	idx += encode_spoe_string(HEALTHCHECK_KEY, SLEN(HEALTHCHECK_KEY), frame+idx);
1116 	frame[idx++] = (SPOE_DATA_T_BOOL | SPOE_DATA_FL_TRUE);
1117 
1118 	framesz = htonl(idx);
1119 	memcpy(buf, (char *)&framesz, 4);
1120 
1121 	if ((*req = malloc(idx+4)) == NULL)
1122 		return -1;
1123 	memcpy(*req, buf, idx+4);
1124 	*len = idx+4;
1125 	return 0;
1126 }
1127 
1128 /* This function is used in checks.c and declared in proto/checks.h. It decode
1129  * the response received from an agent during a healthcheck. It returns 0 on
1130  * success and -1 if an error occurred. */
1131 int
handle_spoe_healthcheck_response(char * frame,size_t size,char * err,int errlen)1132 handle_spoe_healthcheck_response(char *frame, size_t size, char *err, int errlen)
1133 {
1134 	struct appctx a;
1135 	int           r;
1136 
1137 	memset(&a, 0, sizeof(a));
1138 	APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize;
1139 
1140 	if (handle_spoe_agentdiscon_frame(&a, frame, size) != 0)
1141 		goto error;
1142 	if ((r = handle_spoe_agenthello_frame(&a, frame, size)) <= 0) {
1143 		if (r == 0)
1144 			spoe_status_code = SPOE_FRM_ERR_INVALID;
1145 		goto error;
1146 	}
1147 
1148 	return 0;
1149 
1150   error:
1151 	if (spoe_status_code >= SPOE_FRM_ERRS)
1152 		spoe_status_code = SPOE_FRM_ERR_UNKNOWN;
1153 	strncpy(err, spoe_frm_err_reasons[spoe_status_code], errlen);
1154 	return -1;
1155 }
1156 
1157 /********************************************************************
1158  * Functions that manage the SPOE applet
1159  ********************************************************************/
1160 /* Callback function that catches applet timeouts. If a timeout occurred, we set
1161  * <appctx->st1> flag and the SPOE applet is woken up. */
1162 static struct task *
process_spoe_applet(struct task * task)1163 process_spoe_applet(struct task * task)
1164 {
1165 	struct appctx *appctx = task->context;
1166 
1167 	appctx->st1 = SPOE_APPCTX_ERR_NONE;
1168 	if (tick_is_expired(task->expire, now_ms)) {
1169 		task->expire = TICK_ETERNITY;
1170 		appctx->st1 = SPOE_APPCTX_ERR_TOUT;
1171 	}
1172 	si_applet_want_get(appctx->owner);
1173 	appctx_wakeup(appctx);
1174 	return task;
1175 }
1176 
1177 /* Remove a SPOE applet from the agent cache */
1178 static void
remove_spoe_applet_from_cache(struct appctx * appctx)1179 remove_spoe_applet_from_cache(struct appctx *appctx)
1180 {
1181 	struct appctx     *a, *back;
1182 	struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1183 
1184 	if (LIST_ISEMPTY(&agent->cache))
1185 		return;
1186 
1187 	list_for_each_entry_safe(a, back, &agent->cache, ctx.spoe.list) {
1188 		if (a == appctx) {
1189 			LIST_DEL(&APPCTX_SPOE(appctx).list);
1190 			break;
1191 		}
1192 	}
1193 }
1194 
1195 
1196 /* Callback function that releases a SPOE applet. This happens when the
1197  * connection with the agent is closed. */
1198 static void
release_spoe_applet(struct appctx * appctx)1199 release_spoe_applet(struct appctx *appctx)
1200 {
1201 	struct stream_interface *si    = appctx->owner;
1202 	struct spoe_agent       *agent = APPCTX_SPOE(appctx).agent;
1203 	struct spoe_context     *ctx   = APPCTX_SPOE(appctx).ctx;
1204 
1205 	if (appctx->st0 == SPOE_APPCTX_ST_CONNECT ||
1206 	    appctx->st0 == SPOE_APPCTX_ST_CONNECTING)
1207 		on_new_spoe_appctx_failure(agent);
1208 
1209 	if (appctx->st0 != SPOE_APPCTX_ST_END) {
1210 		si_shutw(si);
1211 		si_shutr(si);
1212 		si_ic(si)->flags |= CF_READ_NULL;
1213 		appctx->st0 = SPOE_APPCTX_ST_END;
1214 	}
1215 
1216 	if (ctx != NULL) {
1217 		task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1218 		ctx->appctx = NULL;
1219 	}
1220 
1221 	SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p\n",
1222 		    (int)now.tv_sec, (int)now.tv_usec, agent->id,
1223 		    __FUNCTION__, appctx);
1224 
1225 	/* Release the task attached to the SPOE applet */
1226 	if (APPCTX_SPOE(appctx).task) {
1227 		task_delete(APPCTX_SPOE(appctx).task);
1228 		task_free(APPCTX_SPOE(appctx).task);
1229 	}
1230 
1231 	/* And remove it from the agent cache */
1232 	remove_spoe_applet_from_cache(appctx);
1233 	APPCTX_SPOE(appctx).ctx = NULL;
1234 }
1235 
1236 /* Send a SPOE frame to an agent. It return -2 when an error occurred, -1 when
1237  * the frame can be ignored, 0 to retry later and 1 on success. The frame is
1238  * encoded using the callback function <prepare>. */
1239 static int
send_spoe_frame(struct appctx * appctx,int (* prepare)(struct appctx *,char *,size_t))1240 send_spoe_frame(struct appctx *appctx,
1241 		int (*prepare)(struct appctx *, char *, size_t))
1242 {
1243 	struct stream_interface *si  = appctx->owner;
1244 	int                      framesz, ret;
1245 	uint32_t                 netint;
1246 
1247 	if (si_ic(si)->buf->size == 0)
1248 		return -1;
1249 
1250 	ret = prepare(appctx, trash.str, APPCTX_SPOE(appctx).max_frame_size);
1251 	if (ret <= 0)
1252 		goto skip_or_error;
1253 	framesz = ret;
1254 	netint  = htonl(framesz);
1255 	ret = bi_putblk(si_ic(si), (char *)&netint, sizeof(netint));
1256 	if (ret > 0)
1257 		ret = bi_putblk(si_ic(si), trash.str, framesz);
1258 	if (ret <= 0) {
1259 		if (ret == -1)
1260 			return -1;
1261 		return -2;
1262 	}
1263 	return 1;
1264 
1265  skip_or_error:
1266 	if (!ret)
1267 		return -1;
1268 	return -2;
1269 }
1270 
1271 /* Receive a SPOE frame from an agent. It return -2 when an error occurred, -1
1272  * when the frame can be ignored, 0 to retry later and 1 on success. The frame
1273  * is decoded using the callback function <handle>. */
1274 static int
recv_spoe_frame(struct appctx * appctx,int (* handle)(struct appctx *,char *,size_t))1275 recv_spoe_frame(struct appctx *appctx,
1276 		int (*handle)(struct appctx *, char *, size_t))
1277 {
1278 	struct stream_interface *si  = appctx->owner;
1279 	int                      framesz, ret;
1280 	uint32_t                 netint;
1281 
1282 	ret = bo_getblk(si_oc(si), (char *)&netint, sizeof(netint), 0);
1283 	if (ret <= 0)
1284 		goto empty_or_error;
1285 	framesz = ntohl(netint);
1286 	if (framesz > APPCTX_SPOE(appctx).max_frame_size) {
1287 		spoe_status_code = SPOE_FRM_ERR_TOO_BIG;
1288 		return -2;
1289 	}
1290 
1291 	ret = bo_getblk(si_oc(si), trash.str, framesz, sizeof(netint));
1292 	if (ret <= 0)
1293 		goto empty_or_error;
1294 	bo_skip(si_oc(si), ret+sizeof(netint));
1295 
1296 	/* First check if the received frame is a DISCONNECT frame */
1297 	ret = handle_spoe_agentdiscon_frame(appctx, trash.str, framesz);
1298 	if (ret != 0) {
1299 		if (ret > 0) {
1300 			SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1301 				    " - disconnected by peer (%d): %s\n",
1302 				    (int)now.tv_sec, (int)now.tv_usec,
1303 				    ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1304 				    __FUNCTION__, appctx, spoe_status_code,
1305 				    spoe_reason);
1306 			return 2;
1307 		}
1308 		SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1309 			    " - error on frame (%s)\n",
1310 			    (int)now.tv_sec, (int)now.tv_usec,
1311 			    ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1312 			    __FUNCTION__, appctx,
1313 			    spoe_frm_err_reasons[spoe_status_code]);
1314 		return -2;
1315 	}
1316 	if (handle == NULL)
1317 		goto out;
1318 
1319 	/* If not, try to decode it */
1320 	ret = handle(appctx, trash.str, framesz);
1321 	if (ret <= 0) {
1322 		if (!ret)
1323 			return -1;
1324 		SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1325 			    " - error on frame (%s)\n",
1326 			    (int)now.tv_sec, (int)now.tv_usec,
1327 			    ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1328 			    __FUNCTION__, appctx,
1329 			    spoe_frm_err_reasons[spoe_status_code]);
1330 		return -2;
1331 	}
1332   out:
1333 	return 1;
1334 
1335   empty_or_error:
1336 	if (!ret)
1337 		return 0;
1338 	spoe_status_code = SPOE_FRM_ERR_IO;
1339 	return -2;
1340 }
1341 
1342 /* I/O Handler processing messages exchanged with the agent */
1343 static void
handle_spoe_applet(struct appctx * appctx)1344 handle_spoe_applet(struct appctx *appctx)
1345 {
1346 	struct stream_interface *si    = appctx->owner;
1347 	struct stream           *s     = si_strm(si);
1348 	struct spoe_agent       *agent = APPCTX_SPOE(appctx).agent;
1349 	struct spoe_context     *ctx   = APPCTX_SPOE(appctx).ctx;
1350 	int                      ret;
1351 
1352  switchstate:
1353 	SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1354 		    " - appctx-state=%s\n",
1355 		    (int)now.tv_sec, (int)now.tv_usec, agent->id,
1356 		    __FUNCTION__, appctx, spoe_appctx_state_str[appctx->st0]);
1357 
1358 	switch (appctx->st0) {
1359 		case SPOE_APPCTX_ST_CONNECT:
1360 			spoe_status_code = SPOE_FRM_ERR_NONE;
1361 			if (si->state <= SI_ST_CON) {
1362 				si_applet_want_put(si);
1363 				task_wakeup(s->task, TASK_WOKEN_MSG);
1364 				break;
1365 			}
1366 			else if (si->state != SI_ST_EST) {
1367 				appctx->st0 = SPOE_APPCTX_ST_EXIT;
1368 				on_new_spoe_appctx_failure(agent);
1369 				goto switchstate;
1370 			}
1371 			ret = send_spoe_frame(appctx, &prepare_spoe_hahello_frame);
1372 			if (ret < 0) {
1373 				appctx->st0 = SPOE_APPCTX_ST_EXIT;
1374 				on_new_spoe_appctx_failure(agent);
1375 				goto switchstate;
1376 			}
1377 			else if (!ret)
1378 				goto full;
1379 
1380 			/* Hello frame was sent. Set the hello timeout and
1381 			 * wait for the reply. */
1382 			APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.hello);
1383 			appctx->st0 = SPOE_APPCTX_ST_CONNECTING;
1384 			/* fall through */
1385 
1386 		case SPOE_APPCTX_ST_CONNECTING:
1387 			if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1388 				appctx->st0 = SPOE_APPCTX_ST_EXIT;
1389 				on_new_spoe_appctx_failure(agent);
1390 				goto switchstate;
1391 			}
1392 			if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1393 				SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1394 					    " - Connection timed out\n",
1395 					    (int)now.tv_sec, (int)now.tv_usec,
1396 					    ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1397 					    __FUNCTION__, appctx);
1398 				appctx->st0 = SPOE_APPCTX_ST_EXIT;
1399 				on_new_spoe_appctx_failure(agent);
1400 				goto switchstate;
1401 			}
1402 			ret = recv_spoe_frame(appctx, &handle_spoe_agenthello_frame);
1403 			if (ret < 0) {
1404 				appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1405 				on_new_spoe_appctx_failure(agent);
1406 				goto switchstate;
1407 			}
1408 			if (ret == 2) {
1409 				appctx->st0 = SPOE_APPCTX_ST_EXIT;
1410 				on_new_spoe_appctx_failure(agent);
1411 				goto switchstate;
1412 			}
1413 			if (!ret)
1414 				goto out;
1415 
1416 			/* hello handshake is finished, set the idle timeout,
1417 			 * Add the appctx in the agent cache, decrease the
1418 			 * number of new applets and wake up waiting streams. */
1419 			APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1420 			appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
1421 			on_new_spoe_appctx_success(agent, appctx);
1422 			break;
1423 
1424 		case SPOE_APPCTX_ST_PROCESSING:
1425 			if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1426 				appctx->st0 = SPOE_APPCTX_ST_EXIT;
1427 				goto switchstate;
1428 			}
1429 			if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1430 				spoe_status_code = SPOE_FRM_ERR_TOUT;
1431 				appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1432 				appctx->st1 = SPOE_APPCTX_ERR_NONE;
1433 				goto switchstate;
1434 			}
1435 			if (ctx != NULL && ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
1436 				ret = send_spoe_frame(appctx, &prepare_spoe_hanotify_frame);
1437 				if (ret < 0) {
1438 					if (ret == -1) {
1439 						ctx->state = SPOE_CTX_ST_ERROR;
1440 						task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1441 						goto skip_notify_frame;
1442 					}
1443 					appctx->st0 = SPOE_APPCTX_ST_EXIT;
1444 					goto switchstate;
1445 				}
1446 				else if (!ret)
1447 					goto full;
1448 				ctx->state = SPOE_CTX_ST_WAITING_ACK;
1449 				APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1450 			}
1451 
1452 		  skip_notify_frame:
1453 			if (ctx != NULL && ctx->state == SPOE_CTX_ST_WAITING_ACK) {
1454 				ret = recv_spoe_frame(appctx, &handle_spoe_agentack_frame);
1455 				if (ret < 0) {
1456 					if (ret == -1)
1457 						goto skip_notify_frame;
1458 					ctx->state = SPOE_CTX_ST_ERROR;
1459 					task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1460 					appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1461 					goto switchstate;
1462 				}
1463 				if (!ret)
1464 					goto out;
1465 				if (ret == 2) {
1466 					ctx->state = SPOE_CTX_ST_ERROR;
1467 					task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1468 					appctx->st0 = SPOE_APPCTX_ST_EXIT;
1469 					goto switchstate;
1470 				}
1471 				ctx->state = SPOE_CTX_ST_DONE;
1472 				task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1473 				APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1474 			}
1475 			else {
1476 				if (stopping) {
1477 					appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1478 					goto switchstate;
1479 				}
1480 
1481 				ret = recv_spoe_frame(appctx, NULL);
1482 				if (ret < 0) {
1483 					if (ret == -1)
1484 						goto skip_notify_frame;
1485 					appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1486 					goto switchstate;
1487 				}
1488 				if (!ret)
1489 					goto out;
1490 				if (ret == 2) {
1491 					appctx->st0 = SPOE_APPCTX_ST_EXIT;
1492 					goto switchstate;
1493 				}
1494 				APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1495 			}
1496 			break;
1497 
1498 		case SPOE_APPCTX_ST_DISCONNECT:
1499 			ret = send_spoe_frame(appctx, &prepare_spoe_hadiscon_frame);
1500 			if (ret < 0) {
1501 				appctx->st0 = SPOE_APPCTX_ST_EXIT;
1502 				goto switchstate;
1503 			}
1504 			else if (!ret)
1505 				goto full;
1506 			SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1507 				    " - disconnected by HAProxy (%d): %s\n",
1508 				    (int)now.tv_sec, (int)now.tv_usec,
1509 				    ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1510 				    __FUNCTION__, appctx, spoe_status_code,
1511 				    spoe_frm_err_reasons[spoe_status_code]);
1512 
1513 			APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1514 			appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
1515 			/* fall through */
1516 
1517 		case SPOE_APPCTX_ST_DISCONNECTING:
1518 			if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1519 				appctx->st0 = SPOE_APPCTX_ST_EXIT;
1520 				goto switchstate;
1521 			}
1522 			if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1523 				appctx->st0 = SPOE_APPCTX_ST_EXIT;
1524 				goto switchstate;
1525 			}
1526 			ret = recv_spoe_frame(appctx, NULL);
1527 			if (ret < 0 || ret == 2) {
1528 				appctx->st0 = SPOE_APPCTX_ST_EXIT;
1529 				goto switchstate;
1530 			}
1531 			break;
1532 
1533 		case SPOE_APPCTX_ST_EXIT:
1534 			si_shutw(si);
1535 			si_shutr(si);
1536 			si_ic(si)->flags |= CF_READ_NULL;
1537 			appctx->st0 = SPOE_APPCTX_ST_END;
1538 			APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY;
1539 			/* fall through */
1540 
1541 		case SPOE_APPCTX_ST_END:
1542 			return;
1543 	}
1544 
1545  out:
1546 	if (APPCTX_SPOE(appctx).task->expire != TICK_ETERNITY)
1547 		task_queue(APPCTX_SPOE(appctx).task);
1548 	si_oc(si)->flags |= CF_READ_DONTWAIT;
1549 	task_wakeup(si_strm(si)->task, TASK_WOKEN_IO);
1550 	return;
1551  full:
1552 	si_applet_cant_put(si);
1553 	goto out;
1554 }
1555 
1556 struct applet spoe_applet = {
1557 	.obj_type = OBJ_TYPE_APPLET,
1558 	.name = "<SPOE>", /* used for logging */
1559 	.fct = handle_spoe_applet,
1560 	.release = release_spoe_applet,
1561 };
1562 
1563 /* Create a SPOE applet. On success, the created applet is returned, else
1564  * NULL. */
1565 static struct appctx *
create_spoe_appctx(struct spoe_config * conf)1566 create_spoe_appctx(struct spoe_config *conf)
1567 {
1568 	struct appctx      *appctx;
1569 	struct session     *sess;
1570 	struct task        *task;
1571 	struct stream      *strm;
1572 	struct listener    *l = LIST_NEXT(&conf->agent_fe.conf.listeners,
1573 					  struct listener *, by_fe);
1574 
1575 	if ((appctx = appctx_new(&spoe_applet)) == NULL)
1576 		goto out_error;
1577 
1578 	appctx->st0 = SPOE_APPCTX_ST_CONNECT;
1579 	if ((APPCTX_SPOE(appctx).task = task_new()) == NULL)
1580 		goto out_free_appctx;
1581 	APPCTX_SPOE(appctx).task->process   = process_spoe_applet;
1582 	APPCTX_SPOE(appctx).task->expire    = TICK_ETERNITY;
1583 	APPCTX_SPOE(appctx).task->context   = appctx;
1584 	APPCTX_SPOE(appctx).agent           = conf->agent;
1585 	APPCTX_SPOE(appctx).ctx             = NULL;
1586 	APPCTX_SPOE(appctx).version         = 0;
1587 	APPCTX_SPOE(appctx).max_frame_size  = global.tune.bufsize;
1588 	task_wakeup(APPCTX_SPOE(appctx).task, TASK_WOKEN_INIT);
1589 
1590 	sess = session_new(&conf->agent_fe, l, &appctx->obj_type);
1591 	if (!sess)
1592 		goto out_free_spoe;
1593 
1594 	if ((task = task_new()) == NULL)
1595 		goto out_free_sess;
1596 
1597 	if ((strm = stream_new(sess, task, &appctx->obj_type)) == NULL)
1598 		goto out_free_task;
1599 
1600 	strm->target         = sess->listener->default_target;
1601 	strm->req.analysers |= sess->listener->analysers;
1602 	stream_set_backend(strm, conf->agent->b.be);
1603 
1604 	/* applet is waiting for data */
1605 	si_applet_cant_get(&strm->si[0]);
1606 	appctx_wakeup(appctx);
1607 
1608 	/* Increase the per-process number of cumulated connections */
1609 	if (conf->agent->cps_max > 0)
1610 		update_freq_ctr(&conf->agent->conn_per_sec, 1);
1611 
1612 	strm->do_log = NULL;
1613 	strm->res.flags |= CF_READ_DONTWAIT;
1614 
1615 	conf->agent_fe.feconn++;
1616 	jobs++;
1617 	totalconn++;
1618 
1619 	return appctx;
1620 
1621 	/* Error unrolling */
1622  out_free_task:
1623 	task_free(task);
1624  out_free_sess:
1625 	session_free(sess);
1626  out_free_spoe:
1627 	task_free(APPCTX_SPOE(appctx).task);
1628  out_free_appctx:
1629 	appctx_free(appctx);
1630  out_error:
1631 	return NULL;
1632 }
1633 
1634 /* Wake up a SPOE applet attached to a SPOE context. */
1635 static void
wakeup_spoe_appctx(struct spoe_context * ctx)1636 wakeup_spoe_appctx(struct spoe_context *ctx)
1637 {
1638 	if (ctx->appctx == NULL)
1639 		return;
1640 	if (ctx->appctx->st0 < SPOE_APPCTX_ST_EXIT) {
1641 		si_applet_want_get(ctx->appctx->owner);
1642 		si_applet_want_put(ctx->appctx->owner);
1643 		appctx_wakeup(ctx->appctx);
1644 	}
1645 }
1646 
1647 
1648 /* Run across the list of pending streams waiting for a SPOE applet and wake the
1649  * first. */
1650 static void
offer_spoe_appctx(struct spoe_agent * agent,struct appctx * appctx)1651 offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx)
1652 {
1653 	struct spoe_context *ctx;
1654 
1655 	if  (!appctx || appctx->st0 > SPOE_APPCTX_ST_PROCESSING)
1656 		return;
1657 
1658 	if (LIST_ISEMPTY(&agent->applet_wq))
1659 		LIST_ADD(&agent->cache, &APPCTX_SPOE(appctx).list);
1660 	else {
1661 		ctx = LIST_NEXT(&agent->applet_wq, typeof(ctx), applet_wait);
1662 		APPCTX_SPOE(appctx).ctx = ctx;
1663 		ctx->appctx = appctx;
1664 		LIST_DEL(&ctx->applet_wait);
1665 		LIST_INIT(&ctx->applet_wait);
1666 		task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1667 		SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1668 			    " - wake up stream to get available SPOE applet\n",
1669 			    (int)now.tv_sec, (int)now.tv_usec, agent->id,
1670 			    __FUNCTION__, ctx->strm);
1671 	}
1672 }
1673 
1674 /* A failure occurred during SPOE applet creation. */
1675 static void
on_new_spoe_appctx_failure(struct spoe_agent * agent)1676 on_new_spoe_appctx_failure(struct spoe_agent *agent)
1677 {
1678 	struct spoe_context *ctx;
1679 
1680 	list_for_each_entry(ctx, &agent->applet_wq, applet_wait) {
1681 		task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1682 		SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1683 			    " - wake up stream because to SPOE applet connection failed\n",
1684 			    (int)now.tv_sec, (int)now.tv_usec, agent->id,
1685 			    __FUNCTION__, ctx->strm);
1686 	}
1687 }
1688 
1689 static void
on_new_spoe_appctx_success(struct spoe_agent * agent,struct appctx * appctx)1690 on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx)
1691 {
1692 	offer_spoe_appctx(agent, appctx);
1693 }
1694 /* Retrieve a SPOE applet from the agent cache if possible, else create it. It
1695  * returns 1 on success, 0 to retry later and -1 if an error occurred. */
1696 static int
acquire_spoe_appctx(struct spoe_context * ctx,int dir)1697 acquire_spoe_appctx(struct spoe_context *ctx, int dir)
1698 {
1699 	struct spoe_config *conf = FLT_CONF(ctx->filter);
1700 	struct spoe_agent  *agent = conf->agent;
1701 	struct appctx      *appctx;
1702 
1703 	/* If a process is already started for this SPOE context, retry
1704 	 * later. */
1705 	if (ctx->flags & SPOE_CTX_FL_PROCESS)
1706 		goto wait;
1707 
1708 	/* If needed, initialize the buffer that will be used to encode messages
1709 	 * and decode actions. */
1710 	if (ctx->buffer == &buf_empty) {
1711 		if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) {
1712 			LIST_DEL(&ctx->buffer_wait.list);
1713 			LIST_INIT(&ctx->buffer_wait.list);
1714 		}
1715 
1716 		if (!b_alloc_margin(&ctx->buffer, global.tune.reserved_bufs)) {
1717 			LIST_ADDQ(&buffer_wq, &ctx->buffer_wait.list);
1718 			goto wait;
1719 		}
1720 	}
1721 
1722 	/* If the SPOE applet was already set, all is done. */
1723 	if (ctx->appctx)
1724 		goto success;
1725 
1726 	/* Else try to retrieve it from the agent cache */
1727 	if (!LIST_ISEMPTY(&agent->cache)) {
1728 		appctx = LIST_NEXT(&agent->cache, typeof(appctx), ctx.spoe.list);
1729 		LIST_DEL(&APPCTX_SPOE(appctx).list);
1730 		APPCTX_SPOE(appctx).ctx = ctx;
1731 		ctx->appctx = appctx;
1732 		goto success;
1733 	}
1734 
1735 	/* If there is no server up for the agent's backend, this is an
1736 	 * error. */
1737 	if (!agent->b.be->srv_act && !agent->b.be->srv_bck)
1738 		goto error;
1739 
1740 	SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1741 		    " - waiting for available SPOE appctx\n",
1742 		    (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
1743 		    ctx->strm);
1744 
1745 	/* Else add the stream in the waiting queue. */
1746 	if (LIST_ISEMPTY(&ctx->applet_wait))
1747 		LIST_ADDQ(&agent->applet_wq, &ctx->applet_wait);
1748 
1749 	/* Finally, create new SPOE applet if we can */
1750 	if (agent->cps_max > 0) {
1751 		if (!freq_ctr_remain(&agent->conn_per_sec, agent->cps_max, 0))
1752 			goto wait;
1753 	}
1754 	if (create_spoe_appctx(conf) == NULL)
1755 		goto error;
1756 
1757   wait:
1758 	return 0;
1759 
1760   success:
1761 	/* Remove the stream from the waiting queue */
1762 	if (!LIST_ISEMPTY(&ctx->applet_wait)) {
1763 		LIST_DEL(&ctx->applet_wait);
1764 		LIST_INIT(&ctx->applet_wait);
1765 	}
1766 
1767 	/* Set the right flag to prevent request and response processing
1768 	 * in same time. */
1769 	ctx->flags |= ((dir == SMP_OPT_DIR_REQ)
1770 		       ? SPOE_CTX_FL_REQ_PROCESS
1771 		       : SPOE_CTX_FL_RSP_PROCESS);
1772 
1773 	SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1774 		    " - acquire SPOE appctx %p from cache\n",
1775 		    (int)now.tv_sec, (int)now.tv_usec, agent->id,
1776 		    __FUNCTION__, ctx->strm, ctx->appctx);
1777 	return 1;
1778 
1779   error:
1780 	/* Remove the stream from the waiting queue */
1781 	if (!LIST_ISEMPTY(&ctx->applet_wait)) {
1782 		LIST_DEL(&ctx->applet_wait);
1783 		LIST_INIT(&ctx->applet_wait);
1784 	}
1785 
1786 	SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1787 		    " - failed to acquire SPOE appctx\n",
1788 		    (int)now.tv_sec, (int)now.tv_usec, agent->id,
1789 		    __FUNCTION__, ctx->strm);
1790 	send_log(ctx->strm->be, LOG_WARNING, "failed to acquire SPOE applet.\n");
1791 
1792 	return -1;
1793 }
1794 
1795 /* Release a SPOE applet and push it in the agent cache. */
1796 static void
release_spoe_appctx(struct spoe_context * ctx)1797 release_spoe_appctx(struct spoe_context *ctx)
1798 {
1799 	struct spoe_config *conf = FLT_CONF(ctx->filter);
1800 	struct spoe_agent  *agent = conf->agent;
1801 	struct appctx      *appctx = ctx->appctx;
1802 
1803 	/* Reset the flag to allow next processing */
1804 	ctx->flags &= ~SPOE_CTX_FL_PROCESS;
1805 
1806 	/* Reset processing timer */
1807 	ctx->process_exp = TICK_ETERNITY;
1808 
1809 	/* Release the buffer if needed */
1810 	if (ctx->buffer != &buf_empty) {
1811 		b_free(&ctx->buffer);
1812 		offer_buffers(ctx, tasks_run_queue + applets_active_queue);
1813 	}
1814 
1815 	/* If there is no SPOE applet, all is done */
1816 	if (!appctx)
1817 		return;
1818 
1819 	/* Else, reassign it or push it in the agent cache */
1820 	SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1821 		    " - release SPOE appctx %p\n",
1822 		    (int)now.tv_sec, (int)now.tv_usec, agent->id,
1823 		    __FUNCTION__, ctx->strm, appctx);
1824 
1825 	APPCTX_SPOE(appctx).ctx = NULL;
1826 	ctx->appctx = NULL;
1827 	offer_spoe_appctx(agent, appctx);
1828 }
1829 
1830 /***************************************************************************
1831  * Functions that process SPOE messages and actions
1832  **************************************************************************/
1833 /* Process SPOE messages for a specific event. During the processing, it returns
1834  * 0 and it returns 1 when the processing is finished. If an error occurred, -1
1835  * is returned. */
1836 static int
process_spoe_messages(struct stream * s,struct spoe_context * ctx,struct list * messages,int dir)1837 process_spoe_messages(struct stream *s, struct spoe_context *ctx,
1838 		      struct list *messages, int dir)
1839 {
1840 	struct spoe_message *msg;
1841 	struct sample       *smp;
1842 	struct spoe_arg     *arg;
1843 	char    *p;
1844 	size_t  max_size;
1845 	int     off, flag, idx = 0;
1846 
1847 	/* Reserve 32 bytes from the frame Metadata */
1848 	max_size = APPCTX_SPOE(ctx->appctx).max_frame_size - 32;
1849 
1850 	b_reset(ctx->buffer);
1851 	p = ctx->buffer->p;
1852 
1853 	/* Loop on messages */
1854 	list_for_each_entry(msg, messages, list) {
1855 		if (idx + msg->id_len + 1 > max_size)
1856 			goto skip;
1857 
1858 		/* Set the message name */
1859 		idx += encode_spoe_string(msg->id, msg->id_len, p+idx);
1860 
1861 		/* Save offset where to store the number of arguments for this
1862 		 * message */
1863 		off = idx++;
1864 		p[off] = 0;
1865 
1866 		/* Loop on arguments */
1867 		list_for_each_entry(arg, &msg->args, list) {
1868 			p[off]++; /* Increment the number of arguments */
1869 
1870 			if (idx + arg->name_len + 1 > max_size)
1871 				goto skip;
1872 
1873 			/* Encode the arguement name as a string. It can by NULL */
1874 			idx += encode_spoe_string(arg->name, arg->name_len, p+idx);
1875 
1876 			/* Fetch the arguement value */
1877 			smp = sample_process(s->be, s->sess, s, dir|SMP_OPT_FINAL, arg->expr, NULL);
1878 			if (!smp) {
1879 				/* If no value is available, set it to NULL */
1880 				p[idx++] = SPOE_DATA_T_NULL;
1881 				continue;
1882 			}
1883 
1884 			/* Else, encode the arguement value */
1885 			switch (smp->data.type) {
1886 				case SMP_T_BOOL:
1887 					flag = ((!smp->data.u.sint) ? SPOE_DATA_FL_FALSE : SPOE_DATA_FL_TRUE);
1888 					p[idx++] = (SPOE_DATA_T_BOOL | flag);
1889 					break;
1890 				case SMP_T_SINT:
1891 					p[idx++] = SPOE_DATA_T_INT64;
1892 					if (idx + 8 > max_size)
1893 						goto skip;
1894 					idx += encode_spoe_varint(smp->data.u.sint, p+idx);
1895 					break;
1896 				case SMP_T_IPV4:
1897 					p[idx++] = SPOE_DATA_T_IPV4;
1898 					if (idx + 4 > max_size)
1899 						goto skip;
1900 					memcpy(p+idx, &smp->data.u.ipv4, 4);
1901 					idx += 4;
1902 					break;
1903 				case SMP_T_IPV6:
1904 					p[idx++] = SPOE_DATA_T_IPV6;
1905 					if (idx + 16 > max_size)
1906 						goto skip;
1907 					memcpy(p+idx, &smp->data.u.ipv6, 16);
1908 					idx += 16;
1909 					break;
1910 				case SMP_T_STR:
1911 					p[idx++] = SPOE_DATA_T_STR;
1912 					if (idx + smp->data.u.str.len > max_size)
1913 						goto skip;
1914 					idx += encode_spoe_string(smp->data.u.str.str,
1915 								  smp->data.u.str.len,
1916 								  p+idx);
1917 					break;
1918 				case SMP_T_BIN:
1919 					p[idx++] = SPOE_DATA_T_BIN;
1920 					if (idx + smp->data.u.str.len > max_size)
1921 						goto skip;
1922 					idx += encode_spoe_string(smp->data.u.str.str,
1923 								  smp->data.u.str.len,
1924 								  p+idx);
1925 					break;
1926 				case SMP_T_METH:
1927 					if (smp->data.u.meth.meth == HTTP_METH_OTHER) {
1928 						p[idx++] = SPOE_DATA_T_STR;
1929 						if (idx + http_known_methods[smp->data.u.meth.meth].len > max_size)
1930 							goto skip;
1931 						idx += encode_spoe_string(http_known_methods[smp->data.u.meth.meth].name,
1932 									  http_known_methods[smp->data.u.meth.meth].len,
1933 									  p+idx);
1934 					}
1935 					else {
1936 						p[idx++] = SPOE_DATA_T_STR;
1937 						if (idx + smp->data.u.str.len > max_size)
1938 							goto skip;
1939 						idx += encode_spoe_string(smp->data.u.meth.str.str,
1940 									  smp->data.u.meth.str.len,
1941 									  p+idx);
1942 					}
1943 					break;
1944 				default:
1945 					p[idx++] = SPOE_DATA_T_NULL;
1946 			}
1947 		}
1948 	}
1949 	ctx->buffer->i = idx;
1950 	return 1;
1951 
1952   skip:
1953 	b_reset(ctx->buffer);
1954 	return 0;
1955 }
1956 
1957 /* Helper function to set a variable */
1958 static void
set_spoe_var(struct spoe_context * ctx,char * scope,char * name,int len,struct sample * smp)1959 set_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
1960 	     struct sample *smp)
1961 {
1962 	struct spoe_config *conf = FLT_CONF(ctx->filter);
1963 	struct spoe_agent  *agent = conf->agent;
1964 	char                varname[64];
1965 
1966 	memset(varname, 0, sizeof(varname));
1967 	len = snprintf(varname, sizeof(varname), "%s.%s.%.*s",
1968 		       scope, agent->var_pfx, len, name);
1969 	vars_set_by_name_ifexist(varname, len, smp);
1970 }
1971 
1972 /* Helper function to unset a variable */
1973 static void
unset_spoe_var(struct spoe_context * ctx,char * scope,char * name,int len,struct sample * smp)1974 unset_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
1975 	       struct sample *smp)
1976 {
1977 	struct spoe_config *conf = FLT_CONF(ctx->filter);
1978 	struct spoe_agent  *agent = conf->agent;
1979 	char                varname[64];
1980 
1981 	memset(varname, 0, sizeof(varname));
1982 	len = snprintf(varname, sizeof(varname), "%s.%s.%.*s",
1983 		       scope, agent->var_pfx, len, name);
1984 	vars_unset_by_name_ifexist(varname, len, smp);
1985 }
1986 
1987 
1988 /* Process SPOE actions for a specific event. During the processing, it returns
1989  * 0 and it returns 1 when the processing is finished. If an error occurred, -1
1990  * is returned. */
1991 static int
process_spoe_actions(struct stream * s,struct spoe_context * ctx,enum spoe_event ev,int dir)1992 process_spoe_actions(struct stream *s, struct spoe_context *ctx,
1993 		     enum spoe_event ev, int dir)
1994 {
1995 	char  *p;
1996 	size_t size;
1997 	int    off, i, idx = 0;
1998 
1999 	p    = ctx->buffer->p;
2000 	size = ctx->buffer->i;
2001 
2002 	while (idx < size)  {
2003 		char                 *str;
2004 		uint64_t              sz;
2005 		struct sample         smp;
2006 		enum spoe_action_type type;
2007 
2008 		off = idx;
2009 		if (idx+2 > size)
2010 			goto skip;
2011 
2012 		type = p[idx++];
2013 		switch (type) {
2014 			case SPOE_ACT_T_SET_VAR: {
2015 				char *scope;
2016 
2017 				if (p[idx++] != 3)
2018 					goto skip_action;
2019 
2020 				switch (p[idx++]) {
2021 					case SPOE_SCOPE_PROC: scope = "proc"; break;
2022 					case SPOE_SCOPE_SESS: scope = "sess"; break;
2023 					case SPOE_SCOPE_TXN : scope = "txn";  break;
2024 					case SPOE_SCOPE_REQ : scope = "req";  break;
2025 					case SPOE_SCOPE_RES : scope = "res";  break;
2026 					default: goto skip;
2027 				}
2028 
2029 				idx += decode_spoe_string(p+idx, p+size, &str, &sz);
2030 				if (str == NULL)
2031 					goto skip;
2032 				memset(&smp, 0, sizeof(smp));
2033 				smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
2034 
2035 				if ((i = decode_spoe_data(p+idx, p+size, &smp)) == -1)
2036 					goto skip;
2037 				idx += i;
2038 
2039 				SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2040 					    " - set-var '%s.%s.%.*s'\n",
2041 					    (int)now.tv_sec, (int)now.tv_usec,
2042 					    ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
2043 					    __FUNCTION__, s, scope,
2044 					    ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
2045 					    (int)sz, str);
2046 
2047 				set_spoe_var(ctx, scope, str, sz, &smp);
2048 				break;
2049 			}
2050 
2051 			case SPOE_ACT_T_UNSET_VAR: {
2052 				char *scope;
2053 
2054 				if (p[idx++] != 2)
2055 					goto skip_action;
2056 
2057 				switch (p[idx++]) {
2058 					case SPOE_SCOPE_PROC: scope = "proc"; break;
2059 					case SPOE_SCOPE_SESS: scope = "sess"; break;
2060 					case SPOE_SCOPE_TXN : scope = "txn";  break;
2061 					case SPOE_SCOPE_REQ : scope = "req";  break;
2062 					case SPOE_SCOPE_RES : scope = "res";  break;
2063 					default: goto skip;
2064 				}
2065 
2066 				idx += decode_spoe_string(p+idx, p+size, &str, &sz);
2067 				if (str == NULL)
2068 					goto skip;
2069 				memset(&smp, 0, sizeof(smp));
2070 				smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
2071 
2072 				SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2073 					    " - unset-var '%s.%s.%.*s'\n",
2074 					    (int)now.tv_sec, (int)now.tv_usec,
2075 					    ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
2076 					    __FUNCTION__, s, scope,
2077 					    ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
2078 					    (int)sz, str);
2079 
2080 				unset_spoe_var(ctx, scope, str, sz, &smp);
2081 				break;
2082 			}
2083 
2084 			default:
2085 			  skip_action:
2086 				if ((i = skip_spoe_action(p+off, p+size)) == -1)
2087 					goto skip;
2088 				idx += i;
2089 		}
2090 	}
2091 
2092 	return 1;
2093   skip:
2094 	return 0;
2095 }
2096 
2097 
2098 /* Process a SPOE event. First, this functions will process messages attached to
2099  * this event and send them to an agent in a NOTIFY frame. Then, it will wait a
2100  * ACK frame to process corresponding actions. During all the processing, it
2101  * returns 0 and it returns 1 when the processing is finished. If an error
2102  * occurred, -1 is returned. */
2103 static int
process_spoe_event(struct stream * s,struct spoe_context * ctx,enum spoe_event ev)2104 process_spoe_event(struct stream *s, struct spoe_context *ctx,
2105 		   enum spoe_event ev)
2106 {
2107 	struct spoe_config *conf = FLT_CONF(ctx->filter);
2108 	struct spoe_agent  *agent = conf->agent;
2109 	int                 dir, ret = 1;
2110 
2111 	SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2112 		    " - ctx-state=%s - event=%s\n",
2113 		    (int)now.tv_sec, (int)now.tv_usec,
2114 		    agent->id, __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
2115 		    spoe_event_str[ev]);
2116 
2117 	if (agent->eps_max > 0) {
2118 		if (!freq_ctr_remain(&agent->err_per_sec, agent->eps_max, 0)) {
2119 			SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2120 				    " - skip event '%s': max EPS reached\n",
2121 				    (int)now.tv_sec, (int)now.tv_usec,
2122 				    agent->id, __FUNCTION__, s, spoe_event_str[ev]);
2123 			goto skip;
2124 		}
2125 	}
2126 
2127 	dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES);
2128 
2129 	if (LIST_ISEMPTY(&(ctx->messages[ev])))
2130 		goto out;
2131 
2132 	if (ctx->state == SPOE_CTX_ST_ERROR)
2133 		goto error;
2134 
2135 	if (tick_is_expired(ctx->process_exp, now_ms) && ctx->state != SPOE_CTX_ST_DONE) {
2136 		SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2137 			    " - failed to process event '%s': timeout\n",
2138 			    (int)now.tv_sec, (int)now.tv_usec,
2139 			    agent->id, __FUNCTION__, s, spoe_event_str[ev]);
2140 		send_log(ctx->strm->be, LOG_WARNING,
2141 			 "failed to process event '%s': timeout.\n",
2142 			 spoe_event_str[ev]);
2143 		goto error;
2144 	}
2145 
2146 	if (ctx->state == SPOE_CTX_ST_READY) {
2147 		if (!tick_isset(ctx->process_exp)) {
2148 			ctx->process_exp = tick_add_ifset(now_ms, agent->timeout.processing);
2149 			s->task->expire  = tick_first((tick_is_expired(s->task->expire, now_ms) ? 0 : s->task->expire),
2150 						      ctx->process_exp);
2151 		}
2152 
2153 		ret = acquire_spoe_appctx(ctx, dir);
2154 		if (ret <= 0) {
2155 			if (!ret)
2156 				goto out;
2157 			goto error;
2158 		}
2159 		ctx->state = SPOE_CTX_ST_SENDING_MSGS;
2160 	}
2161 
2162 	if (ctx->appctx == NULL)
2163 		goto error;
2164 
2165 	if (ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
2166 		ret = process_spoe_messages(s, ctx, &(ctx->messages[ev]), dir);
2167 		if (ret <= 0) {
2168 			if (!ret)
2169 				goto skip;
2170 			goto error;
2171 		}
2172 		wakeup_spoe_appctx(ctx);
2173 		ret = 0;
2174 		goto out;
2175 	}
2176 
2177 	if (ctx->state == SPOE_CTX_ST_WAITING_ACK) {
2178 		wakeup_spoe_appctx(ctx);
2179 		ret = 0;
2180 		goto out;
2181 	}
2182 
2183 	if (ctx->state == SPOE_CTX_ST_DONE) {
2184 		ret = process_spoe_actions(s, ctx, ev, dir);
2185 		if (ret <= 0) {
2186 			if (!ret)
2187 				goto skip;
2188 			goto error;
2189 		}
2190 		ctx->frame_id++;
2191 		release_spoe_appctx(ctx);
2192 		ctx->state = SPOE_CTX_ST_READY;
2193 	}
2194 
2195   out:
2196 	return ret;
2197 
2198   skip:
2199 	release_spoe_appctx(ctx);
2200 	ctx->state = SPOE_CTX_ST_READY;
2201 	return 1;
2202 
2203   error:
2204 	if (agent->eps_max > 0)
2205 		update_freq_ctr(&agent->err_per_sec, 1);
2206 
2207 	if (agent->var_on_error) {
2208 		struct sample smp;
2209 
2210 		memset(&smp, 0, sizeof(smp));
2211 		smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
2212 		smp.data.u.sint = 1;
2213 		smp.data.type   = SMP_T_BOOL;
2214 
2215 		set_spoe_var(ctx, "txn", agent->var_on_error,
2216 			     strlen(agent->var_on_error), &smp);
2217 	}
2218 
2219 	release_spoe_appctx(ctx);
2220 	ctx->state = ((agent->flags & SPOE_FL_CONT_ON_ERR)
2221 		      ? SPOE_CTX_ST_READY
2222 		      : SPOE_CTX_ST_ERROR);
2223 	return 1;
2224 }
2225 
2226 
2227 /***************************************************************************
2228  * Functions that create/destroy SPOE contexts
2229  **************************************************************************/
wakeup_spoe_context(struct spoe_context * ctx)2230 static int wakeup_spoe_context(struct spoe_context *ctx)
2231 {
2232 	task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
2233 	return 1;
2234 }
2235 
2236 static struct spoe_context *
create_spoe_context(struct filter * filter)2237 create_spoe_context(struct filter *filter)
2238 {
2239 	struct spoe_config  *conf = FLT_CONF(filter);
2240 	struct spoe_context *ctx;
2241 
2242 	ctx = pool_alloc_dirty(pool2_spoe_ctx);
2243 	if (ctx == NULL) {
2244 		return NULL;
2245 	}
2246 	memset(ctx, 0, sizeof(*ctx));
2247 	ctx->filter   = filter;
2248 	ctx->state    = SPOE_CTX_ST_NONE;
2249 	ctx->flags    = 0;
2250 	ctx->messages = conf->agent->messages;
2251 	ctx->buffer   = &buf_empty;
2252 	LIST_INIT(&ctx->buffer_wait.list);
2253 	ctx->buffer_wait.target = ctx;
2254 	ctx->buffer_wait.wakeup_cb = (int (*)(void *))wakeup_spoe_context;
2255 	LIST_INIT(&ctx->applet_wait);
2256 
2257 	ctx->stream_id   = 0;
2258 	ctx->frame_id    = 1;
2259 	ctx->process_exp = TICK_ETERNITY;
2260 
2261 	return ctx;
2262 }
2263 
2264 static void
destroy_spoe_context(struct spoe_context * ctx)2265 destroy_spoe_context(struct spoe_context *ctx)
2266 {
2267 	if (!ctx)
2268 		return;
2269 
2270 	if (ctx->appctx)
2271 		APPCTX_SPOE(ctx->appctx).ctx = NULL;
2272 	if (!LIST_ISEMPTY(&ctx->buffer_wait.list))
2273 		LIST_DEL(&ctx->buffer_wait.list);
2274 	if (!LIST_ISEMPTY(&ctx->applet_wait))
2275 		LIST_DEL(&ctx->applet_wait);
2276 	pool_free2(pool2_spoe_ctx, ctx);
2277 }
2278 
2279 static void
reset_spoe_context(struct spoe_context * ctx)2280 reset_spoe_context(struct spoe_context *ctx)
2281 {
2282 	ctx->state  = SPOE_CTX_ST_READY;
2283 	ctx->flags &= ~SPOE_CTX_FL_PROCESS;
2284 }
2285 
2286 
2287 /***************************************************************************
2288  * Hooks that manage the filter lifecycle (init/check/deinit)
2289  **************************************************************************/
2290 /* Signal handler: Do a soft stop, wakeup SPOE applet */
2291 static void
sig_stop_spoe(struct sig_handler * sh)2292 sig_stop_spoe(struct sig_handler *sh)
2293 {
2294 	struct proxy *p;
2295 
2296 	p = proxy;
2297 	while (p) {
2298 		struct flt_conf *fconf;
2299 
2300 		list_for_each_entry(fconf, &p->filter_configs, list) {
2301 			struct spoe_config *conf;
2302 			struct spoe_agent  *agent;
2303 			struct appctx      *appctx;
2304 
2305 			if (fconf->id != spoe_filter_id)
2306 				continue;
2307 
2308 			conf  = fconf->conf;
2309 			agent = conf->agent;
2310 
2311 			list_for_each_entry(appctx, &agent->cache, ctx.spoe.list) {
2312 				si_applet_want_get(appctx->owner);
2313 				si_applet_want_put(appctx->owner);
2314 				appctx_wakeup(appctx);
2315 			}
2316 		}
2317 		p = p->next;
2318 	}
2319 }
2320 
2321 
2322 /* Initialize the SPOE filter. Returns -1 on error, else 0. */
2323 static int
spoe_init(struct proxy * px,struct flt_conf * fconf)2324 spoe_init(struct proxy *px, struct flt_conf *fconf)
2325 {
2326 	struct spoe_config *conf = fconf->conf;
2327 	struct listener   *l;
2328 
2329         memset(&conf->agent_fe, 0, sizeof(conf->agent_fe));
2330         init_new_proxy(&conf->agent_fe);
2331         conf->agent_fe.parent = conf->agent;
2332         conf->agent_fe.last_change = now.tv_sec;
2333         conf->agent_fe.id = conf->agent->id;
2334         conf->agent_fe.cap = PR_CAP_FE;
2335         conf->agent_fe.mode = PR_MODE_TCP;
2336         conf->agent_fe.maxconn = 0;
2337         conf->agent_fe.options2 |= PR_O2_INDEPSTR;
2338         conf->agent_fe.conn_retries = CONN_RETRIES;
2339         conf->agent_fe.accept = frontend_accept;
2340         conf->agent_fe.srv = NULL;
2341         conf->agent_fe.timeout.client = TICK_ETERNITY;
2342 	conf->agent_fe.default_target = &spoe_applet.obj_type;
2343 	conf->agent_fe.fe_req_ana = AN_REQ_SWITCHING_RULES;
2344 
2345 	if ((l = calloc(1, sizeof(*l))) == NULL) {
2346 		Alert("spoe_init : out of memory.\n");
2347 		goto out_error;
2348 	}
2349 	l->obj_type = OBJ_TYPE_LISTENER;
2350         l->obj_type  = OBJ_TYPE_LISTENER;
2351         l->frontend  = &conf->agent_fe;
2352         l->state     = LI_READY;
2353         l->analysers = conf->agent_fe.fe_req_ana;
2354 	LIST_ADDQ(&conf->agent_fe.conf.listeners, &l->by_fe);
2355 
2356 	if (!sighandler_registered) {
2357 		signal_register_fct(0, sig_stop_spoe, 0);
2358 		sighandler_registered = 1;
2359 	}
2360 
2361 	return 0;
2362 
2363  out_error:
2364 	return -1;
2365 }
2366 
2367 /* Free ressources allocated by the SPOE filter. */
2368 static void
spoe_deinit(struct proxy * px,struct flt_conf * fconf)2369 spoe_deinit(struct proxy *px, struct flt_conf *fconf)
2370 {
2371 	struct spoe_config *conf = fconf->conf;
2372 
2373 	if (conf) {
2374 		struct spoe_agent *agent = conf->agent;
2375 		struct listener   *l = LIST_NEXT(&conf->agent_fe.conf.listeners,
2376 						 struct listener *, by_fe);
2377 
2378 		free(l);
2379 		release_spoe_agent(agent);
2380 		free(conf);
2381 	}
2382 	fconf->conf = NULL;
2383 }
2384 
2385 /* Check configuration of a SPOE filter for a specified proxy.
2386  * Return 1 on error, else 0. */
2387 static int
spoe_check(struct proxy * px,struct flt_conf * fconf)2388 spoe_check(struct proxy *px, struct flt_conf *fconf)
2389 {
2390 	struct spoe_config *conf = fconf->conf;
2391 	struct proxy       *target;
2392 
2393 	target = proxy_be_by_name(conf->agent->b.name);
2394 	if (target == NULL) {
2395 		Alert("Proxy %s : unknown backend '%s' used by SPOE agent '%s'"
2396 		      " declared at %s:%d.\n",
2397 		      px->id, conf->agent->b.name, conf->agent->id,
2398 		      conf->agent->conf.file, conf->agent->conf.line);
2399 		return 1;
2400 	}
2401 	if (target->mode != PR_MODE_TCP) {
2402 		Alert("Proxy %s : backend '%s' used by SPOE agent '%s' declared"
2403 		      " at %s:%d does not support HTTP mode.\n",
2404 		      px->id, target->id, conf->agent->id,
2405 		      conf->agent->conf.file, conf->agent->conf.line);
2406 		return 1;
2407 	}
2408 
2409 	if (px->bind_proc & ~target->bind_proc) {
2410 		Alert("Proxy %s : backend '%s' used by SPOE agent '%s' declared"
2411 		      " at %s:%d does not cover all of its processes.\n",
2412 		      px->id, target->id, conf->agent->id,
2413 		      conf->agent->conf.file, conf->agent->conf.line);
2414 		return 1;
2415 	}
2416 
2417 	free(conf->agent->b.name);
2418 	conf->agent->b.name = NULL;
2419 	conf->agent->b.be = target;
2420 	return 0;
2421 }
2422 
2423 /**************************************************************************
2424  * Hooks attached to a stream
2425  *************************************************************************/
2426 /* Called when a filter instance is created and attach to a stream. It creates
2427  * the context that will be used to process this stream. */
2428 static int
spoe_start(struct stream * s,struct filter * filter)2429 spoe_start(struct stream *s, struct filter *filter)
2430 {
2431 	struct spoe_context *ctx;
2432 
2433 	SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
2434 		    (int)now.tv_sec, (int)now.tv_usec,
2435 		    ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2436 		    __FUNCTION__, s);
2437 
2438 	ctx = create_spoe_context(filter);
2439 	if (ctx == NULL) {
2440 		send_log(s->be, LOG_EMERG,
2441 			 "failed to create SPOE context for proxy %s\n",
2442 			 s->be->id);
2443 		return 0;
2444 	}
2445 
2446 	ctx->strm   = s;
2447 	ctx->state  = SPOE_CTX_ST_READY;
2448 	filter->ctx = ctx;
2449 
2450 	if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_REQ_FE]))
2451 		filter->pre_analyzers |= AN_REQ_INSPECT_FE;
2452 
2453 	if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_REQ_BE]))
2454 		filter->pre_analyzers |= AN_REQ_INSPECT_BE;
2455 
2456 	if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_RSP]))
2457 		filter->pre_analyzers |= AN_RES_INSPECT;
2458 
2459 	if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_REQ_FE]))
2460 		filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_FE;
2461 
2462 	if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_REQ_BE]))
2463 		filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_BE;
2464 
2465 	if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_RSP]))
2466 		filter->pre_analyzers |= AN_RES_HTTP_PROCESS_FE;
2467 
2468 	return 1;
2469 }
2470 
2471 /* Called when a filter instance is detached from a stream. It release the
2472  * attached SPOE context. */
2473 static void
spoe_stop(struct stream * s,struct filter * filter)2474 spoe_stop(struct stream *s, struct filter *filter)
2475 {
2476 	struct spoe_context *ctx = filter->ctx;
2477 
2478 	SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
2479 		    (int)now.tv_sec, (int)now.tv_usec,
2480 		    ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2481 		    __FUNCTION__, s);
2482 
2483 	if (ctx) {
2484 		release_spoe_appctx(ctx);
2485 		destroy_spoe_context(ctx);
2486 	}
2487 }
2488 
2489 
2490 /*
2491  * Called when the stream is woken up because of expired timer.
2492  */
2493 static void
spoe_check_timeouts(struct stream * s,struct filter * filter)2494 spoe_check_timeouts(struct stream *s, struct filter *filter)
2495 {
2496 	struct spoe_context *ctx = filter->ctx;
2497 
2498 	if (tick_is_expired(ctx->process_exp, now_ms)) {
2499 		s->pending_events |= TASK_WOKEN_MSG;
2500 		if (ctx->buffer != &buf_empty) {
2501 			b_free(&ctx->buffer);
2502 			offer_buffers(ctx, tasks_run_queue + applets_active_queue);
2503 		}
2504 	}
2505 }
2506 
2507 /* Called when we are ready to filter data on a channel */
2508 static int
spoe_start_analyze(struct stream * s,struct filter * filter,struct channel * chn)2509 spoe_start_analyze(struct stream *s, struct filter *filter, struct channel *chn)
2510 {
2511 	struct spoe_context *ctx = filter->ctx;
2512 	int                  ret = 1;
2513 
2514 	SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2515 		    " - ctx-flags=0x%08x\n",
2516 		    (int)now.tv_sec, (int)now.tv_usec,
2517 		    ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2518 		    __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
2519 
2520 	if (!(chn->flags & CF_ISRESP)) {
2521 		if (filter->pre_analyzers & AN_REQ_INSPECT_FE)
2522 			chn->analysers |= AN_REQ_INSPECT_FE;
2523 		if (filter->pre_analyzers & AN_REQ_INSPECT_BE)
2524 			chn->analysers |= AN_REQ_INSPECT_BE;
2525 
2526 		if (ctx->flags & SPOE_CTX_FL_CLI_CONNECTED)
2527 			goto out;
2528 
2529 		ctx->stream_id = s->uniq_id;
2530 		if (ctx->state != SPOE_CTX_ST_NONE && ctx->state != SPOE_CTX_ST_ERROR) {
2531 			ret = process_spoe_event(s, ctx, SPOE_EV_ON_CLIENT_SESS);
2532 			if (ret != 1)
2533 				goto out;
2534 		}
2535 		ctx->flags |= SPOE_CTX_FL_CLI_CONNECTED;
2536 	}
2537 	else {
2538 		if (filter->pre_analyzers & AN_RES_INSPECT)
2539 			chn->analysers |= AN_RES_INSPECT;
2540 
2541 		if (ctx->flags & SPOE_CTX_FL_SRV_CONNECTED)
2542 			goto out;
2543 
2544 		if (ctx->state != SPOE_CTX_ST_NONE && ctx->state != SPOE_CTX_ST_ERROR) {
2545 			ret = process_spoe_event(s, ctx, SPOE_EV_ON_SERVER_SESS);
2546 			if (ret != 1)
2547 				goto out;
2548 		}
2549 		ctx->flags |= SPOE_CTX_FL_SRV_CONNECTED;
2550 	}
2551 
2552   out:
2553 	if (!ret) {
2554                 channel_dont_read(chn);
2555                 channel_dont_close(chn);
2556 	}
2557 	return ret;
2558 }
2559 
2560 /* Called before a processing happens on a given channel */
2561 static int
spoe_chn_pre_analyze(struct stream * s,struct filter * filter,struct channel * chn,unsigned an_bit)2562 spoe_chn_pre_analyze(struct stream *s, struct filter *filter,
2563 		     struct channel *chn, unsigned an_bit)
2564 {
2565 	struct spoe_context *ctx = filter->ctx;
2566 	int                  ret = 1;
2567 
2568 	SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2569 		    " - ctx-flags=0x%08x - ana=0x%08x\n",
2570 		    (int)now.tv_sec, (int)now.tv_usec,
2571 		    ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2572 		    __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
2573 		    ctx->flags, an_bit);
2574 
2575 	if (ctx->state == SPOE_CTX_ST_NONE || ctx->state == SPOE_CTX_ST_ERROR)
2576 		goto out;
2577 
2578 	switch (an_bit) {
2579 		case AN_REQ_INSPECT_FE:
2580 			ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_FE);
2581 			break;
2582 		case AN_REQ_INSPECT_BE:
2583 			ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_BE);
2584 			break;
2585 		case AN_RES_INSPECT:
2586 			ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_RSP);
2587 			break;
2588 		case AN_REQ_HTTP_PROCESS_FE:
2589 			ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_FE);
2590 			break;
2591 		case AN_REQ_HTTP_PROCESS_BE:
2592 			ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_BE);
2593 			break;
2594 		case AN_RES_HTTP_PROCESS_FE:
2595 			ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_RSP);
2596 			break;
2597 	}
2598 
2599   out:
2600 	if (!ret) {
2601                 channel_dont_read(chn);
2602                 channel_dont_close(chn);
2603 	}
2604 	return ret;
2605 }
2606 
2607 /* Called when the filtering on the channel ends. */
2608 static int
spoe_end_analyze(struct stream * s,struct filter * filter,struct channel * chn)2609 spoe_end_analyze(struct stream *s, struct filter *filter, struct channel *chn)
2610 {
2611 	struct spoe_context *ctx = filter->ctx;
2612 
2613 	SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2614 		    " - ctx-flags=0x%08x\n",
2615 		    (int)now.tv_sec, (int)now.tv_usec,
2616 		    ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2617 		    __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
2618 
2619 	if (!(ctx->flags & SPOE_CTX_FL_PROCESS)) {
2620 		reset_spoe_context(ctx);
2621 	}
2622 
2623 	return 1;
2624 }
2625 
2626 /********************************************************************
2627  * Functions that manage the filter initialization
2628  ********************************************************************/
2629 struct flt_ops spoe_ops = {
2630 	/* Manage SPOE filter, called for each filter declaration */
2631 	.init   = spoe_init,
2632 	.deinit = spoe_deinit,
2633 	.check  = spoe_check,
2634 
2635 	/* Handle start/stop of SPOE */
2636 	.attach         = spoe_start,
2637 	.detach         = spoe_stop,
2638 	.check_timeouts = spoe_check_timeouts,
2639 
2640 	/* Handle channels activity */
2641 	.channel_start_analyze = spoe_start_analyze,
2642 	.channel_pre_analyze   = spoe_chn_pre_analyze,
2643 	.channel_end_analyze   = spoe_end_analyze,
2644 };
2645 
2646 
2647 static int
cfg_parse_spoe_agent(const char * file,int linenum,char ** args,int kwm)2648 cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
2649 {
2650 	const char *err;
2651 	int         i, err_code = 0;
2652 
2653 	if ((cfg_scope == NULL && curengine != NULL) ||
2654 	    (cfg_scope != NULL && curengine == NULL) ||
2655 	    (curengine != NULL && cfg_scope != NULL && strcmp(curengine, cfg_scope)))
2656 		goto out;
2657 
2658 	if (!strcmp(args[0], "spoe-agent")) { /* new spoe-agent section */
2659 		if (!*args[1]) {
2660 			Alert("parsing [%s:%d] : missing name for spoe-agent section.\n",
2661 			      file, linenum);
2662 			err_code |= ERR_ALERT | ERR_ABORT;
2663 			goto out;
2664 		}
2665 		if (*args[2]) {
2666 			Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2667 			      file, linenum, args[2]);
2668 			err_code |= ERR_ALERT | ERR_ABORT;
2669 			goto out;
2670 		}
2671 
2672 		err = invalid_char(args[1]);
2673 		if (err) {
2674 			Alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
2675 			      file, linenum, *err, args[0], args[1]);
2676 			err_code |= ERR_ALERT | ERR_ABORT;
2677 			goto out;
2678 		}
2679 
2680 		if (curagent != NULL) {
2681 			Alert("parsing [%s:%d] : another spoe-agent section previously defined.\n",
2682 			      file, linenum);
2683 			err_code |= ERR_ALERT | ERR_ABORT;
2684 			goto out;
2685 		}
2686 		if ((curagent = calloc(1, sizeof(*curagent))) == NULL) {
2687 			Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2688 			err_code |= ERR_ALERT | ERR_ABORT;
2689 			goto out;
2690 		}
2691 
2692 		curagent->id              = strdup(args[1]);
2693 		curagent->conf.file       = strdup(file);
2694 		curagent->conf.line       = linenum;
2695 		curagent->timeout.hello   = TICK_ETERNITY;
2696 		curagent->timeout.idle    = TICK_ETERNITY;
2697 		curagent->timeout.processing = TICK_ETERNITY;
2698 		curagent->var_pfx         = NULL;
2699 		curagent->var_on_error    = NULL;
2700 		curagent->flags           = 0;
2701 		curagent->cps_max         = 0;
2702 		curagent->eps_max         = 0;
2703 
2704 		for (i = 0; i < SPOE_EV_EVENTS; ++i)
2705 			LIST_INIT(&curagent->messages[i]);
2706 		LIST_INIT(&curagent->cache);
2707 		LIST_INIT(&curagent->applet_wq);
2708 	}
2709 	else if (!strcmp(args[0], "use-backend")) {
2710 		if (!*args[1]) {
2711 			Alert("parsing [%s:%d] : '%s' expects a backend name.\n",
2712 			      file, linenum, args[0]);
2713 			err_code |= ERR_ALERT | ERR_FATAL;
2714 			goto out;
2715 		}
2716 		if (*args[2]) {
2717 			Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2718 			      file, linenum, args[2]);
2719 			err_code |= ERR_ALERT | ERR_ABORT;
2720 			goto out;
2721 		}
2722 		free(curagent->b.name);
2723 		curagent->b.name = strdup(args[1]);
2724 	}
2725 	else if (!strcmp(args[0], "messages")) {
2726 		int cur_arg = 1;
2727 		while (*args[cur_arg]) {
2728 			struct spoe_msg_placeholder *mp = NULL;
2729 
2730 			list_for_each_entry(mp, &curmps, list) {
2731 				if (!strcmp(mp->id, args[cur_arg])) {
2732 					Alert("parsing [%s:%d]: spoe-message message '%s' already declared.\n",
2733 					      file, linenum, args[cur_arg]);
2734 					err_code |= ERR_ALERT | ERR_FATAL;
2735 					goto out;
2736 				}
2737 			}
2738 
2739 			if ((mp = calloc(1, sizeof(*mp))) == NULL) {
2740 				Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2741 				err_code |= ERR_ALERT | ERR_ABORT;
2742 				goto out;
2743 			}
2744 			mp->id = strdup(args[cur_arg]);
2745 			LIST_ADDQ(&curmps, &mp->list);
2746 			cur_arg++;
2747 		}
2748 	}
2749 	else if (!strcmp(args[0], "timeout")) {
2750 		unsigned int *tv = NULL;
2751 		const char   *res;
2752 		unsigned      timeout;
2753 
2754 		if (!*args[1]) {
2755 			Alert("parsing [%s:%d] : 'timeout' expects 'connect', 'idle' and 'ack'.\n",
2756 			      file, linenum);
2757 			err_code |= ERR_ALERT | ERR_FATAL;
2758 			goto out;
2759 		}
2760 		if (!strcmp(args[1], "hello"))
2761 			tv = &curagent->timeout.hello;
2762 		else if (!strcmp(args[1], "idle"))
2763 			tv = &curagent->timeout.idle;
2764 		else if (!strcmp(args[1], "processing"))
2765 			tv = &curagent->timeout.processing;
2766 		else {
2767 			Alert("parsing [%s:%d] : 'timeout' supports 'connect', 'idle' or 'processing' (got %s).\n",
2768 			      file, linenum, args[1]);
2769 			err_code |= ERR_ALERT | ERR_FATAL;
2770 			goto out;
2771 		}
2772 		if (!*args[2]) {
2773 			Alert("parsing [%s:%d] : 'timeout %s' expects an integer value (in milliseconds).\n",
2774 			      file, linenum, args[1]);
2775 			err_code |= ERR_ALERT | ERR_FATAL;
2776 			goto out;
2777 		}
2778 		res = parse_time_err(args[2], &timeout, TIME_UNIT_MS);
2779 		if (res) {
2780 			Alert("parsing [%s:%d] : unexpected character '%c' in 'timeout %s'.\n",
2781 			      file, linenum, *res, args[1]);
2782 			err_code |= ERR_ALERT | ERR_ABORT;
2783 			goto out;
2784 		}
2785 		if (*args[3]) {
2786 			Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2787 			      file, linenum, args[3]);
2788 			err_code |= ERR_ALERT | ERR_ABORT;
2789 			goto out;
2790 		}
2791 		*tv = MS_TO_TICKS(timeout);
2792 	}
2793 	else if (!strcmp(args[0], "option")) {
2794 		if (!*args[1]) {
2795                         Alert("parsing [%s:%d]: '%s' expects an option name.\n",
2796                               file, linenum, args[0]);
2797                         err_code |= ERR_ALERT | ERR_FATAL;
2798                         goto out;
2799                 }
2800 		if (!strcmp(args[1], "var-prefix")) {
2801 			char *tmp;
2802 
2803 			if (!*args[2]) {
2804 				Alert("parsing [%s:%d]: '%s %s' expects a value.\n",
2805 				      file, linenum, args[0],
2806 				      args[1]);
2807 				err_code |= ERR_ALERT | ERR_FATAL;
2808 				goto out;
2809 			}
2810 			tmp = args[2];
2811 			while (*tmp) {
2812 				if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
2813 					Alert("parsing [%s:%d]: '%s %s' only supports [a-zA-Z0-9_.] chars.\n",
2814 						 file, linenum, args[0], args[1]);
2815 					err_code |= ERR_ALERT | ERR_FATAL;
2816 					goto out;
2817 				}
2818 				tmp++;
2819 			}
2820 			curagent->var_pfx = strdup(args[2]);
2821 		}
2822 		else if (!strcmp(args[1], "continue-on-error")) {
2823 			if (*args[2]) {
2824 				Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2825 				      file, linenum, args[2]);
2826 				err_code |= ERR_ALERT | ERR_ABORT;
2827 				goto out;
2828 			}
2829 			curagent->flags |= SPOE_FL_CONT_ON_ERR;
2830 		}
2831 		else if (!strcmp(args[1], "set-on-error")) {
2832 			char *tmp;
2833 
2834 			if (!*args[2]) {
2835 				Alert("parsing [%s:%d]: '%s %s' expects a value.\n",
2836 				      file, linenum, args[0],
2837 				      args[1]);
2838 				err_code |= ERR_ALERT | ERR_FATAL;
2839 				goto out;
2840 			}
2841 			tmp = args[2];
2842 			while (*tmp) {
2843 				if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
2844 					Alert("parsing [%s:%d]: '%s %s' only supports [a-zA-Z0-9_.] chars.\n",
2845 						 file, linenum, args[0], args[1]);
2846 					err_code |= ERR_ALERT | ERR_FATAL;
2847 					goto out;
2848 				}
2849 				tmp++;
2850 			}
2851 			curagent->var_on_error = strdup(args[2]);
2852 		}
2853 		else {
2854 			Alert("parsing [%s:%d]: option '%s' is not supported.\n",
2855 			      file, linenum, args[1]);
2856 			err_code |= ERR_ALERT | ERR_FATAL;
2857 			goto out;
2858 		}
2859 	}
2860 	else if (!strcmp(args[0], "maxconnrate")) {
2861 		if (!*args[1]) {
2862 			Alert("parsing [%s:%d] : '%s' expects an integer argument.\n",
2863                               file, linenum, args[0]);
2864                         err_code |= ERR_ALERT | ERR_FATAL;
2865                         goto out;
2866                 }
2867 		if (*args[2]) {
2868 			Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2869 			      file, linenum, args[2]);
2870 			err_code |= ERR_ALERT | ERR_ABORT;
2871 			goto out;
2872 		}
2873 		curagent->cps_max = atol(args[1]);
2874 	}
2875 	else if (!strcmp(args[0], "maxerrrate")) {
2876 		if (!*args[1]) {
2877 			Alert("parsing [%s:%d] : '%s' expects an integer argument.\n",
2878                               file, linenum, args[0]);
2879                         err_code |= ERR_ALERT | ERR_FATAL;
2880                         goto out;
2881                 }
2882 		if (*args[2]) {
2883 			Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2884 			      file, linenum, args[2]);
2885 			err_code |= ERR_ALERT | ERR_ABORT;
2886 			goto out;
2887 		}
2888 		curagent->eps_max = atol(args[1]);
2889 	}
2890 	else if (*args[0]) {
2891 		Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-agent section.\n",
2892 		      file, linenum, args[0]);
2893 		err_code |= ERR_ALERT | ERR_FATAL;
2894 		goto out;
2895 	}
2896  out:
2897 	return err_code;
2898 }
2899 
2900 static int
cfg_parse_spoe_message(const char * file,int linenum,char ** args,int kwm)2901 cfg_parse_spoe_message(const char *file, int linenum, char **args, int kwm)
2902 {
2903 	struct spoe_message *msg;
2904 	struct spoe_arg     *arg;
2905 	const char          *err;
2906 	char                *errmsg   = NULL;
2907 	int                  err_code = 0;
2908 
2909 	if ((cfg_scope == NULL && curengine != NULL) ||
2910 	    (cfg_scope != NULL && curengine == NULL) ||
2911 	    (curengine != NULL && cfg_scope != NULL && strcmp(curengine, cfg_scope)))
2912 		goto out;
2913 
2914 	if (!strcmp(args[0], "spoe-message")) { /* new spoe-message section */
2915 		if (!*args[1]) {
2916 			Alert("parsing [%s:%d] : missing name for spoe-message section.\n",
2917 			      file, linenum);
2918 			err_code |= ERR_ALERT | ERR_ABORT;
2919 			goto out;
2920 		}
2921 		if (*args[2]) {
2922 			Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2923 			      file, linenum, args[2]);
2924 			err_code |= ERR_ALERT | ERR_ABORT;
2925 			goto out;
2926 		}
2927 
2928 		err = invalid_char(args[1]);
2929 		if (err) {
2930 			Alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
2931 			      file, linenum, *err, args[0], args[1]);
2932 			err_code |= ERR_ALERT | ERR_ABORT;
2933 			goto out;
2934 		}
2935 
2936 		list_for_each_entry(msg, &curmsgs, list) {
2937 			if (!strcmp(msg->id, args[1])) {
2938 				Alert("parsing [%s:%d]: spoe-message section '%s' has the same"
2939 				      " name as another one declared at %s:%d.\n",
2940 				      file, linenum, args[1], msg->conf.file, msg->conf.line);
2941 				err_code |= ERR_ALERT | ERR_FATAL;
2942 				goto out;
2943 			}
2944 		}
2945 
2946 		if ((curmsg = calloc(1, sizeof(*curmsg))) == NULL) {
2947 			Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2948 			err_code |= ERR_ALERT | ERR_ABORT;
2949 			goto out;
2950 		}
2951 
2952 		curmsg->id = strdup(args[1]);
2953 		curmsg->id_len = strlen(curmsg->id);
2954 		curmsg->event  = SPOE_EV_NONE;
2955 		curmsg->conf.file = strdup(file);
2956 		curmsg->conf.line = linenum;
2957 		LIST_INIT(&curmsg->args);
2958 		LIST_ADDQ(&curmsgs, &curmsg->list);
2959 	}
2960 	else if (!strcmp(args[0], "args")) {
2961 		int cur_arg = 1;
2962 
2963 		curproxy->conf.args.ctx  = ARGC_SPOE;
2964 		curproxy->conf.args.file = file;
2965 		curproxy->conf.args.line = linenum;
2966 		while (*args[cur_arg]) {
2967 			char *delim = strchr(args[cur_arg], '=');
2968 			int   idx = 0;
2969 
2970 			if ((arg = calloc(1, sizeof(*arg))) == NULL) {
2971 				Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2972 				err_code |= ERR_ALERT | ERR_ABORT;
2973 				goto out;
2974 			}
2975 
2976 			if (!delim) {
2977 				arg->name = NULL;
2978 				arg->name_len  = 0;
2979 				delim = args[cur_arg];
2980 			}
2981 			else {
2982 				arg->name = my_strndup(args[cur_arg], delim - args[cur_arg]);
2983 				arg->name_len = delim - args[cur_arg];
2984 				delim++;
2985 			}
2986 			arg->expr = sample_parse_expr((char*[]){delim, NULL},
2987 						      &idx, file, linenum, &errmsg,
2988 						      &curproxy->conf.args);
2989 			if (arg->expr == NULL) {
2990 				Alert("parsing [%s:%d] : '%s': %s.\n", file, linenum, args[0], errmsg);
2991 				err_code |= ERR_ALERT | ERR_FATAL;
2992 				free(arg->name);
2993 				free(arg);
2994 				goto out;
2995 			}
2996 			LIST_ADDQ(&curmsg->args, &arg->list);
2997 			cur_arg++;
2998 		}
2999 		curproxy->conf.args.file = NULL;
3000 		curproxy->conf.args.line = 0;
3001 	}
3002 	else if (!strcmp(args[0], "event")) {
3003 		if (!*args[1]) {
3004 			Alert("parsing [%s:%d] : missing event name.\n", file, linenum);
3005 			err_code |= ERR_ALERT | ERR_ABORT;
3006 			goto out;
3007 		}
3008 		if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_CLIENT_SESS]))
3009 			curmsg->event = SPOE_EV_ON_CLIENT_SESS;
3010 		else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_SERVER_SESS]))
3011 			curmsg->event = SPOE_EV_ON_SERVER_SESS;
3012 
3013 		else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_FE]))
3014 			curmsg->event = SPOE_EV_ON_TCP_REQ_FE;
3015 		else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_BE]))
3016 			curmsg->event = SPOE_EV_ON_TCP_REQ_BE;
3017 		else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_RSP]))
3018 			curmsg->event = SPOE_EV_ON_TCP_RSP;
3019 
3020 		else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_FE]))
3021 			curmsg->event = SPOE_EV_ON_HTTP_REQ_FE;
3022 		else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_BE]))
3023 			curmsg->event = SPOE_EV_ON_HTTP_REQ_BE;
3024 		else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_RSP]))
3025 			curmsg->event = SPOE_EV_ON_HTTP_RSP;
3026 		else {
3027 			Alert("parsing [%s:%d] : unkown event '%s'.\n",
3028 			      file, linenum, args[1]);
3029 			err_code |= ERR_ALERT | ERR_ABORT;
3030 			goto out;
3031 		}
3032 	}
3033 	else if (!*args[0]) {
3034 		Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-message section.\n",
3035 		      file, linenum, args[0]);
3036 		err_code |= ERR_ALERT | ERR_FATAL;
3037 		goto out;
3038 	}
3039  out:
3040 	free(errmsg);
3041 	return err_code;
3042 }
3043 
3044 /* Return -1 on error, else 0 */
3045 static int
parse_spoe_flt(char ** args,int * cur_arg,struct proxy * px,struct flt_conf * fconf,char ** err,void * private)3046 parse_spoe_flt(char **args, int *cur_arg, struct proxy *px,
3047                 struct flt_conf *fconf, char **err, void *private)
3048 {
3049 	struct list backup_sections;
3050 	struct spoe_config          *conf;
3051 	struct spoe_message         *msg, *msgback;
3052 	struct spoe_msg_placeholder *mp, *mpback;
3053 	char                        *file = NULL, *engine = NULL;
3054 	int                          ret, pos = *cur_arg + 1;
3055 
3056 	conf = calloc(1, sizeof(*conf));
3057 	if (conf == NULL) {
3058 		memprintf(err, "%s: out of memory", args[*cur_arg]);
3059 		goto error;
3060 	}
3061 	conf->proxy = px;
3062 
3063 	while (*args[pos]) {
3064 		if (!strcmp(args[pos], "config")) {
3065 			if (!*args[pos+1]) {
3066 				memprintf(err, "'%s' : '%s' option without value",
3067 					  args[*cur_arg], args[pos]);
3068 				goto error;
3069 			}
3070 			file = args[pos+1];
3071 			pos += 2;
3072 		}
3073 		else if (!strcmp(args[pos], "engine")) {
3074 			if (!*args[pos+1]) {
3075 				memprintf(err, "'%s' : '%s' option without value",
3076 					  args[*cur_arg], args[pos]);
3077 				goto error;
3078 			}
3079 			engine = args[pos+1];
3080 			pos += 2;
3081 		}
3082 		else {
3083 			memprintf(err, "unknown keyword '%s'", args[pos]);
3084 			goto error;
3085 		}
3086 	}
3087 	if (file == NULL) {
3088 		memprintf(err, "'%s' : missing config file", args[*cur_arg]);
3089 		goto error;
3090 	}
3091 
3092 	/* backup sections and register SPOE sections */
3093 	LIST_INIT(&backup_sections);
3094 	cfg_backup_sections(&backup_sections);
3095 	cfg_register_section("spoe-agent",   cfg_parse_spoe_agent);
3096 	cfg_register_section("spoe-message", cfg_parse_spoe_message);
3097 
3098 	/* Parse SPOE filter configuration file */
3099 	curengine = engine;
3100 	curproxy  = px;
3101 	curagent  = NULL;
3102 	curmsg    = NULL;
3103 	ret = readcfgfile(file);
3104 	curproxy = NULL;
3105 
3106 	/* unregister SPOE sections and restore previous sections */
3107 	cfg_unregister_sections();
3108 	cfg_restore_sections(&backup_sections);
3109 
3110 	if (ret == -1) {
3111 		memprintf(err, "Could not open configuration file %s : %s",
3112 			  file, strerror(errno));
3113 		goto error;
3114 	}
3115 	if (ret & (ERR_ABORT|ERR_FATAL)) {
3116 		memprintf(err, "Error(s) found in configuration file %s", file);
3117 		goto error;
3118 	}
3119 
3120 	/* Check SPOE agent */
3121 	if (curagent == NULL) {
3122 		memprintf(err, "No SPOE agent found in file %s", file);
3123 		goto error;
3124 	}
3125 	if (curagent->b.name == NULL) {
3126 		memprintf(err, "No backend declared for SPOE agent '%s' declared at %s:%d",
3127 			  curagent->id, curagent->conf.file, curagent->conf.line);
3128 		goto error;
3129 	}
3130 	if (curagent->timeout.hello      == TICK_ETERNITY ||
3131 	    curagent->timeout.idle       == TICK_ETERNITY ||
3132 	    curagent->timeout.processing == TICK_ETERNITY) {
3133 		Warning("Proxy '%s': missing timeouts for SPOE agent '%s' declare at %s:%d.\n"
3134 			"   | While not properly invalid, you will certainly encounter various problems\n"
3135 			"   | with such a configuration. To fix this, please ensure that all following\n"
3136 			"   | timeouts are set to a non-zero value: 'hello', 'idle', 'processing'.\n",
3137 			px->id, curagent->id, curagent->conf.file, curagent->conf.line);
3138 	}
3139 	if (curagent->var_pfx == NULL) {
3140 		char *tmp = curagent->id;
3141 
3142 		while (*tmp) {
3143 			if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
3144 				memprintf(err, "Invalid variable prefix '%s' for SPOE agent '%s' declared at %s:%d. "
3145 					  "Use 'option var-prefix' to set it. Only [a-zA-Z0-9_.] chars are supported.\n",
3146 					  curagent->id, curagent->id, curagent->conf.file, curagent->conf.line);
3147 				goto error;
3148 			}
3149 			tmp++;
3150 		}
3151 		curagent->var_pfx = strdup(curagent->id);
3152 	}
3153 
3154 	if (LIST_ISEMPTY(&curmps)) {
3155 		Warning("Proxy '%s': No message used by SPOE agent '%s' declared at %s:%d.\n",
3156 			px->id, curagent->id, curagent->conf.file, curagent->conf.line);
3157 		goto finish;
3158 	}
3159 
3160 	list_for_each_entry_safe(mp, mpback, &curmps, list) {
3161 		list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3162 			if (!strcmp(msg->id, mp->id)) {
3163 				if ((px->cap & (PR_CAP_FE|PR_CAP_BE)) == (PR_CAP_FE|PR_CAP_BE)) {
3164 					if (msg->event == SPOE_EV_ON_TCP_REQ_BE)
3165 						msg->event = SPOE_EV_ON_TCP_REQ_FE;
3166 					if (msg->event == SPOE_EV_ON_HTTP_REQ_BE)
3167 						msg->event = SPOE_EV_ON_HTTP_REQ_FE;
3168 				}
3169 				if (!(px->cap & PR_CAP_FE) && (msg->event == SPOE_EV_ON_CLIENT_SESS ||
3170 							       msg->event == SPOE_EV_ON_TCP_REQ_FE ||
3171 							       msg->event == SPOE_EV_ON_HTTP_REQ_FE)) {
3172 					Warning("Proxy '%s': frontend event used on a backend proxy at %s:%d.\n",
3173 						px->id, msg->conf.file, msg->conf.line);
3174 					goto next;
3175 				}
3176 				if (msg->event == SPOE_EV_NONE) {
3177 					Warning("Proxy '%s': Ignore SPOE message without event at %s:%d.\n",
3178 						px->id, msg->conf.file, msg->conf.line);
3179 					goto next;
3180 				}
3181 				msg->agent = curagent;
3182 				LIST_DEL(&msg->list);
3183 				LIST_ADDQ(&curagent->messages[msg->event], &msg->list);
3184 				goto next;
3185 			}
3186 		}
3187 		memprintf(err, "SPOE agent '%s' try to use undefined SPOE message '%s' at %s:%d",
3188 			  curagent->id, mp->id, curagent->conf.file, curagent->conf.line);
3189 		goto error;
3190 	  next:
3191 		continue;
3192 	}
3193 
3194  finish:
3195 	conf->agent = curagent;
3196 	list_for_each_entry_safe(mp, mpback, &curmps, list) {
3197 		LIST_DEL(&mp->list);
3198 		release_spoe_msg_placeholder(mp);
3199 	}
3200 	list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3201 		Warning("Proxy '%s': Ignore unused SPOE messages '%s' declared at %s:%d.\n",
3202 			px->id, msg->id, msg->conf.file, msg->conf.line);
3203 		LIST_DEL(&msg->list);
3204 		release_spoe_message(msg);
3205 	}
3206 
3207 	*cur_arg    = pos;
3208 	fconf->id   = spoe_filter_id;
3209 	fconf->ops  = &spoe_ops;
3210 	fconf->conf = conf;
3211 	return 0;
3212 
3213  error:
3214 	release_spoe_agent(curagent);
3215 	list_for_each_entry_safe(mp, mpback, &curmps, list) {
3216 		LIST_DEL(&mp->list);
3217 		release_spoe_msg_placeholder(mp);
3218 	}
3219 	list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3220 		LIST_DEL(&msg->list);
3221 		release_spoe_message(msg);
3222 	}
3223 	free(conf);
3224 	return -1;
3225 }
3226 
3227 
3228 /* Declare the filter parser for "spoe" keyword */
3229 static struct flt_kw_list flt_kws = { "SPOE", { }, {
3230 		{ "spoe", parse_spoe_flt, NULL },
3231 		{ NULL, NULL, NULL },
3232 	}
3233 };
3234 
3235 __attribute__((constructor))
__spoe_init(void)3236 static void __spoe_init(void)
3237 {
3238 	flt_register_keywords(&flt_kws);
3239 
3240 	LIST_INIT(&curmsgs);
3241 	LIST_INIT(&curmps);
3242 	pool2_spoe_ctx = create_pool("spoe_ctx", sizeof(struct spoe_context), MEM_F_SHARED);
3243 }
3244 
3245 __attribute__((destructor))
3246 static void
__spoe_deinit(void)3247 __spoe_deinit(void)
3248 {
3249 	pool_destroy2(pool2_spoe_ctx);
3250 }
3251