1 /*
2 * Event sink management
3 *
4 * Copyright (C) 2000-2019 Willy Tarreau - w@1wt.eu
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation, version 2.1
9 * exclusively.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 */
20
21 #include <import/ist.h>
22 #include <haproxy/api.h>
23 #include <haproxy/cfgparse.h>
24 #include <haproxy/cli.h>
25 #include <haproxy/errors.h>
26 #include <haproxy/list.h>
27 #include <haproxy/log.h>
28 #include <haproxy/ring.h>
29 #include <haproxy/signal.h>
30 #include <haproxy/sink.h>
31 #include <haproxy/stream_interface.h>
32 #include <haproxy/time.h>
33
34 struct list sink_list = LIST_HEAD_INIT(sink_list);
35
36 struct sink *cfg_sink;
37
sink_find(const char * name)38 struct sink *sink_find(const char *name)
39 {
40 struct sink *sink;
41
42 list_for_each_entry(sink, &sink_list, sink_list)
43 if (strcmp(sink->name, name) == 0)
44 return sink;
45 return NULL;
46 }
47
48 /* creates a new sink and adds it to the list, it's still generic and not fully
49 * initialized. Returns NULL on allocation failure. If another one already
50 * exists with the same name, it will be returned. The caller can detect it as
51 * a newly created one has type SINK_TYPE_NEW.
52 */
__sink_new(const char * name,const char * desc,enum sink_fmt fmt)53 static struct sink *__sink_new(const char *name, const char *desc, enum sink_fmt fmt)
54 {
55 struct sink *sink;
56
57 sink = sink_find(name);
58 if (sink)
59 goto end;
60
61 sink = calloc(1, sizeof(*sink));
62 if (!sink)
63 goto end;
64
65 sink->name = strdup(name);
66 if (!sink->name)
67 goto err;
68
69 sink->desc = strdup(desc);
70 if (!sink->desc)
71 goto err;
72
73 sink->fmt = fmt;
74 sink->type = SINK_TYPE_NEW;
75 sink->maxlen = BUFSIZE;
76 /* address will be filled by the caller if needed */
77 sink->ctx.fd = -1;
78 sink->ctx.dropped = 0;
79 HA_RWLOCK_INIT(&sink->ctx.lock);
80 LIST_ADDQ(&sink_list, &sink->sink_list);
81 end:
82 return sink;
83
84 err:
85 free(sink->name); sink->name = NULL;
86 free(sink->desc); sink->desc = NULL;
87 free(sink); sink = NULL;
88
89 return NULL;
90 }
91
92 /* creates a sink called <name> of type FD associated to fd <fd>, format <fmt>,
93 * and description <desc>. Returns NULL on allocation failure or conflict.
94 * Perfect duplicates are merged (same type, fd, and name).
95 */
sink_new_fd(const char * name,const char * desc,enum sink_fmt fmt,int fd)96 struct sink *sink_new_fd(const char *name, const char *desc, enum sink_fmt fmt, int fd)
97 {
98 struct sink *sink;
99
100 sink = __sink_new(name, desc, fmt);
101 if (!sink || (sink->type == SINK_TYPE_FD && sink->ctx.fd == fd))
102 goto end;
103
104 if (sink->type != SINK_TYPE_NEW) {
105 sink = NULL;
106 goto end;
107 }
108
109 sink->type = SINK_TYPE_FD;
110 sink->ctx.fd = fd;
111 end:
112 return sink;
113 }
114
115 /* creates a sink called <name> of type BUF of size <size>, format <fmt>,
116 * and description <desc>. Returns NULL on allocation failure or conflict.
117 * Perfect duplicates are merged (same type and name). If sizes differ, the
118 * largest one is kept.
119 */
sink_new_buf(const char * name,const char * desc,enum sink_fmt fmt,size_t size)120 struct sink *sink_new_buf(const char *name, const char *desc, enum sink_fmt fmt, size_t size)
121 {
122 struct sink *sink;
123
124 sink = __sink_new(name, desc, fmt);
125 if (!sink)
126 goto fail;
127
128 if (sink->type == SINK_TYPE_BUFFER) {
129 /* such a buffer already exists, we may have to resize it */
130 if (!ring_resize(sink->ctx.ring, size))
131 goto fail;
132 goto end;
133 }
134
135 if (sink->type != SINK_TYPE_NEW) {
136 /* already exists of another type */
137 goto fail;
138 }
139
140 sink->ctx.ring = ring_new(size);
141 if (!sink->ctx.ring) {
142 LIST_DEL(&sink->sink_list);
143 free(sink->name);
144 free(sink->desc);
145 free(sink);
146 goto fail;
147 }
148
149 sink->type = SINK_TYPE_BUFFER;
150 end:
151 return sink;
152 fail:
153 return NULL;
154 }
155
156 /* tries to send <nmsg> message parts (up to 8, ignored above) from message
157 * array <msg> to sink <sink>. Formatting according to the sink's preference is
158 * done here. Lost messages are NOT accounted for. It is preferable to call
159 * sink_write() instead which will also try to emit the number of dropped
160 * messages when there are any. It returns >0 if it could write anything,
161 * <=0 otherwise.
162 */
__sink_write(struct sink * sink,const struct ist msg[],size_t nmsg,int level,int facility,struct ist * tag,struct ist * pid,struct ist * sd)163 ssize_t __sink_write(struct sink *sink, const struct ist msg[], size_t nmsg,
164 int level, int facility, struct ist *tag,
165 struct ist *pid, struct ist *sd)
166 {
167 int log_format;
168 char short_hdr[4];
169 struct ist pfx[6];
170 size_t npfx = 0;
171 char *hdr_ptr;
172 int fac_level;
173
174 if (sink->fmt == SINK_FMT_RAW)
175 goto send;
176
177 if (sink->fmt == SINK_FMT_SHORT || sink->fmt == SINK_FMT_TIMED) {
178 short_hdr[0] = '<';
179 short_hdr[1] = '0' + level;
180 short_hdr[2] = '>';
181
182 pfx[npfx].ptr = short_hdr;
183 pfx[npfx].len = 3;
184 npfx++;
185 if (sink->fmt == SINK_FMT_SHORT)
186 goto send;
187 }
188
189
190 if (sink->fmt == SINK_FMT_ISO || sink->fmt == SINK_FMT_TIMED) {
191 pfx[npfx].ptr = timeofday_as_iso_us(1);
192 pfx[npfx].len = 33;
193 npfx++;
194 goto send;
195 }
196 else if (sink->fmt == SINK_FMT_RFC5424) {
197 pfx[npfx].ptr = logheader_rfc5424;
198 pfx[npfx].len = update_log_hdr_rfc5424(date.tv_sec, date.tv_usec) - pfx[npfx].ptr;
199 log_format = LOG_FORMAT_RFC5424;
200 }
201 else {
202 pfx[npfx].ptr = logheader;
203 pfx[npfx].len = update_log_hdr(date.tv_sec) - pfx[npfx].ptr;
204 log_format = LOG_FORMAT_RFC3164;
205 sd = NULL;
206 }
207
208 fac_level = (facility << 3) + level;
209 hdr_ptr = pfx[npfx].ptr + 3; /* last digit of the log level */
210 do {
211 *hdr_ptr = '0' + fac_level % 10;
212 fac_level /= 10;
213 hdr_ptr--;
214 } while (fac_level && hdr_ptr > pfx[npfx].ptr);
215 *hdr_ptr = '<';
216 pfx[npfx].len -= hdr_ptr - pfx[npfx].ptr;
217 pfx[npfx].ptr = hdr_ptr;
218 npfx++;
219
220 if (tag && tag->len) {
221 pfx[npfx].ptr = tag->ptr;
222 pfx[npfx].len = tag->len;
223 npfx++;
224 }
225 pfx[npfx].ptr = get_format_pid_sep1(log_format, &pfx[npfx].len);
226 if (pfx[npfx].len)
227 npfx++;
228
229 if (pid && pid->len) {
230 pfx[npfx].ptr = pid->ptr;
231 pfx[npfx].len = pid->len;
232 npfx++;
233 }
234
235 pfx[npfx].ptr = get_format_pid_sep2(log_format, &pfx[npfx].len);
236 if (pfx[npfx].len)
237 npfx++;
238
239 if (sd && sd->len) {
240 pfx[npfx].ptr = sd->ptr;
241 pfx[npfx].len = sd->len;
242 npfx++;
243 }
244
245 send:
246 if (sink->type == SINK_TYPE_FD) {
247 return fd_write_frag_line(sink->ctx.fd, sink->maxlen, pfx, npfx, msg, nmsg, 1);
248 }
249 else if (sink->type == SINK_TYPE_BUFFER) {
250 return ring_write(sink->ctx.ring, sink->maxlen, pfx, npfx, msg, nmsg);
251 }
252 return 0;
253 }
254
255 /* Tries to emit a message indicating the number of dropped events. In case of
256 * success, the amount of drops is reduced by as much. It's supposed to be
257 * called under an exclusive lock on the sink to avoid multiple produces doing
258 * the same. On success, >0 is returned, otherwise <=0 on failure.
259 */
sink_announce_dropped(struct sink * sink,int facility,struct ist * pid)260 int sink_announce_dropped(struct sink *sink, int facility, struct ist *pid)
261 {
262 unsigned int dropped;
263 struct buffer msg;
264 struct ist msgvec[1];
265 char logbuf[64];
266 struct ist sd;
267 struct ist tag;
268
269 while (unlikely((dropped = sink->ctx.dropped) > 0)) {
270 chunk_init(&msg, logbuf, sizeof(logbuf));
271 chunk_printf(&msg, "%u event%s dropped", dropped, dropped > 1 ? "s" : "");
272 msgvec[0] = ist2(msg.area, msg.data);
273
274 sd.ptr = default_rfc5424_sd_log_format;
275 sd.len = 2;
276 tag.ptr = global.log_tag.area;
277 tag.len = global.log_tag.data;
278 if (__sink_write(sink, msgvec, 1, LOG_NOTICE, facility, &tag, pid, &sd) <= 0)
279 return 0;
280 /* success! */
281 HA_ATOMIC_SUB(&sink->ctx.dropped, dropped);
282 }
283 return 1;
284 }
285
286 /* parse the "show events" command, returns 1 if a message is returned, otherwise zero */
cli_parse_show_events(char ** args,char * payload,struct appctx * appctx,void * private)287 static int cli_parse_show_events(char **args, char *payload, struct appctx *appctx, void *private)
288 {
289 struct sink *sink;
290 int arg;
291
292 args++; // make args[1] the 1st arg
293
294 if (!*args[1]) {
295 /* no arg => report the list of supported sink */
296 chunk_printf(&trash, "Supported events sinks are listed below. Add -w(wait), -n(new). Any key to stop\n");
297 list_for_each_entry(sink, &sink_list, sink_list) {
298 chunk_appendf(&trash, " %-10s : type=%s, %u dropped, %s\n",
299 sink->name,
300 sink->type == SINK_TYPE_NEW ? "init" :
301 sink->type == SINK_TYPE_FD ? "fd" :
302 sink->type == SINK_TYPE_BUFFER ? "buffer" : "?",
303 sink->ctx.dropped, sink->desc);
304 }
305
306 trash.area[trash.data] = 0;
307 return cli_msg(appctx, LOG_WARNING, trash.area);
308 }
309
310 if (!cli_has_level(appctx, ACCESS_LVL_OPER))
311 return 1;
312
313 sink = sink_find(args[1]);
314 if (!sink)
315 return cli_err(appctx, "No such event sink");
316
317 if (sink->type != SINK_TYPE_BUFFER)
318 return cli_msg(appctx, LOG_NOTICE, "Nothing to report for this sink");
319
320 for (arg = 2; *args[arg]; arg++) {
321 if (strcmp(args[arg], "-w") == 0)
322 appctx->ctx.cli.i0 |= 1; // wait mode
323 else if (strcmp(args[arg], "-n") == 0)
324 appctx->ctx.cli.i0 |= 2; // seek to new
325 else if (strcmp(args[arg], "-nw") == 0 || strcmp(args[arg], "-wn") == 0)
326 appctx->ctx.cli.i0 |= 3; // seek to new + wait
327 else
328 return cli_err(appctx, "unknown option");
329 }
330 return ring_attach_cli(sink->ctx.ring, appctx);
331 }
332
333 /* Pre-configures a ring proxy to emit connections */
sink_setup_proxy(struct proxy * px)334 void sink_setup_proxy(struct proxy *px)
335 {
336 px->last_change = now.tv_sec;
337 px->cap = PR_CAP_FE | PR_CAP_BE;
338 px->maxconn = 0;
339 px->conn_retries = 1;
340 px->timeout.server = TICK_ETERNITY;
341 px->timeout.client = TICK_ETERNITY;
342 px->timeout.connect = TICK_ETERNITY;
343 px->accept = NULL;
344 px->options2 |= PR_O2_INDEPSTR | PR_O2_SMARTCON | PR_O2_SMARTACC;
345 px->bind_proc = 0; /* will be filled by users */
346 }
347
348 /*
349 * IO Handler to handle message push to syslog tcp server
350 */
sink_forward_io_handler(struct appctx * appctx)351 static void sink_forward_io_handler(struct appctx *appctx)
352 {
353 struct stream_interface *si = appctx->owner;
354 struct stream *s = si_strm(si);
355 struct sink *sink = strm_fe(s)->parent;
356 struct sink_forward_target *sft = appctx->ctx.sft.ptr;
357 struct ring *ring = sink->ctx.ring;
358 struct buffer *buf = &ring->buf;
359 uint64_t msg_len;
360 size_t len, cnt, ofs;
361 int ret = 0;
362
363 /* if stopping was requested, close immediately */
364 if (unlikely(stopping))
365 goto close;
366
367 /* for rex because it seems reset to timeout
368 * and we don't want expire on this case
369 * with a syslog server
370 */
371 si_oc(si)->rex = TICK_ETERNITY;
372 /* rto should not change but it seems the case */
373 si_oc(si)->rto = TICK_ETERNITY;
374
375 /* an error was detected */
376 if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
377 goto close;
378
379 /* con closed by server side */
380 if ((si_oc(si)->flags & CF_SHUTW))
381 goto close;
382
383 /* if the connection is not established, inform the stream that we want
384 * to be notified whenever the connection completes.
385 */
386 if (si_opposite(si)->state < SI_ST_EST) {
387 si_cant_get(si);
388 si_rx_conn_blk(si);
389 si_rx_endp_more(si);
390 return;
391 }
392
393 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
394 if (appctx != sft->appctx) {
395 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
396 goto close;
397 }
398 ofs = sft->ofs;
399
400 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
401 LIST_DEL_INIT(&appctx->wait_entry);
402 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
403
404 HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
405
406 /* explanation for the initialization below: it would be better to do
407 * this in the parsing function but this would occasionally result in
408 * dropped events because we'd take a reference on the oldest message
409 * and keep it while being scheduled. Thus instead let's take it the
410 * first time we enter here so that we have a chance to pass many
411 * existing messages before grabbing a reference to a location. This
412 * value cannot be produced after initialization.
413 */
414 if (unlikely(ofs == ~0)) {
415 ofs = 0;
416
417 HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
418 ofs += ring->ofs;
419 }
420
421 /* in this loop, ofs always points to the counter byte that precedes
422 * the message so that we can take our reference there if we have to
423 * stop before the end (ret=0).
424 */
425 if (si_opposite(si)->state == SI_ST_EST) {
426 /* we were already there, adjust the offset to be relative to
427 * the buffer's head and remove us from the counter.
428 */
429 ofs -= ring->ofs;
430 BUG_ON(ofs >= buf->size);
431 HA_ATOMIC_SUB(b_peek(buf, ofs), 1);
432
433 ret = 1;
434 while (ofs + 1 < b_data(buf)) {
435 cnt = 1;
436 len = b_peek_varint(buf, ofs + cnt, &msg_len);
437 if (!len)
438 break;
439 cnt += len;
440 BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
441
442 if (unlikely(msg_len + 1 > b_size(&trash))) {
443 /* too large a message to ever fit, let's skip it */
444 ofs += cnt + msg_len;
445 continue;
446 }
447
448 chunk_reset(&trash);
449 len = b_getblk(buf, trash.area, msg_len, ofs + cnt);
450 trash.data += len;
451 trash.area[trash.data++] = '\n';
452
453 if (ci_putchk(si_ic(si), &trash) == -1) {
454 si_rx_room_blk(si);
455 ret = 0;
456 break;
457 }
458 ofs += cnt + msg_len;
459 }
460
461 HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
462 ofs += ring->ofs;
463 sft->ofs = ofs;
464 }
465 HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
466
467 if (ret) {
468 /* let's be woken up once new data arrive */
469 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
470 LIST_ADDQ(&ring->waiters, &appctx->wait_entry);
471 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
472 si_rx_endp_done(si);
473 }
474 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
475
476 /* always drain data from server */
477 co_skip(si_oc(si), si_oc(si)->output);
478 return;
479
480 close:
481 si_shutw(si);
482 si_shutr(si);
483 si_ic(si)->flags |= CF_READ_NULL;
484 }
485
486 /*
487 * IO Handler to handle message push to syslog tcp server
488 * using octet counting frames
489 */
sink_forward_oc_io_handler(struct appctx * appctx)490 static void sink_forward_oc_io_handler(struct appctx *appctx)
491 {
492 struct stream_interface *si = appctx->owner;
493 struct stream *s = si_strm(si);
494 struct sink *sink = strm_fe(s)->parent;
495 struct sink_forward_target *sft = appctx->ctx.sft.ptr;
496 struct ring *ring = sink->ctx.ring;
497 struct buffer *buf = &ring->buf;
498 uint64_t msg_len;
499 size_t len, cnt, ofs;
500 int ret = 0;
501 char *p;
502
503 /* if stopping was requested, close immediately */
504 if (unlikely(stopping))
505 goto close;
506
507 /* for rex because it seems reset to timeout
508 * and we don't want expire on this case
509 * with a syslog server
510 */
511 si_oc(si)->rex = TICK_ETERNITY;
512 /* rto should not change but it seems the case */
513 si_oc(si)->rto = TICK_ETERNITY;
514
515 /* an error was detected */
516 if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
517 goto close;
518
519 /* con closed by server side */
520 if ((si_oc(si)->flags & CF_SHUTW))
521 goto close;
522
523 /* if the connection is not established, inform the stream that we want
524 * to be notified whenever the connection completes.
525 */
526 if (si_opposite(si)->state < SI_ST_EST) {
527 si_cant_get(si);
528 si_rx_conn_blk(si);
529 si_rx_endp_more(si);
530 return;
531 }
532
533 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
534 if (appctx != sft->appctx) {
535 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
536 goto close;
537 }
538 ofs = sft->ofs;
539
540 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
541 LIST_DEL_INIT(&appctx->wait_entry);
542 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
543
544 HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
545
546 /* explanation for the initialization below: it would be better to do
547 * this in the parsing function but this would occasionally result in
548 * dropped events because we'd take a reference on the oldest message
549 * and keep it while being scheduled. Thus instead let's take it the
550 * first time we enter here so that we have a chance to pass many
551 * existing messages before grabbing a reference to a location. This
552 * value cannot be produced after initialization.
553 */
554 if (unlikely(ofs == ~0)) {
555 ofs = 0;
556
557 HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
558 ofs += ring->ofs;
559 }
560
561 /* in this loop, ofs always points to the counter byte that precedes
562 * the message so that we can take our reference there if we have to
563 * stop before the end (ret=0).
564 */
565 if (si_opposite(si)->state == SI_ST_EST) {
566 /* we were already there, adjust the offset to be relative to
567 * the buffer's head and remove us from the counter.
568 */
569 ofs -= ring->ofs;
570 BUG_ON(ofs >= buf->size);
571 HA_ATOMIC_SUB(b_peek(buf, ofs), 1);
572
573 ret = 1;
574 while (ofs + 1 < b_data(buf)) {
575 cnt = 1;
576 len = b_peek_varint(buf, ofs + cnt, &msg_len);
577 if (!len)
578 break;
579 cnt += len;
580 BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
581
582 chunk_reset(&trash);
583 p = ulltoa(msg_len, trash.area, b_size(&trash));
584 if (p) {
585 trash.data = (p - trash.area) + 1;
586 *p = ' ';
587 }
588
589 if (!p || (trash.data + msg_len > b_size(&trash))) {
590 /* too large a message to ever fit, let's skip it */
591 ofs += cnt + msg_len;
592 continue;
593 }
594
595 trash.data += b_getblk(buf, p + 1, msg_len, ofs + cnt);
596
597 if (ci_putchk(si_ic(si), &trash) == -1) {
598 si_rx_room_blk(si);
599 ret = 0;
600 break;
601 }
602 ofs += cnt + msg_len;
603 }
604
605 HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
606 ofs += ring->ofs;
607 sft->ofs = ofs;
608 }
609 HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
610
611 if (ret) {
612 /* let's be woken up once new data arrive */
613 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
614 LIST_ADDQ(&ring->waiters, &appctx->wait_entry);
615 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
616 si_rx_endp_done(si);
617 }
618 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
619
620 /* always drain data from server */
621 co_skip(si_oc(si), si_oc(si)->output);
622 return;
623
624 close:
625 si_shutw(si);
626 si_shutr(si);
627 si_ic(si)->flags |= CF_READ_NULL;
628 }
629
__sink_forward_session_deinit(struct sink_forward_target * sft)630 void __sink_forward_session_deinit(struct sink_forward_target *sft)
631 {
632 struct stream_interface *si;
633 struct stream *s;
634 struct sink *sink;
635
636 if (!sft->appctx)
637 return;
638
639 si = sft->appctx->owner;
640 if (!si)
641 return;
642
643 s = si_strm(si);
644 if (!s)
645 return;
646
647 sink = strm_fe(s)->parent;
648 if (!sink)
649 return;
650
651 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
652 LIST_DEL_INIT(&sft->appctx->wait_entry);
653 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
654
655 sft->appctx = NULL;
656 task_wakeup(sink->forward_task, TASK_WOKEN_MSG);
657 }
658
659
sink_forward_session_release(struct appctx * appctx)660 static void sink_forward_session_release(struct appctx *appctx)
661 {
662 struct sink_forward_target *sft = appctx->ctx.peers.ptr;
663
664 if (!sft)
665 return;
666
667 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
668 if (sft->appctx == appctx)
669 __sink_forward_session_deinit(sft);
670 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
671 }
672
673 static struct applet sink_forward_applet = {
674 .obj_type = OBJ_TYPE_APPLET,
675 .name = "<SINKFWD>", /* used for logging */
676 .fct = sink_forward_io_handler,
677 .release = sink_forward_session_release,
678 };
679
680 static struct applet sink_forward_oc_applet = {
681 .obj_type = OBJ_TYPE_APPLET,
682 .name = "<SINKFWDOC>", /* used for logging */
683 .fct = sink_forward_oc_io_handler,
684 .release = sink_forward_session_release,
685 };
686
687 /*
688 * Create a new peer session in assigned state (connect will start automatically)
689 */
sink_forward_session_create(struct sink * sink,struct sink_forward_target * sft)690 static struct appctx *sink_forward_session_create(struct sink *sink, struct sink_forward_target *sft)
691 {
692 struct proxy *p = sink->forward_px;
693 struct appctx *appctx;
694 struct session *sess;
695 struct stream *s;
696 struct applet *applet = &sink_forward_applet;
697
698 if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING)
699 applet = &sink_forward_oc_applet;
700
701 appctx = appctx_new(applet, tid_bit);
702 if (!appctx)
703 goto out_close;
704
705 appctx->ctx.sft.ptr = (void *)sft;
706
707 sess = session_new(p, NULL, &appctx->obj_type);
708 if (!sess) {
709 ha_alert("out of memory in peer_session_create().\n");
710 goto out_free_appctx;
711 }
712
713 if ((s = stream_new(sess, &appctx->obj_type)) == NULL) {
714 ha_alert("Failed to initialize stream in peer_session_create().\n");
715 goto out_free_sess;
716 }
717
718
719 s->target = &sft->srv->obj_type;
720 if (!sockaddr_alloc(&s->target_addr))
721 goto out_free_strm;
722 *s->target_addr = sft->srv->addr;
723 s->flags = SF_ASSIGNED|SF_ADDR_SET;
724 s->si[1].flags |= SI_FL_NOLINGER;
725
726 s->do_log = NULL;
727 s->uniq_id = 0;
728
729 s->res.flags |= CF_READ_DONTWAIT;
730 /* for rto and rex to eternity to not expire on idle recv:
731 * We are using a syslog server.
732 */
733 s->res.rto = TICK_ETERNITY;
734 s->res.rex = TICK_ETERNITY;
735 sft->appctx = appctx;
736 task_wakeup(s->task, TASK_WOKEN_INIT);
737 return appctx;
738
739 /* Error unrolling */
740 out_free_strm:
741 LIST_DEL(&s->list);
742 pool_free(pool_head_stream, s);
743 out_free_sess:
744 session_free(sess);
745 out_free_appctx:
746 appctx_free(appctx);
747 out_close:
748 return NULL;
749 }
750
751 /*
752 * Task to handle connctions to forward servers
753 */
process_sink_forward(struct task * task,void * context,unsigned short state)754 static struct task *process_sink_forward(struct task * task, void *context, unsigned short state)
755 {
756 struct sink *sink = (struct sink *)context;
757 struct sink_forward_target *sft = sink->sft;
758
759 task->expire = TICK_ETERNITY;
760
761 if (!stopping) {
762 while (sft) {
763 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
764 /* if appctx is NULL, start a new session */
765 if (!sft->appctx)
766 sft->appctx = sink_forward_session_create(sink, sft);
767 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
768 sft = sft->next;
769 }
770 }
771 else {
772 while (sft) {
773 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
774 /* awake applet to perform a clean close */
775 if (sft->appctx)
776 appctx_wakeup(sft->appctx);
777 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
778 sft = sft->next;
779 }
780 }
781
782 return task;
783 }
784 /*
785 * Init task to manage connctions to forward servers
786 *
787 * returns 0 in case of error.
788 */
sink_init_forward(struct sink * sink)789 int sink_init_forward(struct sink *sink)
790 {
791 sink->forward_task = task_new(MAX_THREADS_MASK);
792 if (!sink->forward_task)
793 return 0;
794
795 sink->forward_task->process = process_sink_forward;
796 sink->forward_task->context = (void *)sink;
797 sink->forward_sighandler = signal_register_task(0, sink->forward_task, 0);
798 task_wakeup(sink->forward_task, TASK_WOKEN_INIT);
799 return 1;
800 }
801 /*
802 * Parse "ring" section and create corresponding sink buffer.
803 *
804 * The function returns 0 in success case, otherwise, it returns error
805 * flags.
806 */
cfg_parse_ring(const char * file,int linenum,char ** args,int kwm)807 int cfg_parse_ring(const char *file, int linenum, char **args, int kwm)
808 {
809 int err_code = 0;
810 const char *inv;
811 size_t size = BUFSIZE;
812 struct proxy *p;
813
814 if (strcmp(args[0], "ring") == 0) { /* new peers section */
815 if (!*args[1]) {
816 ha_alert("parsing [%s:%d] : missing ring name.\n", file, linenum);
817 err_code |= ERR_ALERT | ERR_FATAL;
818 goto err;
819 }
820
821 inv = invalid_char(args[1]);
822 if (inv) {
823 ha_alert("parsing [%s:%d] : invalid ring name '%s' (character '%c' is not permitted).\n", file, linenum, args[1], *inv);
824 err_code |= ERR_ALERT | ERR_FATAL;
825 goto err;
826 }
827
828 if (sink_find(args[1])) {
829 ha_alert("parsing [%s:%d] : sink named '%s' already exists.\n", file, linenum, args[1]);
830 err_code |= ERR_ALERT | ERR_FATAL;
831 goto err;
832 }
833
834 cfg_sink = sink_new_buf(args[1], args[1] , SINK_FMT_RAW, size);
835 if (!cfg_sink || cfg_sink->type != SINK_TYPE_BUFFER) {
836 ha_alert("parsing [%s:%d] : unable to create a new sink buffer for ring '%s'.\n", file, linenum, args[1]);
837 err_code |= ERR_ALERT | ERR_FATAL;
838 goto err;
839 }
840
841 /* allocate new proxy to handle forwards */
842 p = calloc(1, sizeof *p);
843 if (!p) {
844 ha_alert("parsing [%s:%d] : out of memory.\n", file, linenum);
845 err_code |= ERR_ALERT | ERR_FATAL;
846 goto err;
847 }
848
849 init_new_proxy(p);
850 sink_setup_proxy(p);
851 p->parent = cfg_sink;
852 p->id = strdup(args[1]);
853 p->conf.args.file = p->conf.file = strdup(file);
854 p->conf.args.line = p->conf.line = linenum;
855 cfg_sink->forward_px = p;
856 }
857 else if (strcmp(args[0], "size") == 0) {
858 size = atol(args[1]);
859 if (!size) {
860 ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]);
861 err_code |= ERR_ALERT | ERR_FATAL;
862 goto err;
863 }
864
865 if (!cfg_sink || (cfg_sink->type != SINK_TYPE_BUFFER)
866 || !ring_resize(cfg_sink->ctx.ring, size)) {
867 ha_alert("parsing [%s:%d] : fail to set sink buffer size '%s'.\n", file, linenum, args[1]);
868 err_code |= ERR_ALERT | ERR_FATAL;
869 goto err;
870 }
871 }
872 else if (strcmp(args[0],"server") == 0) {
873 err_code |= parse_server(file, linenum, args, cfg_sink->forward_px, NULL, 1, 0, 1);
874 }
875 else if (strcmp(args[0],"timeout") == 0) {
876 if (!cfg_sink || !cfg_sink->forward_px) {
877 ha_alert("parsing [%s:%d] : unable to set timeout '%s'.\n", file, linenum, args[1]);
878 err_code |= ERR_ALERT | ERR_FATAL;
879 goto err;
880 }
881
882 if (strcmp(args[1], "connect") == 0 ||
883 strcmp(args[1], "server") == 0) {
884 const char *res;
885 unsigned int tout;
886
887 if (!*args[2]) {
888 ha_alert("parsing [%s:%d] : '%s %s' expects <time> as argument.\n",
889 file, linenum, args[0], args[1]);
890 err_code |= ERR_ALERT | ERR_FATAL;
891 goto err;
892 }
893 res = parse_time_err(args[2], &tout, TIME_UNIT_MS);
894 if (res == PARSE_TIME_OVER) {
895 ha_alert("parsing [%s:%d]: timer overflow in argument <%s> to <%s %s>, maximum value is 2147483647 ms (~24.8 days).\n",
896 file, linenum, args[2], args[0], args[1]);
897 err_code |= ERR_ALERT | ERR_FATAL;
898 goto err;
899 }
900 else if (res == PARSE_TIME_UNDER) {
901 ha_alert("parsing [%s:%d]: timer underflow in argument <%s> to <%s %s>, minimum non-null value is 1 ms.\n",
902 file, linenum, args[2], args[0], args[1]);
903 err_code |= ERR_ALERT | ERR_FATAL;
904 goto err;
905 }
906 else if (res) {
907 ha_alert("parsing [%s:%d]: unexpected character '%c' in argument to <%s %s>.\n",
908 file, linenum, *res, args[0], args[1]);
909 err_code |= ERR_ALERT | ERR_FATAL;
910 goto err;
911 }
912 if (args[1][2] == 'c')
913 cfg_sink->forward_px->timeout.connect = tout;
914 else
915 cfg_sink->forward_px->timeout.server = tout;
916 }
917 }
918 else if (strcmp(args[0],"format") == 0) {
919 if (!cfg_sink) {
920 ha_alert("parsing [%s:%d] : unable to set format '%s'.\n", file, linenum, args[1]);
921 err_code |= ERR_ALERT | ERR_FATAL;
922 goto err;
923 }
924
925 if (strcmp(args[1], "raw") == 0) {
926 cfg_sink->fmt = SINK_FMT_RAW;
927 }
928 else if (strcmp(args[1], "short") == 0) {
929 cfg_sink->fmt = SINK_FMT_SHORT;
930 }
931 else if (strcmp(args[1], "iso") == 0) {
932 cfg_sink->fmt = SINK_FMT_ISO;
933 }
934 else if (strcmp(args[1], "timed") == 0) {
935 cfg_sink->fmt = SINK_FMT_TIMED;
936 }
937 else if (strcmp(args[1], "rfc3164") == 0) {
938 cfg_sink->fmt = SINK_FMT_RFC3164;
939 }
940 else if (strcmp(args[1], "rfc5424") == 0) {
941 cfg_sink->fmt = SINK_FMT_RFC5424;
942 }
943 else {
944 ha_alert("parsing [%s:%d] : unknown format '%s'.\n", file, linenum, args[1]);
945 err_code |= ERR_ALERT | ERR_FATAL;
946 goto err;
947 }
948 }
949 else if (strcmp(args[0],"maxlen") == 0) {
950 if (!cfg_sink) {
951 ha_alert("parsing [%s:%d] : unable to set event max length '%s'.\n", file, linenum, args[1]);
952 err_code |= ERR_ALERT | ERR_FATAL;
953 goto err;
954 }
955
956 cfg_sink->maxlen = atol(args[1]);
957 if (!cfg_sink->maxlen) {
958 ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]);
959 err_code |= ERR_ALERT | ERR_FATAL;
960 goto err;
961 }
962 }
963 else if (strcmp(args[0],"description") == 0) {
964 if (!cfg_sink) {
965 ha_alert("parsing [%s:%d] : unable to set description '%s'.\n", file, linenum, args[1]);
966 err_code |= ERR_ALERT | ERR_FATAL;
967 goto err;
968 }
969
970 if (!*args[1]) {
971 ha_alert("parsing [%s:%d] : missing ring description text.\n", file, linenum);
972 err_code |= ERR_ALERT | ERR_FATAL;
973 goto err;
974 }
975
976 free(cfg_sink->desc);
977
978 cfg_sink->desc = strdup(args[1]);
979 if (!cfg_sink->desc) {
980 ha_alert("parsing [%s:%d] : fail to set description '%s'.\n", file, linenum, args[1]);
981 err_code |= ERR_ALERT | ERR_FATAL;
982 goto err;
983 }
984 }
985 else {
986 ha_alert("parsing [%s:%d] : unknown statement '%s'.\n", file, linenum, args[0]);
987 err_code |= ERR_ALERT | ERR_FATAL;
988 goto err;
989 }
990
991 err:
992 return err_code;
993 }
994
995 /*
996 * Post parsing "ring" section.
997 *
998 * The function returns 0 in success case, otherwise, it returns error
999 * flags.
1000 */
cfg_post_parse_ring()1001 int cfg_post_parse_ring()
1002 {
1003 int err_code = 0;
1004 struct server *srv;
1005
1006 if (cfg_sink && (cfg_sink->type == SINK_TYPE_BUFFER)) {
1007 if (cfg_sink->maxlen > b_size(&cfg_sink->ctx.ring->buf)) {
1008 ha_warning("ring '%s' event max length '%u' exceeds size, forced to size '%lu'.\n",
1009 cfg_sink->name, cfg_sink->maxlen, (unsigned long)b_size(&cfg_sink->ctx.ring->buf));
1010 cfg_sink->maxlen = b_size(&cfg_sink->ctx.ring->buf);
1011 err_code |= ERR_ALERT;
1012 }
1013
1014 /* prepare forward server descriptors */
1015 if (cfg_sink->forward_px) {
1016 srv = cfg_sink->forward_px->srv;
1017 while (srv) {
1018 struct sink_forward_target *sft;
1019 /* init ssl if needed */
1020 if (srv->use_ssl == 1 && xprt_get(XPRT_SSL) && xprt_get(XPRT_SSL)->prepare_srv) {
1021 if (xprt_get(XPRT_SSL)->prepare_srv(srv)) {
1022 ha_alert("unable to prepare SSL for server '%s' in ring '%s'.\n", srv->id, cfg_sink->name);
1023 err_code |= ERR_ALERT | ERR_FATAL;
1024 }
1025 }
1026
1027 /* allocate sink_forward_target descriptor */
1028 sft = calloc(1, sizeof(*sft));
1029 if (!sft) {
1030 ha_alert("memory allocation error initializing server '%s' in ring '%s'.\n",srv->id, cfg_sink->name);
1031 err_code |= ERR_ALERT | ERR_FATAL;
1032 break;
1033 }
1034 sft->srv = srv;
1035 sft->appctx = NULL;
1036 sft->ofs = ~0; /* init ring offset */
1037 sft->next = cfg_sink->sft;
1038 HA_SPIN_INIT(&sft->lock);
1039
1040 /* mark server attached to the ring */
1041 if (!ring_attach(cfg_sink->ctx.ring)) {
1042 ha_alert("server '%s' sets too many watchers > 255 on ring '%s'.\n", srv->id, cfg_sink->name);
1043 err_code |= ERR_ALERT | ERR_FATAL;
1044 }
1045 cfg_sink->sft = sft;
1046 srv = srv->next;
1047 }
1048 sink_init_forward(cfg_sink);
1049 }
1050 }
1051 cfg_sink = NULL;
1052
1053 return err_code;
1054 }
1055
1056 /* resolve sink names at end of config. Returns 0 on success otherwise error
1057 * flags.
1058 */
post_sink_resolve()1059 int post_sink_resolve()
1060 {
1061 int err_code = 0;
1062 struct logsrv *logsrv, *logb;
1063 struct sink *sink;
1064 struct proxy *px;
1065
1066 list_for_each_entry_safe(logsrv, logb, &global.logsrvs, list) {
1067 if (logsrv->type == LOG_TARGET_BUFFER) {
1068 sink = sink_find(logsrv->ring_name);
1069 if (!sink || sink->type != SINK_TYPE_BUFFER) {
1070 ha_alert("global log server uses unknown ring named '%s'.\n", logsrv->ring_name);
1071 err_code |= ERR_ALERT | ERR_FATAL;
1072 }
1073 logsrv->sink = sink;
1074 }
1075 }
1076
1077 for (px = proxies_list; px; px = px->next) {
1078 list_for_each_entry_safe(logsrv, logb, &px->logsrvs, list) {
1079 if (logsrv->type == LOG_TARGET_BUFFER) {
1080 sink = sink_find(logsrv->ring_name);
1081 if (!sink || sink->type != SINK_TYPE_BUFFER) {
1082 ha_alert("proxy '%s' log server uses unknown ring named '%s'.\n", px->id, logsrv->ring_name);
1083 err_code |= ERR_ALERT | ERR_FATAL;
1084 }
1085 logsrv->sink = sink;
1086 }
1087 }
1088 }
1089 return err_code;
1090 }
1091
1092
sink_init()1093 static void sink_init()
1094 {
1095 sink_new_fd("stdout", "standard output (fd#1)", SINK_FMT_RAW, 1);
1096 sink_new_fd("stderr", "standard output (fd#2)", SINK_FMT_RAW, 2);
1097 sink_new_buf("buf0", "in-memory ring buffer", SINK_FMT_TIMED, 1048576);
1098 }
1099
sink_deinit()1100 static void sink_deinit()
1101 {
1102 struct sink *sink, *sb;
1103
1104 list_for_each_entry_safe(sink, sb, &sink_list, sink_list) {
1105 if (sink->type == SINK_TYPE_BUFFER)
1106 ring_free(sink->ctx.ring);
1107 LIST_DEL(&sink->sink_list);
1108 free(sink->name);
1109 free(sink->desc);
1110 free(sink);
1111 }
1112 }
1113
1114 INITCALL0(STG_REGISTER, sink_init);
1115 REGISTER_POST_DEINIT(sink_deinit);
1116
1117 static struct cli_kw_list cli_kws = {{ },{
1118 { { "show", "events", NULL }, "show events [<sink>] : show event sink state", cli_parse_show_events, NULL, NULL },
1119 {{},}
1120 }};
1121
1122 INITCALL1(STG_REGISTER, cli_register_kw, &cli_kws);
1123
1124 /* config parsers for this section */
1125 REGISTER_CONFIG_SECTION("ring", cfg_parse_ring, cfg_post_parse_ring);
1126 REGISTER_POST_CHECK(post_sink_resolve);
1127
1128 /*
1129 * Local variables:
1130 * c-indent-level: 8
1131 * c-basic-offset: 8
1132 * End:
1133 */
1134