1 /*
2 * Stream processing offload engine management.
3 *
4 * Copyright 2016 HAProxy Technologies, Christopher Faulet <cfaulet@haproxy.com>
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 *
11 */
12 #include <ctype.h>
13 #include <errno.h>
14
15 #include <common/buffer.h>
16 #include <common/cfgparse.h>
17 #include <common/compat.h>
18 #include <common/config.h>
19 #include <common/debug.h>
20 #include <common/memory.h>
21 #include <common/time.h>
22
23 #include <types/arg.h>
24 #include <types/filters.h>
25 #include <types/global.h>
26 #include <types/proxy.h>
27 #include <types/sample.h>
28 #include <types/stream.h>
29
30 #include <proto/arg.h>
31 #include <proto/backend.h>
32 #include <proto/filters.h>
33 #include <proto/freq_ctr.h>
34 #include <proto/frontend.h>
35 #include <proto/log.h>
36 #include <proto/proto_http.h>
37 #include <proto/proxy.h>
38 #include <proto/sample.h>
39 #include <proto/session.h>
40 #include <proto/signal.h>
41 #include <proto/stream.h>
42 #include <proto/stream_interface.h>
43 #include <proto/task.h>
44 #include <proto/vars.h>
45
46 #if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
47 #define SPOE_PRINTF(x...) fprintf(x)
48 #else
49 #define SPOE_PRINTF(x...)
50 #endif
51
52 /* Helper to get ctx inside an appctx */
53 #define APPCTX_SPOE(appctx) ((appctx)->ctx.spoe)
54
55 /* Minimal size for a frame */
56 #define MIN_FRAME_SIZE 256
57
58 /* Flags set on the SPOE agent */
59 #define SPOE_FL_CONT_ON_ERR 0x00000001 /* Do not stop events processing when an error occurred */
60
61 /* Flags set on the SPOE context */
62 #define SPOE_CTX_FL_CLI_CONNECTED 0x00000001 /* Set after that on-client-session event was processed */
63 #define SPOE_CTX_FL_SRV_CONNECTED 0x00000002 /* Set after that on-server-session event was processed */
64 #define SPOE_CTX_FL_REQ_PROCESS 0x00000004 /* Set when SPOE is processing the request */
65 #define SPOE_CTX_FL_RSP_PROCESS 0x00000008 /* Set when SPOE is processing the response */
66
67 #define SPOE_CTX_FL_PROCESS (SPOE_CTX_FL_REQ_PROCESS|SPOE_CTX_FL_RSP_PROCESS)
68
69 #define SPOE_APPCTX_ERR_NONE 0x00000000 /* no error yet, leave it to zero */
70 #define SPOE_APPCTX_ERR_TOUT 0x00000001 /* SPOE applet timeout */
71
72 /* All possible states for a SPOE context */
73 enum spoe_ctx_state {
74 SPOE_CTX_ST_NONE = 0,
75 SPOE_CTX_ST_READY,
76 SPOE_CTX_ST_SENDING_MSGS,
77 SPOE_CTX_ST_WAITING_ACK,
78 SPOE_CTX_ST_DONE,
79 SPOE_CTX_ST_ERROR,
80 };
81
82 /* All possible states for a SPOE applet */
83 enum spoe_appctx_state {
84 SPOE_APPCTX_ST_CONNECT = 0,
85 SPOE_APPCTX_ST_CONNECTING,
86 SPOE_APPCTX_ST_PROCESSING,
87 SPOE_APPCTX_ST_DISCONNECT,
88 SPOE_APPCTX_ST_DISCONNECTING,
89 SPOE_APPCTX_ST_EXIT,
90 SPOE_APPCTX_ST_END,
91 };
92
93 /* All supported SPOE actions */
94 enum spoe_action_type {
95 SPOE_ACT_T_SET_VAR = 1,
96 SPOE_ACT_T_UNSET_VAR,
97 SPOE_ACT_TYPES,
98 };
99
100 /* All supported SPOE events */
101 enum spoe_event {
102 SPOE_EV_NONE = 0,
103
104 /* Request events */
105 SPOE_EV_ON_CLIENT_SESS = 1,
106 SPOE_EV_ON_TCP_REQ_FE,
107 SPOE_EV_ON_TCP_REQ_BE,
108 SPOE_EV_ON_HTTP_REQ_FE,
109 SPOE_EV_ON_HTTP_REQ_BE,
110
111 /* Response events */
112 SPOE_EV_ON_SERVER_SESS,
113 SPOE_EV_ON_TCP_RSP,
114 SPOE_EV_ON_HTTP_RSP,
115
116 SPOE_EV_EVENTS
117 };
118
119 /* Errors triggerd by SPOE applet */
120 enum spoe_frame_error {
121 SPOE_FRM_ERR_NONE = 0,
122 SPOE_FRM_ERR_IO,
123 SPOE_FRM_ERR_TOUT,
124 SPOE_FRM_ERR_TOO_BIG,
125 SPOE_FRM_ERR_INVALID,
126 SPOE_FRM_ERR_NO_VSN,
127 SPOE_FRM_ERR_NO_FRAME_SIZE,
128 SPOE_FRM_ERR_NO_CAP,
129 SPOE_FRM_ERR_BAD_VSN,
130 SPOE_FRM_ERR_BAD_FRAME_SIZE,
131 SPOE_FRM_ERR_UNKNOWN = 99,
132 SPOE_FRM_ERRS,
133 };
134
135 /* Scopes used for variables set by agents. It is a way to be agnotic to vars
136 * scope. */
137 enum spoe_vars_scope {
138 SPOE_SCOPE_PROC = 0, /* <=> SCOPE_PROC */
139 SPOE_SCOPE_SESS, /* <=> SCOPE_SESS */
140 SPOE_SCOPE_TXN, /* <=> SCOPE_TXN */
141 SPOE_SCOPE_REQ, /* <=> SCOPE_REQ */
142 SPOE_SCOPE_RES, /* <=> SCOPE_RES */
143 };
144
145
146 /* Describe an argument that will be linked to a message. It is a sample fetch,
147 * with an optional name. */
148 struct spoe_arg {
149 char *name; /* Name of the argument, may be NULL */
150 unsigned int name_len; /* The name length, 0 if NULL */
151 struct sample_expr *expr; /* Sample expression */
152 struct list list; /* Used to chain SPOE args */
153 };
154
155 /* Used during the config parsing only because, when a SPOE agent section is
156 * parsed, messages can be undefined. */
157 struct spoe_msg_placeholder {
158 char *id; /* SPOE message placeholder id */
159 struct list list; /* Use to chain SPOE message placeholders */
160 };
161
162 /* Describe a message that will be sent in a NOTIFY frame. A message has a name,
163 * an argument list (see above) and it is linked to a specific event. */
164 struct spoe_message {
165 char *id; /* SPOE message id */
166 unsigned int id_len; /* The message id length */
167 struct spoe_agent *agent; /* SPOE agent owning this SPOE message */
168 struct {
169 char *file; /* file where the SPOE message appears */
170 int line; /* line where the SPOE message appears */
171 } conf; /* config information */
172 struct list args; /* Arguments added when the SPOE messages is sent */
173 struct list list; /* Used to chain SPOE messages */
174
175 enum spoe_event event; /* SPOE_EV_* */
176 };
177
178 /* Describe a SPOE agent. */
179 struct spoe_agent {
180 char *id; /* SPOE agent id (name) */
181 struct {
182 char *file; /* file where the SPOE agent appears */
183 int line; /* line where the SPOE agent appears */
184 } conf; /* config information */
185 union {
186 struct proxy *be; /* Backend used by this agent */
187 char *name; /* Backend name used during conf parsing */
188 } b;
189 struct {
190 unsigned int hello; /* Max time to receive AGENT-HELLO frame (in SPOE applet) */
191 unsigned int idle; /* Max Idle timeout (in SPOE applet) */
192 unsigned int processing; /* Max time to process an event (in the main stream) */
193 } timeout;
194
195 char *var_pfx; /* Prefix used for vars set by the agent */
196 char *var_on_error; /* Variable to set when an error occured, in the TXN scope */
197 unsigned int flags; /* SPOE_FL_* */
198 unsigned int cps_max; /* Maximum number of connections per second */
199 unsigned int eps_max; /* Maximum number of errors per second */
200
201 struct list cache; /* List used to cache SPOE streams. In
202 * fact, we cache the SPOE applect ctx */
203
204 struct list messages[SPOE_EV_EVENTS]; /* List of SPOE messages that will be sent
205 * for each supported events */
206
207 struct list applet_wq; /* List of streams waiting for a SPOE applet */
208 struct freq_ctr conn_per_sec; /* connections per second */
209 struct freq_ctr err_per_sec; /* connetion errors per second */
210 };
211
212 /* SPOE filter configuration */
213 struct spoe_config {
214 struct proxy *proxy; /* Proxy owning the filter */
215 struct spoe_agent *agent; /* Agent used by this filter */
216 struct proxy agent_fe; /* Agent frontend */
217 };
218
219 /* SPOE context attached to a stream. It is the main structure that handles the
220 * processing offload */
221 struct spoe_context {
222 struct filter *filter; /* The SPOE filter */
223 struct stream *strm; /* The stream that should be offloaded */
224 struct appctx *appctx; /* The SPOE appctx */
225 struct list *messages; /* List of messages that will be sent during the stream processing */
226 struct buffer *buffer; /* Buffer used to store a NOTIFY or ACK frame */
227 struct buffer_wait buffer_wait; /* position in the list of streams waiting for a buffer */
228 struct list applet_wait; /* position in the list of streams waiting for a SPOE applet */
229
230 enum spoe_ctx_state state; /* SPOE_CTX_ST_* */
231 unsigned int flags; /* SPOE_CTX_FL_* */
232
233 unsigned int stream_id; /* stream_id and frame_id are used */
234 unsigned int frame_id; /* to map NOTIFY and ACK frames */
235 unsigned int process_exp; /* expiration date to process an event */
236 };
237
238 /* SPOE filter id. Used to identify SPOE filters */
239 const char *spoe_filter_id = "SPOE filter";
240
241 /* Set if the handle on SIGUSR1 is registered */
242 static int sighandler_registered = 0;
243
244 /* proxy used during the parsing */
245 struct proxy *curproxy = NULL;
246
247 /* The name of the SPOE engine, used during the parsing */
248 char *curengine = NULL;
249
250 /* SPOE agent used during the parsing */
251 struct spoe_agent *curagent = NULL;
252
253 /* SPOE message used during the parsing */
254 struct spoe_message *curmsg = NULL;
255
256 /* list of SPOE messages and placeholders used during the parsing */
257 struct list curmsgs;
258 struct list curmps;
259
260 /* Pool used to allocate new SPOE contexts */
261 static struct pool_head *pool2_spoe_ctx = NULL;
262
263 /* Temporary variables used to ease error processing */
264 int spoe_status_code = SPOE_FRM_ERR_NONE;
265 char spoe_reason[256];
266
267 struct flt_ops spoe_ops;
268
269 static void offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx);
270 static void on_new_spoe_appctx_failure(struct spoe_agent *agent);
271 static void on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx);
272
273 /********************************************************************
274 * helper functions/globals
275 ********************************************************************/
276 static void
release_spoe_msg_placeholder(struct spoe_msg_placeholder * mp)277 release_spoe_msg_placeholder(struct spoe_msg_placeholder *mp)
278 {
279 if (!mp)
280 return;
281 free(mp->id);
282 free(mp);
283 }
284
285
286 static void
release_spoe_message(struct spoe_message * msg)287 release_spoe_message(struct spoe_message *msg)
288 {
289 struct spoe_arg *arg, *back;
290
291 if (!msg)
292 return;
293 free(msg->id);
294 free(msg->conf.file);
295 list_for_each_entry_safe(arg, back, &msg->args, list) {
296 release_sample_expr(arg->expr);
297 free(arg->name);
298 LIST_DEL(&arg->list);
299 free(arg);
300 }
301 free(msg);
302 }
303
304 static void
release_spoe_agent(struct spoe_agent * agent)305 release_spoe_agent(struct spoe_agent *agent)
306 {
307 struct spoe_message *msg, *back;
308 int i;
309
310 if (!agent)
311 return;
312 free(agent->id);
313 free(agent->conf.file);
314 free(agent->var_pfx);
315 free(agent->var_on_error);
316 for (i = 0; i < SPOE_EV_EVENTS; ++i) {
317 list_for_each_entry_safe(msg, back, &agent->messages[i], list) {
318 LIST_DEL(&msg->list);
319 release_spoe_message(msg);
320 }
321 }
322 free(agent);
323 }
324
325 static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
326 [SPOE_FRM_ERR_NONE] = "normal",
327 [SPOE_FRM_ERR_IO] = "I/O error",
328 [SPOE_FRM_ERR_TOUT] = "a timeout occurred",
329 [SPOE_FRM_ERR_TOO_BIG] = "frame is too big",
330 [SPOE_FRM_ERR_INVALID] = "invalid frame received",
331 [SPOE_FRM_ERR_NO_VSN] = "version value not found",
332 [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found",
333 [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found",
334 [SPOE_FRM_ERR_BAD_VSN] = "unsupported version",
335 [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
336 [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred",
337 };
338
339 static const char *spoe_event_str[SPOE_EV_EVENTS] = {
340 [SPOE_EV_ON_CLIENT_SESS] = "on-client-session",
341 [SPOE_EV_ON_TCP_REQ_FE] = "on-frontend-tcp-request",
342 [SPOE_EV_ON_TCP_REQ_BE] = "on-backend-tcp-request",
343 [SPOE_EV_ON_HTTP_REQ_FE] = "on-frontend-http-request",
344 [SPOE_EV_ON_HTTP_REQ_BE] = "on-backend-http-request",
345
346 [SPOE_EV_ON_SERVER_SESS] = "on-server-session",
347 [SPOE_EV_ON_TCP_RSP] = "on-tcp-response",
348 [SPOE_EV_ON_HTTP_RSP] = "on-http-response",
349 };
350
351
352 #if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
353
354 static const char *spoe_ctx_state_str[SPOE_CTX_ST_ERROR+1] = {
355 [SPOE_CTX_ST_NONE] = "NONE",
356 [SPOE_CTX_ST_READY] = "READY",
357 [SPOE_CTX_ST_SENDING_MSGS] = "SENDING_MSGS",
358 [SPOE_CTX_ST_WAITING_ACK] = "WAITING_ACK",
359 [SPOE_CTX_ST_DONE] = "DONE",
360 [SPOE_CTX_ST_ERROR] = "ERROR",
361 };
362
363 static const char *spoe_appctx_state_str[SPOE_APPCTX_ST_END+1] = {
364 [SPOE_APPCTX_ST_CONNECT] = "CONNECT",
365 [SPOE_APPCTX_ST_CONNECTING] = "CONNECTING",
366 [SPOE_APPCTX_ST_PROCESSING] = "PROCESSING",
367 [SPOE_APPCTX_ST_DISCONNECT] = "DISCONNECT",
368 [SPOE_APPCTX_ST_DISCONNECTING] = "DISCONNECTING",
369 [SPOE_APPCTX_ST_EXIT] = "EXIT",
370 [SPOE_APPCTX_ST_END] = "END",
371 };
372
373 #endif
374 /********************************************************************
375 * Functions that encode/decode SPOE frames
376 ********************************************************************/
377 /* Frame Types sent by HAProxy and by agents */
378 enum spoe_frame_type {
379 /* Frames sent by HAProxy */
380 SPOE_FRM_T_HAPROXY_HELLO = 1,
381 SPOE_FRM_T_HAPROXY_DISCON,
382 SPOE_FRM_T_HAPROXY_NOTIFY,
383
384 /* Frames sent by the agents */
385 SPOE_FRM_T_AGENT_HELLO = 101,
386 SPOE_FRM_T_AGENT_DISCON,
387 SPOE_FRM_T_AGENT_ACK
388 };
389
390 /* All supported data types */
391 enum spoe_data_type {
392 SPOE_DATA_T_NULL = 0,
393 SPOE_DATA_T_BOOL,
394 SPOE_DATA_T_INT32,
395 SPOE_DATA_T_UINT32,
396 SPOE_DATA_T_INT64,
397 SPOE_DATA_T_UINT64,
398 SPOE_DATA_T_IPV4,
399 SPOE_DATA_T_IPV6,
400 SPOE_DATA_T_STR,
401 SPOE_DATA_T_BIN,
402 SPOE_DATA_TYPES
403 };
404
405 /* Masks to get data type or flags value */
406 #define SPOE_DATA_T_MASK 0x0F
407 #define SPOE_DATA_FL_MASK 0xF0
408
409 /* Flags to set Boolean values */
410 #define SPOE_DATA_FL_FALSE 0x00
411 #define SPOE_DATA_FL_TRUE 0x10
412
413 /* Helper to get static string length, excluding the terminating null byte */
414 #define SLEN(str) (sizeof(str)-1)
415
416 /* Predefined key used in HELLO/DISCONNECT frames */
417 #define SUPPORTED_VERSIONS_KEY "supported-versions"
418 #define VERSION_KEY "version"
419 #define MAX_FRAME_SIZE_KEY "max-frame-size"
420 #define CAPABILITIES_KEY "capabilities"
421 #define HEALTHCHECK_KEY "healthcheck"
422 #define STATUS_CODE_KEY "status-code"
423 #define MSG_KEY "message"
424
425 struct spoe_version {
426 char *str;
427 int min;
428 int max;
429 };
430
431 /* All supported versions */
432 static struct spoe_version supported_versions[] = {
433 {"1.0", 1000, 1000},
434 {NULL, 0, 0}
435 };
436
437 /* Comma-separated list of supported versions */
438 #define SUPPORTED_VERSIONS_VAL "1.0"
439
440 /* Comma-separated list of supported capabilities (none for now) */
441 #define CAPABILITIES_VAL ""
442
443 static int
decode_spoe_version(const char * str,size_t len)444 decode_spoe_version(const char *str, size_t len)
445 {
446 char tmp[len+1], *start, *end;
447 double d;
448 int vsn = -1;
449
450 memset(tmp, 0, len+1);
451 memcpy(tmp, str, len);
452
453 start = tmp;
454 while (isspace(*start))
455 start++;
456
457 d = strtod(start, &end);
458 if (d == 0 || start == end)
459 goto out;
460
461 if (*end) {
462 while (isspace(*end))
463 end++;
464 if (*end)
465 goto out;
466 }
467 vsn = (int)(d * 1000);
468 out:
469 return vsn;
470 }
471
472 /* Encode a variable-length integer. This function never fails and returns the
473 * number of written bytes. */
474 static int
encode_spoe_varint(uint64_t i,char * buf)475 encode_spoe_varint(uint64_t i, char *buf)
476 {
477 int idx;
478
479 if (i < 240) {
480 buf[0] = (unsigned char)i;
481 return 1;
482 }
483
484 buf[0] = (unsigned char)i | 240;
485 i = (i - 240) >> 4;
486 for (idx = 1; i >= 128; ++idx) {
487 buf[idx] = (unsigned char)i | 128;
488 i = (i - 128) >> 7;
489 }
490 buf[idx++] = (unsigned char)i;
491 return idx;
492 }
493
494 /* Decode a varable-length integer. If the decoding fails, -1 is returned. This
495 * happens when the buffer's end in reached. On success, the number of read
496 * bytes is returned. */
497 static int
decode_spoe_varint(const char * buf,const char * end,uint64_t * i)498 decode_spoe_varint(const char *buf, const char *end, uint64_t *i)
499 {
500 unsigned char *msg = (unsigned char *)buf;
501 int idx = 0;
502
503 if (msg > (unsigned char *)end)
504 return -1;
505
506 if (msg[0] < 240) {
507 *i = msg[0];
508 return 1;
509 }
510 *i = msg[0];
511 do {
512 ++idx;
513 if (msg+idx > (unsigned char *)end)
514 return -1;
515 *i += (uint64_t)msg[idx] << (4 + 7 * (idx-1));
516 } while (msg[idx] >= 128);
517 return (idx + 1);
518 }
519
520 /* Encode a string. The string will be prefix by its length, encoded as a
521 * variable-length integer. This function never fails and returns the number of
522 * written bytes. */
523 static int
encode_spoe_string(const char * str,size_t len,char * dst)524 encode_spoe_string(const char *str, size_t len, char *dst)
525 {
526 int idx = 0;
527
528 if (!len) {
529 dst[0] = 0;
530 return 1;
531 }
532
533 idx += encode_spoe_varint(len, dst);
534 memcpy(dst+idx, str, len);
535 return (idx + len);
536 }
537
538 /* Decode a string. Its length is decoded first as a variable-length integer. If
539 * it succeeds, and if the string length is valid, the begin of the string is
540 * saved in <*str>, its length is saved in <*len> and the total numbre of bytes
541 * read is returned. If an error occurred, -1 is returned and <*str> remains
542 * NULL. */
543 static int
decode_spoe_string(char * buf,char * end,char ** str,uint64_t * len)544 decode_spoe_string(char *buf, char *end, char **str, uint64_t *len)
545 {
546 int i, idx = 0;
547
548 *str = NULL;
549 *len = 0;
550
551 if ((i = decode_spoe_varint(buf, end, len)) == -1)
552 goto error;
553 idx += i;
554 if (buf + idx + *len > end)
555 goto error;
556
557 *str = buf+idx;
558 return (idx + *len);
559
560 error:
561 return -1;
562 }
563
564 /* Skip a typed data. If an error occurred, -1 is returned, otherwise the number
565 * of bytes read is returned. A types data is composed of a type (1 byte) and
566 * corresponding data:
567 * - boolean: non additional data (0 bytes)
568 * - integers: a variable-length integer (see decode_spoe_varint)
569 * - ipv4: 4 bytes
570 * - ipv6: 16 bytes
571 * - binary and string: a buffer prefixed by its size, a variable-length
572 * integer (see decode_spoe_string) */
573 static int
skip_spoe_data(char * frame,char * end)574 skip_spoe_data(char *frame, char *end)
575 {
576 uint64_t sz = 0;
577 int i, idx = 0;
578
579 if (frame > end)
580 return -1;
581
582 switch (frame[idx++] & SPOE_DATA_T_MASK) {
583 case SPOE_DATA_T_BOOL:
584 break;
585 case SPOE_DATA_T_INT32:
586 case SPOE_DATA_T_INT64:
587 case SPOE_DATA_T_UINT32:
588 case SPOE_DATA_T_UINT64:
589 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
590 return -1;
591 idx += i;
592 break;
593 case SPOE_DATA_T_IPV4:
594 idx += 4;
595 break;
596 case SPOE_DATA_T_IPV6:
597 idx += 16;
598 break;
599 case SPOE_DATA_T_STR:
600 case SPOE_DATA_T_BIN:
601 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
602 return -1;
603 idx += i + sz;
604 break;
605 }
606
607 if (frame+idx > end)
608 return -1;
609 return idx;
610 }
611
612 /* Decode a typed data. If an error occurred, -1 is returned, otherwise the
613 * number of read bytes is returned. See skip_spoe_data for details. */
614 static int
decode_spoe_data(char * frame,char * end,struct sample * smp)615 decode_spoe_data(char *frame, char *end, struct sample *smp)
616 {
617 uint64_t sz = 0;
618 int type, i, idx = 0;
619
620 if (frame > end)
621 return -1;
622
623 type = frame[idx++];
624 switch (type & SPOE_DATA_T_MASK) {
625 case SPOE_DATA_T_BOOL:
626 smp->data.u.sint = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
627 smp->data.type = SMP_T_BOOL;
628 break;
629 case SPOE_DATA_T_INT32:
630 case SPOE_DATA_T_INT64:
631 case SPOE_DATA_T_UINT32:
632 case SPOE_DATA_T_UINT64:
633 if ((i = decode_spoe_varint(frame+idx, end, (uint64_t *)&smp->data.u.sint)) == -1)
634 return -1;
635 idx += i;
636 smp->data.type = SMP_T_SINT;
637 break;
638 case SPOE_DATA_T_IPV4:
639 if (frame+idx+4 > end)
640 return -1;
641 memcpy(&smp->data.u.ipv4, frame+idx, 4);
642 smp->data.type = SMP_T_IPV4;
643 idx += 4;
644 break;
645 case SPOE_DATA_T_IPV6:
646 if (frame+idx+16 > end)
647 return -1;
648 memcpy(&smp->data.u.ipv6, frame+idx, 16);
649 smp->data.type = SMP_T_IPV6;
650 idx += 16;
651 break;
652 case SPOE_DATA_T_STR:
653 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
654 return -1;
655 idx += i;
656 if (frame+idx+sz > end)
657 return -1;
658 smp->data.u.str.str = frame+idx;
659 smp->data.u.str.len = sz;
660 smp->data.type = SMP_T_STR;
661 idx += sz;
662 break;
663 case SPOE_DATA_T_BIN:
664 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
665 return -1;
666 idx += i;
667 if (frame+idx+sz > end)
668 return -1;
669 smp->data.u.str.str = frame+idx;
670 smp->data.u.str.len = sz;
671 smp->data.type = SMP_T_BIN;
672 idx += sz;
673 break;
674 }
675
676 if (frame+idx > end)
677 return -1;
678 return idx;
679 }
680
681 /* Skip an action in a frame received from an agent. If an error occurred, -1 is
682 * returned, otherwise the number of read bytes is returned. An action is
683 * composed of the action type followed by a typed data. */
684 static int
skip_spoe_action(char * frame,char * end)685 skip_spoe_action(char *frame, char *end)
686 {
687 int n, i, idx = 0;
688
689 if (frame+2 > end)
690 return -1;
691
692 idx++; /* Skip the action type */
693 n = frame[idx++];
694 while (n-- > 0) {
695 if ((i = skip_spoe_data(frame+idx, end)) == -1)
696 return -1;
697 idx += i;
698 }
699
700 if (frame+idx > end)
701 return -1;
702 return idx;
703 }
704
705 /* Encode HELLO frame sent by HAProxy to an agent. It returns the frame size on
706 * success, 0 if the frame can be ignored and -1 if an error occurred. */
707 static int
prepare_spoe_hahello_frame(struct appctx * appctx,char * frame,size_t size)708 prepare_spoe_hahello_frame(struct appctx *appctx, char *frame, size_t size)
709 {
710 int idx = 0;
711 size_t max = (7 /* TYPE + METADATA */
712 + 1 + SLEN(SUPPORTED_VERSIONS_KEY) + 1 + 1 + SLEN(SUPPORTED_VERSIONS_VAL)
713 + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 4
714 + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + SLEN(CAPABILITIES_VAL));
715
716 if (size < max)
717 return -1;
718
719 /* Frame type */
720 frame[idx++] = SPOE_FRM_T_HAPROXY_HELLO;
721
722 /* No flags for now */
723 memset(frame+idx, 0, 4);
724 idx += 4;
725
726 /* No stream-id and frame-id for HELLO frames */
727 frame[idx++] = 0;
728 frame[idx++] = 0;
729
730 /* There are 3 mandatory items: "supported-versions", "max-frame-size"
731 * and "capabilities" */
732
733 /* "supported-versions" K/V item */
734 idx += encode_spoe_string(SUPPORTED_VERSIONS_KEY, SLEN(SUPPORTED_VERSIONS_KEY), frame+idx);
735 frame[idx++] = SPOE_DATA_T_STR;
736 idx += encode_spoe_string(SUPPORTED_VERSIONS_VAL, SLEN(SUPPORTED_VERSIONS_VAL), frame+idx);
737
738 /* "max-fram-size" K/V item */
739 idx += encode_spoe_string(MAX_FRAME_SIZE_KEY, SLEN(MAX_FRAME_SIZE_KEY), frame+idx);
740 frame[idx++] = SPOE_DATA_T_UINT32;
741 idx += encode_spoe_varint(APPCTX_SPOE(appctx).max_frame_size, frame+idx);
742
743 /* "capabilities" K/V item */
744 idx += encode_spoe_string(CAPABILITIES_KEY, SLEN(CAPABILITIES_KEY), frame+idx);
745 frame[idx++] = SPOE_DATA_T_STR;
746 idx += encode_spoe_string(CAPABILITIES_VAL, SLEN(CAPABILITIES_VAL), frame+idx);
747
748 return idx;
749 }
750
751 /* Encode DISCONNECT frame sent by HAProxy to an agent. It returns the frame
752 * size on success, 0 if the frame can be ignored and -1 if an error
753 * occurred. */
754 static int
prepare_spoe_hadiscon_frame(struct appctx * appctx,char * frame,size_t size)755 prepare_spoe_hadiscon_frame(struct appctx *appctx, char *frame, size_t size)
756 {
757 const char *reason;
758 int rlen, idx = 0;
759 size_t max = (7 /* TYPE + METADATA */
760 + 1 + SLEN(STATUS_CODE_KEY) + 1 + 2
761 + 1 + SLEN(MSG_KEY) + 1 + 2 + 255);
762
763 if (size < max)
764 return -1;
765
766 /* Get the message corresponding to the status code */
767 if (spoe_status_code >= SPOE_FRM_ERRS)
768 spoe_status_code = SPOE_FRM_ERR_UNKNOWN;
769 reason = spoe_frm_err_reasons[spoe_status_code];
770 rlen = strlen(reason);
771
772 /* Frame type */
773 frame[idx++] = SPOE_FRM_T_HAPROXY_DISCON;
774
775 /* No flags for now */
776 memset(frame+idx, 0, 4);
777 idx += 4;
778
779 /* No stream-id and frame-id for DISCONNECT frames */
780 frame[idx++] = 0;
781 frame[idx++] = 0;
782
783 /* There are 2 mandatory items: "status-code" and "message" */
784
785 /* "status-code" K/V item */
786 idx += encode_spoe_string(STATUS_CODE_KEY, SLEN(STATUS_CODE_KEY), frame+idx);
787 frame[idx++] = SPOE_DATA_T_UINT32;
788 idx += encode_spoe_varint(spoe_status_code, frame+idx);
789
790 /* "message" K/V item */
791 idx += encode_spoe_string(MSG_KEY, SLEN(MSG_KEY), frame+idx);
792 frame[idx++] = SPOE_DATA_T_STR;
793 idx += encode_spoe_string(reason, rlen, frame+idx);
794
795 return idx;
796 }
797
798 /* Encode NOTIFY frame sent by HAProxy to an agent. It returns the frame size on
799 * success, 0 if the frame can be ignored and -1 if an error occurred. */
800 static int
prepare_spoe_hanotify_frame(struct appctx * appctx,char * frame,size_t size)801 prepare_spoe_hanotify_frame(struct appctx *appctx, char *frame, size_t size)
802 {
803 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
804 int idx = 0;
805
806 if (size < APPCTX_SPOE(appctx).max_frame_size)
807 return -1;
808
809 frame[idx++] = SPOE_FRM_T_HAPROXY_NOTIFY;
810
811 /* No flags for now */
812 memset(frame+idx, 0, 4);
813 idx += 4;
814
815 /* Set stream-id and frame-id */
816 idx += encode_spoe_varint(ctx->stream_id, frame+idx);
817 idx += encode_spoe_varint(ctx->frame_id, frame+idx);
818
819 /* Copy encoded messages */
820 memcpy(frame+idx, ctx->buffer->p, ctx->buffer->i);
821 idx += ctx->buffer->i;
822
823 return idx;
824 }
825
826 /* Decode HELLO frame sent by an agent. It returns the number of by read bytes
827 * on success, 0 if the frame can be ignored and -1 if an error occurred. */
828 static int
handle_spoe_agenthello_frame(struct appctx * appctx,char * frame,size_t size)829 handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size)
830 {
831 int vsn, max_frame_size;
832 int i, idx = 0;
833 size_t min_size = (7 /* TYPE + METADATA */
834 + 1 + SLEN(VERSION_KEY) + 1 + 1 + 3
835 + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 1
836 + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + 0);
837
838 /* Check frame type */
839 if (frame[idx++] != SPOE_FRM_T_AGENT_HELLO)
840 return 0;
841
842 if (size < min_size) {
843 spoe_status_code = SPOE_FRM_ERR_INVALID;
844 return -1;
845 }
846
847 /* Skip flags: fragmentation is not supported for now */
848 idx += 4;
849
850 /* stream-id and frame-id must be cleared */
851 if (frame[idx] != 0 || frame[idx+1] != 0) {
852 spoe_status_code = SPOE_FRM_ERR_INVALID;
853 return -1;
854 }
855 idx += 2;
856
857 /* There are 3 mandatory items: "version", "max-frame-size" and
858 * "capabilities" */
859
860 /* Loop on K/V items */
861 vsn = max_frame_size = 0;
862 while (idx < size) {
863 char *str;
864 uint64_t sz;
865
866 /* Decode the item key */
867 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
868 if (str == NULL) {
869 spoe_status_code = SPOE_FRM_ERR_INVALID;
870 return -1;
871 }
872 /* Check "version" K/V item */
873 if (!memcmp(str, VERSION_KEY, sz)) {
874 /* The value must be a string */
875 if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
876 spoe_status_code = SPOE_FRM_ERR_INVALID;
877 return -1;
878 }
879 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
880 if (str == NULL) {
881 spoe_status_code = SPOE_FRM_ERR_INVALID;
882 return -1;
883 }
884
885 vsn = decode_spoe_version(str, sz);
886 if (vsn == -1) {
887 spoe_status_code = SPOE_FRM_ERR_BAD_VSN;
888 return -1;
889 }
890 for (i = 0; supported_versions[i].str != NULL; ++i) {
891 if (vsn >= supported_versions[i].min &&
892 vsn <= supported_versions[i].max)
893 break;
894 }
895 if (supported_versions[i].str == NULL) {
896 spoe_status_code = SPOE_FRM_ERR_BAD_VSN;
897 return -1;
898 }
899 }
900 /* Check "max-frame-size" K/V item */
901 else if (!memcmp(str, MAX_FRAME_SIZE_KEY, sz)) {
902 int type;
903
904 /* The value must be integer */
905 type = frame[idx++];
906 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
907 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
908 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
909 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
910 spoe_status_code = SPOE_FRM_ERR_INVALID;
911 return -1;
912 }
913 if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) {
914 spoe_status_code = SPOE_FRM_ERR_INVALID;
915 return -1;
916 }
917 idx += i;
918 if (sz < MIN_FRAME_SIZE || sz > APPCTX_SPOE(appctx).max_frame_size) {
919 spoe_status_code = SPOE_FRM_ERR_BAD_FRAME_SIZE;
920 return -1;
921 }
922 max_frame_size = sz;
923 }
924 /* Skip "capabilities" K/V item for now */
925 else {
926 /* Silently ignore unknown item */
927 if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
928 spoe_status_code = SPOE_FRM_ERR_INVALID;
929 return -1;
930 }
931 idx += i;
932 }
933 }
934
935 /* Final checks */
936 if (!vsn) {
937 spoe_status_code = SPOE_FRM_ERR_NO_VSN;
938 return -1;
939 }
940 if (!max_frame_size) {
941 spoe_status_code = SPOE_FRM_ERR_NO_FRAME_SIZE;
942 return -1;
943 }
944
945 APPCTX_SPOE(appctx).version = (unsigned int)vsn;
946 APPCTX_SPOE(appctx).max_frame_size = (unsigned int)max_frame_size;
947 return idx;
948 }
949
950 /* Decode DISCONNECT frame sent by an agent. It returns the number of by read
951 * bytes on success, 0 if the frame can be ignored and -1 if an error
952 * occurred. */
953 static int
handle_spoe_agentdiscon_frame(struct appctx * appctx,char * frame,size_t size)954 handle_spoe_agentdiscon_frame(struct appctx *appctx, char *frame, size_t size)
955 {
956 int i, idx = 0;
957 size_t min_size = (7 /* TYPE + METADATA */
958 + 1 + SLEN(STATUS_CODE_KEY) + 1 + 1
959 + 1 + SLEN(MSG_KEY) + 1 + 1);
960
961 /* Check frame type */
962 if (frame[idx++] != SPOE_FRM_T_AGENT_DISCON)
963 return 0;
964
965 if (size < min_size) {
966 spoe_status_code = SPOE_FRM_ERR_INVALID;
967 return -1;
968 }
969
970 /* Skip flags: fragmentation is not supported for now */
971 idx += 4;
972
973 /* stream-id and frame-id must be cleared */
974 if (frame[idx] != 0 || frame[idx+1] != 0) {
975 spoe_status_code = SPOE_FRM_ERR_INVALID;
976 return -1;
977 }
978 idx += 2;
979
980 /* There are 2 mandatory items: "status-code" and "message" */
981
982 /* Loop on K/V items */
983 while (idx < size) {
984 char *str;
985 uint64_t sz;
986
987 /* Decode the item key */
988 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
989 if (str == NULL) {
990 spoe_status_code = SPOE_FRM_ERR_INVALID;
991 return -1;
992 }
993
994 /* Check "status-code" K/V item */
995 if (!memcmp(str, STATUS_CODE_KEY, sz)) {
996 int type;
997
998 /* The value must be an integer */
999 type = frame[idx++];
1000 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
1001 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
1002 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
1003 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
1004 spoe_status_code = SPOE_FRM_ERR_INVALID;
1005 return -1;
1006 }
1007 if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) {
1008 spoe_status_code = SPOE_FRM_ERR_INVALID;
1009 return -1;
1010 }
1011 idx += i;
1012 spoe_status_code = sz;
1013 }
1014
1015 /* Check "message" K/V item */
1016 else if (sz && !memcmp(str, MSG_KEY, sz)) {
1017 /* The value must be a string */
1018 if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
1019 spoe_status_code = SPOE_FRM_ERR_INVALID;
1020 return -1;
1021 }
1022 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
1023 if (str == NULL || sz > 255) {
1024 spoe_status_code = SPOE_FRM_ERR_INVALID;
1025 return -1;
1026 }
1027 memcpy(spoe_reason, str, sz);
1028 spoe_reason[sz] = 0;
1029 }
1030 else {
1031 /* Silently ignore unknown item */
1032 if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
1033 spoe_status_code = SPOE_FRM_ERR_INVALID;
1034 return -1;
1035 }
1036 idx += i;
1037 }
1038 }
1039
1040 return idx;
1041 }
1042
1043
1044 /* Decode ACK frame sent by an agent. It returns the number of by read bytes on
1045 * success, 0 if the frame can be ignored and -1 if an error occurred. */
1046 static int
handle_spoe_agentack_frame(struct appctx * appctx,char * frame,size_t size)1047 handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size)
1048 {
1049 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
1050 uint64_t stream_id, frame_id;
1051 int idx = 0, i;
1052 size_t min_size = (7 /* TYPE + METADATA */);
1053
1054 /* Check frame type */
1055 if (frame[idx++] != SPOE_FRM_T_AGENT_ACK)
1056 return 0;
1057
1058 if (size < min_size) {
1059 spoe_status_code = SPOE_FRM_ERR_INVALID;
1060 return -1;
1061 }
1062
1063 /* Skip flags: fragmentation is not supported for now */
1064 idx += 4;
1065
1066 /* Get the stream-id and the frame-id */
1067 if ((i = decode_spoe_varint(frame+idx, frame+size, &stream_id)) == -1) {
1068 spoe_status_code = SPOE_FRM_ERR_INVALID;
1069 return -1;
1070 }
1071 idx += i;
1072
1073 if ((i = decode_spoe_varint(frame+idx, frame+size, &frame_id)) == -1) {
1074 spoe_status_code = SPOE_FRM_ERR_INVALID;
1075 return -1;
1076 }
1077 idx += i;
1078
1079 /* Check stream-id and frame-id */
1080 if (ctx->stream_id != (unsigned int)stream_id ||
1081 ctx->frame_id != (unsigned int)frame_id)
1082 return 0;
1083
1084 /* Copy encoded actions */
1085 b_reset(ctx->buffer);
1086 memcpy(ctx->buffer->p, frame+idx, size-idx);
1087 ctx->buffer->i = size-idx;
1088
1089 return idx;
1090 }
1091
1092 /* This function is used in cfgparse.c and declared in proto/checks.h. It
1093 * prepare the request to send to agents during a healthcheck. It returns 0 on
1094 * success and -1 if an error occurred. */
1095 int
prepare_spoe_healthcheck_request(char ** req,int * len)1096 prepare_spoe_healthcheck_request(char **req, int *len)
1097 {
1098 struct appctx a;
1099 char *frame, buf[global.tune.bufsize];
1100 unsigned int framesz;
1101 int idx;
1102
1103 memset(&a, 0, sizeof(a));
1104 memset(buf, 0, sizeof(buf));
1105 APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize;
1106
1107 frame = buf+4;
1108 idx = prepare_spoe_hahello_frame(&a, frame, global.tune.bufsize-4);
1109 if (idx <= 0)
1110 return -1;
1111 if (idx + SLEN(HEALTHCHECK_KEY) + 1 > global.tune.bufsize-4)
1112 return -1;
1113
1114 /* "healthcheck" K/V item */
1115 idx += encode_spoe_string(HEALTHCHECK_KEY, SLEN(HEALTHCHECK_KEY), frame+idx);
1116 frame[idx++] = (SPOE_DATA_T_BOOL | SPOE_DATA_FL_TRUE);
1117
1118 framesz = htonl(idx);
1119 memcpy(buf, (char *)&framesz, 4);
1120
1121 if ((*req = malloc(idx+4)) == NULL)
1122 return -1;
1123 memcpy(*req, buf, idx+4);
1124 *len = idx+4;
1125 return 0;
1126 }
1127
1128 /* This function is used in checks.c and declared in proto/checks.h. It decode
1129 * the response received from an agent during a healthcheck. It returns 0 on
1130 * success and -1 if an error occurred. */
1131 int
handle_spoe_healthcheck_response(char * frame,size_t size,char * err,int errlen)1132 handle_spoe_healthcheck_response(char *frame, size_t size, char *err, int errlen)
1133 {
1134 struct appctx a;
1135 int r;
1136
1137 memset(&a, 0, sizeof(a));
1138 APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize;
1139
1140 if (handle_spoe_agentdiscon_frame(&a, frame, size) != 0)
1141 goto error;
1142 if ((r = handle_spoe_agenthello_frame(&a, frame, size)) <= 0) {
1143 if (r == 0)
1144 spoe_status_code = SPOE_FRM_ERR_INVALID;
1145 goto error;
1146 }
1147
1148 return 0;
1149
1150 error:
1151 if (spoe_status_code >= SPOE_FRM_ERRS)
1152 spoe_status_code = SPOE_FRM_ERR_UNKNOWN;
1153 strncpy(err, spoe_frm_err_reasons[spoe_status_code], errlen);
1154 return -1;
1155 }
1156
1157 /********************************************************************
1158 * Functions that manage the SPOE applet
1159 ********************************************************************/
1160 /* Callback function that catches applet timeouts. If a timeout occurred, we set
1161 * <appctx->st1> flag and the SPOE applet is woken up. */
1162 static struct task *
process_spoe_applet(struct task * task)1163 process_spoe_applet(struct task * task)
1164 {
1165 struct appctx *appctx = task->context;
1166
1167 appctx->st1 = SPOE_APPCTX_ERR_NONE;
1168 if (tick_is_expired(task->expire, now_ms)) {
1169 task->expire = TICK_ETERNITY;
1170 appctx->st1 = SPOE_APPCTX_ERR_TOUT;
1171 }
1172 si_applet_want_get(appctx->owner);
1173 appctx_wakeup(appctx);
1174 return task;
1175 }
1176
1177 /* Remove a SPOE applet from the agent cache */
1178 static void
remove_spoe_applet_from_cache(struct appctx * appctx)1179 remove_spoe_applet_from_cache(struct appctx *appctx)
1180 {
1181 struct appctx *a, *back;
1182 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1183
1184 if (LIST_ISEMPTY(&agent->cache))
1185 return;
1186
1187 list_for_each_entry_safe(a, back, &agent->cache, ctx.spoe.list) {
1188 if (a == appctx) {
1189 LIST_DEL(&APPCTX_SPOE(appctx).list);
1190 break;
1191 }
1192 }
1193 }
1194
1195
1196 /* Callback function that releases a SPOE applet. This happens when the
1197 * connection with the agent is closed. */
1198 static void
release_spoe_applet(struct appctx * appctx)1199 release_spoe_applet(struct appctx *appctx)
1200 {
1201 struct stream_interface *si = appctx->owner;
1202 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1203 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
1204
1205 if (appctx->st0 == SPOE_APPCTX_ST_CONNECT ||
1206 appctx->st0 == SPOE_APPCTX_ST_CONNECTING)
1207 on_new_spoe_appctx_failure(agent);
1208
1209 if (appctx->st0 != SPOE_APPCTX_ST_END) {
1210 si_shutw(si);
1211 si_shutr(si);
1212 si_ic(si)->flags |= CF_READ_NULL;
1213 appctx->st0 = SPOE_APPCTX_ST_END;
1214 }
1215
1216 if (ctx != NULL) {
1217 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1218 ctx->appctx = NULL;
1219 }
1220
1221 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p\n",
1222 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1223 __FUNCTION__, appctx);
1224
1225 /* Release the task attached to the SPOE applet */
1226 if (APPCTX_SPOE(appctx).task) {
1227 task_delete(APPCTX_SPOE(appctx).task);
1228 task_free(APPCTX_SPOE(appctx).task);
1229 }
1230
1231 /* And remove it from the agent cache */
1232 remove_spoe_applet_from_cache(appctx);
1233 APPCTX_SPOE(appctx).ctx = NULL;
1234 }
1235
1236 /* Send a SPOE frame to an agent. It return -2 when an error occurred, -1 when
1237 * the frame can be ignored, 0 to retry later and 1 on success. The frame is
1238 * encoded using the callback function <prepare>. */
1239 static int
send_spoe_frame(struct appctx * appctx,int (* prepare)(struct appctx *,char *,size_t))1240 send_spoe_frame(struct appctx *appctx,
1241 int (*prepare)(struct appctx *, char *, size_t))
1242 {
1243 struct stream_interface *si = appctx->owner;
1244 int framesz, ret;
1245 uint32_t netint;
1246
1247 if (si_ic(si)->buf->size == 0)
1248 return -1;
1249
1250 ret = prepare(appctx, trash.str, APPCTX_SPOE(appctx).max_frame_size);
1251 if (ret <= 0)
1252 goto skip_or_error;
1253 framesz = ret;
1254 netint = htonl(framesz);
1255 ret = bi_putblk(si_ic(si), (char *)&netint, sizeof(netint));
1256 if (ret > 0)
1257 ret = bi_putblk(si_ic(si), trash.str, framesz);
1258 if (ret <= 0) {
1259 if (ret == -1)
1260 return -1;
1261 return -2;
1262 }
1263 return 1;
1264
1265 skip_or_error:
1266 if (!ret)
1267 return -1;
1268 return -2;
1269 }
1270
1271 /* Receive a SPOE frame from an agent. It return -2 when an error occurred, -1
1272 * when the frame can be ignored, 0 to retry later and 1 on success. The frame
1273 * is decoded using the callback function <handle>. */
1274 static int
recv_spoe_frame(struct appctx * appctx,int (* handle)(struct appctx *,char *,size_t))1275 recv_spoe_frame(struct appctx *appctx,
1276 int (*handle)(struct appctx *, char *, size_t))
1277 {
1278 struct stream_interface *si = appctx->owner;
1279 int framesz, ret;
1280 uint32_t netint;
1281
1282 ret = bo_getblk(si_oc(si), (char *)&netint, sizeof(netint), 0);
1283 if (ret <= 0)
1284 goto empty_or_error;
1285 framesz = ntohl(netint);
1286 if (framesz > APPCTX_SPOE(appctx).max_frame_size) {
1287 spoe_status_code = SPOE_FRM_ERR_TOO_BIG;
1288 return -2;
1289 }
1290
1291 ret = bo_getblk(si_oc(si), trash.str, framesz, sizeof(netint));
1292 if (ret <= 0)
1293 goto empty_or_error;
1294 bo_skip(si_oc(si), ret+sizeof(netint));
1295
1296 /* First check if the received frame is a DISCONNECT frame */
1297 ret = handle_spoe_agentdiscon_frame(appctx, trash.str, framesz);
1298 if (ret != 0) {
1299 if (ret > 0) {
1300 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1301 " - disconnected by peer (%d): %s\n",
1302 (int)now.tv_sec, (int)now.tv_usec,
1303 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1304 __FUNCTION__, appctx, spoe_status_code,
1305 spoe_reason);
1306 return 2;
1307 }
1308 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1309 " - error on frame (%s)\n",
1310 (int)now.tv_sec, (int)now.tv_usec,
1311 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1312 __FUNCTION__, appctx,
1313 spoe_frm_err_reasons[spoe_status_code]);
1314 return -2;
1315 }
1316 if (handle == NULL)
1317 goto out;
1318
1319 /* If not, try to decode it */
1320 ret = handle(appctx, trash.str, framesz);
1321 if (ret <= 0) {
1322 if (!ret)
1323 return -1;
1324 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1325 " - error on frame (%s)\n",
1326 (int)now.tv_sec, (int)now.tv_usec,
1327 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1328 __FUNCTION__, appctx,
1329 spoe_frm_err_reasons[spoe_status_code]);
1330 return -2;
1331 }
1332 out:
1333 return 1;
1334
1335 empty_or_error:
1336 if (!ret)
1337 return 0;
1338 spoe_status_code = SPOE_FRM_ERR_IO;
1339 return -2;
1340 }
1341
1342 /* I/O Handler processing messages exchanged with the agent */
1343 static void
handle_spoe_applet(struct appctx * appctx)1344 handle_spoe_applet(struct appctx *appctx)
1345 {
1346 struct stream_interface *si = appctx->owner;
1347 struct stream *s = si_strm(si);
1348 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1349 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
1350 int ret;
1351
1352 switchstate:
1353 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1354 " - appctx-state=%s\n",
1355 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1356 __FUNCTION__, appctx, spoe_appctx_state_str[appctx->st0]);
1357
1358 switch (appctx->st0) {
1359 case SPOE_APPCTX_ST_CONNECT:
1360 spoe_status_code = SPOE_FRM_ERR_NONE;
1361 if (si->state <= SI_ST_CON) {
1362 si_applet_want_put(si);
1363 task_wakeup(s->task, TASK_WOKEN_MSG);
1364 break;
1365 }
1366 else if (si->state != SI_ST_EST) {
1367 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1368 on_new_spoe_appctx_failure(agent);
1369 goto switchstate;
1370 }
1371 ret = send_spoe_frame(appctx, &prepare_spoe_hahello_frame);
1372 if (ret < 0) {
1373 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1374 on_new_spoe_appctx_failure(agent);
1375 goto switchstate;
1376 }
1377 else if (!ret)
1378 goto full;
1379
1380 /* Hello frame was sent. Set the hello timeout and
1381 * wait for the reply. */
1382 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.hello);
1383 appctx->st0 = SPOE_APPCTX_ST_CONNECTING;
1384 /* fall through */
1385
1386 case SPOE_APPCTX_ST_CONNECTING:
1387 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1388 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1389 on_new_spoe_appctx_failure(agent);
1390 goto switchstate;
1391 }
1392 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1393 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1394 " - Connection timed out\n",
1395 (int)now.tv_sec, (int)now.tv_usec,
1396 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1397 __FUNCTION__, appctx);
1398 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1399 on_new_spoe_appctx_failure(agent);
1400 goto switchstate;
1401 }
1402 ret = recv_spoe_frame(appctx, &handle_spoe_agenthello_frame);
1403 if (ret < 0) {
1404 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1405 on_new_spoe_appctx_failure(agent);
1406 goto switchstate;
1407 }
1408 if (ret == 2) {
1409 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1410 on_new_spoe_appctx_failure(agent);
1411 goto switchstate;
1412 }
1413 if (!ret)
1414 goto out;
1415
1416 /* hello handshake is finished, set the idle timeout,
1417 * Add the appctx in the agent cache, decrease the
1418 * number of new applets and wake up waiting streams. */
1419 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1420 appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
1421 on_new_spoe_appctx_success(agent, appctx);
1422 break;
1423
1424 case SPOE_APPCTX_ST_PROCESSING:
1425 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1426 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1427 goto switchstate;
1428 }
1429 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1430 spoe_status_code = SPOE_FRM_ERR_TOUT;
1431 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1432 appctx->st1 = SPOE_APPCTX_ERR_NONE;
1433 goto switchstate;
1434 }
1435 if (ctx != NULL && ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
1436 ret = send_spoe_frame(appctx, &prepare_spoe_hanotify_frame);
1437 if (ret < 0) {
1438 if (ret == -1) {
1439 ctx->state = SPOE_CTX_ST_ERROR;
1440 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1441 goto skip_notify_frame;
1442 }
1443 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1444 goto switchstate;
1445 }
1446 else if (!ret)
1447 goto full;
1448 ctx->state = SPOE_CTX_ST_WAITING_ACK;
1449 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1450 }
1451
1452 skip_notify_frame:
1453 if (ctx != NULL && ctx->state == SPOE_CTX_ST_WAITING_ACK) {
1454 ret = recv_spoe_frame(appctx, &handle_spoe_agentack_frame);
1455 if (ret < 0) {
1456 if (ret == -1)
1457 goto skip_notify_frame;
1458 ctx->state = SPOE_CTX_ST_ERROR;
1459 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1460 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1461 goto switchstate;
1462 }
1463 if (!ret)
1464 goto out;
1465 if (ret == 2) {
1466 ctx->state = SPOE_CTX_ST_ERROR;
1467 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1468 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1469 goto switchstate;
1470 }
1471 ctx->state = SPOE_CTX_ST_DONE;
1472 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1473 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1474 }
1475 else {
1476 if (stopping) {
1477 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1478 goto switchstate;
1479 }
1480
1481 ret = recv_spoe_frame(appctx, NULL);
1482 if (ret < 0) {
1483 if (ret == -1)
1484 goto skip_notify_frame;
1485 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1486 goto switchstate;
1487 }
1488 if (!ret)
1489 goto out;
1490 if (ret == 2) {
1491 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1492 goto switchstate;
1493 }
1494 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1495 }
1496 break;
1497
1498 case SPOE_APPCTX_ST_DISCONNECT:
1499 ret = send_spoe_frame(appctx, &prepare_spoe_hadiscon_frame);
1500 if (ret < 0) {
1501 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1502 goto switchstate;
1503 }
1504 else if (!ret)
1505 goto full;
1506 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1507 " - disconnected by HAProxy (%d): %s\n",
1508 (int)now.tv_sec, (int)now.tv_usec,
1509 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1510 __FUNCTION__, appctx, spoe_status_code,
1511 spoe_frm_err_reasons[spoe_status_code]);
1512
1513 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1514 appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
1515 /* fall through */
1516
1517 case SPOE_APPCTX_ST_DISCONNECTING:
1518 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1519 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1520 goto switchstate;
1521 }
1522 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1523 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1524 goto switchstate;
1525 }
1526 ret = recv_spoe_frame(appctx, NULL);
1527 if (ret < 0 || ret == 2) {
1528 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1529 goto switchstate;
1530 }
1531 break;
1532
1533 case SPOE_APPCTX_ST_EXIT:
1534 si_shutw(si);
1535 si_shutr(si);
1536 si_ic(si)->flags |= CF_READ_NULL;
1537 appctx->st0 = SPOE_APPCTX_ST_END;
1538 APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY;
1539 /* fall through */
1540
1541 case SPOE_APPCTX_ST_END:
1542 return;
1543 }
1544
1545 out:
1546 if (APPCTX_SPOE(appctx).task->expire != TICK_ETERNITY)
1547 task_queue(APPCTX_SPOE(appctx).task);
1548 si_oc(si)->flags |= CF_READ_DONTWAIT;
1549 task_wakeup(si_strm(si)->task, TASK_WOKEN_IO);
1550 return;
1551 full:
1552 si_applet_cant_put(si);
1553 goto out;
1554 }
1555
1556 struct applet spoe_applet = {
1557 .obj_type = OBJ_TYPE_APPLET,
1558 .name = "<SPOE>", /* used for logging */
1559 .fct = handle_spoe_applet,
1560 .release = release_spoe_applet,
1561 };
1562
1563 /* Create a SPOE applet. On success, the created applet is returned, else
1564 * NULL. */
1565 static struct appctx *
create_spoe_appctx(struct spoe_config * conf)1566 create_spoe_appctx(struct spoe_config *conf)
1567 {
1568 struct appctx *appctx;
1569 struct session *sess;
1570 struct task *task;
1571 struct stream *strm;
1572 struct listener *l = LIST_NEXT(&conf->agent_fe.conf.listeners,
1573 struct listener *, by_fe);
1574
1575 if ((appctx = appctx_new(&spoe_applet)) == NULL)
1576 goto out_error;
1577
1578 appctx->st0 = SPOE_APPCTX_ST_CONNECT;
1579 if ((APPCTX_SPOE(appctx).task = task_new()) == NULL)
1580 goto out_free_appctx;
1581 APPCTX_SPOE(appctx).task->process = process_spoe_applet;
1582 APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY;
1583 APPCTX_SPOE(appctx).task->context = appctx;
1584 APPCTX_SPOE(appctx).agent = conf->agent;
1585 APPCTX_SPOE(appctx).ctx = NULL;
1586 APPCTX_SPOE(appctx).version = 0;
1587 APPCTX_SPOE(appctx).max_frame_size = global.tune.bufsize;
1588 task_wakeup(APPCTX_SPOE(appctx).task, TASK_WOKEN_INIT);
1589
1590 sess = session_new(&conf->agent_fe, l, &appctx->obj_type);
1591 if (!sess)
1592 goto out_free_spoe;
1593
1594 if ((task = task_new()) == NULL)
1595 goto out_free_sess;
1596
1597 if ((strm = stream_new(sess, task, &appctx->obj_type)) == NULL)
1598 goto out_free_task;
1599
1600 strm->target = sess->listener->default_target;
1601 strm->req.analysers |= sess->listener->analysers;
1602 stream_set_backend(strm, conf->agent->b.be);
1603
1604 /* applet is waiting for data */
1605 si_applet_cant_get(&strm->si[0]);
1606 appctx_wakeup(appctx);
1607
1608 /* Increase the per-process number of cumulated connections */
1609 if (conf->agent->cps_max > 0)
1610 update_freq_ctr(&conf->agent->conn_per_sec, 1);
1611
1612 strm->do_log = NULL;
1613 strm->res.flags |= CF_READ_DONTWAIT;
1614
1615 conf->agent_fe.feconn++;
1616 jobs++;
1617 totalconn++;
1618
1619 return appctx;
1620
1621 /* Error unrolling */
1622 out_free_task:
1623 task_free(task);
1624 out_free_sess:
1625 session_free(sess);
1626 out_free_spoe:
1627 task_free(APPCTX_SPOE(appctx).task);
1628 out_free_appctx:
1629 appctx_free(appctx);
1630 out_error:
1631 return NULL;
1632 }
1633
1634 /* Wake up a SPOE applet attached to a SPOE context. */
1635 static void
wakeup_spoe_appctx(struct spoe_context * ctx)1636 wakeup_spoe_appctx(struct spoe_context *ctx)
1637 {
1638 if (ctx->appctx == NULL)
1639 return;
1640 if (ctx->appctx->st0 < SPOE_APPCTX_ST_EXIT) {
1641 si_applet_want_get(ctx->appctx->owner);
1642 si_applet_want_put(ctx->appctx->owner);
1643 appctx_wakeup(ctx->appctx);
1644 }
1645 }
1646
1647
1648 /* Run across the list of pending streams waiting for a SPOE applet and wake the
1649 * first. */
1650 static void
offer_spoe_appctx(struct spoe_agent * agent,struct appctx * appctx)1651 offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx)
1652 {
1653 struct spoe_context *ctx;
1654
1655 if (!appctx || appctx->st0 > SPOE_APPCTX_ST_PROCESSING)
1656 return;
1657
1658 if (LIST_ISEMPTY(&agent->applet_wq))
1659 LIST_ADD(&agent->cache, &APPCTX_SPOE(appctx).list);
1660 else {
1661 ctx = LIST_NEXT(&agent->applet_wq, typeof(ctx), applet_wait);
1662 APPCTX_SPOE(appctx).ctx = ctx;
1663 ctx->appctx = appctx;
1664 LIST_DEL(&ctx->applet_wait);
1665 LIST_INIT(&ctx->applet_wait);
1666 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1667 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1668 " - wake up stream to get available SPOE applet\n",
1669 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1670 __FUNCTION__, ctx->strm);
1671 }
1672 }
1673
1674 /* A failure occurred during SPOE applet creation. */
1675 static void
on_new_spoe_appctx_failure(struct spoe_agent * agent)1676 on_new_spoe_appctx_failure(struct spoe_agent *agent)
1677 {
1678 struct spoe_context *ctx;
1679
1680 list_for_each_entry(ctx, &agent->applet_wq, applet_wait) {
1681 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1682 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1683 " - wake up stream because to SPOE applet connection failed\n",
1684 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1685 __FUNCTION__, ctx->strm);
1686 }
1687 }
1688
1689 static void
on_new_spoe_appctx_success(struct spoe_agent * agent,struct appctx * appctx)1690 on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx)
1691 {
1692 offer_spoe_appctx(agent, appctx);
1693 }
1694 /* Retrieve a SPOE applet from the agent cache if possible, else create it. It
1695 * returns 1 on success, 0 to retry later and -1 if an error occurred. */
1696 static int
acquire_spoe_appctx(struct spoe_context * ctx,int dir)1697 acquire_spoe_appctx(struct spoe_context *ctx, int dir)
1698 {
1699 struct spoe_config *conf = FLT_CONF(ctx->filter);
1700 struct spoe_agent *agent = conf->agent;
1701 struct appctx *appctx;
1702
1703 /* If a process is already started for this SPOE context, retry
1704 * later. */
1705 if (ctx->flags & SPOE_CTX_FL_PROCESS)
1706 goto wait;
1707
1708 /* If needed, initialize the buffer that will be used to encode messages
1709 * and decode actions. */
1710 if (ctx->buffer == &buf_empty) {
1711 if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) {
1712 LIST_DEL(&ctx->buffer_wait.list);
1713 LIST_INIT(&ctx->buffer_wait.list);
1714 }
1715
1716 if (!b_alloc_margin(&ctx->buffer, global.tune.reserved_bufs)) {
1717 LIST_ADDQ(&buffer_wq, &ctx->buffer_wait.list);
1718 goto wait;
1719 }
1720 }
1721
1722 /* If the SPOE applet was already set, all is done. */
1723 if (ctx->appctx)
1724 goto success;
1725
1726 /* Else try to retrieve it from the agent cache */
1727 if (!LIST_ISEMPTY(&agent->cache)) {
1728 appctx = LIST_NEXT(&agent->cache, typeof(appctx), ctx.spoe.list);
1729 LIST_DEL(&APPCTX_SPOE(appctx).list);
1730 APPCTX_SPOE(appctx).ctx = ctx;
1731 ctx->appctx = appctx;
1732 goto success;
1733 }
1734
1735 /* If there is no server up for the agent's backend, this is an
1736 * error. */
1737 if (!agent->b.be->srv_act && !agent->b.be->srv_bck)
1738 goto error;
1739
1740 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1741 " - waiting for available SPOE appctx\n",
1742 (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
1743 ctx->strm);
1744
1745 /* Else add the stream in the waiting queue. */
1746 if (LIST_ISEMPTY(&ctx->applet_wait))
1747 LIST_ADDQ(&agent->applet_wq, &ctx->applet_wait);
1748
1749 /* Finally, create new SPOE applet if we can */
1750 if (agent->cps_max > 0) {
1751 if (!freq_ctr_remain(&agent->conn_per_sec, agent->cps_max, 0))
1752 goto wait;
1753 }
1754 if (create_spoe_appctx(conf) == NULL)
1755 goto error;
1756
1757 wait:
1758 return 0;
1759
1760 success:
1761 /* Remove the stream from the waiting queue */
1762 if (!LIST_ISEMPTY(&ctx->applet_wait)) {
1763 LIST_DEL(&ctx->applet_wait);
1764 LIST_INIT(&ctx->applet_wait);
1765 }
1766
1767 /* Set the right flag to prevent request and response processing
1768 * in same time. */
1769 ctx->flags |= ((dir == SMP_OPT_DIR_REQ)
1770 ? SPOE_CTX_FL_REQ_PROCESS
1771 : SPOE_CTX_FL_RSP_PROCESS);
1772
1773 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1774 " - acquire SPOE appctx %p from cache\n",
1775 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1776 __FUNCTION__, ctx->strm, ctx->appctx);
1777 return 1;
1778
1779 error:
1780 /* Remove the stream from the waiting queue */
1781 if (!LIST_ISEMPTY(&ctx->applet_wait)) {
1782 LIST_DEL(&ctx->applet_wait);
1783 LIST_INIT(&ctx->applet_wait);
1784 }
1785
1786 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1787 " - failed to acquire SPOE appctx\n",
1788 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1789 __FUNCTION__, ctx->strm);
1790 send_log(ctx->strm->be, LOG_WARNING, "failed to acquire SPOE applet.\n");
1791
1792 return -1;
1793 }
1794
1795 /* Release a SPOE applet and push it in the agent cache. */
1796 static void
release_spoe_appctx(struct spoe_context * ctx)1797 release_spoe_appctx(struct spoe_context *ctx)
1798 {
1799 struct spoe_config *conf = FLT_CONF(ctx->filter);
1800 struct spoe_agent *agent = conf->agent;
1801 struct appctx *appctx = ctx->appctx;
1802
1803 /* Reset the flag to allow next processing */
1804 ctx->flags &= ~SPOE_CTX_FL_PROCESS;
1805
1806 /* Reset processing timer */
1807 ctx->process_exp = TICK_ETERNITY;
1808
1809 /* Release the buffer if needed */
1810 if (ctx->buffer != &buf_empty) {
1811 b_free(&ctx->buffer);
1812 offer_buffers(ctx, tasks_run_queue + applets_active_queue);
1813 }
1814
1815 /* If there is no SPOE applet, all is done */
1816 if (!appctx)
1817 return;
1818
1819 /* Else, reassign it or push it in the agent cache */
1820 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1821 " - release SPOE appctx %p\n",
1822 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1823 __FUNCTION__, ctx->strm, appctx);
1824
1825 APPCTX_SPOE(appctx).ctx = NULL;
1826 ctx->appctx = NULL;
1827 offer_spoe_appctx(agent, appctx);
1828 }
1829
1830 /***************************************************************************
1831 * Functions that process SPOE messages and actions
1832 **************************************************************************/
1833 /* Process SPOE messages for a specific event. During the processing, it returns
1834 * 0 and it returns 1 when the processing is finished. If an error occurred, -1
1835 * is returned. */
1836 static int
process_spoe_messages(struct stream * s,struct spoe_context * ctx,struct list * messages,int dir)1837 process_spoe_messages(struct stream *s, struct spoe_context *ctx,
1838 struct list *messages, int dir)
1839 {
1840 struct spoe_message *msg;
1841 struct sample *smp;
1842 struct spoe_arg *arg;
1843 char *p;
1844 size_t max_size;
1845 int off, flag, idx = 0;
1846
1847 /* Reserve 32 bytes from the frame Metadata */
1848 max_size = APPCTX_SPOE(ctx->appctx).max_frame_size - 32;
1849
1850 b_reset(ctx->buffer);
1851 p = ctx->buffer->p;
1852
1853 /* Loop on messages */
1854 list_for_each_entry(msg, messages, list) {
1855 if (idx + msg->id_len + 1 > max_size)
1856 goto skip;
1857
1858 /* Set the message name */
1859 idx += encode_spoe_string(msg->id, msg->id_len, p+idx);
1860
1861 /* Save offset where to store the number of arguments for this
1862 * message */
1863 off = idx++;
1864 p[off] = 0;
1865
1866 /* Loop on arguments */
1867 list_for_each_entry(arg, &msg->args, list) {
1868 p[off]++; /* Increment the number of arguments */
1869
1870 if (idx + arg->name_len + 1 > max_size)
1871 goto skip;
1872
1873 /* Encode the arguement name as a string. It can by NULL */
1874 idx += encode_spoe_string(arg->name, arg->name_len, p+idx);
1875
1876 /* Fetch the arguement value */
1877 smp = sample_process(s->be, s->sess, s, dir|SMP_OPT_FINAL, arg->expr, NULL);
1878 if (!smp) {
1879 /* If no value is available, set it to NULL */
1880 p[idx++] = SPOE_DATA_T_NULL;
1881 continue;
1882 }
1883
1884 /* Else, encode the arguement value */
1885 switch (smp->data.type) {
1886 case SMP_T_BOOL:
1887 flag = ((!smp->data.u.sint) ? SPOE_DATA_FL_FALSE : SPOE_DATA_FL_TRUE);
1888 p[idx++] = (SPOE_DATA_T_BOOL | flag);
1889 break;
1890 case SMP_T_SINT:
1891 p[idx++] = SPOE_DATA_T_INT64;
1892 if (idx + 8 > max_size)
1893 goto skip;
1894 idx += encode_spoe_varint(smp->data.u.sint, p+idx);
1895 break;
1896 case SMP_T_IPV4:
1897 p[idx++] = SPOE_DATA_T_IPV4;
1898 if (idx + 4 > max_size)
1899 goto skip;
1900 memcpy(p+idx, &smp->data.u.ipv4, 4);
1901 idx += 4;
1902 break;
1903 case SMP_T_IPV6:
1904 p[idx++] = SPOE_DATA_T_IPV6;
1905 if (idx + 16 > max_size)
1906 goto skip;
1907 memcpy(p+idx, &smp->data.u.ipv6, 16);
1908 idx += 16;
1909 break;
1910 case SMP_T_STR:
1911 p[idx++] = SPOE_DATA_T_STR;
1912 if (idx + smp->data.u.str.len > max_size)
1913 goto skip;
1914 idx += encode_spoe_string(smp->data.u.str.str,
1915 smp->data.u.str.len,
1916 p+idx);
1917 break;
1918 case SMP_T_BIN:
1919 p[idx++] = SPOE_DATA_T_BIN;
1920 if (idx + smp->data.u.str.len > max_size)
1921 goto skip;
1922 idx += encode_spoe_string(smp->data.u.str.str,
1923 smp->data.u.str.len,
1924 p+idx);
1925 break;
1926 case SMP_T_METH:
1927 if (smp->data.u.meth.meth == HTTP_METH_OTHER) {
1928 p[idx++] = SPOE_DATA_T_STR;
1929 if (idx + http_known_methods[smp->data.u.meth.meth].len > max_size)
1930 goto skip;
1931 idx += encode_spoe_string(http_known_methods[smp->data.u.meth.meth].name,
1932 http_known_methods[smp->data.u.meth.meth].len,
1933 p+idx);
1934 }
1935 else {
1936 p[idx++] = SPOE_DATA_T_STR;
1937 if (idx + smp->data.u.str.len > max_size)
1938 goto skip;
1939 idx += encode_spoe_string(smp->data.u.meth.str.str,
1940 smp->data.u.meth.str.len,
1941 p+idx);
1942 }
1943 break;
1944 default:
1945 p[idx++] = SPOE_DATA_T_NULL;
1946 }
1947 }
1948 }
1949 ctx->buffer->i = idx;
1950 return 1;
1951
1952 skip:
1953 b_reset(ctx->buffer);
1954 return 0;
1955 }
1956
1957 /* Helper function to set a variable */
1958 static void
set_spoe_var(struct spoe_context * ctx,char * scope,char * name,int len,struct sample * smp)1959 set_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
1960 struct sample *smp)
1961 {
1962 struct spoe_config *conf = FLT_CONF(ctx->filter);
1963 struct spoe_agent *agent = conf->agent;
1964 char varname[64];
1965
1966 memset(varname, 0, sizeof(varname));
1967 len = snprintf(varname, sizeof(varname), "%s.%s.%.*s",
1968 scope, agent->var_pfx, len, name);
1969 vars_set_by_name_ifexist(varname, len, smp);
1970 }
1971
1972 /* Helper function to unset a variable */
1973 static void
unset_spoe_var(struct spoe_context * ctx,char * scope,char * name,int len,struct sample * smp)1974 unset_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
1975 struct sample *smp)
1976 {
1977 struct spoe_config *conf = FLT_CONF(ctx->filter);
1978 struct spoe_agent *agent = conf->agent;
1979 char varname[64];
1980
1981 memset(varname, 0, sizeof(varname));
1982 len = snprintf(varname, sizeof(varname), "%s.%s.%.*s",
1983 scope, agent->var_pfx, len, name);
1984 vars_unset_by_name_ifexist(varname, len, smp);
1985 }
1986
1987
1988 /* Process SPOE actions for a specific event. During the processing, it returns
1989 * 0 and it returns 1 when the processing is finished. If an error occurred, -1
1990 * is returned. */
1991 static int
process_spoe_actions(struct stream * s,struct spoe_context * ctx,enum spoe_event ev,int dir)1992 process_spoe_actions(struct stream *s, struct spoe_context *ctx,
1993 enum spoe_event ev, int dir)
1994 {
1995 char *p;
1996 size_t size;
1997 int off, i, idx = 0;
1998
1999 p = ctx->buffer->p;
2000 size = ctx->buffer->i;
2001
2002 while (idx < size) {
2003 char *str;
2004 uint64_t sz;
2005 struct sample smp;
2006 enum spoe_action_type type;
2007
2008 off = idx;
2009 if (idx+2 > size)
2010 goto skip;
2011
2012 type = p[idx++];
2013 switch (type) {
2014 case SPOE_ACT_T_SET_VAR: {
2015 char *scope;
2016
2017 if (p[idx++] != 3)
2018 goto skip_action;
2019
2020 switch (p[idx++]) {
2021 case SPOE_SCOPE_PROC: scope = "proc"; break;
2022 case SPOE_SCOPE_SESS: scope = "sess"; break;
2023 case SPOE_SCOPE_TXN : scope = "txn"; break;
2024 case SPOE_SCOPE_REQ : scope = "req"; break;
2025 case SPOE_SCOPE_RES : scope = "res"; break;
2026 default: goto skip;
2027 }
2028
2029 idx += decode_spoe_string(p+idx, p+size, &str, &sz);
2030 if (str == NULL)
2031 goto skip;
2032 memset(&smp, 0, sizeof(smp));
2033 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
2034
2035 if ((i = decode_spoe_data(p+idx, p+size, &smp)) == -1)
2036 goto skip;
2037 idx += i;
2038
2039 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2040 " - set-var '%s.%s.%.*s'\n",
2041 (int)now.tv_sec, (int)now.tv_usec,
2042 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
2043 __FUNCTION__, s, scope,
2044 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
2045 (int)sz, str);
2046
2047 set_spoe_var(ctx, scope, str, sz, &smp);
2048 break;
2049 }
2050
2051 case SPOE_ACT_T_UNSET_VAR: {
2052 char *scope;
2053
2054 if (p[idx++] != 2)
2055 goto skip_action;
2056
2057 switch (p[idx++]) {
2058 case SPOE_SCOPE_PROC: scope = "proc"; break;
2059 case SPOE_SCOPE_SESS: scope = "sess"; break;
2060 case SPOE_SCOPE_TXN : scope = "txn"; break;
2061 case SPOE_SCOPE_REQ : scope = "req"; break;
2062 case SPOE_SCOPE_RES : scope = "res"; break;
2063 default: goto skip;
2064 }
2065
2066 idx += decode_spoe_string(p+idx, p+size, &str, &sz);
2067 if (str == NULL)
2068 goto skip;
2069 memset(&smp, 0, sizeof(smp));
2070 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
2071
2072 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2073 " - unset-var '%s.%s.%.*s'\n",
2074 (int)now.tv_sec, (int)now.tv_usec,
2075 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
2076 __FUNCTION__, s, scope,
2077 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
2078 (int)sz, str);
2079
2080 unset_spoe_var(ctx, scope, str, sz, &smp);
2081 break;
2082 }
2083
2084 default:
2085 skip_action:
2086 if ((i = skip_spoe_action(p+off, p+size)) == -1)
2087 goto skip;
2088 idx += i;
2089 }
2090 }
2091
2092 return 1;
2093 skip:
2094 return 0;
2095 }
2096
2097
2098 /* Process a SPOE event. First, this functions will process messages attached to
2099 * this event and send them to an agent in a NOTIFY frame. Then, it will wait a
2100 * ACK frame to process corresponding actions. During all the processing, it
2101 * returns 0 and it returns 1 when the processing is finished. If an error
2102 * occurred, -1 is returned. */
2103 static int
process_spoe_event(struct stream * s,struct spoe_context * ctx,enum spoe_event ev)2104 process_spoe_event(struct stream *s, struct spoe_context *ctx,
2105 enum spoe_event ev)
2106 {
2107 struct spoe_config *conf = FLT_CONF(ctx->filter);
2108 struct spoe_agent *agent = conf->agent;
2109 int dir, ret = 1;
2110
2111 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2112 " - ctx-state=%s - event=%s\n",
2113 (int)now.tv_sec, (int)now.tv_usec,
2114 agent->id, __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
2115 spoe_event_str[ev]);
2116
2117 if (agent->eps_max > 0) {
2118 if (!freq_ctr_remain(&agent->err_per_sec, agent->eps_max, 0)) {
2119 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2120 " - skip event '%s': max EPS reached\n",
2121 (int)now.tv_sec, (int)now.tv_usec,
2122 agent->id, __FUNCTION__, s, spoe_event_str[ev]);
2123 goto skip;
2124 }
2125 }
2126
2127 dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES);
2128
2129 if (LIST_ISEMPTY(&(ctx->messages[ev])))
2130 goto out;
2131
2132 if (ctx->state == SPOE_CTX_ST_ERROR)
2133 goto error;
2134
2135 if (tick_is_expired(ctx->process_exp, now_ms) && ctx->state != SPOE_CTX_ST_DONE) {
2136 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2137 " - failed to process event '%s': timeout\n",
2138 (int)now.tv_sec, (int)now.tv_usec,
2139 agent->id, __FUNCTION__, s, spoe_event_str[ev]);
2140 send_log(ctx->strm->be, LOG_WARNING,
2141 "failed to process event '%s': timeout.\n",
2142 spoe_event_str[ev]);
2143 goto error;
2144 }
2145
2146 if (ctx->state == SPOE_CTX_ST_READY) {
2147 if (!tick_isset(ctx->process_exp)) {
2148 ctx->process_exp = tick_add_ifset(now_ms, agent->timeout.processing);
2149 s->task->expire = tick_first((tick_is_expired(s->task->expire, now_ms) ? 0 : s->task->expire),
2150 ctx->process_exp);
2151 }
2152
2153 ret = acquire_spoe_appctx(ctx, dir);
2154 if (ret <= 0) {
2155 if (!ret)
2156 goto out;
2157 goto error;
2158 }
2159 ctx->state = SPOE_CTX_ST_SENDING_MSGS;
2160 }
2161
2162 if (ctx->appctx == NULL)
2163 goto error;
2164
2165 if (ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
2166 ret = process_spoe_messages(s, ctx, &(ctx->messages[ev]), dir);
2167 if (ret <= 0) {
2168 if (!ret)
2169 goto skip;
2170 goto error;
2171 }
2172 wakeup_spoe_appctx(ctx);
2173 ret = 0;
2174 goto out;
2175 }
2176
2177 if (ctx->state == SPOE_CTX_ST_WAITING_ACK) {
2178 wakeup_spoe_appctx(ctx);
2179 ret = 0;
2180 goto out;
2181 }
2182
2183 if (ctx->state == SPOE_CTX_ST_DONE) {
2184 ret = process_spoe_actions(s, ctx, ev, dir);
2185 if (ret <= 0) {
2186 if (!ret)
2187 goto skip;
2188 goto error;
2189 }
2190 ctx->frame_id++;
2191 release_spoe_appctx(ctx);
2192 ctx->state = SPOE_CTX_ST_READY;
2193 }
2194
2195 out:
2196 return ret;
2197
2198 skip:
2199 release_spoe_appctx(ctx);
2200 ctx->state = SPOE_CTX_ST_READY;
2201 return 1;
2202
2203 error:
2204 if (agent->eps_max > 0)
2205 update_freq_ctr(&agent->err_per_sec, 1);
2206
2207 if (agent->var_on_error) {
2208 struct sample smp;
2209
2210 memset(&smp, 0, sizeof(smp));
2211 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
2212 smp.data.u.sint = 1;
2213 smp.data.type = SMP_T_BOOL;
2214
2215 set_spoe_var(ctx, "txn", agent->var_on_error,
2216 strlen(agent->var_on_error), &smp);
2217 }
2218
2219 release_spoe_appctx(ctx);
2220 ctx->state = ((agent->flags & SPOE_FL_CONT_ON_ERR)
2221 ? SPOE_CTX_ST_READY
2222 : SPOE_CTX_ST_ERROR);
2223 return 1;
2224 }
2225
2226
2227 /***************************************************************************
2228 * Functions that create/destroy SPOE contexts
2229 **************************************************************************/
wakeup_spoe_context(struct spoe_context * ctx)2230 static int wakeup_spoe_context(struct spoe_context *ctx)
2231 {
2232 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
2233 return 1;
2234 }
2235
2236 static struct spoe_context *
create_spoe_context(struct filter * filter)2237 create_spoe_context(struct filter *filter)
2238 {
2239 struct spoe_config *conf = FLT_CONF(filter);
2240 struct spoe_context *ctx;
2241
2242 ctx = pool_alloc_dirty(pool2_spoe_ctx);
2243 if (ctx == NULL) {
2244 return NULL;
2245 }
2246 memset(ctx, 0, sizeof(*ctx));
2247 ctx->filter = filter;
2248 ctx->state = SPOE_CTX_ST_NONE;
2249 ctx->flags = 0;
2250 ctx->messages = conf->agent->messages;
2251 ctx->buffer = &buf_empty;
2252 LIST_INIT(&ctx->buffer_wait.list);
2253 ctx->buffer_wait.target = ctx;
2254 ctx->buffer_wait.wakeup_cb = (int (*)(void *))wakeup_spoe_context;
2255 LIST_INIT(&ctx->applet_wait);
2256
2257 ctx->stream_id = 0;
2258 ctx->frame_id = 1;
2259 ctx->process_exp = TICK_ETERNITY;
2260
2261 return ctx;
2262 }
2263
2264 static void
destroy_spoe_context(struct spoe_context * ctx)2265 destroy_spoe_context(struct spoe_context *ctx)
2266 {
2267 if (!ctx)
2268 return;
2269
2270 if (ctx->appctx)
2271 APPCTX_SPOE(ctx->appctx).ctx = NULL;
2272 if (!LIST_ISEMPTY(&ctx->buffer_wait.list))
2273 LIST_DEL(&ctx->buffer_wait.list);
2274 if (!LIST_ISEMPTY(&ctx->applet_wait))
2275 LIST_DEL(&ctx->applet_wait);
2276 pool_free2(pool2_spoe_ctx, ctx);
2277 }
2278
2279 static void
reset_spoe_context(struct spoe_context * ctx)2280 reset_spoe_context(struct spoe_context *ctx)
2281 {
2282 ctx->state = SPOE_CTX_ST_READY;
2283 ctx->flags &= ~SPOE_CTX_FL_PROCESS;
2284 }
2285
2286
2287 /***************************************************************************
2288 * Hooks that manage the filter lifecycle (init/check/deinit)
2289 **************************************************************************/
2290 /* Signal handler: Do a soft stop, wakeup SPOE applet */
2291 static void
sig_stop_spoe(struct sig_handler * sh)2292 sig_stop_spoe(struct sig_handler *sh)
2293 {
2294 struct proxy *p;
2295
2296 p = proxy;
2297 while (p) {
2298 struct flt_conf *fconf;
2299
2300 list_for_each_entry(fconf, &p->filter_configs, list) {
2301 struct spoe_config *conf;
2302 struct spoe_agent *agent;
2303 struct appctx *appctx;
2304
2305 if (fconf->id != spoe_filter_id)
2306 continue;
2307
2308 conf = fconf->conf;
2309 agent = conf->agent;
2310
2311 list_for_each_entry(appctx, &agent->cache, ctx.spoe.list) {
2312 si_applet_want_get(appctx->owner);
2313 si_applet_want_put(appctx->owner);
2314 appctx_wakeup(appctx);
2315 }
2316 }
2317 p = p->next;
2318 }
2319 }
2320
2321
2322 /* Initialize the SPOE filter. Returns -1 on error, else 0. */
2323 static int
spoe_init(struct proxy * px,struct flt_conf * fconf)2324 spoe_init(struct proxy *px, struct flt_conf *fconf)
2325 {
2326 struct spoe_config *conf = fconf->conf;
2327 struct listener *l;
2328
2329 memset(&conf->agent_fe, 0, sizeof(conf->agent_fe));
2330 init_new_proxy(&conf->agent_fe);
2331 conf->agent_fe.parent = conf->agent;
2332 conf->agent_fe.last_change = now.tv_sec;
2333 conf->agent_fe.id = conf->agent->id;
2334 conf->agent_fe.cap = PR_CAP_FE;
2335 conf->agent_fe.mode = PR_MODE_TCP;
2336 conf->agent_fe.maxconn = 0;
2337 conf->agent_fe.options2 |= PR_O2_INDEPSTR;
2338 conf->agent_fe.conn_retries = CONN_RETRIES;
2339 conf->agent_fe.accept = frontend_accept;
2340 conf->agent_fe.srv = NULL;
2341 conf->agent_fe.timeout.client = TICK_ETERNITY;
2342 conf->agent_fe.default_target = &spoe_applet.obj_type;
2343 conf->agent_fe.fe_req_ana = AN_REQ_SWITCHING_RULES;
2344
2345 if ((l = calloc(1, sizeof(*l))) == NULL) {
2346 Alert("spoe_init : out of memory.\n");
2347 goto out_error;
2348 }
2349 l->obj_type = OBJ_TYPE_LISTENER;
2350 l->obj_type = OBJ_TYPE_LISTENER;
2351 l->frontend = &conf->agent_fe;
2352 l->state = LI_READY;
2353 l->analysers = conf->agent_fe.fe_req_ana;
2354 LIST_ADDQ(&conf->agent_fe.conf.listeners, &l->by_fe);
2355
2356 if (!sighandler_registered) {
2357 signal_register_fct(0, sig_stop_spoe, 0);
2358 sighandler_registered = 1;
2359 }
2360
2361 return 0;
2362
2363 out_error:
2364 return -1;
2365 }
2366
2367 /* Free ressources allocated by the SPOE filter. */
2368 static void
spoe_deinit(struct proxy * px,struct flt_conf * fconf)2369 spoe_deinit(struct proxy *px, struct flt_conf *fconf)
2370 {
2371 struct spoe_config *conf = fconf->conf;
2372
2373 if (conf) {
2374 struct spoe_agent *agent = conf->agent;
2375 struct listener *l = LIST_NEXT(&conf->agent_fe.conf.listeners,
2376 struct listener *, by_fe);
2377
2378 free(l);
2379 release_spoe_agent(agent);
2380 free(conf);
2381 }
2382 fconf->conf = NULL;
2383 }
2384
2385 /* Check configuration of a SPOE filter for a specified proxy.
2386 * Return 1 on error, else 0. */
2387 static int
spoe_check(struct proxy * px,struct flt_conf * fconf)2388 spoe_check(struct proxy *px, struct flt_conf *fconf)
2389 {
2390 struct spoe_config *conf = fconf->conf;
2391 struct proxy *target;
2392
2393 target = proxy_be_by_name(conf->agent->b.name);
2394 if (target == NULL) {
2395 Alert("Proxy %s : unknown backend '%s' used by SPOE agent '%s'"
2396 " declared at %s:%d.\n",
2397 px->id, conf->agent->b.name, conf->agent->id,
2398 conf->agent->conf.file, conf->agent->conf.line);
2399 return 1;
2400 }
2401 if (target->mode != PR_MODE_TCP) {
2402 Alert("Proxy %s : backend '%s' used by SPOE agent '%s' declared"
2403 " at %s:%d does not support HTTP mode.\n",
2404 px->id, target->id, conf->agent->id,
2405 conf->agent->conf.file, conf->agent->conf.line);
2406 return 1;
2407 }
2408
2409 if (px->bind_proc & ~target->bind_proc) {
2410 Alert("Proxy %s : backend '%s' used by SPOE agent '%s' declared"
2411 " at %s:%d does not cover all of its processes.\n",
2412 px->id, target->id, conf->agent->id,
2413 conf->agent->conf.file, conf->agent->conf.line);
2414 return 1;
2415 }
2416
2417 free(conf->agent->b.name);
2418 conf->agent->b.name = NULL;
2419 conf->agent->b.be = target;
2420 return 0;
2421 }
2422
2423 /**************************************************************************
2424 * Hooks attached to a stream
2425 *************************************************************************/
2426 /* Called when a filter instance is created and attach to a stream. It creates
2427 * the context that will be used to process this stream. */
2428 static int
spoe_start(struct stream * s,struct filter * filter)2429 spoe_start(struct stream *s, struct filter *filter)
2430 {
2431 struct spoe_context *ctx;
2432
2433 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
2434 (int)now.tv_sec, (int)now.tv_usec,
2435 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2436 __FUNCTION__, s);
2437
2438 ctx = create_spoe_context(filter);
2439 if (ctx == NULL) {
2440 send_log(s->be, LOG_EMERG,
2441 "failed to create SPOE context for proxy %s\n",
2442 s->be->id);
2443 return 0;
2444 }
2445
2446 ctx->strm = s;
2447 ctx->state = SPOE_CTX_ST_READY;
2448 filter->ctx = ctx;
2449
2450 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_REQ_FE]))
2451 filter->pre_analyzers |= AN_REQ_INSPECT_FE;
2452
2453 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_REQ_BE]))
2454 filter->pre_analyzers |= AN_REQ_INSPECT_BE;
2455
2456 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_RSP]))
2457 filter->pre_analyzers |= AN_RES_INSPECT;
2458
2459 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_REQ_FE]))
2460 filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_FE;
2461
2462 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_REQ_BE]))
2463 filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_BE;
2464
2465 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_RSP]))
2466 filter->pre_analyzers |= AN_RES_HTTP_PROCESS_FE;
2467
2468 return 1;
2469 }
2470
2471 /* Called when a filter instance is detached from a stream. It release the
2472 * attached SPOE context. */
2473 static void
spoe_stop(struct stream * s,struct filter * filter)2474 spoe_stop(struct stream *s, struct filter *filter)
2475 {
2476 struct spoe_context *ctx = filter->ctx;
2477
2478 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
2479 (int)now.tv_sec, (int)now.tv_usec,
2480 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2481 __FUNCTION__, s);
2482
2483 if (ctx) {
2484 release_spoe_appctx(ctx);
2485 destroy_spoe_context(ctx);
2486 }
2487 }
2488
2489
2490 /*
2491 * Called when the stream is woken up because of expired timer.
2492 */
2493 static void
spoe_check_timeouts(struct stream * s,struct filter * filter)2494 spoe_check_timeouts(struct stream *s, struct filter *filter)
2495 {
2496 struct spoe_context *ctx = filter->ctx;
2497
2498 if (tick_is_expired(ctx->process_exp, now_ms)) {
2499 s->pending_events |= TASK_WOKEN_MSG;
2500 if (ctx->buffer != &buf_empty) {
2501 b_free(&ctx->buffer);
2502 offer_buffers(ctx, tasks_run_queue + applets_active_queue);
2503 }
2504 }
2505 }
2506
2507 /* Called when we are ready to filter data on a channel */
2508 static int
spoe_start_analyze(struct stream * s,struct filter * filter,struct channel * chn)2509 spoe_start_analyze(struct stream *s, struct filter *filter, struct channel *chn)
2510 {
2511 struct spoe_context *ctx = filter->ctx;
2512 int ret = 1;
2513
2514 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2515 " - ctx-flags=0x%08x\n",
2516 (int)now.tv_sec, (int)now.tv_usec,
2517 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2518 __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
2519
2520 if (!(chn->flags & CF_ISRESP)) {
2521 if (filter->pre_analyzers & AN_REQ_INSPECT_FE)
2522 chn->analysers |= AN_REQ_INSPECT_FE;
2523 if (filter->pre_analyzers & AN_REQ_INSPECT_BE)
2524 chn->analysers |= AN_REQ_INSPECT_BE;
2525
2526 if (ctx->flags & SPOE_CTX_FL_CLI_CONNECTED)
2527 goto out;
2528
2529 ctx->stream_id = s->uniq_id;
2530 if (ctx->state != SPOE_CTX_ST_NONE && ctx->state != SPOE_CTX_ST_ERROR) {
2531 ret = process_spoe_event(s, ctx, SPOE_EV_ON_CLIENT_SESS);
2532 if (ret != 1)
2533 goto out;
2534 }
2535 ctx->flags |= SPOE_CTX_FL_CLI_CONNECTED;
2536 }
2537 else {
2538 if (filter->pre_analyzers & AN_RES_INSPECT)
2539 chn->analysers |= AN_RES_INSPECT;
2540
2541 if (ctx->flags & SPOE_CTX_FL_SRV_CONNECTED)
2542 goto out;
2543
2544 if (ctx->state != SPOE_CTX_ST_NONE && ctx->state != SPOE_CTX_ST_ERROR) {
2545 ret = process_spoe_event(s, ctx, SPOE_EV_ON_SERVER_SESS);
2546 if (ret != 1)
2547 goto out;
2548 }
2549 ctx->flags |= SPOE_CTX_FL_SRV_CONNECTED;
2550 }
2551
2552 out:
2553 if (!ret) {
2554 channel_dont_read(chn);
2555 channel_dont_close(chn);
2556 }
2557 return ret;
2558 }
2559
2560 /* Called before a processing happens on a given channel */
2561 static int
spoe_chn_pre_analyze(struct stream * s,struct filter * filter,struct channel * chn,unsigned an_bit)2562 spoe_chn_pre_analyze(struct stream *s, struct filter *filter,
2563 struct channel *chn, unsigned an_bit)
2564 {
2565 struct spoe_context *ctx = filter->ctx;
2566 int ret = 1;
2567
2568 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2569 " - ctx-flags=0x%08x - ana=0x%08x\n",
2570 (int)now.tv_sec, (int)now.tv_usec,
2571 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2572 __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
2573 ctx->flags, an_bit);
2574
2575 if (ctx->state == SPOE_CTX_ST_NONE || ctx->state == SPOE_CTX_ST_ERROR)
2576 goto out;
2577
2578 switch (an_bit) {
2579 case AN_REQ_INSPECT_FE:
2580 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_FE);
2581 break;
2582 case AN_REQ_INSPECT_BE:
2583 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_BE);
2584 break;
2585 case AN_RES_INSPECT:
2586 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_RSP);
2587 break;
2588 case AN_REQ_HTTP_PROCESS_FE:
2589 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_FE);
2590 break;
2591 case AN_REQ_HTTP_PROCESS_BE:
2592 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_BE);
2593 break;
2594 case AN_RES_HTTP_PROCESS_FE:
2595 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_RSP);
2596 break;
2597 }
2598
2599 out:
2600 if (!ret) {
2601 channel_dont_read(chn);
2602 channel_dont_close(chn);
2603 }
2604 return ret;
2605 }
2606
2607 /* Called when the filtering on the channel ends. */
2608 static int
spoe_end_analyze(struct stream * s,struct filter * filter,struct channel * chn)2609 spoe_end_analyze(struct stream *s, struct filter *filter, struct channel *chn)
2610 {
2611 struct spoe_context *ctx = filter->ctx;
2612
2613 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2614 " - ctx-flags=0x%08x\n",
2615 (int)now.tv_sec, (int)now.tv_usec,
2616 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2617 __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
2618
2619 if (!(ctx->flags & SPOE_CTX_FL_PROCESS)) {
2620 reset_spoe_context(ctx);
2621 }
2622
2623 return 1;
2624 }
2625
2626 /********************************************************************
2627 * Functions that manage the filter initialization
2628 ********************************************************************/
2629 struct flt_ops spoe_ops = {
2630 /* Manage SPOE filter, called for each filter declaration */
2631 .init = spoe_init,
2632 .deinit = spoe_deinit,
2633 .check = spoe_check,
2634
2635 /* Handle start/stop of SPOE */
2636 .attach = spoe_start,
2637 .detach = spoe_stop,
2638 .check_timeouts = spoe_check_timeouts,
2639
2640 /* Handle channels activity */
2641 .channel_start_analyze = spoe_start_analyze,
2642 .channel_pre_analyze = spoe_chn_pre_analyze,
2643 .channel_end_analyze = spoe_end_analyze,
2644 };
2645
2646
2647 static int
cfg_parse_spoe_agent(const char * file,int linenum,char ** args,int kwm)2648 cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
2649 {
2650 const char *err;
2651 int i, err_code = 0;
2652
2653 if ((cfg_scope == NULL && curengine != NULL) ||
2654 (cfg_scope != NULL && curengine == NULL) ||
2655 (curengine != NULL && cfg_scope != NULL && strcmp(curengine, cfg_scope)))
2656 goto out;
2657
2658 if (!strcmp(args[0], "spoe-agent")) { /* new spoe-agent section */
2659 if (!*args[1]) {
2660 Alert("parsing [%s:%d] : missing name for spoe-agent section.\n",
2661 file, linenum);
2662 err_code |= ERR_ALERT | ERR_ABORT;
2663 goto out;
2664 }
2665 if (*args[2]) {
2666 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2667 file, linenum, args[2]);
2668 err_code |= ERR_ALERT | ERR_ABORT;
2669 goto out;
2670 }
2671
2672 err = invalid_char(args[1]);
2673 if (err) {
2674 Alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
2675 file, linenum, *err, args[0], args[1]);
2676 err_code |= ERR_ALERT | ERR_ABORT;
2677 goto out;
2678 }
2679
2680 if (curagent != NULL) {
2681 Alert("parsing [%s:%d] : another spoe-agent section previously defined.\n",
2682 file, linenum);
2683 err_code |= ERR_ALERT | ERR_ABORT;
2684 goto out;
2685 }
2686 if ((curagent = calloc(1, sizeof(*curagent))) == NULL) {
2687 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2688 err_code |= ERR_ALERT | ERR_ABORT;
2689 goto out;
2690 }
2691
2692 curagent->id = strdup(args[1]);
2693 curagent->conf.file = strdup(file);
2694 curagent->conf.line = linenum;
2695 curagent->timeout.hello = TICK_ETERNITY;
2696 curagent->timeout.idle = TICK_ETERNITY;
2697 curagent->timeout.processing = TICK_ETERNITY;
2698 curagent->var_pfx = NULL;
2699 curagent->var_on_error = NULL;
2700 curagent->flags = 0;
2701 curagent->cps_max = 0;
2702 curagent->eps_max = 0;
2703
2704 for (i = 0; i < SPOE_EV_EVENTS; ++i)
2705 LIST_INIT(&curagent->messages[i]);
2706 LIST_INIT(&curagent->cache);
2707 LIST_INIT(&curagent->applet_wq);
2708 }
2709 else if (!strcmp(args[0], "use-backend")) {
2710 if (!*args[1]) {
2711 Alert("parsing [%s:%d] : '%s' expects a backend name.\n",
2712 file, linenum, args[0]);
2713 err_code |= ERR_ALERT | ERR_FATAL;
2714 goto out;
2715 }
2716 if (*args[2]) {
2717 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2718 file, linenum, args[2]);
2719 err_code |= ERR_ALERT | ERR_ABORT;
2720 goto out;
2721 }
2722 free(curagent->b.name);
2723 curagent->b.name = strdup(args[1]);
2724 }
2725 else if (!strcmp(args[0], "messages")) {
2726 int cur_arg = 1;
2727 while (*args[cur_arg]) {
2728 struct spoe_msg_placeholder *mp = NULL;
2729
2730 list_for_each_entry(mp, &curmps, list) {
2731 if (!strcmp(mp->id, args[cur_arg])) {
2732 Alert("parsing [%s:%d]: spoe-message message '%s' already declared.\n",
2733 file, linenum, args[cur_arg]);
2734 err_code |= ERR_ALERT | ERR_FATAL;
2735 goto out;
2736 }
2737 }
2738
2739 if ((mp = calloc(1, sizeof(*mp))) == NULL) {
2740 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2741 err_code |= ERR_ALERT | ERR_ABORT;
2742 goto out;
2743 }
2744 mp->id = strdup(args[cur_arg]);
2745 LIST_ADDQ(&curmps, &mp->list);
2746 cur_arg++;
2747 }
2748 }
2749 else if (!strcmp(args[0], "timeout")) {
2750 unsigned int *tv = NULL;
2751 const char *res;
2752 unsigned timeout;
2753
2754 if (!*args[1]) {
2755 Alert("parsing [%s:%d] : 'timeout' expects 'connect', 'idle' and 'ack'.\n",
2756 file, linenum);
2757 err_code |= ERR_ALERT | ERR_FATAL;
2758 goto out;
2759 }
2760 if (!strcmp(args[1], "hello"))
2761 tv = &curagent->timeout.hello;
2762 else if (!strcmp(args[1], "idle"))
2763 tv = &curagent->timeout.idle;
2764 else if (!strcmp(args[1], "processing"))
2765 tv = &curagent->timeout.processing;
2766 else {
2767 Alert("parsing [%s:%d] : 'timeout' supports 'connect', 'idle' or 'processing' (got %s).\n",
2768 file, linenum, args[1]);
2769 err_code |= ERR_ALERT | ERR_FATAL;
2770 goto out;
2771 }
2772 if (!*args[2]) {
2773 Alert("parsing [%s:%d] : 'timeout %s' expects an integer value (in milliseconds).\n",
2774 file, linenum, args[1]);
2775 err_code |= ERR_ALERT | ERR_FATAL;
2776 goto out;
2777 }
2778 res = parse_time_err(args[2], &timeout, TIME_UNIT_MS);
2779 if (res) {
2780 Alert("parsing [%s:%d] : unexpected character '%c' in 'timeout %s'.\n",
2781 file, linenum, *res, args[1]);
2782 err_code |= ERR_ALERT | ERR_ABORT;
2783 goto out;
2784 }
2785 if (*args[3]) {
2786 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2787 file, linenum, args[3]);
2788 err_code |= ERR_ALERT | ERR_ABORT;
2789 goto out;
2790 }
2791 *tv = MS_TO_TICKS(timeout);
2792 }
2793 else if (!strcmp(args[0], "option")) {
2794 if (!*args[1]) {
2795 Alert("parsing [%s:%d]: '%s' expects an option name.\n",
2796 file, linenum, args[0]);
2797 err_code |= ERR_ALERT | ERR_FATAL;
2798 goto out;
2799 }
2800 if (!strcmp(args[1], "var-prefix")) {
2801 char *tmp;
2802
2803 if (!*args[2]) {
2804 Alert("parsing [%s:%d]: '%s %s' expects a value.\n",
2805 file, linenum, args[0],
2806 args[1]);
2807 err_code |= ERR_ALERT | ERR_FATAL;
2808 goto out;
2809 }
2810 tmp = args[2];
2811 while (*tmp) {
2812 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
2813 Alert("parsing [%s:%d]: '%s %s' only supports [a-zA-Z0-9_.] chars.\n",
2814 file, linenum, args[0], args[1]);
2815 err_code |= ERR_ALERT | ERR_FATAL;
2816 goto out;
2817 }
2818 tmp++;
2819 }
2820 curagent->var_pfx = strdup(args[2]);
2821 }
2822 else if (!strcmp(args[1], "continue-on-error")) {
2823 if (*args[2]) {
2824 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2825 file, linenum, args[2]);
2826 err_code |= ERR_ALERT | ERR_ABORT;
2827 goto out;
2828 }
2829 curagent->flags |= SPOE_FL_CONT_ON_ERR;
2830 }
2831 else if (!strcmp(args[1], "set-on-error")) {
2832 char *tmp;
2833
2834 if (!*args[2]) {
2835 Alert("parsing [%s:%d]: '%s %s' expects a value.\n",
2836 file, linenum, args[0],
2837 args[1]);
2838 err_code |= ERR_ALERT | ERR_FATAL;
2839 goto out;
2840 }
2841 tmp = args[2];
2842 while (*tmp) {
2843 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
2844 Alert("parsing [%s:%d]: '%s %s' only supports [a-zA-Z0-9_.] chars.\n",
2845 file, linenum, args[0], args[1]);
2846 err_code |= ERR_ALERT | ERR_FATAL;
2847 goto out;
2848 }
2849 tmp++;
2850 }
2851 curagent->var_on_error = strdup(args[2]);
2852 }
2853 else {
2854 Alert("parsing [%s:%d]: option '%s' is not supported.\n",
2855 file, linenum, args[1]);
2856 err_code |= ERR_ALERT | ERR_FATAL;
2857 goto out;
2858 }
2859 }
2860 else if (!strcmp(args[0], "maxconnrate")) {
2861 if (!*args[1]) {
2862 Alert("parsing [%s:%d] : '%s' expects an integer argument.\n",
2863 file, linenum, args[0]);
2864 err_code |= ERR_ALERT | ERR_FATAL;
2865 goto out;
2866 }
2867 if (*args[2]) {
2868 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2869 file, linenum, args[2]);
2870 err_code |= ERR_ALERT | ERR_ABORT;
2871 goto out;
2872 }
2873 curagent->cps_max = atol(args[1]);
2874 }
2875 else if (!strcmp(args[0], "maxerrrate")) {
2876 if (!*args[1]) {
2877 Alert("parsing [%s:%d] : '%s' expects an integer argument.\n",
2878 file, linenum, args[0]);
2879 err_code |= ERR_ALERT | ERR_FATAL;
2880 goto out;
2881 }
2882 if (*args[2]) {
2883 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2884 file, linenum, args[2]);
2885 err_code |= ERR_ALERT | ERR_ABORT;
2886 goto out;
2887 }
2888 curagent->eps_max = atol(args[1]);
2889 }
2890 else if (*args[0]) {
2891 Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-agent section.\n",
2892 file, linenum, args[0]);
2893 err_code |= ERR_ALERT | ERR_FATAL;
2894 goto out;
2895 }
2896 out:
2897 return err_code;
2898 }
2899
2900 static int
cfg_parse_spoe_message(const char * file,int linenum,char ** args,int kwm)2901 cfg_parse_spoe_message(const char *file, int linenum, char **args, int kwm)
2902 {
2903 struct spoe_message *msg;
2904 struct spoe_arg *arg;
2905 const char *err;
2906 char *errmsg = NULL;
2907 int err_code = 0;
2908
2909 if ((cfg_scope == NULL && curengine != NULL) ||
2910 (cfg_scope != NULL && curengine == NULL) ||
2911 (curengine != NULL && cfg_scope != NULL && strcmp(curengine, cfg_scope)))
2912 goto out;
2913
2914 if (!strcmp(args[0], "spoe-message")) { /* new spoe-message section */
2915 if (!*args[1]) {
2916 Alert("parsing [%s:%d] : missing name for spoe-message section.\n",
2917 file, linenum);
2918 err_code |= ERR_ALERT | ERR_ABORT;
2919 goto out;
2920 }
2921 if (*args[2]) {
2922 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2923 file, linenum, args[2]);
2924 err_code |= ERR_ALERT | ERR_ABORT;
2925 goto out;
2926 }
2927
2928 err = invalid_char(args[1]);
2929 if (err) {
2930 Alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
2931 file, linenum, *err, args[0], args[1]);
2932 err_code |= ERR_ALERT | ERR_ABORT;
2933 goto out;
2934 }
2935
2936 list_for_each_entry(msg, &curmsgs, list) {
2937 if (!strcmp(msg->id, args[1])) {
2938 Alert("parsing [%s:%d]: spoe-message section '%s' has the same"
2939 " name as another one declared at %s:%d.\n",
2940 file, linenum, args[1], msg->conf.file, msg->conf.line);
2941 err_code |= ERR_ALERT | ERR_FATAL;
2942 goto out;
2943 }
2944 }
2945
2946 if ((curmsg = calloc(1, sizeof(*curmsg))) == NULL) {
2947 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2948 err_code |= ERR_ALERT | ERR_ABORT;
2949 goto out;
2950 }
2951
2952 curmsg->id = strdup(args[1]);
2953 curmsg->id_len = strlen(curmsg->id);
2954 curmsg->event = SPOE_EV_NONE;
2955 curmsg->conf.file = strdup(file);
2956 curmsg->conf.line = linenum;
2957 LIST_INIT(&curmsg->args);
2958 LIST_ADDQ(&curmsgs, &curmsg->list);
2959 }
2960 else if (!strcmp(args[0], "args")) {
2961 int cur_arg = 1;
2962
2963 curproxy->conf.args.ctx = ARGC_SPOE;
2964 curproxy->conf.args.file = file;
2965 curproxy->conf.args.line = linenum;
2966 while (*args[cur_arg]) {
2967 char *delim = strchr(args[cur_arg], '=');
2968 int idx = 0;
2969
2970 if ((arg = calloc(1, sizeof(*arg))) == NULL) {
2971 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2972 err_code |= ERR_ALERT | ERR_ABORT;
2973 goto out;
2974 }
2975
2976 if (!delim) {
2977 arg->name = NULL;
2978 arg->name_len = 0;
2979 delim = args[cur_arg];
2980 }
2981 else {
2982 arg->name = my_strndup(args[cur_arg], delim - args[cur_arg]);
2983 arg->name_len = delim - args[cur_arg];
2984 delim++;
2985 }
2986 arg->expr = sample_parse_expr((char*[]){delim, NULL},
2987 &idx, file, linenum, &errmsg,
2988 &curproxy->conf.args);
2989 if (arg->expr == NULL) {
2990 Alert("parsing [%s:%d] : '%s': %s.\n", file, linenum, args[0], errmsg);
2991 err_code |= ERR_ALERT | ERR_FATAL;
2992 free(arg->name);
2993 free(arg);
2994 goto out;
2995 }
2996 LIST_ADDQ(&curmsg->args, &arg->list);
2997 cur_arg++;
2998 }
2999 curproxy->conf.args.file = NULL;
3000 curproxy->conf.args.line = 0;
3001 }
3002 else if (!strcmp(args[0], "event")) {
3003 if (!*args[1]) {
3004 Alert("parsing [%s:%d] : missing event name.\n", file, linenum);
3005 err_code |= ERR_ALERT | ERR_ABORT;
3006 goto out;
3007 }
3008 if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_CLIENT_SESS]))
3009 curmsg->event = SPOE_EV_ON_CLIENT_SESS;
3010 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_SERVER_SESS]))
3011 curmsg->event = SPOE_EV_ON_SERVER_SESS;
3012
3013 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_FE]))
3014 curmsg->event = SPOE_EV_ON_TCP_REQ_FE;
3015 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_BE]))
3016 curmsg->event = SPOE_EV_ON_TCP_REQ_BE;
3017 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_RSP]))
3018 curmsg->event = SPOE_EV_ON_TCP_RSP;
3019
3020 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_FE]))
3021 curmsg->event = SPOE_EV_ON_HTTP_REQ_FE;
3022 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_BE]))
3023 curmsg->event = SPOE_EV_ON_HTTP_REQ_BE;
3024 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_RSP]))
3025 curmsg->event = SPOE_EV_ON_HTTP_RSP;
3026 else {
3027 Alert("parsing [%s:%d] : unkown event '%s'.\n",
3028 file, linenum, args[1]);
3029 err_code |= ERR_ALERT | ERR_ABORT;
3030 goto out;
3031 }
3032 }
3033 else if (!*args[0]) {
3034 Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-message section.\n",
3035 file, linenum, args[0]);
3036 err_code |= ERR_ALERT | ERR_FATAL;
3037 goto out;
3038 }
3039 out:
3040 free(errmsg);
3041 return err_code;
3042 }
3043
3044 /* Return -1 on error, else 0 */
3045 static int
parse_spoe_flt(char ** args,int * cur_arg,struct proxy * px,struct flt_conf * fconf,char ** err,void * private)3046 parse_spoe_flt(char **args, int *cur_arg, struct proxy *px,
3047 struct flt_conf *fconf, char **err, void *private)
3048 {
3049 struct list backup_sections;
3050 struct spoe_config *conf;
3051 struct spoe_message *msg, *msgback;
3052 struct spoe_msg_placeholder *mp, *mpback;
3053 char *file = NULL, *engine = NULL;
3054 int ret, pos = *cur_arg + 1;
3055
3056 conf = calloc(1, sizeof(*conf));
3057 if (conf == NULL) {
3058 memprintf(err, "%s: out of memory", args[*cur_arg]);
3059 goto error;
3060 }
3061 conf->proxy = px;
3062
3063 while (*args[pos]) {
3064 if (!strcmp(args[pos], "config")) {
3065 if (!*args[pos+1]) {
3066 memprintf(err, "'%s' : '%s' option without value",
3067 args[*cur_arg], args[pos]);
3068 goto error;
3069 }
3070 file = args[pos+1];
3071 pos += 2;
3072 }
3073 else if (!strcmp(args[pos], "engine")) {
3074 if (!*args[pos+1]) {
3075 memprintf(err, "'%s' : '%s' option without value",
3076 args[*cur_arg], args[pos]);
3077 goto error;
3078 }
3079 engine = args[pos+1];
3080 pos += 2;
3081 }
3082 else {
3083 memprintf(err, "unknown keyword '%s'", args[pos]);
3084 goto error;
3085 }
3086 }
3087 if (file == NULL) {
3088 memprintf(err, "'%s' : missing config file", args[*cur_arg]);
3089 goto error;
3090 }
3091
3092 /* backup sections and register SPOE sections */
3093 LIST_INIT(&backup_sections);
3094 cfg_backup_sections(&backup_sections);
3095 cfg_register_section("spoe-agent", cfg_parse_spoe_agent);
3096 cfg_register_section("spoe-message", cfg_parse_spoe_message);
3097
3098 /* Parse SPOE filter configuration file */
3099 curengine = engine;
3100 curproxy = px;
3101 curagent = NULL;
3102 curmsg = NULL;
3103 ret = readcfgfile(file);
3104 curproxy = NULL;
3105
3106 /* unregister SPOE sections and restore previous sections */
3107 cfg_unregister_sections();
3108 cfg_restore_sections(&backup_sections);
3109
3110 if (ret == -1) {
3111 memprintf(err, "Could not open configuration file %s : %s",
3112 file, strerror(errno));
3113 goto error;
3114 }
3115 if (ret & (ERR_ABORT|ERR_FATAL)) {
3116 memprintf(err, "Error(s) found in configuration file %s", file);
3117 goto error;
3118 }
3119
3120 /* Check SPOE agent */
3121 if (curagent == NULL) {
3122 memprintf(err, "No SPOE agent found in file %s", file);
3123 goto error;
3124 }
3125 if (curagent->b.name == NULL) {
3126 memprintf(err, "No backend declared for SPOE agent '%s' declared at %s:%d",
3127 curagent->id, curagent->conf.file, curagent->conf.line);
3128 goto error;
3129 }
3130 if (curagent->timeout.hello == TICK_ETERNITY ||
3131 curagent->timeout.idle == TICK_ETERNITY ||
3132 curagent->timeout.processing == TICK_ETERNITY) {
3133 Warning("Proxy '%s': missing timeouts for SPOE agent '%s' declare at %s:%d.\n"
3134 " | While not properly invalid, you will certainly encounter various problems\n"
3135 " | with such a configuration. To fix this, please ensure that all following\n"
3136 " | timeouts are set to a non-zero value: 'hello', 'idle', 'processing'.\n",
3137 px->id, curagent->id, curagent->conf.file, curagent->conf.line);
3138 }
3139 if (curagent->var_pfx == NULL) {
3140 char *tmp = curagent->id;
3141
3142 while (*tmp) {
3143 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
3144 memprintf(err, "Invalid variable prefix '%s' for SPOE agent '%s' declared at %s:%d. "
3145 "Use 'option var-prefix' to set it. Only [a-zA-Z0-9_.] chars are supported.\n",
3146 curagent->id, curagent->id, curagent->conf.file, curagent->conf.line);
3147 goto error;
3148 }
3149 tmp++;
3150 }
3151 curagent->var_pfx = strdup(curagent->id);
3152 }
3153
3154 if (LIST_ISEMPTY(&curmps)) {
3155 Warning("Proxy '%s': No message used by SPOE agent '%s' declared at %s:%d.\n",
3156 px->id, curagent->id, curagent->conf.file, curagent->conf.line);
3157 goto finish;
3158 }
3159
3160 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3161 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3162 if (!strcmp(msg->id, mp->id)) {
3163 if ((px->cap & (PR_CAP_FE|PR_CAP_BE)) == (PR_CAP_FE|PR_CAP_BE)) {
3164 if (msg->event == SPOE_EV_ON_TCP_REQ_BE)
3165 msg->event = SPOE_EV_ON_TCP_REQ_FE;
3166 if (msg->event == SPOE_EV_ON_HTTP_REQ_BE)
3167 msg->event = SPOE_EV_ON_HTTP_REQ_FE;
3168 }
3169 if (!(px->cap & PR_CAP_FE) && (msg->event == SPOE_EV_ON_CLIENT_SESS ||
3170 msg->event == SPOE_EV_ON_TCP_REQ_FE ||
3171 msg->event == SPOE_EV_ON_HTTP_REQ_FE)) {
3172 Warning("Proxy '%s': frontend event used on a backend proxy at %s:%d.\n",
3173 px->id, msg->conf.file, msg->conf.line);
3174 goto next;
3175 }
3176 if (msg->event == SPOE_EV_NONE) {
3177 Warning("Proxy '%s': Ignore SPOE message without event at %s:%d.\n",
3178 px->id, msg->conf.file, msg->conf.line);
3179 goto next;
3180 }
3181 msg->agent = curagent;
3182 LIST_DEL(&msg->list);
3183 LIST_ADDQ(&curagent->messages[msg->event], &msg->list);
3184 goto next;
3185 }
3186 }
3187 memprintf(err, "SPOE agent '%s' try to use undefined SPOE message '%s' at %s:%d",
3188 curagent->id, mp->id, curagent->conf.file, curagent->conf.line);
3189 goto error;
3190 next:
3191 continue;
3192 }
3193
3194 finish:
3195 conf->agent = curagent;
3196 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3197 LIST_DEL(&mp->list);
3198 release_spoe_msg_placeholder(mp);
3199 }
3200 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3201 Warning("Proxy '%s': Ignore unused SPOE messages '%s' declared at %s:%d.\n",
3202 px->id, msg->id, msg->conf.file, msg->conf.line);
3203 LIST_DEL(&msg->list);
3204 release_spoe_message(msg);
3205 }
3206
3207 *cur_arg = pos;
3208 fconf->id = spoe_filter_id;
3209 fconf->ops = &spoe_ops;
3210 fconf->conf = conf;
3211 return 0;
3212
3213 error:
3214 release_spoe_agent(curagent);
3215 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3216 LIST_DEL(&mp->list);
3217 release_spoe_msg_placeholder(mp);
3218 }
3219 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3220 LIST_DEL(&msg->list);
3221 release_spoe_message(msg);
3222 }
3223 free(conf);
3224 return -1;
3225 }
3226
3227
3228 /* Declare the filter parser for "spoe" keyword */
3229 static struct flt_kw_list flt_kws = { "SPOE", { }, {
3230 { "spoe", parse_spoe_flt, NULL },
3231 { NULL, NULL, NULL },
3232 }
3233 };
3234
3235 __attribute__((constructor))
__spoe_init(void)3236 static void __spoe_init(void)
3237 {
3238 flt_register_keywords(&flt_kws);
3239
3240 LIST_INIT(&curmsgs);
3241 LIST_INIT(&curmps);
3242 pool2_spoe_ctx = create_pool("spoe_ctx", sizeof(struct spoe_context), MEM_F_SHARED);
3243 }
3244
3245 __attribute__((destructor))
3246 static void
__spoe_deinit(void)3247 __spoe_deinit(void)
3248 {
3249 pool_destroy2(pool2_spoe_ctx);
3250 }
3251