1 /*
2  * Event sink management
3  *
4  * Copyright (C) 2000-2019 Willy Tarreau - w@1wt.eu
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation, version 2.1
9  * exclusively.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
19  */
20 
21 #include <import/ist.h>
22 #include <haproxy/api.h>
23 #include <haproxy/cfgparse.h>
24 #include <haproxy/cli.h>
25 #include <haproxy/errors.h>
26 #include <haproxy/list.h>
27 #include <haproxy/log.h>
28 #include <haproxy/ring.h>
29 #include <haproxy/signal.h>
30 #include <haproxy/sink.h>
31 #include <haproxy/stream_interface.h>
32 #include <haproxy/time.h>
33 
34 struct list sink_list = LIST_HEAD_INIT(sink_list);
35 
36 struct sink *cfg_sink;
37 
sink_find(const char * name)38 struct sink *sink_find(const char *name)
39 {
40 	struct sink *sink;
41 
42 	list_for_each_entry(sink, &sink_list, sink_list)
43 		if (strcmp(sink->name, name) == 0)
44 			return sink;
45 	return NULL;
46 }
47 
48 /* creates a new sink and adds it to the list, it's still generic and not fully
49  * initialized. Returns NULL on allocation failure. If another one already
50  * exists with the same name, it will be returned. The caller can detect it as
51  * a newly created one has type SINK_TYPE_NEW.
52  */
__sink_new(const char * name,const char * desc,enum sink_fmt fmt)53 static struct sink *__sink_new(const char *name, const char *desc, enum sink_fmt fmt)
54 {
55 	struct sink *sink;
56 
57 	sink = sink_find(name);
58 	if (sink)
59 		goto end;
60 
61 	sink = calloc(1, sizeof(*sink));
62 	if (!sink)
63 		goto end;
64 
65 	sink->name = strdup(name);
66 	if (!sink->name)
67 		goto err;
68 
69 	sink->desc = strdup(desc);
70 	if (!sink->desc)
71 		goto err;
72 
73 	sink->fmt  = fmt;
74 	sink->type = SINK_TYPE_NEW;
75 	sink->maxlen = BUFSIZE;
76 	/* address will be filled by the caller if needed */
77 	sink->ctx.fd = -1;
78 	sink->ctx.dropped = 0;
79 	HA_RWLOCK_INIT(&sink->ctx.lock);
80 	LIST_ADDQ(&sink_list, &sink->sink_list);
81  end:
82 	return sink;
83 
84  err:
85 	free(sink->name); sink->name = NULL;
86 	free(sink->desc); sink->desc = NULL;
87 	free(sink); sink = NULL;
88 
89 	return NULL;
90 }
91 
92 /* creates a sink called <name> of type FD associated to fd <fd>, format <fmt>,
93  * and description <desc>. Returns NULL on allocation failure or conflict.
94  * Perfect duplicates are merged (same type, fd, and name).
95  */
sink_new_fd(const char * name,const char * desc,enum sink_fmt fmt,int fd)96 struct sink *sink_new_fd(const char *name, const char *desc, enum sink_fmt fmt, int fd)
97 {
98 	struct sink *sink;
99 
100 	sink = __sink_new(name, desc, fmt);
101 	if (!sink || (sink->type == SINK_TYPE_FD && sink->ctx.fd == fd))
102 		goto end;
103 
104 	if (sink->type != SINK_TYPE_NEW) {
105 		sink = NULL;
106 		goto end;
107 	}
108 
109 	sink->type = SINK_TYPE_FD;
110 	sink->ctx.fd = fd;
111  end:
112 	return sink;
113 }
114 
115 /* creates a sink called <name> of type BUF of size <size>, format <fmt>,
116  * and description <desc>. Returns NULL on allocation failure or conflict.
117  * Perfect duplicates are merged (same type and name). If sizes differ, the
118  * largest one is kept.
119  */
sink_new_buf(const char * name,const char * desc,enum sink_fmt fmt,size_t size)120 struct sink *sink_new_buf(const char *name, const char *desc, enum sink_fmt fmt, size_t size)
121 {
122 	struct sink *sink;
123 
124 	sink = __sink_new(name, desc, fmt);
125 	if (!sink)
126 		goto fail;
127 
128 	if (sink->type == SINK_TYPE_BUFFER) {
129 		/* such a buffer already exists, we may have to resize it */
130 		if (!ring_resize(sink->ctx.ring, size))
131 			goto fail;
132 		goto end;
133 	}
134 
135 	if (sink->type != SINK_TYPE_NEW) {
136 		/* already exists of another type */
137 		goto fail;
138 	}
139 
140 	sink->ctx.ring = ring_new(size);
141 	if (!sink->ctx.ring) {
142 		LIST_DEL(&sink->sink_list);
143 		free(sink->name);
144 		free(sink->desc);
145 		free(sink);
146 		goto fail;
147 	}
148 
149 	sink->type = SINK_TYPE_BUFFER;
150  end:
151 	return sink;
152  fail:
153 	return NULL;
154 }
155 
156 /* tries to send <nmsg> message parts (up to 8, ignored above) from message
157  * array <msg> to sink <sink>. Formatting according to the sink's preference is
158  * done here. Lost messages are NOT accounted for. It is preferable to call
159  * sink_write() instead which will also try to emit the number of dropped
160  * messages when there are any. It returns >0 if it could write anything,
161  * <=0 otherwise.
162  */
__sink_write(struct sink * sink,const struct ist msg[],size_t nmsg,int level,int facility,struct ist * tag,struct ist * pid,struct ist * sd)163 ssize_t __sink_write(struct sink *sink, const struct ist msg[], size_t nmsg,
164 	             int level, int facility, struct ist *tag,
165 		     struct ist *pid, struct ist *sd)
166 {
167 	int log_format;
168 	char short_hdr[4];
169 	struct ist pfx[6];
170 	size_t npfx = 0;
171 	char *hdr_ptr;
172 	int fac_level;
173 
174 	if (sink->fmt == SINK_FMT_RAW)
175 		goto send;
176 
177 	if (sink->fmt == SINK_FMT_SHORT || sink->fmt == SINK_FMT_TIMED) {
178 		short_hdr[0] = '<';
179 		short_hdr[1] = '0' + level;
180 		short_hdr[2] = '>';
181 
182 		pfx[npfx].ptr = short_hdr;
183 		pfx[npfx].len = 3;
184 		npfx++;
185 		if (sink->fmt == SINK_FMT_SHORT)
186 			goto send;
187         }
188 
189 
190 	if (sink->fmt == SINK_FMT_ISO || sink->fmt == SINK_FMT_TIMED) {
191 		pfx[npfx].ptr = timeofday_as_iso_us(1);
192 		pfx[npfx].len = 33;
193 		npfx++;
194 		goto send;
195         }
196 	else if (sink->fmt == SINK_FMT_RFC5424) {
197 		pfx[npfx].ptr = logheader_rfc5424;
198 		pfx[npfx].len = update_log_hdr_rfc5424(date.tv_sec, date.tv_usec) - pfx[npfx].ptr;
199 		log_format = LOG_FORMAT_RFC5424;
200 	}
201 	else {
202 		pfx[npfx].ptr = logheader;
203                 pfx[npfx].len = update_log_hdr(date.tv_sec) - pfx[npfx].ptr;
204 		log_format = LOG_FORMAT_RFC3164;
205 		sd = NULL;
206 	}
207 
208 	fac_level = (facility << 3) + level;
209 	hdr_ptr = pfx[npfx].ptr + 3; /* last digit of the log level */
210         do {
211 		*hdr_ptr = '0' + fac_level % 10;
212 		fac_level /= 10;
213                 hdr_ptr--;
214 	} while (fac_level && hdr_ptr > pfx[npfx].ptr);
215 	*hdr_ptr = '<';
216 	pfx[npfx].len -= hdr_ptr - pfx[npfx].ptr;
217 	pfx[npfx].ptr = hdr_ptr;
218 	npfx++;
219 
220 	if (tag && tag->len) {
221 		pfx[npfx].ptr = tag->ptr;
222 		pfx[npfx].len = tag->len;
223 		npfx++;
224 	}
225 	pfx[npfx].ptr = get_format_pid_sep1(log_format, &pfx[npfx].len);
226 	if (pfx[npfx].len)
227 		npfx++;
228 
229 	if (pid && pid->len) {
230 		pfx[npfx].ptr = pid->ptr;
231 		pfx[npfx].len = pid->len;
232 		npfx++;
233 	}
234 
235 	pfx[npfx].ptr = get_format_pid_sep2(log_format, &pfx[npfx].len);
236 	if (pfx[npfx].len)
237 		npfx++;
238 
239 	if (sd && sd->len) {
240 		pfx[npfx].ptr = sd->ptr;
241 		pfx[npfx].len = sd->len;
242 		npfx++;
243 	}
244 
245 send:
246 	if (sink->type == SINK_TYPE_FD) {
247 		return fd_write_frag_line(sink->ctx.fd, sink->maxlen, pfx, npfx, msg, nmsg, 1);
248 	}
249 	else if (sink->type == SINK_TYPE_BUFFER) {
250 		return ring_write(sink->ctx.ring, sink->maxlen, pfx, npfx, msg, nmsg);
251 	}
252 	return 0;
253 }
254 
255 /* Tries to emit a message indicating the number of dropped events. In case of
256  * success, the amount of drops is reduced by as much. It's supposed to be
257  * called under an exclusive lock on the sink to avoid multiple produces doing
258  * the same. On success, >0 is returned, otherwise <=0 on failure.
259  */
sink_announce_dropped(struct sink * sink,int facility,struct ist * pid)260 int sink_announce_dropped(struct sink *sink, int facility, struct ist *pid)
261 {
262 	unsigned int dropped;
263 	struct buffer msg;
264 	struct ist msgvec[1];
265 	char logbuf[64];
266 	struct ist sd;
267 	struct ist tag;
268 
269 	while (unlikely((dropped = sink->ctx.dropped) > 0)) {
270 		chunk_init(&msg, logbuf, sizeof(logbuf));
271 		chunk_printf(&msg, "%u event%s dropped", dropped, dropped > 1 ? "s" : "");
272 		msgvec[0] = ist2(msg.area, msg.data);
273 
274 		sd.ptr = default_rfc5424_sd_log_format;
275 		sd.len = 2;
276 		tag.ptr = global.log_tag.area;
277 		tag.len = global.log_tag.data;
278 		if (__sink_write(sink, msgvec, 1, LOG_NOTICE, facility, &tag, pid, &sd) <= 0)
279 			return 0;
280 		/* success! */
281 		HA_ATOMIC_SUB(&sink->ctx.dropped, dropped);
282 	}
283 	return 1;
284 }
285 
286 /* parse the "show events" command, returns 1 if a message is returned, otherwise zero */
cli_parse_show_events(char ** args,char * payload,struct appctx * appctx,void * private)287 static int cli_parse_show_events(char **args, char *payload, struct appctx *appctx, void *private)
288 {
289 	struct sink *sink;
290 	int arg;
291 
292 	args++; // make args[1] the 1st arg
293 
294 	if (!*args[1]) {
295 		/* no arg => report the list of supported sink */
296 		chunk_printf(&trash, "Supported events sinks are listed below. Add -w(wait), -n(new). Any key to stop\n");
297 		list_for_each_entry(sink, &sink_list, sink_list) {
298 			chunk_appendf(&trash, "    %-10s : type=%s, %u dropped, %s\n",
299 				      sink->name,
300 				      sink->type == SINK_TYPE_NEW ? "init" :
301 				      sink->type == SINK_TYPE_FD ? "fd" :
302 				      sink->type == SINK_TYPE_BUFFER ? "buffer" : "?",
303 				      sink->ctx.dropped, sink->desc);
304 		}
305 
306 		trash.area[trash.data] = 0;
307 		return cli_msg(appctx, LOG_WARNING, trash.area);
308 	}
309 
310 	if (!cli_has_level(appctx, ACCESS_LVL_OPER))
311 		return 1;
312 
313 	sink = sink_find(args[1]);
314 	if (!sink)
315 		return cli_err(appctx, "No such event sink");
316 
317 	if (sink->type != SINK_TYPE_BUFFER)
318 		return cli_msg(appctx, LOG_NOTICE, "Nothing to report for this sink");
319 
320 	for (arg = 2; *args[arg]; arg++) {
321 		if (strcmp(args[arg], "-w") == 0)
322 			appctx->ctx.cli.i0 |= 1; // wait mode
323 		else if (strcmp(args[arg], "-n") == 0)
324 			appctx->ctx.cli.i0 |= 2; // seek to new
325 		else if (strcmp(args[arg], "-nw") == 0 || strcmp(args[arg], "-wn") == 0)
326 			appctx->ctx.cli.i0 |= 3; // seek to new + wait
327 		else
328 			return cli_err(appctx, "unknown option");
329 	}
330 	return ring_attach_cli(sink->ctx.ring, appctx);
331 }
332 
333 /* Pre-configures a ring proxy to emit connections */
sink_setup_proxy(struct proxy * px)334 void sink_setup_proxy(struct proxy *px)
335 {
336 	px->last_change = now.tv_sec;
337 	px->cap = PR_CAP_FE | PR_CAP_BE;
338 	px->maxconn = 0;
339 	px->conn_retries = 1;
340 	px->timeout.server = TICK_ETERNITY;
341 	px->timeout.client = TICK_ETERNITY;
342 	px->timeout.connect = TICK_ETERNITY;
343 	px->accept = NULL;
344 	px->options2 |= PR_O2_INDEPSTR | PR_O2_SMARTCON | PR_O2_SMARTACC;
345 	px->bind_proc = 0; /* will be filled by users */
346 }
347 
348 /*
349  * IO Handler to handle message push to syslog tcp server
350  */
sink_forward_io_handler(struct appctx * appctx)351 static void sink_forward_io_handler(struct appctx *appctx)
352 {
353 	struct stream_interface *si = appctx->owner;
354 	struct stream *s = si_strm(si);
355 	struct sink *sink = strm_fe(s)->parent;
356 	struct sink_forward_target *sft = appctx->ctx.sft.ptr;
357 	struct ring *ring = sink->ctx.ring;
358 	struct buffer *buf = &ring->buf;
359 	uint64_t msg_len;
360 	size_t len, cnt, ofs;
361 	int ret = 0;
362 
363 	/* if stopping was requested, close immediately */
364 	if (unlikely(stopping))
365 		goto close;
366 
367 	/* for rex because it seems reset to timeout
368 	 * and we don't want expire on this case
369 	 * with a syslog server
370 	 */
371 	si_oc(si)->rex = TICK_ETERNITY;
372 	/* rto should not change but it seems the case */
373 	si_oc(si)->rto = TICK_ETERNITY;
374 
375 	/* an error was detected */
376 	if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
377 		goto close;
378 
379 	/* con closed by server side */
380 	if ((si_oc(si)->flags & CF_SHUTW))
381 		goto close;
382 
383 	/* if the connection is not established, inform the stream that we want
384 	 * to be notified whenever the connection completes.
385 	 */
386 	if (si_opposite(si)->state < SI_ST_EST) {
387 		si_cant_get(si);
388 		si_rx_conn_blk(si);
389 		si_rx_endp_more(si);
390 		return;
391 	}
392 
393 	HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
394 	if (appctx != sft->appctx) {
395 		HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
396 		goto close;
397 	}
398 	ofs = sft->ofs;
399 
400 	HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
401 	LIST_DEL_INIT(&appctx->wait_entry);
402 	HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
403 
404 	HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
405 
406 	/* explanation for the initialization below: it would be better to do
407 	 * this in the parsing function but this would occasionally result in
408 	 * dropped events because we'd take a reference on the oldest message
409 	 * and keep it while being scheduled. Thus instead let's take it the
410 	 * first time we enter here so that we have a chance to pass many
411 	 * existing messages before grabbing a reference to a location. This
412 	 * value cannot be produced after initialization.
413 	 */
414 	if (unlikely(ofs == ~0)) {
415 		ofs = 0;
416 
417 		HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
418 		ofs += ring->ofs;
419 	}
420 
421 	/* in this loop, ofs always points to the counter byte that precedes
422 	 * the message so that we can take our reference there if we have to
423 	 * stop before the end (ret=0).
424 	 */
425 	if (si_opposite(si)->state == SI_ST_EST) {
426 		/* we were already there, adjust the offset to be relative to
427 		 * the buffer's head and remove us from the counter.
428 		 */
429 		ofs -= ring->ofs;
430 		BUG_ON(ofs >= buf->size);
431 		HA_ATOMIC_SUB(b_peek(buf, ofs), 1);
432 
433 		ret = 1;
434 		while (ofs + 1 < b_data(buf)) {
435 			cnt = 1;
436 			len = b_peek_varint(buf, ofs + cnt, &msg_len);
437 			if (!len)
438 				break;
439 			cnt += len;
440 			BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
441 
442 			if (unlikely(msg_len + 1 > b_size(&trash))) {
443 				/* too large a message to ever fit, let's skip it */
444 				ofs += cnt + msg_len;
445 				continue;
446 			}
447 
448 			chunk_reset(&trash);
449 			len = b_getblk(buf, trash.area, msg_len, ofs + cnt);
450 			trash.data += len;
451 			trash.area[trash.data++] = '\n';
452 
453 			if (ci_putchk(si_ic(si), &trash) == -1) {
454 				si_rx_room_blk(si);
455 				ret = 0;
456 				break;
457 			}
458 			ofs += cnt + msg_len;
459 		}
460 
461 		HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
462 		ofs += ring->ofs;
463 		sft->ofs = ofs;
464 	}
465 	HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
466 
467 	if (ret) {
468 		/* let's be woken up once new data arrive */
469 		HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
470 		LIST_ADDQ(&ring->waiters, &appctx->wait_entry);
471 		HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
472 		si_rx_endp_done(si);
473 	}
474 	HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
475 
476 	/* always drain data from server */
477 	co_skip(si_oc(si), si_oc(si)->output);
478 	return;
479 
480 close:
481 	si_shutw(si);
482 	si_shutr(si);
483 	si_ic(si)->flags |= CF_READ_NULL;
484 }
485 
486 /*
487  * IO Handler to handle message push to syslog tcp server
488  * using octet counting frames
489  */
sink_forward_oc_io_handler(struct appctx * appctx)490 static void sink_forward_oc_io_handler(struct appctx *appctx)
491 {
492 	struct stream_interface *si = appctx->owner;
493 	struct stream *s = si_strm(si);
494 	struct sink *sink = strm_fe(s)->parent;
495 	struct sink_forward_target *sft = appctx->ctx.sft.ptr;
496 	struct ring *ring = sink->ctx.ring;
497 	struct buffer *buf = &ring->buf;
498 	uint64_t msg_len;
499 	size_t len, cnt, ofs;
500 	int ret = 0;
501 	char *p;
502 
503 	/* if stopping was requested, close immediately */
504 	if (unlikely(stopping))
505 		goto close;
506 
507 	/* for rex because it seems reset to timeout
508 	 * and we don't want expire on this case
509 	 * with a syslog server
510 	 */
511 	si_oc(si)->rex = TICK_ETERNITY;
512 	/* rto should not change but it seems the case */
513 	si_oc(si)->rto = TICK_ETERNITY;
514 
515 	/* an error was detected */
516 	if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
517 		goto close;
518 
519 	/* con closed by server side */
520 	if ((si_oc(si)->flags & CF_SHUTW))
521 		goto close;
522 
523 	/* if the connection is not established, inform the stream that we want
524 	 * to be notified whenever the connection completes.
525 	 */
526 	if (si_opposite(si)->state < SI_ST_EST) {
527 		si_cant_get(si);
528 		si_rx_conn_blk(si);
529 		si_rx_endp_more(si);
530 		return;
531 	}
532 
533 	HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
534 	if (appctx != sft->appctx) {
535 		HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
536 		goto close;
537 	}
538 	ofs = sft->ofs;
539 
540 	HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
541 	LIST_DEL_INIT(&appctx->wait_entry);
542 	HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
543 
544 	HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
545 
546 	/* explanation for the initialization below: it would be better to do
547 	 * this in the parsing function but this would occasionally result in
548 	 * dropped events because we'd take a reference on the oldest message
549 	 * and keep it while being scheduled. Thus instead let's take it the
550 	 * first time we enter here so that we have a chance to pass many
551 	 * existing messages before grabbing a reference to a location. This
552 	 * value cannot be produced after initialization.
553 	 */
554 	if (unlikely(ofs == ~0)) {
555 		ofs = 0;
556 
557 		HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
558 		ofs += ring->ofs;
559 	}
560 
561 	/* in this loop, ofs always points to the counter byte that precedes
562 	 * the message so that we can take our reference there if we have to
563 	 * stop before the end (ret=0).
564 	 */
565 	if (si_opposite(si)->state == SI_ST_EST) {
566 		/* we were already there, adjust the offset to be relative to
567 		 * the buffer's head and remove us from the counter.
568 		 */
569 		ofs -= ring->ofs;
570 		BUG_ON(ofs >= buf->size);
571 		HA_ATOMIC_SUB(b_peek(buf, ofs), 1);
572 
573 		ret = 1;
574 		while (ofs + 1 < b_data(buf)) {
575 			cnt = 1;
576 			len = b_peek_varint(buf, ofs + cnt, &msg_len);
577 			if (!len)
578 				break;
579 			cnt += len;
580 			BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
581 
582 			chunk_reset(&trash);
583 			p = ulltoa(msg_len, trash.area, b_size(&trash));
584 			if (p) {
585 				trash.data = (p - trash.area) + 1;
586 				*p = ' ';
587 			}
588 
589 			if (!p || (trash.data + msg_len > b_size(&trash))) {
590 				/* too large a message to ever fit, let's skip it */
591 				ofs += cnt + msg_len;
592 				continue;
593 			}
594 
595 			trash.data += b_getblk(buf, p + 1, msg_len, ofs + cnt);
596 
597 			if (ci_putchk(si_ic(si), &trash) == -1) {
598 				si_rx_room_blk(si);
599 				ret = 0;
600 				break;
601 			}
602 			ofs += cnt + msg_len;
603 		}
604 
605 		HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
606 		ofs += ring->ofs;
607 		sft->ofs = ofs;
608 	}
609 	HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
610 
611 	if (ret) {
612 		/* let's be woken up once new data arrive */
613 		HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
614 		LIST_ADDQ(&ring->waiters, &appctx->wait_entry);
615 		HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
616 		si_rx_endp_done(si);
617 	}
618 	HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
619 
620 	/* always drain data from server */
621 	co_skip(si_oc(si), si_oc(si)->output);
622 	return;
623 
624 close:
625 	si_shutw(si);
626 	si_shutr(si);
627 	si_ic(si)->flags |= CF_READ_NULL;
628 }
629 
__sink_forward_session_deinit(struct sink_forward_target * sft)630 void __sink_forward_session_deinit(struct sink_forward_target *sft)
631 {
632 	struct stream_interface *si;
633 	struct stream *s;
634 	struct sink *sink;
635 
636 	if (!sft->appctx)
637 		return;
638 
639 	si = sft->appctx->owner;
640 	if (!si)
641 		return;
642 
643 	s = si_strm(si);
644 	if (!s)
645 		return;
646 
647 	sink = strm_fe(s)->parent;
648 	if (!sink)
649 		return;
650 
651 	HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
652 	LIST_DEL_INIT(&sft->appctx->wait_entry);
653 	HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
654 
655 	sft->appctx = NULL;
656 	task_wakeup(sink->forward_task, TASK_WOKEN_MSG);
657 }
658 
659 
sink_forward_session_release(struct appctx * appctx)660 static void sink_forward_session_release(struct appctx *appctx)
661 {
662 	struct sink_forward_target *sft = appctx->ctx.peers.ptr;
663 
664 	if (!sft)
665 		return;
666 
667 	HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
668 	if (sft->appctx == appctx)
669 		__sink_forward_session_deinit(sft);
670 	HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
671 }
672 
673 static struct applet sink_forward_applet = {
674 	.obj_type = OBJ_TYPE_APPLET,
675 	.name = "<SINKFWD>", /* used for logging */
676 	.fct = sink_forward_io_handler,
677 	.release = sink_forward_session_release,
678 };
679 
680 static struct applet sink_forward_oc_applet = {
681 	.obj_type = OBJ_TYPE_APPLET,
682 	.name = "<SINKFWDOC>", /* used for logging */
683 	.fct = sink_forward_oc_io_handler,
684 	.release = sink_forward_session_release,
685 };
686 
687 /*
688  * Create a new peer session in assigned state (connect will start automatically)
689  */
sink_forward_session_create(struct sink * sink,struct sink_forward_target * sft)690 static struct appctx *sink_forward_session_create(struct sink *sink, struct sink_forward_target *sft)
691 {
692 	struct proxy *p = sink->forward_px;
693 	struct appctx *appctx;
694 	struct session *sess;
695 	struct stream *s;
696 	struct applet *applet = &sink_forward_applet;
697 
698 	if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING)
699 		applet = &sink_forward_oc_applet;
700 
701 	appctx = appctx_new(applet, tid_bit);
702 	if (!appctx)
703 		goto out_close;
704 
705 	appctx->ctx.sft.ptr = (void *)sft;
706 
707 	sess = session_new(p, NULL, &appctx->obj_type);
708 	if (!sess) {
709 		ha_alert("out of memory in peer_session_create().\n");
710 		goto out_free_appctx;
711 	}
712 
713 	if ((s = stream_new(sess, &appctx->obj_type)) == NULL) {
714 		ha_alert("Failed to initialize stream in peer_session_create().\n");
715 		goto out_free_sess;
716 	}
717 
718 
719 	s->target = &sft->srv->obj_type;
720 	if (!sockaddr_alloc(&s->target_addr))
721 		goto out_free_strm;
722 	*s->target_addr = sft->srv->addr;
723 	s->flags = SF_ASSIGNED|SF_ADDR_SET;
724 	s->si[1].flags |= SI_FL_NOLINGER;
725 
726 	s->do_log = NULL;
727 	s->uniq_id = 0;
728 
729 	s->res.flags |= CF_READ_DONTWAIT;
730 	/* for rto and rex to eternity to not expire on idle recv:
731 	 * We are using a syslog server.
732 	 */
733 	s->res.rto = TICK_ETERNITY;
734 	s->res.rex = TICK_ETERNITY;
735 	sft->appctx = appctx;
736 	task_wakeup(s->task, TASK_WOKEN_INIT);
737 	return appctx;
738 
739 	/* Error unrolling */
740  out_free_strm:
741 	LIST_DEL(&s->list);
742 	pool_free(pool_head_stream, s);
743  out_free_sess:
744 	session_free(sess);
745  out_free_appctx:
746 	appctx_free(appctx);
747  out_close:
748 	return NULL;
749 }
750 
751 /*
752  * Task to handle connctions to forward servers
753  */
process_sink_forward(struct task * task,void * context,unsigned short state)754 static struct task *process_sink_forward(struct task * task, void *context, unsigned short state)
755 {
756 	struct sink *sink = (struct sink *)context;
757 	struct sink_forward_target *sft = sink->sft;
758 
759 	task->expire = TICK_ETERNITY;
760 
761 	if (!stopping) {
762 		while (sft) {
763 			HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
764 			/* if appctx is NULL, start a new session */
765 			if (!sft->appctx)
766 				sft->appctx = sink_forward_session_create(sink, sft);
767 			HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
768 			sft = sft->next;
769 		}
770 	}
771 	else {
772 		while (sft) {
773 			HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
774 			/* awake applet to perform a clean close */
775 			if (sft->appctx)
776 				appctx_wakeup(sft->appctx);
777 			HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
778 			sft = sft->next;
779 		}
780 	}
781 
782 	return task;
783 }
784 /*
785  * Init task to manage connctions to forward servers
786  *
787  * returns 0 in case of error.
788  */
sink_init_forward(struct sink * sink)789 int sink_init_forward(struct sink *sink)
790 {
791 	sink->forward_task = task_new(MAX_THREADS_MASK);
792 	if (!sink->forward_task)
793 		return 0;
794 
795 	sink->forward_task->process = process_sink_forward;
796 	sink->forward_task->context = (void *)sink;
797 	sink->forward_sighandler = signal_register_task(0, sink->forward_task, 0);
798 	task_wakeup(sink->forward_task, TASK_WOKEN_INIT);
799 	return 1;
800 }
801 /*
802  * Parse "ring" section and create corresponding sink buffer.
803  *
804  * The function returns 0 in success case, otherwise, it returns error
805  * flags.
806  */
cfg_parse_ring(const char * file,int linenum,char ** args,int kwm)807 int cfg_parse_ring(const char *file, int linenum, char **args, int kwm)
808 {
809 	int err_code = 0;
810 	const char *inv;
811 	size_t size = BUFSIZE;
812 	struct proxy *p;
813 
814 	if (strcmp(args[0], "ring") == 0) { /* new peers section */
815 		if (!*args[1]) {
816 			ha_alert("parsing [%s:%d] : missing ring name.\n", file, linenum);
817 			err_code |= ERR_ALERT | ERR_FATAL;
818 			goto err;
819 		}
820 
821 		inv = invalid_char(args[1]);
822 		if (inv) {
823 			ha_alert("parsing [%s:%d] : invalid ring name '%s' (character '%c' is not permitted).\n", file, linenum, args[1], *inv);
824 			err_code |= ERR_ALERT | ERR_FATAL;
825 			goto err;
826 		}
827 
828 		if (sink_find(args[1])) {
829 			ha_alert("parsing [%s:%d] : sink named '%s' already exists.\n", file, linenum, args[1]);
830 			err_code |= ERR_ALERT | ERR_FATAL;
831 			goto err;
832 		}
833 
834 		cfg_sink = sink_new_buf(args[1], args[1] , SINK_FMT_RAW, size);
835 		if (!cfg_sink || cfg_sink->type != SINK_TYPE_BUFFER) {
836 			ha_alert("parsing [%s:%d] : unable to create a new sink buffer for ring '%s'.\n", file, linenum, args[1]);
837 			err_code |= ERR_ALERT | ERR_FATAL;
838 			goto err;
839 		}
840 
841 		/* allocate new proxy to handle forwards */
842 		p = calloc(1, sizeof *p);
843 		if (!p) {
844 			ha_alert("parsing [%s:%d] : out of memory.\n", file, linenum);
845 			err_code |= ERR_ALERT | ERR_FATAL;
846 			goto err;
847 		}
848 
849 		init_new_proxy(p);
850 		sink_setup_proxy(p);
851 		p->parent = cfg_sink;
852 		p->id = strdup(args[1]);
853 		p->conf.args.file = p->conf.file = strdup(file);
854 		p->conf.args.line = p->conf.line = linenum;
855 		cfg_sink->forward_px = p;
856 	}
857 	else if (strcmp(args[0], "size") == 0) {
858 		size = atol(args[1]);
859 		if (!size) {
860 			ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]);
861 			err_code |= ERR_ALERT | ERR_FATAL;
862 			goto err;
863 		}
864 
865 		if (!cfg_sink || (cfg_sink->type != SINK_TYPE_BUFFER)
866 		              || !ring_resize(cfg_sink->ctx.ring, size)) {
867 			ha_alert("parsing [%s:%d] : fail to set sink buffer size '%s'.\n", file, linenum, args[1]);
868 			err_code |= ERR_ALERT | ERR_FATAL;
869 			goto err;
870 		}
871 	}
872 	else if (strcmp(args[0],"server") == 0) {
873 		err_code |= parse_server(file, linenum, args, cfg_sink->forward_px, NULL, 1, 0, 1);
874 	}
875 	else if (strcmp(args[0],"timeout") == 0) {
876 		if (!cfg_sink || !cfg_sink->forward_px) {
877 			ha_alert("parsing [%s:%d] : unable to set timeout '%s'.\n", file, linenum, args[1]);
878 			err_code |= ERR_ALERT | ERR_FATAL;
879 			goto err;
880 		}
881 
882                 if (strcmp(args[1], "connect") == 0 ||
883 		    strcmp(args[1], "server") == 0) {
884 			const char *res;
885 			unsigned int tout;
886 
887 			if (!*args[2]) {
888 				ha_alert("parsing [%s:%d] : '%s %s' expects <time> as argument.\n",
889 					 file, linenum, args[0], args[1]);
890 				err_code |= ERR_ALERT | ERR_FATAL;
891 				goto err;
892 			}
893 			res = parse_time_err(args[2], &tout, TIME_UNIT_MS);
894 			if (res == PARSE_TIME_OVER) {
895 				ha_alert("parsing [%s:%d]: timer overflow in argument <%s> to <%s %s>, maximum value is 2147483647 ms (~24.8 days).\n",
896 					 file, linenum, args[2], args[0], args[1]);
897 				err_code |= ERR_ALERT | ERR_FATAL;
898 				goto err;
899 			}
900 			else if (res == PARSE_TIME_UNDER) {
901 				ha_alert("parsing [%s:%d]: timer underflow in argument <%s> to <%s %s>, minimum non-null value is 1 ms.\n",
902 					 file, linenum, args[2], args[0], args[1]);
903 				err_code |= ERR_ALERT | ERR_FATAL;
904 				goto err;
905 			}
906 			else if (res) {
907 				ha_alert("parsing [%s:%d]: unexpected character '%c' in argument to <%s %s>.\n",
908 					 file, linenum, *res, args[0], args[1]);
909 				err_code |= ERR_ALERT | ERR_FATAL;
910 				goto err;
911 			}
912                         if (args[1][2] == 'c')
913                                 cfg_sink->forward_px->timeout.connect = tout;
914                         else
915                                 cfg_sink->forward_px->timeout.server = tout;
916 		}
917 	}
918 	else if (strcmp(args[0],"format") == 0) {
919 		if (!cfg_sink) {
920 			ha_alert("parsing [%s:%d] : unable to set format '%s'.\n", file, linenum, args[1]);
921 			err_code |= ERR_ALERT | ERR_FATAL;
922 			goto err;
923 		}
924 
925 		if (strcmp(args[1], "raw") == 0) {
926 			cfg_sink->fmt = SINK_FMT_RAW;
927 		}
928 		else if (strcmp(args[1], "short") == 0) {
929 			cfg_sink->fmt = SINK_FMT_SHORT;
930 		}
931 		else if (strcmp(args[1], "iso") == 0) {
932 			cfg_sink->fmt = SINK_FMT_ISO;
933 		}
934 		else if (strcmp(args[1], "timed") == 0) {
935 			cfg_sink->fmt = SINK_FMT_TIMED;
936 		}
937 		else if (strcmp(args[1], "rfc3164") == 0) {
938 			cfg_sink->fmt = SINK_FMT_RFC3164;
939 		}
940 		else if (strcmp(args[1], "rfc5424") == 0) {
941 			cfg_sink->fmt = SINK_FMT_RFC5424;
942 		}
943 		else {
944 			ha_alert("parsing [%s:%d] : unknown format '%s'.\n", file, linenum, args[1]);
945 			err_code |= ERR_ALERT | ERR_FATAL;
946 			goto err;
947 		}
948 	}
949 	else if (strcmp(args[0],"maxlen") == 0) {
950 		if (!cfg_sink) {
951 			ha_alert("parsing [%s:%d] : unable to set event max length '%s'.\n", file, linenum, args[1]);
952 			err_code |= ERR_ALERT | ERR_FATAL;
953 			goto err;
954 		}
955 
956 		cfg_sink->maxlen = atol(args[1]);
957 		if (!cfg_sink->maxlen) {
958 			ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]);
959 			err_code |= ERR_ALERT | ERR_FATAL;
960 			goto err;
961 		}
962 	}
963 	else if (strcmp(args[0],"description") == 0) {
964 		if (!cfg_sink) {
965 			ha_alert("parsing [%s:%d] : unable to set description '%s'.\n", file, linenum, args[1]);
966 			err_code |= ERR_ALERT | ERR_FATAL;
967 			goto err;
968 		}
969 
970 		if (!*args[1]) {
971 			ha_alert("parsing [%s:%d] : missing ring description text.\n", file, linenum);
972 			err_code |= ERR_ALERT | ERR_FATAL;
973 			goto err;
974 		}
975 
976 		free(cfg_sink->desc);
977 
978 		cfg_sink->desc = strdup(args[1]);
979 		if (!cfg_sink->desc) {
980 			ha_alert("parsing [%s:%d] : fail to set description '%s'.\n", file, linenum, args[1]);
981 			err_code |= ERR_ALERT | ERR_FATAL;
982 			goto err;
983 		}
984 	}
985 	else {
986 		ha_alert("parsing [%s:%d] : unknown statement '%s'.\n", file, linenum, args[0]);
987 		err_code |= ERR_ALERT | ERR_FATAL;
988 		goto err;
989 	}
990 
991 err:
992 	return err_code;
993 }
994 
995 /*
996  * Post parsing "ring" section.
997  *
998  * The function returns 0 in success case, otherwise, it returns error
999  * flags.
1000  */
cfg_post_parse_ring()1001 int cfg_post_parse_ring()
1002 {
1003 	int err_code = 0;
1004 	struct server *srv;
1005 
1006 	if (cfg_sink && (cfg_sink->type == SINK_TYPE_BUFFER)) {
1007 		if (cfg_sink->maxlen > b_size(&cfg_sink->ctx.ring->buf)) {
1008 			ha_warning("ring '%s' event max length '%u' exceeds size, forced to size '%lu'.\n",
1009 			           cfg_sink->name, cfg_sink->maxlen, (unsigned long)b_size(&cfg_sink->ctx.ring->buf));
1010 			cfg_sink->maxlen = b_size(&cfg_sink->ctx.ring->buf);
1011 			err_code |= ERR_ALERT;
1012 		}
1013 
1014 		/* prepare forward server descriptors */
1015 		if (cfg_sink->forward_px) {
1016 			srv = cfg_sink->forward_px->srv;
1017 			while (srv) {
1018 				struct sink_forward_target *sft;
1019 				/* init ssl if needed */
1020 				if (srv->use_ssl == 1 && xprt_get(XPRT_SSL) && xprt_get(XPRT_SSL)->prepare_srv) {
1021 					if (xprt_get(XPRT_SSL)->prepare_srv(srv)) {
1022 						ha_alert("unable to prepare SSL for server '%s' in ring '%s'.\n", srv->id, cfg_sink->name);
1023 						err_code |= ERR_ALERT | ERR_FATAL;
1024 					}
1025 				}
1026 
1027 				/* allocate sink_forward_target descriptor */
1028 				sft = calloc(1, sizeof(*sft));
1029 				if (!sft) {
1030 					ha_alert("memory allocation error initializing server '%s' in ring '%s'.\n",srv->id, cfg_sink->name);
1031 					err_code |= ERR_ALERT | ERR_FATAL;
1032 					break;
1033 				}
1034 				sft->srv = srv;
1035 				sft->appctx = NULL;
1036 				sft->ofs = ~0; /* init ring offset */
1037 				sft->next = cfg_sink->sft;
1038 				HA_SPIN_INIT(&sft->lock);
1039 
1040 				/* mark server attached to the ring */
1041 				if (!ring_attach(cfg_sink->ctx.ring)) {
1042 					ha_alert("server '%s' sets too many watchers > 255 on ring '%s'.\n", srv->id, cfg_sink->name);
1043 					err_code |= ERR_ALERT | ERR_FATAL;
1044 				}
1045 				cfg_sink->sft = sft;
1046 				srv = srv->next;
1047 			}
1048 			sink_init_forward(cfg_sink);
1049 		}
1050 	}
1051 	cfg_sink = NULL;
1052 
1053 	return err_code;
1054 }
1055 
1056 /* resolve sink names at end of config. Returns 0 on success otherwise error
1057  * flags.
1058 */
post_sink_resolve()1059 int post_sink_resolve()
1060 {
1061 	int err_code = 0;
1062 	struct logsrv *logsrv, *logb;
1063 	struct sink *sink;
1064 	struct proxy *px;
1065 
1066 	list_for_each_entry_safe(logsrv, logb, &global.logsrvs, list) {
1067 		if (logsrv->type == LOG_TARGET_BUFFER) {
1068 			sink = sink_find(logsrv->ring_name);
1069 			if (!sink || sink->type != SINK_TYPE_BUFFER) {
1070 				ha_alert("global log server uses unknown ring named '%s'.\n", logsrv->ring_name);
1071 				err_code |= ERR_ALERT | ERR_FATAL;
1072 			}
1073 			logsrv->sink = sink;
1074 		}
1075 	}
1076 
1077 	for (px = proxies_list; px; px = px->next) {
1078 		list_for_each_entry_safe(logsrv, logb, &px->logsrvs, list) {
1079 			if (logsrv->type == LOG_TARGET_BUFFER) {
1080 				sink = sink_find(logsrv->ring_name);
1081 				if (!sink || sink->type != SINK_TYPE_BUFFER) {
1082 					ha_alert("proxy '%s' log server uses unknown ring named '%s'.\n", px->id, logsrv->ring_name);
1083 					err_code |= ERR_ALERT | ERR_FATAL;
1084 				}
1085 				logsrv->sink = sink;
1086 			}
1087 		}
1088 	}
1089 	return err_code;
1090 }
1091 
1092 
sink_init()1093 static void sink_init()
1094 {
1095 	sink_new_fd("stdout", "standard output (fd#1)", SINK_FMT_RAW, 1);
1096 	sink_new_fd("stderr", "standard output (fd#2)", SINK_FMT_RAW, 2);
1097 	sink_new_buf("buf0",  "in-memory ring buffer", SINK_FMT_TIMED, 1048576);
1098 }
1099 
sink_deinit()1100 static void sink_deinit()
1101 {
1102 	struct sink *sink, *sb;
1103 
1104 	list_for_each_entry_safe(sink, sb, &sink_list, sink_list) {
1105 		if (sink->type == SINK_TYPE_BUFFER)
1106 			ring_free(sink->ctx.ring);
1107 		LIST_DEL(&sink->sink_list);
1108 		free(sink->name);
1109 		free(sink->desc);
1110 		free(sink);
1111 	}
1112 }
1113 
1114 INITCALL0(STG_REGISTER, sink_init);
1115 REGISTER_POST_DEINIT(sink_deinit);
1116 
1117 static struct cli_kw_list cli_kws = {{ },{
1118 	{ { "show", "events", NULL }, "show events [<sink>] : show event sink state", cli_parse_show_events, NULL, NULL },
1119 	{{},}
1120 }};
1121 
1122 INITCALL1(STG_REGISTER, cli_register_kw, &cli_kws);
1123 
1124 /* config parsers for this section */
1125 REGISTER_CONFIG_SECTION("ring", cfg_parse_ring, cfg_post_parse_ring);
1126 REGISTER_POST_CHECK(post_sink_resolve);
1127 
1128 /*
1129  * Local variables:
1130  *  c-indent-level: 8
1131  *  c-basic-offset: 8
1132  * End:
1133  */
1134