1 /*
2  * Stream management functions.
3  *
4  * Copyright 2000-2012 Willy Tarreau <w@1wt.eu>
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version
9  * 2 of the License, or (at your option) any later version.
10  *
11  */
12 
13 #include <stdlib.h>
14 #include <unistd.h>
15 #include <fcntl.h>
16 
17 #include <common/cfgparse.h>
18 #include <common/config.h>
19 #include <common/buffer.h>
20 #include <common/debug.h>
21 #include <common/memory.h>
22 
23 #include <types/applet.h>
24 #include <types/capture.h>
25 #include <types/cli.h>
26 #include <types/filters.h>
27 #include <types/global.h>
28 #include <types/stats.h>
29 
30 #include <proto/acl.h>
31 #include <proto/action.h>
32 #include <proto/arg.h>
33 #include <proto/backend.h>
34 #include <proto/channel.h>
35 #include <proto/checks.h>
36 #include <proto/cli.h>
37 #include <proto/connection.h>
38 #include <proto/stats.h>
39 #include <proto/fd.h>
40 #include <proto/filters.h>
41 #include <proto/freq_ctr.h>
42 #include <proto/frontend.h>
43 #include <proto/hdr_idx.h>
44 #include <proto/hlua.h>
45 #include <proto/listener.h>
46 #include <proto/log.h>
47 #include <proto/raw_sock.h>
48 #include <proto/session.h>
49 #include <proto/stream.h>
50 #include <proto/pipe.h>
51 #include <proto/proto_http.h>
52 #include <proto/proxy.h>
53 #include <proto/queue.h>
54 #include <proto/server.h>
55 #include <proto/sample.h>
56 #include <proto/stick_table.h>
57 #include <proto/stream_interface.h>
58 #include <proto/task.h>
59 #include <proto/tcp_rules.h>
60 #include <proto/vars.h>
61 
62 struct pool_head *pool2_stream;
63 struct list streams;
64 
65 /* List of all use-service keywords. */
66 static struct list service_keywords = LIST_HEAD_INIT(service_keywords);
67 
68 /* This function is called from the session handler which detects the end of
69  * handshake, in order to complete initialization of a valid stream. It must be
70  * called with a session (which may be embryonic). It returns the pointer to
71  * the newly created stream, or NULL in case of fatal error. The client-facing
72  * end point is assigned to <origin>, which must be valid. The task's context
73  * is set to the new stream, and its function is set to process_stream().
74  * Target and analysers are null.
75  */
stream_new(struct session * sess,struct task * t,enum obj_type * origin)76 struct stream *stream_new(struct session *sess, struct task *t, enum obj_type *origin)
77 {
78 	struct stream *s;
79 	struct connection *conn = objt_conn(origin);
80 	struct appctx *appctx   = objt_appctx(origin);
81 
82 	if (unlikely((s = pool_alloc2(pool2_stream)) == NULL))
83 		return s;
84 
85 	/* minimum stream initialization required for an embryonic stream is
86 	 * fairly low. We need very little to execute L4 ACLs, then we need a
87 	 * task to make the client-side connection live on its own.
88 	 *  - flags
89 	 *  - stick-entry tracking
90 	 */
91 	s->flags = 0;
92 	s->logs.logwait = sess->fe->to_log;
93 	s->logs.level = 0;
94 	s->logs.accept_date = sess->accept_date; /* user-visible date for logging */
95 	s->logs.tv_accept = sess->tv_accept;   /* corrected date for internal use */
96 	/* This function is called just after the handshake, so the handshake duration is
97 	 * between the accept time and now.
98 	 */
99 	s->logs.t_handshake = tv_ms_elapsed(&sess->tv_accept, &now);
100 	s->logs.t_idle = -1;
101 	tv_zero(&s->logs.tv_request);
102 	s->logs.t_queue = -1;
103 	s->logs.t_connect = -1;
104 	s->logs.t_data = -1;
105 	s->logs.t_close = 0;
106 	s->logs.bytes_in = s->logs.bytes_out = 0;
107 	s->logs.prx_queue_size = 0;  /* we get the number of pending conns before us */
108 	s->logs.srv_queue_size = 0; /* we will get this number soon */
109 
110 	/* default logging function */
111 	s->do_log = strm_log;
112 
113 	/* default error reporting function, may be changed by analysers */
114 	s->srv_error = default_srv_error;
115 
116 	/* Initialise the current rule list pointer to NULL. We are sure that
117 	 * any rulelist match the NULL pointer.
118 	 */
119 	s->current_rule_list = NULL;
120 	s->current_rule = NULL;
121 
122 	/* Copy SC counters for the stream. We don't touch refcounts because
123 	 * any reference we have is inherited from the session. Since the stream
124 	 * doesn't exist without the session, the session's existence guarantees
125 	 * we don't lose the entry. During the store operation, the stream won't
126 	 * touch these ones.
127 	 */
128 	memcpy(s->stkctr, sess->stkctr, sizeof(s->stkctr));
129 
130 	s->sess = sess;
131 	s->si[0].flags = SI_FL_NONE;
132 	s->si[1].flags = SI_FL_ISBACK;
133 
134 	s->uniq_id = global.req_count++;
135 
136 	/* OK, we're keeping the stream, so let's properly initialize the stream */
137 	LIST_ADDQ(&streams, &s->list);
138 	LIST_INIT(&s->back_refs);
139 
140 	LIST_INIT(&s->buffer_wait.list);
141 	s->buffer_wait.target = s;
142 	s->buffer_wait.wakeup_cb = (int (*)(void *))stream_res_wakeup;
143 
144 	s->flags |= SF_INITIALIZED;
145 	s->unique_id = NULL;
146 
147 	s->task = t;
148 	s->pending_events = 0;
149 	t->process = process_stream;
150 	t->context = s;
151 	t->expire = TICK_ETERNITY;
152 
153 	/* Note: initially, the stream's backend points to the frontend.
154 	 * This changes later when switching rules are executed or
155 	 * when the default backend is assigned.
156 	 */
157 	s->be  = sess->fe;
158 	s->req.buf = s->res.buf = NULL;
159 	s->req_cap = NULL;
160 	s->res_cap = NULL;
161 
162 	/* Initialise all the variables contexts even if not used.
163 	 * This permits to prune these contexts without errors.
164 	 */
165 	vars_init(&s->vars_txn,    SCOPE_TXN);
166 	vars_init(&s->vars_reqres, SCOPE_REQ);
167 
168 	/* this part should be common with other protocols */
169 	si_reset(&s->si[0]);
170 	si_set_state(&s->si[0], SI_ST_EST);
171 	s->si[0].hcto = sess->fe->timeout.clientfin;
172 
173 	/* attach the incoming connection to the stream interface now. */
174 	if (conn)
175 		si_attach_conn(&s->si[0], conn);
176 	else if (appctx)
177 		si_attach_appctx(&s->si[0], appctx);
178 
179 	if (likely(sess->fe->options2 & PR_O2_INDEPSTR))
180 		s->si[0].flags |= SI_FL_INDEP_STR;
181 
182 	/* pre-initialize the other side's stream interface to an INIT state. The
183 	 * callbacks will be initialized before attempting to connect.
184 	 */
185 	si_reset(&s->si[1]);
186 	s->si[1].hcto = TICK_ETERNITY;
187 
188 	if (likely(sess->fe->options2 & PR_O2_INDEPSTR))
189 		s->si[1].flags |= SI_FL_INDEP_STR;
190 
191 	stream_init_srv_conn(s);
192 	s->target = NULL;
193 	s->pend_pos = NULL;
194 
195 	/* init store persistence */
196 	s->store_count = 0;
197 
198 	channel_init(&s->req);
199 	s->req.flags |= CF_READ_ATTACHED; /* the producer is already connected */
200 	s->req.analysers = 0;
201 	channel_auto_connect(&s->req);  /* don't wait to establish connection */
202 	channel_auto_close(&s->req);    /* let the producer forward close requests */
203 
204 	s->req.rto = sess->fe->timeout.client;
205 	s->req.wto = TICK_ETERNITY;
206 	s->req.rex = TICK_ETERNITY;
207 	s->req.wex = TICK_ETERNITY;
208 	s->req.analyse_exp = TICK_ETERNITY;
209 
210 	channel_init(&s->res);
211 	s->res.flags |= CF_ISRESP;
212 	s->res.analysers = 0;
213 
214 	if (sess->fe->options2 & PR_O2_NODELAY) {
215 		s->req.flags |= CF_NEVER_WAIT;
216 		s->res.flags |= CF_NEVER_WAIT;
217 	}
218 
219 	s->res.wto = sess->fe->timeout.client;
220 	s->res.rto = TICK_ETERNITY;
221 	s->res.rex = TICK_ETERNITY;
222 	s->res.wex = TICK_ETERNITY;
223 	s->res.analyse_exp = TICK_ETERNITY;
224 
225 	s->txn = NULL;
226 
227 	HLUA_INIT(&s->hlua);
228 
229 	if (flt_stream_init(s) < 0 || flt_stream_start(s) < 0)
230 		goto out_fail_accept;
231 
232 	/* finish initialization of the accepted file descriptor */
233 	if (conn)
234 		conn_data_want_recv(conn);
235 	else if (appctx)
236 		si_applet_want_get(&s->si[0]);
237 
238 	if (sess->fe->accept && sess->fe->accept(s) < 0)
239 		goto out_fail_accept;
240 
241 	/* it is important not to call the wakeup function directly but to
242 	 * pass through task_wakeup(), because this one knows how to apply
243 	 * priorities to tasks.
244 	 */
245 	task_wakeup(t, TASK_WOKEN_INIT);
246 	return s;
247 
248 	/* Error unrolling */
249  out_fail_accept:
250 	flt_stream_release(s, 0);
251 	LIST_DEL(&s->list);
252 	pool_free2(pool2_stream, s);
253 	return NULL;
254 }
255 
256 /*
257  * frees  the context associated to a stream. It must have been removed first.
258  */
stream_free(struct stream * s)259 static void stream_free(struct stream *s)
260 {
261 	struct session *sess = strm_sess(s);
262 	struct proxy *fe = sess->fe;
263 	struct bref *bref, *back;
264 	struct connection *cli_conn = objt_conn(sess->origin);
265 	int i;
266 
267 	if (s->pend_pos)
268 		pendconn_free(s->pend_pos);
269 
270 	if (objt_server(s->target)) { /* there may be requests left pending in queue */
271 		if (s->flags & SF_CURR_SESS) {
272 			s->flags &= ~SF_CURR_SESS;
273 			objt_server(s->target)->cur_sess--;
274 		}
275 		if (may_dequeue_tasks(objt_server(s->target), s->be))
276 			process_srv_queue(objt_server(s->target));
277 	}
278 
279 	if (unlikely(s->srv_conn)) {
280 		/* the stream still has a reserved slot on a server, but
281 		 * it should normally be only the same as the one above,
282 		 * so this should not happen in fact.
283 		 */
284 		sess_change_server(s, NULL);
285 	}
286 
287 	if (s->req.pipe)
288 		put_pipe(s->req.pipe);
289 
290 	if (s->res.pipe)
291 		put_pipe(s->res.pipe);
292 
293 	/* We may still be present in the buffer wait queue */
294 	if (!LIST_ISEMPTY(&s->buffer_wait.list)) {
295 		LIST_DEL(&s->buffer_wait.list);
296 		LIST_INIT(&s->buffer_wait.list);
297 	}
298 	if (s->req.buf->size || s->res.buf->size) {
299 		b_drop(&s->req.buf);
300 		b_drop(&s->res.buf);
301 		offer_buffers(NULL, tasks_run_queue + applets_active_queue);
302 	}
303 
304 	pool_free2(pool2_uniqueid, s->unique_id);
305 	s->unique_id = NULL;
306 
307 	hlua_ctx_destroy(&s->hlua);
308 	if (s->txn)
309 		http_end_txn(s);
310 
311 	/* ensure the client-side transport layer is destroyed */
312 	if (cli_conn)
313 		conn_force_close(cli_conn);
314 
315 	for (i = 0; i < s->store_count; i++) {
316 		if (!s->store[i].ts)
317 			continue;
318 		stksess_free(s->store[i].table, s->store[i].ts);
319 		s->store[i].ts = NULL;
320 	}
321 
322 	if (s->txn) {
323 		pool_free2(pool2_hdr_idx, s->txn->hdr_idx.v);
324 		pool_free2(pool2_http_txn, s->txn);
325 		s->txn = NULL;
326 	}
327 
328 	flt_stream_stop(s);
329 	flt_stream_release(s, 0);
330 
331 	if (fe) {
332 		if (s->req_cap) {
333 			struct cap_hdr *h;
334 			for (h = fe->req_cap; h; h = h->next)
335 				pool_free2(h->pool, s->req_cap[h->index]);
336 		}
337 
338 		if (s->res_cap) {
339 			struct cap_hdr *h;
340 			for (h = fe->rsp_cap; h; h = h->next)
341 				pool_free2(h->pool, s->res_cap[h->index]);
342 		}
343 
344 		pool_free2(fe->rsp_cap_pool, s->res_cap);
345 		pool_free2(fe->req_cap_pool, s->req_cap);
346 	}
347 
348 	/* Cleanup all variable contexts. */
349 	vars_prune(&s->vars_txn, s->sess, s);
350 	vars_prune(&s->vars_reqres, s->sess, s);
351 
352 	stream_store_counters(s);
353 
354 	list_for_each_entry_safe(bref, back, &s->back_refs, users) {
355 		/* we have to unlink all watchers. We must not relink them if
356 		 * this stream was the last one in the list.
357 		 */
358 		LIST_DEL(&bref->users);
359 		LIST_INIT(&bref->users);
360 		if (s->list.n != &streams)
361 			LIST_ADDQ(&LIST_ELEM(s->list.n, struct stream *, list)->back_refs, &bref->users);
362 		bref->ref = s->list.n;
363 	}
364 	LIST_DEL(&s->list);
365 	si_release_endpoint(&s->si[1]);
366 	si_release_endpoint(&s->si[0]);
367 
368 	/* FIXME: for now we have a 1:1 relation between stream and session so
369 	 * the stream must free the session.
370 	 */
371 	pool_free2(pool2_stream, s);
372 	session_free(sess);
373 
374 	/* We may want to free the maximum amount of pools if the proxy is stopping */
375 	if (fe && unlikely(fe->state == PR_STSTOPPED)) {
376 		pool_flush2(pool2_buffer);
377 		pool_flush2(pool2_http_txn);
378 		pool_flush2(pool2_hdr_idx);
379 		pool_flush2(pool2_requri);
380 		pool_flush2(pool2_capture);
381 		pool_flush2(pool2_stream);
382 		pool_flush2(pool2_session);
383 		pool_flush2(pool2_connection);
384 		pool_flush2(pool2_pendconn);
385 		pool_flush2(fe->req_cap_pool);
386 		pool_flush2(fe->rsp_cap_pool);
387 	}
388 }
389 
390 
391 /* Allocates a work buffer for stream <s>. It is meant to be called inside
392  * process_stream(). It will only allocate the side needed for the function
393  * to work fine, which is the response buffer so that an error message may be
394  * built and returned. Response buffers may be allocated from the reserve, this
395  * is critical to ensure that a response may always flow and will never block a
396  * server from releasing a connection. Returns 0 in case of failure, non-zero
397  * otherwise.
398  */
stream_alloc_work_buffer(struct stream * s)399 static int stream_alloc_work_buffer(struct stream *s)
400 {
401 	if (!LIST_ISEMPTY(&s->buffer_wait.list)) {
402 		LIST_DEL(&s->buffer_wait.list);
403 		LIST_INIT(&s->buffer_wait.list);
404 	}
405 
406 	if (b_alloc_margin(&s->res.buf, 0))
407 		return 1;
408 
409 	LIST_ADDQ(&buffer_wq, &s->buffer_wait.list);
410 	return 0;
411 }
412 
413 /* releases unused buffers after processing. Typically used at the end of the
414  * update() functions. It will try to wake up as many tasks/applets as the
415  * number of buffers that it releases. In practice, most often streams are
416  * blocked on a single buffer, so it makes sense to try to wake two up when two
417  * buffers are released at once.
418  */
stream_release_buffers(struct stream * s)419 void stream_release_buffers(struct stream *s)
420 {
421 	int offer = 0;
422 
423 	if (s->req.buf->size && buffer_empty(s->req.buf)) {
424 		offer = 1;
425 		b_free(&s->req.buf);
426 	}
427 	if (s->res.buf->size && buffer_empty(s->res.buf)) {
428 		offer = 1;
429 		b_free(&s->res.buf);
430 	}
431 
432 	/* if we're certain to have at least 1 buffer available, and there is
433 	 * someone waiting, we can wake up a waiter and offer them.
434 	 */
435 	if (offer)
436 		offer_buffers(s, tasks_run_queue + applets_active_queue);
437 }
438 
439 /* perform minimal intializations, report 0 in case of error, 1 if OK. */
init_stream()440 int init_stream()
441 {
442 	LIST_INIT(&streams);
443 	pool2_stream = create_pool("stream", sizeof(struct stream), MEM_F_SHARED);
444 	return pool2_stream != NULL;
445 }
446 
stream_process_counters(struct stream * s)447 void stream_process_counters(struct stream *s)
448 {
449 	struct session *sess = s->sess;
450 	unsigned long long bytes;
451 	void *ptr1,*ptr2;
452 	int i;
453 
454 	bytes = s->req.total - s->logs.bytes_in;
455 	s->logs.bytes_in = s->req.total;
456 	if (bytes) {
457 		sess->fe->fe_counters.bytes_in += bytes;
458 
459 		s->be->be_counters.bytes_in += bytes;
460 
461 		if (objt_server(s->target))
462 			objt_server(s->target)->counters.bytes_in += bytes;
463 
464 		if (sess->listener && sess->listener->counters)
465 			sess->listener->counters->bytes_in += bytes;
466 
467 		for (i = 0; i < MAX_SESS_STKCTR; i++) {
468 			struct stkctr *stkctr = &s->stkctr[i];
469 
470 			if (!stkctr_entry(stkctr)) {
471 				stkctr = &sess->stkctr[i];
472 				if (!stkctr_entry(stkctr))
473 					continue;
474 			}
475 
476 			ptr1 = stktable_data_ptr(stkctr->table, stkctr_entry(stkctr), STKTABLE_DT_BYTES_IN_CNT);
477 			if (ptr1)
478 				stktable_data_cast(ptr1, bytes_in_cnt) += bytes;
479 
480 			ptr2 = stktable_data_ptr(stkctr->table, stkctr_entry(stkctr), STKTABLE_DT_BYTES_IN_RATE);
481 			if (ptr2)
482 				update_freq_ctr_period(&stktable_data_cast(ptr2, bytes_in_rate),
483 						       stkctr->table->data_arg[STKTABLE_DT_BYTES_IN_RATE].u, bytes);
484 
485 			/* If data was modified, we need to touch to re-schedule sync */
486 			if (ptr1 || ptr2)
487 				stktable_touch(stkctr->table, stkctr_entry(stkctr), 1);
488 		}
489 	}
490 
491 	bytes = s->res.total - s->logs.bytes_out;
492 	s->logs.bytes_out = s->res.total;
493 	if (bytes) {
494 		sess->fe->fe_counters.bytes_out += bytes;
495 
496 		s->be->be_counters.bytes_out += bytes;
497 
498 		if (objt_server(s->target))
499 			objt_server(s->target)->counters.bytes_out += bytes;
500 
501 		if (sess->listener && sess->listener->counters)
502 			sess->listener->counters->bytes_out += bytes;
503 
504 		for (i = 0; i < MAX_SESS_STKCTR; i++) {
505 			struct stkctr *stkctr = &s->stkctr[i];
506 
507 			if (!stkctr_entry(stkctr)) {
508 				stkctr = &sess->stkctr[i];
509 				if (!stkctr_entry(stkctr))
510 					continue;
511 			}
512 
513 			ptr1 = stktable_data_ptr(stkctr->table, stkctr_entry(stkctr), STKTABLE_DT_BYTES_OUT_CNT);
514 			if (ptr1)
515 				stktable_data_cast(ptr1, bytes_out_cnt) += bytes;
516 
517 			ptr2 = stktable_data_ptr(stkctr->table, stkctr_entry(stkctr), STKTABLE_DT_BYTES_OUT_RATE);
518 			if (ptr2)
519 				update_freq_ctr_period(&stktable_data_cast(ptr2, bytes_out_rate),
520 						       stkctr->table->data_arg[STKTABLE_DT_BYTES_OUT_RATE].u, bytes);
521 
522 			/* If data was modified, we need to touch to re-schedule sync */
523 			if (ptr1 || ptr2)
524 				stktable_touch(stkctr->table, stkctr_entry(stkctr), 1);
525 		}
526 	}
527 }
528 
529 /* This function is called with (si->state == SI_ST_CON) meaning that a
530  * connection was attempted and that the file descriptor is already allocated.
531  * We must check for establishment, error and abort. Possible output states
532  * are SI_ST_EST (established), SI_ST_CER (error), SI_ST_DIS (abort), and
533  * SI_ST_CON (no change). The function returns 0 if it switches to SI_ST_CER,
534  * otherwise 1. This only works with connection-based streams.
535  */
sess_update_st_con_tcp(struct stream * s)536 static int sess_update_st_con_tcp(struct stream *s)
537 {
538 	struct stream_interface *si = &s->si[1];
539 	struct channel *req = &s->req;
540 	struct channel *rep = &s->res;
541 	struct connection *srv_conn = __objt_conn(si->end);
542 
543 	/* If we got an error, or if nothing happened and the connection timed
544 	 * out, we must give up. The CER state handler will take care of retry
545 	 * attempts and error reports.
546 	 */
547 	if (unlikely(si->flags & (SI_FL_EXP|SI_FL_ERR))) {
548 		if (unlikely(req->flags & CF_WROTE_DATA)) {
549 			/* Some data were sent past the connection establishment,
550 			 * so we need to pretend we're established to log correctly
551 			 * and let later states handle the failure.
552 			 */
553 			si->state    = SI_ST_EST;
554 			si->err_type = SI_ET_DATA_ERR;
555 			req->flags |= CF_WRITE_ERROR;
556 			rep->flags |= CF_READ_ERROR;
557 			return 1;
558 		}
559 		si->exp   = TICK_ETERNITY;
560 		si->state = SI_ST_CER;
561 
562 		conn_force_close(srv_conn);
563 
564 		if (si->err_type)
565 			return 0;
566 
567 		if (si->flags & SI_FL_ERR)
568 			si->err_type = SI_ET_CONN_ERR;
569 		else
570 			si->err_type = SI_ET_CONN_TO;
571 		return 0;
572 	}
573 
574 	/* OK, maybe we want to abort */
575 	if (!(req->flags & CF_WROTE_DATA) &&
576 	    unlikely((rep->flags & CF_SHUTW) ||
577 		     ((req->flags & CF_SHUTW_NOW) && /* FIXME: this should not prevent a connection from establishing */
578 		      ((!(req->flags & (CF_WRITE_ACTIVITY|CF_WRITE_EVENT)) && channel_is_empty(req)) ||
579 		       s->be->options & PR_O_ABRT_CLOSE)))) {
580 		/* give up */
581 		si_shutw(si);
582 		si->err_type |= SI_ET_CONN_ABRT;
583 		if (s->srv_error)
584 			s->srv_error(s, si);
585 		return 1;
586 	}
587 
588 	/* we need to wait a bit more if there was no activity either */
589 	if (!(req->flags & (CF_WRITE_ACTIVITY|CF_WRITE_EVENT)))
590 		return 1;
591 
592 	/* OK, this means that a connection succeeded. The caller will be
593 	 * responsible for handling the transition from CON to EST.
594 	 */
595 	si->state    = SI_ST_EST;
596 	si->err_type = SI_ET_NONE;
597 	return 1;
598 }
599 
600 /* This function is called with (si->state == SI_ST_CER) meaning that a
601  * previous connection attempt has failed and that the file descriptor
602  * has already been released. Possible causes include asynchronous error
603  * notification and time out. Possible output states are SI_ST_CLO when
604  * retries are exhausted, SI_ST_TAR when a delay is wanted before a new
605  * connection attempt, SI_ST_ASS when it's wise to retry on the same server,
606  * and SI_ST_REQ when an immediate redispatch is wanted. The buffers are
607  * marked as in error state. It returns 0.
608  */
sess_update_st_cer(struct stream * s)609 static int sess_update_st_cer(struct stream *s)
610 {
611 	struct stream_interface *si = &s->si[1];
612 
613 	/* we probably have to release last stream from the server */
614 	if (objt_server(s->target)) {
615 		health_adjust(objt_server(s->target), HANA_STATUS_L4_ERR);
616 
617 		if (s->flags & SF_CURR_SESS) {
618 			s->flags &= ~SF_CURR_SESS;
619 			objt_server(s->target)->cur_sess--;
620 		}
621 	}
622 
623 	/* ensure that we have enough retries left */
624 	si->conn_retries--;
625 	if (si->conn_retries < 0) {
626 		if (!si->err_type) {
627 			si->err_type = SI_ET_CONN_ERR;
628 		}
629 
630 		if (objt_server(s->target))
631 			objt_server(s->target)->counters.failed_conns++;
632 		s->be->be_counters.failed_conns++;
633 		sess_change_server(s, NULL);
634 		if (may_dequeue_tasks(objt_server(s->target), s->be))
635 			process_srv_queue(objt_server(s->target));
636 
637 		/* shutw is enough so stop a connecting socket */
638 		si_shutw(si);
639 		s->req.flags |= CF_WRITE_ERROR;
640 		s->res.flags |= CF_READ_ERROR;
641 
642 		si->state = SI_ST_CLO;
643 		if (s->srv_error)
644 			s->srv_error(s, si);
645 		return 0;
646 	}
647 
648 	/* If the "redispatch" option is set on the backend, we are allowed to
649 	 * retry on another server. By default this redispatch occurs on the
650 	 * last retry, but if configured we allow redispatches to occur on
651 	 * configurable intervals, e.g. on every retry. In order to achieve this,
652 	 * we must mark the stream unassigned, and eventually clear the DIRECT
653 	 * bit to ignore any persistence cookie. We won't count a retry nor a
654 	 * redispatch yet, because this will depend on what server is selected.
655 	 * If the connection is not persistent, the balancing algorithm is not
656 	 * determinist (round robin) and there is more than one active server,
657 	 * we accept to perform an immediate redispatch without waiting since
658 	 * we don't care about this particular server.
659 	 */
660 	if (objt_server(s->target) &&
661 	    (s->be->options & PR_O_REDISP) && !(s->flags & SF_FORCE_PRST) &&
662 	    ((__objt_server(s->target)->state < SRV_ST_RUNNING) ||
663 	     (((s->be->redispatch_after > 0) &&
664 	       ((s->be->conn_retries - si->conn_retries) %
665 	        s->be->redispatch_after == 0)) ||
666 	      ((s->be->redispatch_after < 0) &&
667 	       ((s->be->conn_retries - si->conn_retries) %
668 	        (s->be->conn_retries + 1 + s->be->redispatch_after) == 0))) ||
669 	     (!(s->flags & SF_DIRECT) && s->be->srv_act > 1 &&
670 	      ((s->be->lbprm.algo & BE_LB_KIND) == BE_LB_KIND_RR)))) {
671 		sess_change_server(s, NULL);
672 		if (may_dequeue_tasks(objt_server(s->target), s->be))
673 			process_srv_queue(objt_server(s->target));
674 
675 		s->flags &= ~(SF_DIRECT | SF_ASSIGNED | SF_ADDR_SET);
676 		si->state = SI_ST_REQ;
677 	} else {
678 		if (objt_server(s->target))
679 			objt_server(s->target)->counters.retries++;
680 		s->be->be_counters.retries++;
681 		si->state = SI_ST_ASS;
682 	}
683 
684 	if (si->flags & SI_FL_ERR) {
685 		/* The error was an asynchronous connection error, and we will
686 		 * likely have to retry connecting to the same server, most
687 		 * likely leading to the same result. To avoid this, we wait
688 		 * MIN(one second, connect timeout) before retrying.
689 		 */
690 
691 		int delay = 1000;
692 
693 		if (s->be->timeout.connect && s->be->timeout.connect < delay)
694 			delay = s->be->timeout.connect;
695 
696 		if (!si->err_type)
697 			si->err_type = SI_ET_CONN_ERR;
698 
699 		/* only wait when we're retrying on the same server */
700 		if (si->state == SI_ST_ASS ||
701 		    (s->be->lbprm.algo & BE_LB_KIND) != BE_LB_KIND_RR ||
702 		    (s->be->srv_act <= 1)) {
703 			si->state = SI_ST_TAR;
704 			si->exp = tick_add(now_ms, MS_TO_TICKS(delay));
705 		}
706 		return 0;
707 	}
708 	return 0;
709 }
710 
711 /*
712  * This function handles the transition between the SI_ST_CON state and the
713  * SI_ST_EST state. It must only be called after switching from SI_ST_CON (or
714  * SI_ST_INI) to SI_ST_EST, but only when a ->proto is defined.
715  */
sess_establish(struct stream * s)716 static void sess_establish(struct stream *s)
717 {
718 	struct stream_interface *si = &s->si[1];
719 	struct channel *req = &s->req;
720 	struct channel *rep = &s->res;
721 
722 	/* First, centralize the timers information */
723 	s->logs.t_connect = tv_ms_elapsed(&s->logs.tv_accept, &now);
724 	si->exp      = TICK_ETERNITY;
725 
726 	if (objt_server(s->target))
727 		health_adjust(objt_server(s->target), HANA_STATUS_L4_OK);
728 
729 	if (s->be->mode == PR_MODE_TCP) { /* let's allow immediate data connection in this case */
730 		/* if the user wants to log as soon as possible, without counting
731 		 * bytes from the server, then this is the right moment. */
732 		if (!LIST_ISEMPTY(&strm_fe(s)->logformat) && !(s->logs.logwait & LW_BYTES)) {
733 			s->logs.t_close = s->logs.t_connect; /* to get a valid end date */
734 			s->do_log(s);
735 		}
736 	}
737 	else {
738 		rep->flags |= CF_READ_DONTWAIT; /* a single read is enough to get response headers */
739 	}
740 
741 	rep->analysers |= strm_fe(s)->fe_rsp_ana | s->be->be_rsp_ana;
742 
743 	/* Be sure to filter response headers if the backend is an HTTP proxy
744 	 * and if there are filters attached to the stream. */
745 	if (s->be->mode == PR_MODE_HTTP && HAS_FILTERS(s))
746 		rep->analysers |= AN_RES_FLT_HTTP_HDRS;
747 
748 	rep->flags |= CF_READ_ATTACHED; /* producer is now attached */
749 	if (req->flags & CF_WAKE_CONNECT) {
750 		req->flags |= CF_WAKE_ONCE;
751 		req->flags &= ~CF_WAKE_CONNECT;
752 	}
753 	if (objt_conn(si->end)) {
754 		/* real connections have timeouts */
755 		req->wto = s->be->timeout.server;
756 		rep->rto = s->be->timeout.server;
757 	}
758 	req->wex = TICK_ETERNITY;
759 }
760 
761 /* Check if the connection request is in such a state that it can be aborted. */
check_req_may_abort(struct channel * req,struct stream * s)762 static int check_req_may_abort(struct channel *req, struct stream *s)
763 {
764 	return ((req->flags & (CF_READ_ERROR)) ||
765 	        ((req->flags & (CF_SHUTW_NOW|CF_SHUTW)) &&  /* empty and client aborted */
766 	         (channel_is_empty(req) || s->be->options & PR_O_ABRT_CLOSE)));
767 }
768 
769 /* Update back stream interface status for input states SI_ST_ASS, SI_ST_QUE,
770  * SI_ST_TAR. Other input states are simply ignored.
771  * Possible output states are SI_ST_CLO, SI_ST_TAR, SI_ST_ASS, SI_ST_REQ, SI_ST_CON
772  * and SI_ST_EST. Flags must have previously been updated for timeouts and other
773  * conditions.
774  */
sess_update_stream_int(struct stream * s)775 static void sess_update_stream_int(struct stream *s)
776 {
777 	struct server *srv = objt_server(s->target);
778 	struct stream_interface *si = &s->si[1];
779 	struct channel *req = &s->req;
780 
781 	DPRINTF(stderr,"[%u] %s: sess=%p rq=%p, rp=%p, exp(r,w)=%u,%u rqf=%08x rpf=%08x rqh=%d rqt=%d rph=%d rpt=%d cs=%d ss=%d\n",
782 		now_ms, __FUNCTION__,
783 		s,
784 		req, &s->res,
785 		req->rex, s->res.wex,
786 		req->flags, s->res.flags,
787 		req->buf->i, req->buf->o, s->res.buf->i, s->res.buf->o, s->si[0].state, s->si[1].state);
788 
789 	if (si->state == SI_ST_ASS) {
790 		/* Server assigned to connection request, we have to try to connect now */
791 		int conn_err;
792 
793 		/* Before we try to initiate the connection, see if the
794 		 * request may be aborted instead.
795 		 */
796 		if (check_req_may_abort(req, s)) {
797 			si->err_type |= SI_ET_CONN_ABRT;
798 			goto abort_connection;
799 		}
800 
801 		conn_err = connect_server(s);
802 		srv = objt_server(s->target);
803 
804 		if (conn_err == SF_ERR_NONE) {
805 			/* state = SI_ST_CON or SI_ST_EST now */
806 			if (srv)
807 				srv_inc_sess_ctr(srv);
808 			if (srv)
809 				srv_set_sess_last(srv);
810 			return;
811 		}
812 
813 		/* We have received a synchronous error. We might have to
814 		 * abort, retry immediately or redispatch.
815 		 */
816 		if (conn_err == SF_ERR_INTERNAL) {
817 			if (!si->err_type) {
818 				si->err_type = SI_ET_CONN_OTHER;
819 			}
820 
821 			if (srv)
822 				srv_inc_sess_ctr(srv);
823 			if (srv)
824 				srv_set_sess_last(srv);
825 			if (srv)
826 				srv->counters.failed_conns++;
827 			s->be->be_counters.failed_conns++;
828 
829 			/* release other streams waiting for this server */
830 			sess_change_server(s, NULL);
831 			if (may_dequeue_tasks(srv, s->be))
832 				process_srv_queue(srv);
833 
834 			/* Failed and not retryable. */
835 			si_shutr(si);
836 			si_shutw(si);
837 			req->flags |= CF_WRITE_ERROR;
838 
839 			s->logs.t_queue = tv_ms_elapsed(&s->logs.tv_accept, &now);
840 
841 			/* no stream was ever accounted for this server */
842 			si->state = SI_ST_CLO;
843 			if (s->srv_error)
844 				s->srv_error(s, si);
845 			return;
846 		}
847 
848 		/* We are facing a retryable error, but we don't want to run a
849 		 * turn-around now, as the problem is likely a source port
850 		 * allocation problem, so we want to retry now.
851 		 */
852 		si->state = SI_ST_CER;
853 		si->flags &= ~SI_FL_ERR;
854 		sess_update_st_cer(s);
855 		/* now si->state is one of SI_ST_CLO, SI_ST_TAR, SI_ST_ASS, SI_ST_REQ */
856 		return;
857 	}
858 	else if (si->state == SI_ST_QUE) {
859 		/* connection request was queued, check for any update */
860 		if (!s->pend_pos) {
861 			/* The connection is not in the queue anymore. Either
862 			 * we have a server connection slot available and we
863 			 * go directly to the assigned state, or we need to
864 			 * load-balance first and go to the INI state.
865 			 */
866 			si->exp = TICK_ETERNITY;
867 			if (unlikely(!(s->flags & SF_ASSIGNED)))
868 				si->state = SI_ST_REQ;
869 			else {
870 				s->logs.t_queue = tv_ms_elapsed(&s->logs.tv_accept, &now);
871 				si->state = SI_ST_ASS;
872 			}
873 			return;
874 		}
875 
876 		/* Connection request still in queue... */
877 		if (si->flags & SI_FL_EXP) {
878 			/* ... and timeout expired */
879 			si->exp = TICK_ETERNITY;
880 			s->logs.t_queue = tv_ms_elapsed(&s->logs.tv_accept, &now);
881 			if (srv)
882 				srv->counters.failed_conns++;
883 			s->be->be_counters.failed_conns++;
884 			si_shutr(si);
885 			si_shutw(si);
886 			req->flags |= CF_WRITE_TIMEOUT;
887 			if (!si->err_type)
888 				si->err_type = SI_ET_QUEUE_TO;
889 			si->state = SI_ST_CLO;
890 			if (s->srv_error)
891 				s->srv_error(s, si);
892 			return;
893 		}
894 
895 		/* Connection remains in queue, check if we have to abort it */
896 		if (check_req_may_abort(req, s)) {
897 			s->logs.t_queue = tv_ms_elapsed(&s->logs.tv_accept, &now);
898 			si->err_type |= SI_ET_QUEUE_ABRT;
899 			goto abort_connection;
900 		}
901 
902 		/* Nothing changed */
903 		return;
904 	}
905 	else if (si->state == SI_ST_TAR) {
906 		/* Connection request might be aborted */
907 		if (check_req_may_abort(req, s)) {
908 			si->err_type |= SI_ET_CONN_ABRT;
909 			goto abort_connection;
910 		}
911 
912 		if (!(si->flags & SI_FL_EXP))
913 			return;  /* still in turn-around */
914 
915 		si->exp = TICK_ETERNITY;
916 
917 		/* we keep trying on the same server as long as the stream is
918 		 * marked "assigned".
919 		 * FIXME: Should we force a redispatch attempt when the server is down ?
920 		 */
921 		if (s->flags & SF_ASSIGNED)
922 			si->state = SI_ST_ASS;
923 		else
924 			si->state = SI_ST_REQ;
925 		return;
926 	}
927 	return;
928 
929 abort_connection:
930 	/* give up */
931 	si->exp = TICK_ETERNITY;
932 	si_shutr(si);
933 	si_shutw(si);
934 	si->state = SI_ST_CLO;
935 	if (s->srv_error)
936 		s->srv_error(s, si);
937 	return;
938 }
939 
940 /* Set correct stream termination flags in case no analyser has done it. It
941  * also counts a failed request if the server state has not reached the request
942  * stage.
943  */
sess_set_term_flags(struct stream * s)944 static void sess_set_term_flags(struct stream *s)
945 {
946 	if (!(s->flags & SF_FINST_MASK)) {
947 		if (s->si[1].state < SI_ST_REQ) {
948 
949 			strm_fe(s)->fe_counters.failed_req++;
950 			if (strm_li(s) && strm_li(s)->counters)
951 				strm_li(s)->counters->failed_req++;
952 
953 			s->flags |= SF_FINST_R;
954 		}
955 		else if (s->si[1].state == SI_ST_QUE)
956 			s->flags |= SF_FINST_Q;
957 		else if (s->si[1].state < SI_ST_EST)
958 			s->flags |= SF_FINST_C;
959 		else if (s->si[1].state == SI_ST_EST || s->si[1].prev_state == SI_ST_EST)
960 			s->flags |= SF_FINST_D;
961 		else
962 			s->flags |= SF_FINST_L;
963 	}
964 }
965 
966 /* This function initiates a server connection request on a stream interface
967  * already in SI_ST_REQ state. Upon success, the state goes to SI_ST_ASS for
968  * a real connection to a server, indicating that a server has been assigned,
969  * or SI_ST_EST for a successful connection to an applet. It may also return
970  * SI_ST_QUE, or SI_ST_CLO upon error.
971  */
sess_prepare_conn_req(struct stream * s)972 static void sess_prepare_conn_req(struct stream *s)
973 {
974 	struct stream_interface *si = &s->si[1];
975 
976 	DPRINTF(stderr,"[%u] %s: sess=%p rq=%p, rp=%p, exp(r,w)=%u,%u rqf=%08x rpf=%08x rqh=%d rqt=%d rph=%d rpt=%d cs=%d ss=%d\n",
977 		now_ms, __FUNCTION__,
978 		s,
979 		&s->req, &s->res,
980 		s->req.rex, s->res.wex,
981 		s->req.flags, s->res.flags,
982 		s->req.buf->i, s->req.buf->o, s->res.buf->i, s->res.buf->o, s->si[0].state, s->si[1].state);
983 
984 	if (si->state != SI_ST_REQ)
985 		return;
986 
987 	if (unlikely(obj_type(s->target) == OBJ_TYPE_APPLET)) {
988 		/* the applet directly goes to the EST state */
989 		struct appctx *appctx = objt_appctx(si->end);
990 
991 		if (!appctx || appctx->applet != __objt_applet(s->target))
992 			appctx = stream_int_register_handler(si, objt_applet(s->target));
993 
994 		if (!appctx) {
995 			/* No more memory, let's immediately abort. Force the
996 			 * error code to ignore the ERR_LOCAL which is not a
997 			 * real error.
998 			 */
999 			s->flags &= ~(SF_ERR_MASK | SF_FINST_MASK);
1000 
1001 			si_shutr(si);
1002 			si_shutw(si);
1003 			s->req.flags |= CF_WRITE_ERROR;
1004 			si->err_type = SI_ET_CONN_RES;
1005 			si->state = SI_ST_CLO;
1006 			if (s->srv_error)
1007 				s->srv_error(s, si);
1008 			return;
1009 		}
1010 
1011 		if (tv_iszero(&s->logs.tv_request))
1012 			s->logs.tv_request = now;
1013 		s->logs.t_queue   = tv_ms_elapsed(&s->logs.tv_accept, &now);
1014 		si->state         = SI_ST_EST;
1015 		si->err_type      = SI_ET_NONE;
1016 		be_set_sess_last(s->be);
1017 		/* let sess_establish() finish the job */
1018 		return;
1019 	}
1020 
1021 	/* Try to assign a server */
1022 	if (srv_redispatch_connect(s) != 0) {
1023 		/* We did not get a server. Either we queued the
1024 		 * connection request, or we encountered an error.
1025 		 */
1026 		if (si->state == SI_ST_QUE)
1027 			return;
1028 
1029 		/* we did not get any server, let's check the cause */
1030 		si_shutr(si);
1031 		si_shutw(si);
1032 		s->req.flags |= CF_WRITE_ERROR;
1033 		if (!si->err_type)
1034 			si->err_type = SI_ET_CONN_OTHER;
1035 		si->state = SI_ST_CLO;
1036 		if (s->srv_error)
1037 			s->srv_error(s, si);
1038 		return;
1039 	}
1040 
1041 	/* The server is assigned */
1042 	s->logs.t_queue = tv_ms_elapsed(&s->logs.tv_accept, &now);
1043 	si->state = SI_ST_ASS;
1044 	be_set_sess_last(s->be);
1045 }
1046 
1047 /* This function parses the use-service action ruleset. It executes
1048  * the associated ACL and set an applet as a stream or txn final node.
1049  * it returns ACT_RET_ERR if an error occurs, the proxy left in
1050  * consistent state. It returns ACT_RET_STOP in succes case because
1051  * use-service must be a terminal action. Returns ACT_RET_YIELD
1052  * if the initialisation function require more data.
1053  */
process_use_service(struct act_rule * rule,struct proxy * px,struct session * sess,struct stream * s,int flags)1054 enum act_return process_use_service(struct act_rule *rule, struct proxy *px,
1055                                     struct session *sess, struct stream *s, int flags)
1056 
1057 {
1058 	struct appctx *appctx;
1059 
1060 	/* Initialises the applet if it is required. */
1061 	if (flags & ACT_FLAG_FIRST) {
1062 		/* Register applet. this function schedules the applet. */
1063 		s->target = &rule->applet.obj_type;
1064 		if (unlikely(!stream_int_register_handler(&s->si[1], objt_applet(s->target))))
1065 			return ACT_RET_ERR;
1066 
1067 		/* Initialise the context. */
1068 		appctx = si_appctx(&s->si[1]);
1069 		memset(&appctx->ctx, 0, sizeof(appctx->ctx));
1070 		appctx->rule = rule;
1071 
1072 		/* enable the minimally required analyzers in case of HTTP
1073 		 * keep-alive to properly handle keep-alive and compression
1074 		 * on the HTTP response.
1075 		 */
1076 		if (rule->from == ACT_F_HTTP_REQ) {
1077 			s->req.analysers &= AN_REQ_FLT_HTTP_HDRS | AN_REQ_FLT_END;
1078 			s->req.analysers |= AN_REQ_HTTP_XFER_BODY;
1079 		}
1080 	}
1081 	else
1082 		appctx = si_appctx(&s->si[1]);
1083 
1084 	/* Stops the applet sheduling, in case of the init function miss
1085 	 * some data.
1086 	 */
1087 	appctx_pause(appctx);
1088 	si_applet_stop_get(&s->si[1]);
1089 
1090 	/* Call initialisation. */
1091 	if (rule->applet.init)
1092 		switch (rule->applet.init(appctx, px, s)) {
1093 		case 0: return ACT_RET_ERR;
1094 		case 1: break;
1095 		default: return ACT_RET_YIELD;
1096 	}
1097 
1098 	/* Now we can schedule the applet. */
1099 	si_applet_cant_get(&s->si[1]);
1100 	appctx_wakeup(appctx);
1101 
1102 	if (sess->fe == s->be) /* report it if the request was intercepted by the frontend */
1103 		sess->fe->fe_counters.intercepted_req++;
1104 
1105 	/* The flag SF_ASSIGNED prevent from server assignment. */
1106 	s->flags |= SF_ASSIGNED;
1107 
1108 	return ACT_RET_STOP;
1109 }
1110 
1111 /* This stream analyser checks the switching rules and changes the backend
1112  * if appropriate. The default_backend rule is also considered, then the
1113  * target backend's forced persistence rules are also evaluated last if any.
1114  * It returns 1 if the processing can continue on next analysers, or zero if it
1115  * either needs more data or wants to immediately abort the request.
1116  */
process_switching_rules(struct stream * s,struct channel * req,int an_bit)1117 static int process_switching_rules(struct stream *s, struct channel *req, int an_bit)
1118 {
1119 	struct persist_rule *prst_rule;
1120 	struct session *sess = s->sess;
1121 	struct proxy *fe = sess->fe;
1122 
1123 	req->analysers &= ~an_bit;
1124 	req->analyse_exp = TICK_ETERNITY;
1125 
1126 	DPRINTF(stderr,"[%u] %s: stream=%p b=%p, exp(r,w)=%u,%u bf=%08x bh=%d analysers=%02x\n",
1127 		now_ms, __FUNCTION__,
1128 		s,
1129 		req,
1130 		req->rex, req->wex,
1131 		req->flags,
1132 		req->buf->i,
1133 		req->analysers);
1134 
1135 	/* now check whether we have some switching rules for this request */
1136 	if (!(s->flags & SF_BE_ASSIGNED)) {
1137 		struct switching_rule *rule;
1138 
1139 		list_for_each_entry(rule, &fe->switching_rules, list) {
1140 			int ret = 1;
1141 
1142 			if (rule->cond) {
1143 				ret = acl_exec_cond(rule->cond, fe, sess, s, SMP_OPT_DIR_REQ|SMP_OPT_FINAL);
1144 				ret = acl_pass(ret);
1145 				if (rule->cond->pol == ACL_COND_UNLESS)
1146 					ret = !ret;
1147 			}
1148 
1149 			if (ret) {
1150 				/* If the backend name is dynamic, try to resolve the name.
1151 				 * If we can't resolve the name, or if any error occurs, break
1152 				 * the loop and fallback to the default backend.
1153 				 */
1154 				struct proxy *backend = NULL;
1155 
1156 				if (rule->dynamic) {
1157 					struct chunk *tmp;
1158 
1159 					tmp = alloc_trash_chunk();
1160 					if (!tmp)
1161 						goto sw_failed;
1162 
1163 					if (build_logline(s, tmp->str, tmp->size, &rule->be.expr))
1164 						backend = proxy_be_by_name(tmp->str);
1165 
1166 					free_trash_chunk(tmp);
1167 					tmp = NULL;
1168 
1169 					if (!backend)
1170 						break;
1171 				}
1172 				else
1173 					backend = rule->be.backend;
1174 
1175 				if (!stream_set_backend(s, backend))
1176 					goto sw_failed;
1177 				break;
1178 			}
1179 		}
1180 
1181 		/* To ensure correct connection accounting on the backend, we
1182 		 * have to assign one if it was not set (eg: a listen). This
1183 		 * measure also takes care of correctly setting the default
1184 		 * backend if any.
1185 		 */
1186 		if (!(s->flags & SF_BE_ASSIGNED))
1187 			if (!stream_set_backend(s, fe->defbe.be ? fe->defbe.be : s->be))
1188 				goto sw_failed;
1189 	}
1190 
1191 	/* we don't want to run the TCP or HTTP filters again if the backend has not changed */
1192 	if (fe == s->be) {
1193 		s->req.analysers &= ~AN_REQ_INSPECT_BE;
1194 		s->req.analysers &= ~AN_REQ_HTTP_PROCESS_BE;
1195 		s->req.analysers &= ~AN_REQ_FLT_START_BE;
1196 	}
1197 
1198 	/* as soon as we know the backend, we must check if we have a matching forced or ignored
1199 	 * persistence rule, and report that in the stream.
1200 	 */
1201 	list_for_each_entry(prst_rule, &s->be->persist_rules, list) {
1202 		int ret = 1;
1203 
1204 		if (prst_rule->cond) {
1205 	                ret = acl_exec_cond(prst_rule->cond, s->be, sess, s, SMP_OPT_DIR_REQ|SMP_OPT_FINAL);
1206 			ret = acl_pass(ret);
1207 			if (prst_rule->cond->pol == ACL_COND_UNLESS)
1208 				ret = !ret;
1209 		}
1210 
1211 		if (ret) {
1212 			/* no rule, or the rule matches */
1213 			if (prst_rule->type == PERSIST_TYPE_FORCE) {
1214 				s->flags |= SF_FORCE_PRST;
1215 			} else {
1216 				s->flags |= SF_IGNORE_PRST;
1217 			}
1218 			break;
1219 		}
1220 	}
1221 
1222 	return 1;
1223 
1224  sw_failed:
1225 	/* immediately abort this request in case of allocation failure */
1226 	channel_abort(&s->req);
1227 	channel_abort(&s->res);
1228 
1229 	if (!(s->flags & SF_ERR_MASK))
1230 		s->flags |= SF_ERR_RESOURCE;
1231 	if (!(s->flags & SF_FINST_MASK))
1232 		s->flags |= SF_FINST_R;
1233 
1234 	if (s->txn)
1235 		s->txn->status = 500;
1236 	s->req.analysers &= AN_REQ_FLT_END;
1237 	s->req.analyse_exp = TICK_ETERNITY;
1238 	return 0;
1239 }
1240 
1241 /* This stream analyser works on a request. It applies all use-server rules on
1242  * it then returns 1. The data must already be present in the buffer otherwise
1243  * they won't match. It always returns 1.
1244  */
process_server_rules(struct stream * s,struct channel * req,int an_bit)1245 static int process_server_rules(struct stream *s, struct channel *req, int an_bit)
1246 {
1247 	struct proxy *px = s->be;
1248 	struct session *sess = s->sess;
1249 	struct server_rule *rule;
1250 
1251 	DPRINTF(stderr,"[%u] %s: stream=%p b=%p, exp(r,w)=%u,%u bf=%08x bl=%d analysers=%02x\n",
1252 		now_ms, __FUNCTION__,
1253 		s,
1254 		req,
1255 		req->rex, req->wex,
1256 		req->flags,
1257 		req->buf->i + req->buf->o,
1258 		req->analysers);
1259 
1260 	if (!(s->flags & SF_ASSIGNED)) {
1261 		list_for_each_entry(rule, &px->server_rules, list) {
1262 			int ret;
1263 
1264 			ret = acl_exec_cond(rule->cond, s->be, sess, s, SMP_OPT_DIR_REQ|SMP_OPT_FINAL);
1265 			ret = acl_pass(ret);
1266 			if (rule->cond->pol == ACL_COND_UNLESS)
1267 				ret = !ret;
1268 
1269 			if (ret) {
1270 				struct server *srv = rule->srv.ptr;
1271 
1272 				if ((srv->state != SRV_ST_STOPPED) ||
1273 				    (px->options & PR_O_PERSIST) ||
1274 				    (s->flags & SF_FORCE_PRST)) {
1275 					s->flags |= SF_DIRECT | SF_ASSIGNED;
1276 					s->target = &srv->obj_type;
1277 					break;
1278 				}
1279 				/* if the server is not UP, let's go on with next rules
1280 				 * just in case another one is suited.
1281 				 */
1282 			}
1283 		}
1284 	}
1285 
1286 	req->analysers &= ~an_bit;
1287 	req->analyse_exp = TICK_ETERNITY;
1288 	return 1;
1289 }
1290 
1291 /* This stream analyser works on a request. It applies all sticking rules on
1292  * it then returns 1. The data must already be present in the buffer otherwise
1293  * they won't match. It always returns 1.
1294  */
process_sticking_rules(struct stream * s,struct channel * req,int an_bit)1295 static int process_sticking_rules(struct stream *s, struct channel *req, int an_bit)
1296 {
1297 	struct proxy    *px   = s->be;
1298 	struct session *sess  = s->sess;
1299 	struct sticking_rule  *rule;
1300 
1301 	DPRINTF(stderr,"[%u] %s: stream=%p b=%p, exp(r,w)=%u,%u bf=%08x bh=%d analysers=%02x\n",
1302 		now_ms, __FUNCTION__,
1303 		s,
1304 		req,
1305 		req->rex, req->wex,
1306 		req->flags,
1307 		req->buf->i,
1308 		req->analysers);
1309 
1310 	list_for_each_entry(rule, &px->sticking_rules, list) {
1311 		int ret = 1 ;
1312 		int i;
1313 
1314 		/* Only the first stick store-request of each table is applied
1315 		 * and other ones are ignored. The purpose is to allow complex
1316 		 * configurations which look for multiple entries by decreasing
1317 		 * order of precision and to stop at the first which matches.
1318 		 * An example could be a store of the IP address from an HTTP
1319 		 * header first, then from the source if not found.
1320 		 */
1321 		if (rule->flags & STK_IS_STORE) {
1322 			for (i = 0; i < s->store_count; i++) {
1323 				if (rule->table.t == s->store[i].table)
1324 					break;
1325 			}
1326 
1327 			if (i !=  s->store_count)
1328 				continue;
1329 		}
1330 
1331 		if (rule->cond) {
1332 	                ret = acl_exec_cond(rule->cond, px, sess, s, SMP_OPT_DIR_REQ|SMP_OPT_FINAL);
1333 			ret = acl_pass(ret);
1334 			if (rule->cond->pol == ACL_COND_UNLESS)
1335 				ret = !ret;
1336 		}
1337 
1338 		if (ret) {
1339 			struct stktable_key *key;
1340 
1341 			key = stktable_fetch_key(rule->table.t, px, sess, s, SMP_OPT_DIR_REQ|SMP_OPT_FINAL, rule->expr, NULL);
1342 			if (!key)
1343 				continue;
1344 
1345 			if (rule->flags & STK_IS_MATCH) {
1346 				struct stksess *ts;
1347 
1348 				if ((ts = stktable_lookup_key(rule->table.t, key)) != NULL) {
1349 					if (!(s->flags & SF_ASSIGNED)) {
1350 						struct eb32_node *node;
1351 						void *ptr;
1352 
1353 						/* srv found in table */
1354 						ptr = stktable_data_ptr(rule->table.t, ts, STKTABLE_DT_SERVER_ID);
1355 						node = eb32_lookup(&px->conf.used_server_id, stktable_data_cast(ptr, server_id));
1356 						if (node) {
1357 							struct server *srv;
1358 
1359 							srv = container_of(node, struct server, conf.id);
1360 							if ((srv->state != SRV_ST_STOPPED) ||
1361 							    (px->options & PR_O_PERSIST) ||
1362 							    (s->flags & SF_FORCE_PRST)) {
1363 								s->flags |= SF_DIRECT | SF_ASSIGNED;
1364 								s->target = &srv->obj_type;
1365 							}
1366 						}
1367 					}
1368 					stktable_touch(rule->table.t, ts, 1);
1369 				}
1370 			}
1371 			if (rule->flags & STK_IS_STORE) {
1372 				if (s->store_count < (sizeof(s->store) / sizeof(s->store[0]))) {
1373 					struct stksess *ts;
1374 
1375 					ts = stksess_new(rule->table.t, key);
1376 					if (ts) {
1377 						s->store[s->store_count].table = rule->table.t;
1378 						s->store[s->store_count++].ts = ts;
1379 					}
1380 				}
1381 			}
1382 		}
1383 	}
1384 
1385 	req->analysers &= ~an_bit;
1386 	req->analyse_exp = TICK_ETERNITY;
1387 	return 1;
1388 }
1389 
1390 /* This stream analyser works on a response. It applies all store rules on it
1391  * then returns 1. The data must already be present in the buffer otherwise
1392  * they won't match. It always returns 1.
1393  */
process_store_rules(struct stream * s,struct channel * rep,int an_bit)1394 static int process_store_rules(struct stream *s, struct channel *rep, int an_bit)
1395 {
1396 	struct proxy    *px   = s->be;
1397 	struct session *sess  = s->sess;
1398 	struct sticking_rule  *rule;
1399 	int i;
1400 	int nbreq = s->store_count;
1401 
1402 	DPRINTF(stderr,"[%u] %s: stream=%p b=%p, exp(r,w)=%u,%u bf=%08x bh=%d analysers=%02x\n",
1403 		now_ms, __FUNCTION__,
1404 		s,
1405 		rep,
1406 		rep->rex, rep->wex,
1407 		rep->flags,
1408 		rep->buf->i,
1409 		rep->analysers);
1410 
1411 	list_for_each_entry(rule, &px->storersp_rules, list) {
1412 		int ret = 1 ;
1413 
1414 		/* Only the first stick store-response of each table is applied
1415 		 * and other ones are ignored. The purpose is to allow complex
1416 		 * configurations which look for multiple entries by decreasing
1417 		 * order of precision and to stop at the first which matches.
1418 		 * An example could be a store of a set-cookie value, with a
1419 		 * fallback to a parameter found in a 302 redirect.
1420 		 *
1421 		 * The store-response rules are not allowed to override the
1422 		 * store-request rules for the same table, but they may coexist.
1423 		 * Thus we can have up to one store-request entry and one store-
1424 		 * response entry for the same table at any time.
1425 		 */
1426 		for (i = nbreq; i < s->store_count; i++) {
1427 			if (rule->table.t == s->store[i].table)
1428 				break;
1429 		}
1430 
1431 		/* skip existing entries for this table */
1432 		if (i < s->store_count)
1433 			continue;
1434 
1435 		if (rule->cond) {
1436 	                ret = acl_exec_cond(rule->cond, px, sess, s, SMP_OPT_DIR_RES|SMP_OPT_FINAL);
1437 	                ret = acl_pass(ret);
1438 			if (rule->cond->pol == ACL_COND_UNLESS)
1439 				ret = !ret;
1440 		}
1441 
1442 		if (ret) {
1443 			struct stktable_key *key;
1444 
1445 			key = stktable_fetch_key(rule->table.t, px, sess, s, SMP_OPT_DIR_RES|SMP_OPT_FINAL, rule->expr, NULL);
1446 			if (!key)
1447 				continue;
1448 
1449 			if (s->store_count < (sizeof(s->store) / sizeof(s->store[0]))) {
1450 				struct stksess *ts;
1451 
1452 				ts = stksess_new(rule->table.t, key);
1453 				if (ts) {
1454 					s->store[s->store_count].table = rule->table.t;
1455 					s->store[s->store_count++].ts = ts;
1456 				}
1457 			}
1458 		}
1459 	}
1460 
1461 	/* process store request and store response */
1462 	for (i = 0; i < s->store_count; i++) {
1463 		struct stksess *ts;
1464 		void *ptr;
1465 
1466 		if (objt_server(s->target) && objt_server(s->target)->flags & SRV_F_NON_STICK) {
1467 			stksess_free(s->store[i].table, s->store[i].ts);
1468 			s->store[i].ts = NULL;
1469 			continue;
1470 		}
1471 
1472 		ts = stktable_lookup(s->store[i].table, s->store[i].ts);
1473 		if (ts) {
1474 			/* the entry already existed, we can free ours */
1475 			stktable_touch(s->store[i].table, ts, 1);
1476 			stksess_free(s->store[i].table, s->store[i].ts);
1477 		}
1478 		else
1479 			ts = stktable_store(s->store[i].table, s->store[i].ts, 1);
1480 
1481 		s->store[i].ts = NULL;
1482 		ptr = stktable_data_ptr(s->store[i].table, ts, STKTABLE_DT_SERVER_ID);
1483 		stktable_data_cast(ptr, server_id) = objt_server(s->target)->puid;
1484 	}
1485 	s->store_count = 0; /* everything is stored */
1486 
1487 	rep->analysers &= ~an_bit;
1488 	rep->analyse_exp = TICK_ETERNITY;
1489 	return 1;
1490 }
1491 
1492 /* This macro is very specific to the function below. See the comments in
1493  * process_stream() below to understand the logic and the tests.
1494  */
1495 #define UPDATE_ANALYSERS(real, list, back, flag) {			\
1496 		list = (((list) & ~(flag)) | ~(back)) & (real);		\
1497 		back = real;						\
1498 		if (!(list))						\
1499 			break;						\
1500 		if (((list) ^ ((list) & ((list) - 1))) < (flag))	\
1501 			continue;					\
1502 }
1503 
1504 /* These 2 following macros call an analayzer for the specified channel if the
1505  * right flag is set. The first one is used for "filterable" analyzers. If a
1506  * stream has some registered filters, pre and post analyaze callbacks are
1507  * called. The second are used for other analyzers (AN_REQ/RES_FLT_* and
1508  * AN_REQ/RES_HTTP_XFER_BODY) */
1509 #define FLT_ANALYZE(strm, chn, fun, list, back, flag, ...)			\
1510 	{									\
1511 		if ((list) & (flag)) {						\
1512 			if (HAS_FILTERS(strm)) {			        \
1513 				if (!flt_pre_analyze((strm), (chn), (flag)))    \
1514 					break;				        \
1515 				if (!fun((strm), (chn), (flag), ##__VA_ARGS__))	\
1516 					break;					\
1517 				if (!flt_post_analyze((strm), (chn), (flag)))	\
1518 					break;					\
1519 			}							\
1520 			else {							\
1521 				if (!fun((strm), (chn), (flag), ##__VA_ARGS__))	\
1522 					break;					\
1523 			}							\
1524 			UPDATE_ANALYSERS((chn)->analysers, (list),		\
1525 					 (back), (flag));			\
1526 		}								\
1527 	}
1528 
1529 #define ANALYZE(strm, chn, fun, list, back, flag, ...)			\
1530 	{								\
1531 		if ((list) & (flag)) {					\
1532 			if (!fun((strm), (chn), (flag), ##__VA_ARGS__))	\
1533 				break;					\
1534 			UPDATE_ANALYSERS((chn)->analysers, (list),	\
1535 					 (back), (flag));		\
1536 		}							\
1537 	}
1538 
1539 /* Processes the client, server, request and response jobs of a stream task,
1540  * then puts it back to the wait queue in a clean state, or cleans up its
1541  * resources if it must be deleted. Returns in <next> the date the task wants
1542  * to be woken up, or TICK_ETERNITY. In order not to call all functions for
1543  * nothing too many times, the request and response buffers flags are monitored
1544  * and each function is called only if at least another function has changed at
1545  * least one flag it is interested in.
1546  */
process_stream(struct task * t)1547 struct task *process_stream(struct task *t)
1548 {
1549 	struct server *srv;
1550 	struct stream *s = t->context;
1551 	struct session *sess = s->sess;
1552 	unsigned int rqf_last, rpf_last;
1553 	unsigned int rq_prod_last, rq_cons_last;
1554 	unsigned int rp_cons_last, rp_prod_last;
1555 	unsigned int req_ana_back;
1556 	struct channel *req, *res;
1557 	struct stream_interface *si_f, *si_b;
1558 
1559 	req = &s->req;
1560 	res = &s->res;
1561 
1562 	si_f = &s->si[0];
1563 	si_b = &s->si[1];
1564 
1565 	//DPRINTF(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__,
1566 	//        si_f->state, si_b->state, si_b->err_type, req->flags, res->flags);
1567 
1568 	/* this data may be no longer valid, clear it */
1569 	if (s->txn)
1570 		memset(&s->txn->auth, 0, sizeof(s->txn->auth));
1571 
1572 	/* This flag must explicitly be set every time */
1573 	req->flags &= ~(CF_READ_NOEXP|CF_WAKE_WRITE);
1574 	res->flags &= ~(CF_READ_NOEXP|CF_WAKE_WRITE);
1575 
1576 	/* Keep a copy of req/rep flags so that we can detect shutdowns */
1577 	rqf_last = req->flags & ~CF_MASK_ANALYSER;
1578 	rpf_last = res->flags & ~CF_MASK_ANALYSER;
1579 
1580 	/* we don't want the stream interface functions to recursively wake us up */
1581 	si_f->flags |= SI_FL_DONT_WAKE;
1582 	si_b->flags |= SI_FL_DONT_WAKE;
1583 
1584 	/* update pending events */
1585 	s->pending_events |= (t->state & TASK_WOKEN_ANY);
1586 
1587 	/* 1a: Check for low level timeouts if needed. We just set a flag on
1588 	 * stream interfaces when their timeouts have expired.
1589 	 */
1590 	if (unlikely(s->pending_events & TASK_WOKEN_TIMER)) {
1591 		stream_int_check_timeouts(si_f);
1592 		stream_int_check_timeouts(si_b);
1593 
1594 		/* check channel timeouts, and close the corresponding stream interfaces
1595 		 * for future reads or writes. Note: this will also concern upper layers
1596 		 * but we do not touch any other flag. We must be careful and correctly
1597 		 * detect state changes when calling them.
1598 		 */
1599 
1600 		channel_check_timeouts(req);
1601 
1602 		if (unlikely((req->flags & (CF_SHUTW|CF_WRITE_TIMEOUT)) == CF_WRITE_TIMEOUT)) {
1603 			si_b->flags |= SI_FL_NOLINGER;
1604 			si_shutw(si_b);
1605 		}
1606 
1607 		if (unlikely((req->flags & (CF_SHUTR|CF_READ_TIMEOUT)) == CF_READ_TIMEOUT)) {
1608 			if (si_f->flags & SI_FL_NOHALF)
1609 				si_f->flags |= SI_FL_NOLINGER;
1610 			si_shutr(si_f);
1611 		}
1612 
1613 		channel_check_timeouts(res);
1614 
1615 		if (unlikely((res->flags & (CF_SHUTW|CF_WRITE_TIMEOUT)) == CF_WRITE_TIMEOUT)) {
1616 			si_f->flags |= SI_FL_NOLINGER;
1617 			si_shutw(si_f);
1618 		}
1619 
1620 		if (unlikely((res->flags & (CF_SHUTR|CF_READ_TIMEOUT)) == CF_READ_TIMEOUT)) {
1621 			if (si_b->flags & SI_FL_NOHALF)
1622 				si_b->flags |= SI_FL_NOLINGER;
1623 			si_shutr(si_b);
1624 		}
1625 
1626 		if (HAS_FILTERS(s))
1627 			flt_stream_check_timeouts(s);
1628 
1629 		/* Once in a while we're woken up because the task expires. But
1630 		 * this does not necessarily mean that a timeout has been reached.
1631 		 * So let's not run a whole stream processing if only an expiration
1632 		 * timeout needs to be refreshed.
1633 		 */
1634 		if (!((req->flags | res->flags) &
1635 		      (CF_SHUTR|CF_READ_ACTIVITY|CF_READ_TIMEOUT|CF_SHUTW|
1636 		       CF_WRITE_ACTIVITY|CF_WRITE_EVENT|CF_WRITE_TIMEOUT|CF_ANA_TIMEOUT)) &&
1637 		    !((si_f->flags | si_b->flags) & (SI_FL_EXP|SI_FL_ERR)) &&
1638 		    ((s->pending_events & TASK_WOKEN_ANY) == TASK_WOKEN_TIMER)) {
1639 			si_f->flags &= ~SI_FL_DONT_WAKE;
1640 			si_b->flags &= ~SI_FL_DONT_WAKE;
1641 			goto update_exp_and_leave;
1642 		}
1643 	}
1644 
1645 	/* below we may emit error messages so we have to ensure that we have
1646 	 * our buffers properly allocated.
1647 	 */
1648 	if (!stream_alloc_work_buffer(s)) {
1649 		/* No buffer available, we've been subscribed to the list of
1650 		 * buffer waiters, let's wait for our turn.
1651 		 */
1652 		si_f->flags &= ~SI_FL_DONT_WAKE;
1653 		si_b->flags &= ~SI_FL_DONT_WAKE;
1654 		goto update_exp_and_leave;
1655 	}
1656 
1657 	/* 1b: check for low-level errors reported at the stream interface.
1658 	 * First we check if it's a retryable error (in which case we don't
1659 	 * want to tell the buffer). Otherwise we report the error one level
1660 	 * upper by setting flags into the buffers. Note that the side towards
1661 	 * the client cannot have connect (hence retryable) errors. Also, the
1662 	 * connection setup code must be able to deal with any type of abort.
1663 	 */
1664 	srv = objt_server(s->target);
1665 	if (unlikely(si_f->flags & SI_FL_ERR)) {
1666 		if (si_f->state == SI_ST_EST || si_f->state == SI_ST_DIS) {
1667 			si_shutr(si_f);
1668 			si_shutw(si_f);
1669 			stream_int_report_error(si_f);
1670 			if (!(req->analysers) && !(res->analysers)) {
1671 				s->be->be_counters.cli_aborts++;
1672 				sess->fe->fe_counters.cli_aborts++;
1673 				if (srv)
1674 					srv->counters.cli_aborts++;
1675 				if (!(s->flags & SF_ERR_MASK))
1676 					s->flags |= SF_ERR_CLICL;
1677 				if (!(s->flags & SF_FINST_MASK))
1678 					s->flags |= SF_FINST_D;
1679 			}
1680 		}
1681 	}
1682 
1683 	if (unlikely(si_b->flags & SI_FL_ERR)) {
1684 		if (si_b->state == SI_ST_EST || si_b->state == SI_ST_DIS) {
1685 			si_shutr(si_b);
1686 			si_shutw(si_b);
1687 			stream_int_report_error(si_b);
1688 			s->be->be_counters.failed_resp++;
1689 			if (srv)
1690 				srv->counters.failed_resp++;
1691 			if (!(req->analysers) && !(res->analysers)) {
1692 				s->be->be_counters.srv_aborts++;
1693 				sess->fe->fe_counters.srv_aborts++;
1694 				if (srv)
1695 					srv->counters.srv_aborts++;
1696 				if (!(s->flags & SF_ERR_MASK))
1697 					s->flags |= SF_ERR_SRVCL;
1698 				if (!(s->flags & SF_FINST_MASK))
1699 					s->flags |= SF_FINST_D;
1700 			}
1701 		}
1702 		/* note: maybe we should process connection errors here ? */
1703 	}
1704 
1705 	if (si_b->state == SI_ST_CON) {
1706 		/* we were trying to establish a connection on the server side,
1707 		 * maybe it succeeded, maybe it failed, maybe we timed out, ...
1708 		 */
1709 		if (unlikely(!sess_update_st_con_tcp(s)))
1710 			sess_update_st_cer(s);
1711 		else if (si_b->state == SI_ST_EST)
1712 			sess_establish(s);
1713 
1714 		/* state is now one of SI_ST_CON (still in progress), SI_ST_EST
1715 		 * (established), SI_ST_DIS (abort), SI_ST_CLO (last error),
1716 		 * SI_ST_ASS/SI_ST_TAR/SI_ST_REQ for retryable errors.
1717 		 */
1718 	}
1719 
1720 	rq_prod_last = si_f->state;
1721 	rq_cons_last = si_b->state;
1722 	rp_cons_last = si_f->state;
1723 	rp_prod_last = si_b->state;
1724 
1725  resync_stream_interface:
1726 	/* Check for connection closure */
1727 
1728 	DPRINTF(stderr,
1729 		"[%u] %s:%d: task=%p s=%p, sfl=0x%08x, rq=%p, rp=%p, exp(r,w)=%u,%u rqf=%08x rpf=%08x rqh=%d rqt=%d rph=%d rpt=%d cs=%d ss=%d, cet=0x%x set=0x%x retr=%d\n",
1730 		now_ms, __FUNCTION__, __LINE__,
1731 		t,
1732 		s, s->flags,
1733 		req, res,
1734 		req->rex, res->wex,
1735 		req->flags, res->flags,
1736 		req->buf->i, req->buf->o, res->buf->i, res->buf->o, si_f->state, si_b->state,
1737 		si_f->err_type, si_b->err_type,
1738 		si_b->conn_retries);
1739 
1740 	/* nothing special to be done on client side */
1741 	if (unlikely(si_f->state == SI_ST_DIS))
1742 		si_f->state = SI_ST_CLO;
1743 
1744 	/* When a server-side connection is released, we have to count it and
1745 	 * check for pending connections on this server.
1746 	 */
1747 	if (unlikely(si_b->state == SI_ST_DIS)) {
1748 		si_b->state = SI_ST_CLO;
1749 		srv = objt_server(s->target);
1750 		if (srv) {
1751 			if (s->flags & SF_CURR_SESS) {
1752 				s->flags &= ~SF_CURR_SESS;
1753 				srv->cur_sess--;
1754 			}
1755 			sess_change_server(s, NULL);
1756 			if (may_dequeue_tasks(srv, s->be))
1757 				process_srv_queue(srv);
1758 		}
1759 	}
1760 
1761 	/*
1762 	 * Note: of the transient states (REQ, CER, DIS), only REQ may remain
1763 	 * at this point.
1764 	 */
1765 
1766  resync_request:
1767 	/* Analyse request */
1768 	if (((req->flags & ~rqf_last) & CF_MASK_ANALYSER) ||
1769 	    ((req->flags ^ rqf_last) & CF_MASK_STATIC) ||
1770 	    (req->analysers && (req->flags & CF_SHUTW)) ||
1771 	    si_f->state != rq_prod_last ||
1772 	    si_b->state != rq_cons_last ||
1773 	    s->pending_events & TASK_WOKEN_MSG) {
1774 		unsigned int flags = req->flags;
1775 
1776 		if (si_f->state >= SI_ST_EST) {
1777 			int max_loops = global.tune.maxpollevents;
1778 			unsigned int ana_list;
1779 			unsigned int ana_back;
1780 
1781 			/* it's up to the analysers to stop new connections,
1782 			 * disable reading or closing. Note: if an analyser
1783 			 * disables any of these bits, it is responsible for
1784 			 * enabling them again when it disables itself, so
1785 			 * that other analysers are called in similar conditions.
1786 			 */
1787 			channel_auto_read(req);
1788 			channel_auto_connect(req);
1789 			channel_auto_close(req);
1790 
1791 			/* We will call all analysers for which a bit is set in
1792 			 * req->analysers, following the bit order from LSB
1793 			 * to MSB. The analysers must remove themselves from
1794 			 * the list when not needed. Any analyser may return 0
1795 			 * to break out of the loop, either because of missing
1796 			 * data to take a decision, or because it decides to
1797 			 * kill the stream. We loop at least once through each
1798 			 * analyser, and we may loop again if other analysers
1799 			 * are added in the middle.
1800 			 *
1801 			 * We build a list of analysers to run. We evaluate all
1802 			 * of these analysers in the order of the lower bit to
1803 			 * the higher bit. This ordering is very important.
1804 			 * An analyser will often add/remove other analysers,
1805 			 * including itself. Any changes to itself have no effect
1806 			 * on the loop. If it removes any other analysers, we
1807 			 * want those analysers not to be called anymore during
1808 			 * this loop. If it adds an analyser that is located
1809 			 * after itself, we want it to be scheduled for being
1810 			 * processed during the loop. If it adds an analyser
1811 			 * which is located before it, we want it to switch to
1812 			 * it immediately, even if it has already been called
1813 			 * once but removed since.
1814 			 *
1815 			 * In order to achieve this, we compare the analyser
1816 			 * list after the call with a copy of it before the
1817 			 * call. The work list is fed with analyser bits that
1818 			 * appeared during the call. Then we compare previous
1819 			 * work list with the new one, and check the bits that
1820 			 * appeared. If the lowest of these bits is lower than
1821 			 * the current bit, it means we have enabled a previous
1822 			 * analyser and must immediately loop again.
1823 			 */
1824 
1825 			ana_list = ana_back = req->analysers;
1826 			while (ana_list && max_loops--) {
1827 				/* Warning! ensure that analysers are always placed in ascending order! */
1828 				ANALYZE    (s, req, flt_start_analyze,          ana_list, ana_back, AN_REQ_FLT_START_FE);
1829 				FLT_ANALYZE(s, req, tcp_inspect_request,        ana_list, ana_back, AN_REQ_INSPECT_FE);
1830 				FLT_ANALYZE(s, req, http_wait_for_request,      ana_list, ana_back, AN_REQ_WAIT_HTTP);
1831 				FLT_ANALYZE(s, req, http_wait_for_request_body, ana_list, ana_back, AN_REQ_HTTP_BODY);
1832 				FLT_ANALYZE(s, req, http_process_req_common,    ana_list, ana_back, AN_REQ_HTTP_PROCESS_FE, sess->fe);
1833 				FLT_ANALYZE(s, req, process_switching_rules,    ana_list, ana_back, AN_REQ_SWITCHING_RULES);
1834 				ANALYZE    (s, req, flt_start_analyze,          ana_list, ana_back, AN_REQ_FLT_START_BE);
1835 				FLT_ANALYZE(s, req, tcp_inspect_request,        ana_list, ana_back, AN_REQ_INSPECT_BE);
1836 				FLT_ANALYZE(s, req, http_process_req_common,    ana_list, ana_back, AN_REQ_HTTP_PROCESS_BE, s->be);
1837 				FLT_ANALYZE(s, req, http_process_tarpit,        ana_list, ana_back, AN_REQ_HTTP_TARPIT);
1838 				FLT_ANALYZE(s, req, process_server_rules,       ana_list, ana_back, AN_REQ_SRV_RULES);
1839 				FLT_ANALYZE(s, req, http_process_request,       ana_list, ana_back, AN_REQ_HTTP_INNER);
1840 				FLT_ANALYZE(s, req, tcp_persist_rdp_cookie,     ana_list, ana_back, AN_REQ_PRST_RDP_COOKIE);
1841 				FLT_ANALYZE(s, req, process_sticking_rules,     ana_list, ana_back, AN_REQ_STICKING_RULES);
1842 				ANALYZE    (s, req, flt_analyze_http_headers,   ana_list, ana_back, AN_REQ_FLT_HTTP_HDRS);
1843 				ANALYZE    (s, req, http_request_forward_body,  ana_list, ana_back, AN_REQ_HTTP_XFER_BODY);
1844 				ANALYZE    (s, req, flt_xfer_data,              ana_list, ana_back, AN_REQ_FLT_XFER_DATA);
1845 				ANALYZE    (s, req, flt_end_analyze,            ana_list, ana_back, AN_REQ_FLT_END);
1846 				break;
1847 			}
1848 		}
1849 
1850 		rq_prod_last = si_f->state;
1851 		rq_cons_last = si_b->state;
1852 		req->flags &= ~CF_WAKE_ONCE;
1853 		rqf_last = req->flags;
1854 
1855 		if ((req->flags ^ flags) & CF_MASK_STATIC)
1856 			goto resync_request;
1857 	}
1858 
1859 	/* we'll monitor the request analysers while parsing the response,
1860 	 * because some response analysers may indirectly enable new request
1861 	 * analysers (eg: HTTP keep-alive).
1862 	 */
1863 	req_ana_back = req->analysers;
1864 
1865  resync_response:
1866 	/* Analyse response */
1867 
1868 	if (((res->flags & ~rpf_last) & CF_MASK_ANALYSER) ||
1869 		 (res->flags ^ rpf_last) & CF_MASK_STATIC ||
1870 		 (res->analysers && (res->flags & CF_SHUTW)) ||
1871 		 si_f->state != rp_cons_last ||
1872 		 si_b->state != rp_prod_last ||
1873 		 s->pending_events & TASK_WOKEN_MSG) {
1874 		unsigned int flags = res->flags;
1875 
1876 		if (si_b->state >= SI_ST_EST) {
1877 			int max_loops = global.tune.maxpollevents;
1878 			unsigned int ana_list;
1879 			unsigned int ana_back;
1880 
1881 			/* it's up to the analysers to stop disable reading or
1882 			 * closing. Note: if an analyser disables any of these
1883 			 * bits, it is responsible for enabling them again when
1884 			 * it disables itself, so that other analysers are called
1885 			 * in similar conditions.
1886 			 */
1887 			channel_auto_read(res);
1888 			channel_auto_close(res);
1889 
1890 			/* We will call all analysers for which a bit is set in
1891 			 * res->analysers, following the bit order from LSB
1892 			 * to MSB. The analysers must remove themselves from
1893 			 * the list when not needed. Any analyser may return 0
1894 			 * to break out of the loop, either because of missing
1895 			 * data to take a decision, or because it decides to
1896 			 * kill the stream. We loop at least once through each
1897 			 * analyser, and we may loop again if other analysers
1898 			 * are added in the middle.
1899 			 */
1900 
1901 			ana_list = ana_back = res->analysers;
1902 			while (ana_list && max_loops--) {
1903 				/* Warning! ensure that analysers are always placed in ascending order! */
1904 				ANALYZE    (s, res, flt_start_analyze,          ana_list, ana_back, AN_RES_FLT_START_FE);
1905 				ANALYZE    (s, res, flt_start_analyze,          ana_list, ana_back, AN_RES_FLT_START_BE);
1906 				FLT_ANALYZE(s, res, tcp_inspect_response,       ana_list, ana_back, AN_RES_INSPECT);
1907 				FLT_ANALYZE(s, res, http_wait_for_response,     ana_list, ana_back, AN_RES_WAIT_HTTP);
1908 				FLT_ANALYZE(s, res, process_store_rules,        ana_list, ana_back, AN_RES_STORE_RULES);
1909 				FLT_ANALYZE(s, res, http_process_res_common,    ana_list, ana_back, AN_RES_HTTP_PROCESS_BE, s->be);
1910 				ANALYZE    (s, res, flt_analyze_http_headers,   ana_list, ana_back, AN_RES_FLT_HTTP_HDRS);
1911 				ANALYZE    (s, res, http_response_forward_body, ana_list, ana_back, AN_RES_HTTP_XFER_BODY);
1912 				ANALYZE    (s, res, flt_xfer_data,              ana_list, ana_back, AN_RES_FLT_XFER_DATA);
1913 				ANALYZE    (s, res, flt_end_analyze,            ana_list, ana_back, AN_RES_FLT_END);
1914 				break;
1915 			}
1916 		}
1917 
1918 		rp_cons_last = si_f->state;
1919 		rp_prod_last = si_b->state;
1920 		res->flags &= ~CF_WAKE_ONCE;
1921 		rpf_last = res->flags;
1922 
1923 		if ((res->flags ^ flags) & CF_MASK_STATIC)
1924 			goto resync_response;
1925 	}
1926 
1927 	/* maybe someone has added some request analysers, so we must check and loop */
1928 	if (req->analysers & ~req_ana_back)
1929 		goto resync_request;
1930 
1931 	if ((req->flags & ~rqf_last) & CF_MASK_ANALYSER)
1932 		goto resync_request;
1933 
1934 	/* FIXME: here we should call protocol handlers which rely on
1935 	 * both buffers.
1936 	 */
1937 
1938 
1939 	/*
1940 	 * Now we propagate unhandled errors to the stream. Normally
1941 	 * we're just in a data phase here since it means we have not
1942 	 * seen any analyser who could set an error status.
1943 	 */
1944 	srv = objt_server(s->target);
1945 	if (unlikely(!(s->flags & SF_ERR_MASK))) {
1946 		if (req->flags & (CF_READ_ERROR|CF_READ_TIMEOUT|CF_WRITE_ERROR|CF_WRITE_TIMEOUT)) {
1947 			/* Report it if the client got an error or a read timeout expired */
1948 			req->analysers = 0;
1949 			if (req->flags & CF_READ_ERROR) {
1950 				s->be->be_counters.cli_aborts++;
1951 				sess->fe->fe_counters.cli_aborts++;
1952 				if (srv)
1953 					srv->counters.cli_aborts++;
1954 				s->flags |= SF_ERR_CLICL;
1955 			}
1956 			else if (req->flags & CF_READ_TIMEOUT) {
1957 				s->be->be_counters.cli_aborts++;
1958 				sess->fe->fe_counters.cli_aborts++;
1959 				if (srv)
1960 					srv->counters.cli_aborts++;
1961 				s->flags |= SF_ERR_CLITO;
1962 			}
1963 			else if (req->flags & CF_WRITE_ERROR) {
1964 				s->be->be_counters.srv_aborts++;
1965 				sess->fe->fe_counters.srv_aborts++;
1966 				if (srv)
1967 					srv->counters.srv_aborts++;
1968 				s->flags |= SF_ERR_SRVCL;
1969 			}
1970 			else {
1971 				s->be->be_counters.srv_aborts++;
1972 				sess->fe->fe_counters.srv_aborts++;
1973 				if (srv)
1974 					srv->counters.srv_aborts++;
1975 				s->flags |= SF_ERR_SRVTO;
1976 			}
1977 			sess_set_term_flags(s);
1978 		}
1979 		else if (res->flags & (CF_READ_ERROR|CF_READ_TIMEOUT|CF_WRITE_ERROR|CF_WRITE_TIMEOUT)) {
1980 			/* Report it if the server got an error or a read timeout expired */
1981 			res->analysers = 0;
1982 			if (res->flags & CF_READ_ERROR) {
1983 				s->be->be_counters.srv_aborts++;
1984 				sess->fe->fe_counters.srv_aborts++;
1985 				if (srv)
1986 					srv->counters.srv_aborts++;
1987 				s->flags |= SF_ERR_SRVCL;
1988 			}
1989 			else if (res->flags & CF_READ_TIMEOUT) {
1990 				s->be->be_counters.srv_aborts++;
1991 				sess->fe->fe_counters.srv_aborts++;
1992 				if (srv)
1993 					srv->counters.srv_aborts++;
1994 				s->flags |= SF_ERR_SRVTO;
1995 			}
1996 			else if (res->flags & CF_WRITE_ERROR) {
1997 				s->be->be_counters.cli_aborts++;
1998 				sess->fe->fe_counters.cli_aborts++;
1999 				if (srv)
2000 					srv->counters.cli_aborts++;
2001 				s->flags |= SF_ERR_CLICL;
2002 			}
2003 			else {
2004 				s->be->be_counters.cli_aborts++;
2005 				sess->fe->fe_counters.cli_aborts++;
2006 				if (srv)
2007 					srv->counters.cli_aborts++;
2008 				s->flags |= SF_ERR_CLITO;
2009 			}
2010 			sess_set_term_flags(s);
2011 		}
2012 	}
2013 
2014 	/*
2015 	 * Here we take care of forwarding unhandled data. This also includes
2016 	 * connection establishments and shutdown requests.
2017 	 */
2018 
2019 
2020 	/* If noone is interested in analysing data, it's time to forward
2021 	 * everything. We configure the buffer to forward indefinitely.
2022 	 * Note that we're checking CF_SHUTR_NOW as an indication of a possible
2023 	 * recent call to channel_abort().
2024 	 */
2025 	if (unlikely((!req->analysers || (req->analysers == AN_REQ_FLT_END && !(req->flags & CF_FLT_ANALYZE))) &&
2026 	    !(req->flags & (CF_SHUTW|CF_SHUTR_NOW)) &&
2027 	    (si_f->state >= SI_ST_EST) &&
2028 	    (req->to_forward != CHN_INFINITE_FORWARD))) {
2029 		/* This buffer is freewheeling, there's no analyser
2030 		 * attached to it. If any data are left in, we'll permit them to
2031 		 * move.
2032 		 */
2033 		channel_auto_read(req);
2034 		channel_auto_connect(req);
2035 		channel_auto_close(req);
2036 		buffer_flush(req->buf);
2037 
2038 		/* We'll let data flow between the producer (if still connected)
2039 		 * to the consumer (which might possibly not be connected yet).
2040 		 */
2041 		if (!(req->flags & (CF_SHUTR|CF_SHUTW_NOW)))
2042 			channel_forward_forever(req);
2043 
2044 		/* Just in order to support fetching HTTP contents after start
2045 		 * of forwarding when the HTTP forwarding analyser is not used,
2046 		 * we simply reset msg->sov so that HTTP rewinding points to the
2047 		 * headers.
2048 		 */
2049 		if (s->txn)
2050 			s->txn->req.sov = s->txn->req.eoh + s->txn->req.eol - req->buf->o;
2051 	}
2052 
2053 	/* check if it is wise to enable kernel splicing to forward request data */
2054 	if (!(req->flags & (CF_KERN_SPLICING|CF_SHUTR)) &&
2055 	    req->to_forward &&
2056 	    (global.tune.options & GTUNE_USE_SPLICE) &&
2057 	    (objt_conn(si_f->end) && __objt_conn(si_f->end)->xprt && __objt_conn(si_f->end)->xprt->rcv_pipe) &&
2058 	    (objt_conn(si_b->end) && __objt_conn(si_b->end)->xprt && __objt_conn(si_b->end)->xprt->snd_pipe) &&
2059 	    (pipes_used < global.maxpipes) &&
2060 	    (((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_REQ) ||
2061 	     (((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_AUT) &&
2062 	      (req->flags & CF_STREAMER_FAST)))) {
2063 		req->flags |= CF_KERN_SPLICING;
2064 	}
2065 
2066 	/* reflect what the L7 analysers have seen last */
2067 	rqf_last = req->flags;
2068 
2069 	/*
2070 	 * Now forward all shutdown requests between both sides of the buffer
2071 	 */
2072 
2073 	/* first, let's check if the request buffer needs to shutdown(write), which may
2074 	 * happen either because the input is closed or because we want to force a close
2075 	 * once the server has begun to respond. If a half-closed timeout is set, we adjust
2076 	 * the other side's timeout as well.
2077 	 */
2078 	if (unlikely((req->flags & (CF_SHUTW|CF_SHUTW_NOW|CF_AUTO_CLOSE|CF_SHUTR)) ==
2079 		     (CF_AUTO_CLOSE|CF_SHUTR))) {
2080 		channel_shutw_now(req);
2081 	}
2082 
2083 	/* shutdown(write) pending */
2084 	if (unlikely((req->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW &&
2085 		     channel_is_empty(req))) {
2086 		if (req->flags & CF_READ_ERROR)
2087 			si_b->flags |= SI_FL_NOLINGER;
2088 		si_shutw(si_b);
2089 	}
2090 
2091 	/* shutdown(write) done on server side, we must stop the client too */
2092 	if (unlikely((req->flags & (CF_SHUTW|CF_SHUTR|CF_SHUTR_NOW)) == CF_SHUTW &&
2093 		     !req->analysers))
2094 		channel_shutr_now(req);
2095 
2096 	/* shutdown(read) pending */
2097 	if (unlikely((req->flags & (CF_SHUTR|CF_SHUTR_NOW)) == CF_SHUTR_NOW)) {
2098 		if (si_f->flags & SI_FL_NOHALF)
2099 			si_f->flags |= SI_FL_NOLINGER;
2100 		si_shutr(si_f);
2101 	}
2102 
2103 	/* it's possible that an upper layer has requested a connection setup or abort.
2104 	 * There are 2 situations where we decide to establish a new connection :
2105 	 *  - there are data scheduled for emission in the buffer
2106 	 *  - the CF_AUTO_CONNECT flag is set (active connection)
2107 	 */
2108 	if (si_b->state == SI_ST_INI) {
2109 		if (!(req->flags & CF_SHUTW)) {
2110 			if ((req->flags & CF_AUTO_CONNECT) || !channel_is_empty(req)) {
2111 				/* If we have an appctx, there is no connect method, so we
2112 				 * immediately switch to the connected state, otherwise we
2113 				 * perform a connection request.
2114 				 */
2115 				si_b->state = SI_ST_REQ; /* new connection requested */
2116 				si_b->conn_retries = s->be->conn_retries;
2117 			}
2118 		}
2119 		else {
2120 			si_b->state = SI_ST_CLO; /* shutw+ini = abort */
2121 			channel_shutw_now(req);        /* fix buffer flags upon abort */
2122 			channel_shutr_now(res);
2123 		}
2124 	}
2125 
2126 
2127 	/* we may have a pending connection request, or a connection waiting
2128 	 * for completion.
2129 	 */
2130 	if (si_b->state >= SI_ST_REQ && si_b->state < SI_ST_CON) {
2131 
2132 		/* prune the request variables and swap to the response variables. */
2133 		if (s->vars_reqres.scope != SCOPE_RES) {
2134 			vars_prune(&s->vars_reqres, s->sess, s);
2135 			vars_init(&s->vars_reqres, SCOPE_RES);
2136 		}
2137 
2138 		do {
2139 			/* nb: step 1 might switch from QUE to ASS, but we first want
2140 			 * to give a chance to step 2 to perform a redirect if needed.
2141 			 */
2142 			if (si_b->state != SI_ST_REQ)
2143 				sess_update_stream_int(s);
2144 			if (si_b->state == SI_ST_REQ)
2145 				sess_prepare_conn_req(s);
2146 
2147 			/* applets directly go to the ESTABLISHED state. Similarly,
2148 			 * servers experience the same fate when their connection
2149 			 * is reused.
2150 			 */
2151 			if (unlikely(si_b->state == SI_ST_EST))
2152 				sess_establish(s);
2153 
2154 			/* Now we can add the server name to a header (if requested) */
2155 			/* check for HTTP mode and proxy server_name_hdr_name != NULL */
2156 			if ((si_b->state >= SI_ST_CON) && (si_b->state < SI_ST_CLO) &&
2157 			    (s->be->server_id_hdr_name != NULL) &&
2158 			    (s->be->mode == PR_MODE_HTTP) &&
2159 			    objt_server(s->target)) {
2160 				http_send_name_header(s->txn, s->be, objt_server(s->target)->id);
2161 			}
2162 
2163 			srv = objt_server(s->target);
2164 			if (si_b->state == SI_ST_ASS && srv && srv->rdr_len && (s->flags & SF_REDIRECTABLE))
2165 				http_perform_server_redirect(s, si_b);
2166 		} while (si_b->state == SI_ST_ASS);
2167 	}
2168 
2169 	/* Benchmarks have shown that it's optimal to do a full resync now */
2170 	if (si_f->state == SI_ST_DIS || si_b->state == SI_ST_DIS)
2171 		goto resync_stream_interface;
2172 
2173 	/* otherwise we want to check if we need to resync the req buffer or not */
2174 	if ((req->flags ^ rqf_last) & CF_MASK_STATIC)
2175 		goto resync_request;
2176 
2177 	/* perform output updates to the response buffer */
2178 
2179 	/* If noone is interested in analysing data, it's time to forward
2180 	 * everything. We configure the buffer to forward indefinitely.
2181 	 * Note that we're checking CF_SHUTR_NOW as an indication of a possible
2182 	 * recent call to channel_abort().
2183 	 */
2184 	if (unlikely((!res->analysers || (res->analysers == AN_RES_FLT_END && !(res->flags & CF_FLT_ANALYZE))) &&
2185 	    !(res->flags & (CF_SHUTW|CF_SHUTR_NOW)) &&
2186 	    (si_b->state >= SI_ST_EST) &&
2187 	    (res->to_forward != CHN_INFINITE_FORWARD))) {
2188 		/* This buffer is freewheeling, there's no analyser
2189 		 * attached to it. If any data are left in, we'll permit them to
2190 		 * move.
2191 		 */
2192 		channel_auto_read(res);
2193 		channel_auto_close(res);
2194 		buffer_flush(res->buf);
2195 
2196 		/* We'll let data flow between the producer (if still connected)
2197 		 * to the consumer.
2198 		 */
2199 		if (!(res->flags & (CF_SHUTR|CF_SHUTW_NOW)))
2200 			channel_forward_forever(res);
2201 
2202 		/* Just in order to support fetching HTTP contents after start
2203 		 * of forwarding when the HTTP forwarding analyser is not used,
2204 		 * we simply reset msg->sov so that HTTP rewinding points to the
2205 		 * headers.
2206 		 */
2207 		if (s->txn)
2208 			s->txn->rsp.sov = s->txn->rsp.eoh + s->txn->rsp.eol - res->buf->o;
2209 
2210 		/* if we have no analyser anymore in any direction and have a
2211 		 * tunnel timeout set, use it now. Note that we must respect
2212 		 * the half-closed timeouts as well.
2213 		 */
2214 		if (!req->analysers && s->be->timeout.tunnel) {
2215 			req->rto = req->wto = res->rto = res->wto =
2216 				s->be->timeout.tunnel;
2217 
2218 			if ((req->flags & CF_SHUTR) && tick_isset(sess->fe->timeout.clientfin))
2219 				res->wto = sess->fe->timeout.clientfin;
2220 			if ((req->flags & CF_SHUTW) && tick_isset(s->be->timeout.serverfin))
2221 				res->rto = s->be->timeout.serverfin;
2222 			if ((res->flags & CF_SHUTR) && tick_isset(s->be->timeout.serverfin))
2223 				req->wto = s->be->timeout.serverfin;
2224 			if ((res->flags & CF_SHUTW) && tick_isset(sess->fe->timeout.clientfin))
2225 				req->rto = sess->fe->timeout.clientfin;
2226 
2227 			req->rex = tick_add(now_ms, req->rto);
2228 			req->wex = tick_add(now_ms, req->wto);
2229 			res->rex = tick_add(now_ms, res->rto);
2230 			res->wex = tick_add(now_ms, res->wto);
2231 		}
2232 	}
2233 
2234 	/* check if it is wise to enable kernel splicing to forward response data */
2235 	if (!(res->flags & (CF_KERN_SPLICING|CF_SHUTR)) &&
2236 	    res->to_forward &&
2237 	    (global.tune.options & GTUNE_USE_SPLICE) &&
2238 	    (objt_conn(si_f->end) && __objt_conn(si_f->end)->xprt && __objt_conn(si_f->end)->xprt->snd_pipe) &&
2239 	    (objt_conn(si_b->end) && __objt_conn(si_b->end)->xprt && __objt_conn(si_b->end)->xprt->rcv_pipe) &&
2240 	    (pipes_used < global.maxpipes) &&
2241 	    (((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_RTR) ||
2242 	     (((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_AUT) &&
2243 	      (res->flags & CF_STREAMER_FAST)))) {
2244 		res->flags |= CF_KERN_SPLICING;
2245 	}
2246 
2247 	/* reflect what the L7 analysers have seen last */
2248 	rpf_last = res->flags;
2249 
2250 	/*
2251 	 * Now forward all shutdown requests between both sides of the buffer
2252 	 */
2253 
2254 	/*
2255 	 * FIXME: this is probably where we should produce error responses.
2256 	 */
2257 
2258 	/* first, let's check if the response buffer needs to shutdown(write) */
2259 	if (unlikely((res->flags & (CF_SHUTW|CF_SHUTW_NOW|CF_AUTO_CLOSE|CF_SHUTR)) ==
2260 		     (CF_AUTO_CLOSE|CF_SHUTR))) {
2261 		channel_shutw_now(res);
2262 	}
2263 
2264 	/* shutdown(write) pending */
2265 	if (unlikely((res->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW &&
2266 		     channel_is_empty(res))) {
2267 		si_shutw(si_f);
2268 	}
2269 
2270 	/* shutdown(write) done on the client side, we must stop the server too */
2271 	if (unlikely((res->flags & (CF_SHUTW|CF_SHUTR|CF_SHUTR_NOW)) == CF_SHUTW) &&
2272 	    !res->analysers)
2273 		channel_shutr_now(res);
2274 
2275 	/* shutdown(read) pending */
2276 	if (unlikely((res->flags & (CF_SHUTR|CF_SHUTR_NOW)) == CF_SHUTR_NOW)) {
2277 		if (si_b->flags & SI_FL_NOHALF)
2278 			si_b->flags |= SI_FL_NOLINGER;
2279 		si_shutr(si_b);
2280 	}
2281 
2282 	if (si_f->state == SI_ST_DIS || si_b->state == SI_ST_DIS)
2283 		goto resync_stream_interface;
2284 
2285 	if (req->flags != rqf_last)
2286 		goto resync_request;
2287 
2288 	if ((res->flags ^ rpf_last) & CF_MASK_STATIC)
2289 		goto resync_response;
2290 
2291 	/* we're interested in getting wakeups again */
2292 	si_f->flags &= ~SI_FL_DONT_WAKE;
2293 	si_b->flags &= ~SI_FL_DONT_WAKE;
2294 
2295 	/* This is needed only when debugging is enabled, to indicate
2296 	 * client-side or server-side close. Please note that in the unlikely
2297 	 * event where both sides would close at once, the sequence is reported
2298 	 * on the server side first.
2299 	 */
2300 	if (unlikely((global.mode & MODE_DEBUG) &&
2301 		     (!(global.mode & MODE_QUIET) ||
2302 		      (global.mode & MODE_VERBOSE)))) {
2303 		if (si_b->state == SI_ST_CLO &&
2304 		    si_b->prev_state == SI_ST_EST) {
2305 			chunk_printf(&trash, "%08x:%s.srvcls[%04x:%04x]\n",
2306 				      s->uniq_id, s->be->id,
2307 			              objt_conn(si_f->end) ? (unsigned short)objt_conn(si_f->end)->t.sock.fd : -1,
2308 			              objt_conn(si_b->end) ? (unsigned short)objt_conn(si_b->end)->t.sock.fd : -1);
2309 			shut_your_big_mouth_gcc(write(1, trash.str, trash.len));
2310 		}
2311 
2312 		if (si_f->state == SI_ST_CLO &&
2313 		    si_f->prev_state == SI_ST_EST) {
2314 			chunk_printf(&trash, "%08x:%s.clicls[%04x:%04x]\n",
2315 				      s->uniq_id, s->be->id,
2316 			              objt_conn(si_f->end) ? (unsigned short)objt_conn(si_f->end)->t.sock.fd : -1,
2317 			              objt_conn(si_b->end) ? (unsigned short)objt_conn(si_b->end)->t.sock.fd : -1);
2318 			shut_your_big_mouth_gcc(write(1, trash.str, trash.len));
2319 		}
2320 	}
2321 
2322 	if (likely((si_f->state != SI_ST_CLO) ||
2323 		   (si_b->state > SI_ST_INI && si_b->state < SI_ST_CLO))) {
2324 
2325 		if ((sess->fe->options & PR_O_CONTSTATS) && (s->flags & SF_BE_ASSIGNED))
2326 			stream_process_counters(s);
2327 
2328 		if (si_f->state == SI_ST_EST)
2329 			si_update(si_f);
2330 
2331 		if (si_b->state == SI_ST_EST)
2332 			si_update(si_b);
2333 
2334 		req->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_WRITE_NULL|CF_WRITE_PARTIAL|CF_READ_ATTACHED|CF_WRITE_EVENT);
2335 		res->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_WRITE_NULL|CF_WRITE_PARTIAL|CF_READ_ATTACHED|CF_WRITE_EVENT);
2336 		si_f->prev_state = si_f->state;
2337 		si_b->prev_state = si_b->state;
2338 		si_f->flags &= ~(SI_FL_ERR|SI_FL_EXP);
2339 		si_b->flags &= ~(SI_FL_ERR|SI_FL_EXP);
2340 
2341 		/* Trick: if a request is being waiting for the server to respond,
2342 		 * and if we know the server can timeout, we don't want the timeout
2343 		 * to expire on the client side first, but we're still interested
2344 		 * in passing data from the client to the server (eg: POST). Thus,
2345 		 * we can cancel the client's request timeout if the server's
2346 		 * request timeout is set and the server has not yet sent a response.
2347 		 */
2348 
2349 		if ((res->flags & (CF_AUTO_CLOSE|CF_SHUTR)) == 0 &&
2350 		    (tick_isset(req->wex) || tick_isset(res->rex))) {
2351 			req->flags |= CF_READ_NOEXP;
2352 			req->rex = TICK_ETERNITY;
2353 		}
2354 
2355 		/* Reset pending events now */
2356 		s->pending_events = 0;
2357 
2358 	update_exp_and_leave:
2359 		/* Note: please ensure that if you branch here you disable SI_FL_DONT_WAKE */
2360 		t->expire = tick_first((tick_is_expired(t->expire, now_ms) ? 0 : t->expire),
2361 				       tick_first(tick_first(req->rex, req->wex),
2362 						  tick_first(res->rex, res->wex)));
2363 		if (!req->analysers)
2364 			req->analyse_exp = TICK_ETERNITY;
2365 
2366 		if ((sess->fe->options & PR_O_CONTSTATS) && (s->flags & SF_BE_ASSIGNED) &&
2367 		          (!tick_isset(req->analyse_exp) || tick_is_expired(req->analyse_exp, now_ms)))
2368 			req->analyse_exp = tick_add(now_ms, 5000);
2369 
2370 		t->expire = tick_first(t->expire, req->analyse_exp);
2371 
2372 		t->expire = tick_first(t->expire, res->analyse_exp);
2373 
2374 		if (si_f->exp)
2375 			t->expire = tick_first(t->expire, si_f->exp);
2376 
2377 		if (si_b->exp)
2378 			t->expire = tick_first(t->expire, si_b->exp);
2379 
2380 #ifdef DEBUG_FULL
2381 		fprintf(stderr,
2382 			"[%u] queuing with exp=%u req->rex=%u req->wex=%u req->ana_exp=%u"
2383 			" rep->rex=%u rep->wex=%u, si[0].exp=%u, si[1].exp=%u, cs=%d, ss=%d\n",
2384 			now_ms, t->expire, req->rex, req->wex, req->analyse_exp,
2385 			res->rex, res->wex, si_f->exp, si_b->exp, si_f->state, si_b->state);
2386 #endif
2387 
2388 #ifdef DEBUG_DEV
2389 		/* this may only happen when no timeout is set or in case of an FSM bug */
2390 		if (!tick_isset(t->expire))
2391 			ABORT_NOW();
2392 #endif
2393 		s->pending_events &= ~(TASK_WOKEN_TIMER | TASK_WOKEN_RES);
2394 		stream_release_buffers(s);
2395 		return t; /* nothing more to do */
2396 	}
2397 
2398 	sess->fe->feconn--;
2399 	if (s->flags & SF_BE_ASSIGNED)
2400 		s->be->beconn--;
2401 	jobs--;
2402 	if (sess->listener) {
2403 		if (!(sess->listener->options & LI_O_UNLIMITED))
2404 			actconn--;
2405 		sess->listener->nbconn--;
2406 		if (sess->listener->state == LI_FULL)
2407 			resume_listener(sess->listener);
2408 
2409 		/* Dequeues all of the listeners waiting for a resource */
2410 		if (!LIST_ISEMPTY(&global_listener_queue))
2411 			dequeue_all_listeners(&global_listener_queue);
2412 
2413 		if (!LIST_ISEMPTY(&sess->fe->listener_queue) &&
2414 		    (!sess->fe->fe_sps_lim || freq_ctr_remain(&sess->fe->fe_sess_per_sec, sess->fe->fe_sps_lim, 0) > 0))
2415 			dequeue_all_listeners(&sess->fe->listener_queue);
2416 	}
2417 
2418 	if (unlikely((global.mode & MODE_DEBUG) &&
2419 		     (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)))) {
2420 		chunk_printf(&trash, "%08x:%s.closed[%04x:%04x]\n",
2421 			      s->uniq_id, s->be->id,
2422 		              objt_conn(si_f->end) ? (unsigned short)objt_conn(si_f->end)->t.sock.fd : -1,
2423 		              objt_conn(si_b->end) ? (unsigned short)objt_conn(si_b->end)->t.sock.fd : -1);
2424 		shut_your_big_mouth_gcc(write(1, trash.str, trash.len));
2425 	}
2426 
2427 	s->logs.t_close = tv_ms_elapsed(&s->logs.tv_accept, &now);
2428 	stream_process_counters(s);
2429 
2430 	if (s->txn && s->txn->status) {
2431 		int n;
2432 
2433 		n = s->txn->status / 100;
2434 		if (n < 1 || n > 5)
2435 			n = 0;
2436 
2437 		if (sess->fe->mode == PR_MODE_HTTP) {
2438 			sess->fe->fe_counters.p.http.rsp[n]++;
2439 		}
2440 		if ((s->flags & SF_BE_ASSIGNED) &&
2441 		    (s->be->mode == PR_MODE_HTTP)) {
2442 			s->be->be_counters.p.http.rsp[n]++;
2443 			s->be->be_counters.p.http.cum_req++;
2444 		}
2445 	}
2446 
2447 	/* let's do a final log if we need it */
2448 	if (!LIST_ISEMPTY(&sess->fe->logformat) && s->logs.logwait &&
2449 	    !(s->flags & SF_MONITOR) &&
2450 	    (!(sess->fe->options & PR_O_NULLNOLOG) || req->total)) {
2451 		s->do_log(s);
2452 	}
2453 
2454 	/* update time stats for this stream */
2455 	stream_update_time_stats(s);
2456 
2457 	/* the task MUST not be in the run queue anymore */
2458 	stream_free(s);
2459 	task_delete(t);
2460 	task_free(t);
2461 	return NULL;
2462 }
2463 
2464 /* Update the stream's backend and server time stats */
stream_update_time_stats(struct stream * s)2465 void stream_update_time_stats(struct stream *s)
2466 {
2467 	int t_request;
2468 	int t_queue;
2469 	int t_connect;
2470 	int t_data;
2471 	int t_close;
2472 	struct server *srv;
2473 
2474 	t_request = 0;
2475 	t_queue   = s->logs.t_queue;
2476 	t_connect = s->logs.t_connect;
2477 	t_close   = s->logs.t_close;
2478 	t_data    = s->logs.t_data;
2479 
2480 	if (s->be->mode != PR_MODE_HTTP)
2481 		t_data = t_connect;
2482 
2483 	if (t_connect < 0 || t_data < 0)
2484 		return;
2485 
2486 	if (tv_isge(&s->logs.tv_request, &s->logs.tv_accept))
2487 		t_request = tv_ms_elapsed(&s->logs.tv_accept, &s->logs.tv_request);
2488 
2489 	t_data    -= t_connect;
2490 	t_connect -= t_queue;
2491 	t_queue   -= t_request;
2492 
2493 	srv = objt_server(s->target);
2494 	if (srv) {
2495 		swrate_add(&srv->counters.q_time, TIME_STATS_SAMPLES, t_queue);
2496 		swrate_add(&srv->counters.c_time, TIME_STATS_SAMPLES, t_connect);
2497 		swrate_add(&srv->counters.d_time, TIME_STATS_SAMPLES, t_data);
2498 		swrate_add(&srv->counters.t_time, TIME_STATS_SAMPLES, t_close);
2499 	}
2500 	swrate_add(&s->be->be_counters.q_time, TIME_STATS_SAMPLES, t_queue);
2501 	swrate_add(&s->be->be_counters.c_time, TIME_STATS_SAMPLES, t_connect);
2502 	swrate_add(&s->be->be_counters.d_time, TIME_STATS_SAMPLES, t_data);
2503 	swrate_add(&s->be->be_counters.t_time, TIME_STATS_SAMPLES, t_close);
2504 }
2505 
2506 /*
2507  * This function adjusts sess->srv_conn and maintains the previous and new
2508  * server's served stream counts. Setting newsrv to NULL is enough to release
2509  * current connection slot. This function also notifies any LB algo which might
2510  * expect to be informed about any change in the number of active streams on a
2511  * server.
2512  */
sess_change_server(struct stream * sess,struct server * newsrv)2513 void sess_change_server(struct stream *sess, struct server *newsrv)
2514 {
2515 	if (sess->srv_conn == newsrv)
2516 		return;
2517 
2518 	if (sess->srv_conn) {
2519 		sess->srv_conn->served--;
2520 		sess->srv_conn->proxy->served--;
2521 		if (sess->srv_conn->proxy->lbprm.server_drop_conn)
2522 			sess->srv_conn->proxy->lbprm.server_drop_conn(sess->srv_conn);
2523 		stream_del_srv_conn(sess);
2524 	}
2525 
2526 	if (newsrv) {
2527 		newsrv->served++;
2528 		newsrv->proxy->served++;
2529 		if (newsrv->proxy->lbprm.server_take_conn)
2530 			newsrv->proxy->lbprm.server_take_conn(newsrv);
2531 		stream_add_srv_conn(sess, newsrv);
2532 	}
2533 }
2534 
2535 /* Handle server-side errors for default protocols. It is called whenever a a
2536  * connection setup is aborted or a request is aborted in queue. It sets the
2537  * stream termination flags so that the caller does not have to worry about
2538  * them. It's installed as ->srv_error for the server-side stream_interface.
2539  */
default_srv_error(struct stream * s,struct stream_interface * si)2540 void default_srv_error(struct stream *s, struct stream_interface *si)
2541 {
2542 	int err_type = si->err_type;
2543 	int err = 0, fin = 0;
2544 
2545 	if (err_type & SI_ET_QUEUE_ABRT) {
2546 		err = SF_ERR_CLICL;
2547 		fin = SF_FINST_Q;
2548 	}
2549 	else if (err_type & SI_ET_CONN_ABRT) {
2550 		err = SF_ERR_CLICL;
2551 		fin = SF_FINST_C;
2552 	}
2553 	else if (err_type & SI_ET_QUEUE_TO) {
2554 		err = SF_ERR_SRVTO;
2555 		fin = SF_FINST_Q;
2556 	}
2557 	else if (err_type & SI_ET_QUEUE_ERR) {
2558 		err = SF_ERR_SRVCL;
2559 		fin = SF_FINST_Q;
2560 	}
2561 	else if (err_type & SI_ET_CONN_TO) {
2562 		err = SF_ERR_SRVTO;
2563 		fin = SF_FINST_C;
2564 	}
2565 	else if (err_type & SI_ET_CONN_ERR) {
2566 		err = SF_ERR_SRVCL;
2567 		fin = SF_FINST_C;
2568 	}
2569 	else if (err_type & SI_ET_CONN_RES) {
2570 		err = SF_ERR_RESOURCE;
2571 		fin = SF_FINST_C;
2572 	}
2573 	else /* SI_ET_CONN_OTHER and others */ {
2574 		err = SF_ERR_INTERNAL;
2575 		fin = SF_FINST_C;
2576 	}
2577 
2578 	if (!(s->flags & SF_ERR_MASK))
2579 		s->flags |= err;
2580 	if (!(s->flags & SF_FINST_MASK))
2581 		s->flags |= fin;
2582 }
2583 
2584 /* kill a stream and set the termination flags to <why> (one of SF_ERR_*) */
stream_shutdown(struct stream * stream,int why)2585 void stream_shutdown(struct stream *stream, int why)
2586 {
2587 	if (stream->req.flags & (CF_SHUTW|CF_SHUTW_NOW))
2588 		return;
2589 
2590 	channel_shutw_now(&stream->req);
2591 	channel_shutr_now(&stream->res);
2592 	stream->task->nice = 1024;
2593 	if (!(stream->flags & SF_ERR_MASK))
2594 		stream->flags |= why;
2595 	task_wakeup(stream->task, TASK_WOKEN_OTHER);
2596 }
2597 
2598 /************************************************************************/
2599 /*           All supported ACL keywords must be declared here.          */
2600 /************************************************************************/
2601 
2602 /* 0=OK, <0=Alert, >0=Warning */
stream_parse_use_service(const char ** args,int * cur_arg,struct proxy * px,struct act_rule * rule,char ** err)2603 static enum act_parse_ret stream_parse_use_service(const char **args, int *cur_arg,
2604                                                    struct proxy *px, struct act_rule *rule,
2605                                                    char **err)
2606 {
2607 	struct action_kw *kw;
2608 
2609 	/* Check if the service name exists. */
2610 	if (*(args[*cur_arg]) == 0) {
2611 		memprintf(err, "'%s' expects a service name.", args[0]);
2612 		return ACT_RET_PRS_ERR;
2613 	}
2614 
2615 	/* lookup for keyword corresponding to a service. */
2616 	kw = action_lookup(&service_keywords, args[*cur_arg]);
2617 	if (!kw) {
2618 		memprintf(err, "'%s' unknown service name.", args[1]);
2619 		return ACT_RET_PRS_ERR;
2620 	}
2621 	(*cur_arg)++;
2622 
2623 	/* executes specific rule parser. */
2624 	rule->kw = kw;
2625 	if (kw->parse((const char **)args, cur_arg, px, rule, err) == ACT_RET_PRS_ERR)
2626 		return ACT_RET_PRS_ERR;
2627 
2628 	/* Register processing function. */
2629 	rule->action_ptr = process_use_service;
2630 	rule->action = ACT_CUSTOM;
2631 
2632 	return ACT_RET_PRS_OK;
2633 }
2634 
service_keywords_register(struct action_kw_list * kw_list)2635 void service_keywords_register(struct action_kw_list *kw_list)
2636 {
2637 	LIST_ADDQ(&service_keywords, &kw_list->list);
2638 }
2639 
2640 
2641 /* This function dumps a complete stream state onto the stream interface's
2642  * read buffer. The stream has to be set in strm. It returns 0 if the output
2643  * buffer is full and it needs to be called again, otherwise non-zero. It is
2644  * designed to be called from stats_dump_strm_to_buffer() below.
2645  */
stats_dump_full_strm_to_buffer(struct stream_interface * si,struct stream * strm)2646 static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct stream *strm)
2647 {
2648 	struct appctx *appctx = __objt_appctx(si->end);
2649 	struct tm tm;
2650 	extern const char *monthname[12];
2651 	char pn[INET6_ADDRSTRLEN];
2652 	struct connection *conn;
2653 	struct appctx *tmpctx;
2654 
2655 	chunk_reset(&trash);
2656 
2657 	if (appctx->ctx.sess.section > 0 && appctx->ctx.sess.uid != strm->uniq_id) {
2658 		/* stream changed, no need to go any further */
2659 		chunk_appendf(&trash, "  *** session terminated while we were watching it ***\n");
2660 		if (bi_putchk(si_ic(si), &trash) == -1) {
2661 			si_applet_cant_put(si);
2662 			return 0;
2663 		}
2664 		appctx->ctx.sess.uid = 0;
2665 		appctx->ctx.sess.section = 0;
2666 		return 1;
2667 	}
2668 
2669 	switch (appctx->ctx.sess.section) {
2670 	case 0: /* main status of the stream */
2671 		appctx->ctx.sess.uid = strm->uniq_id;
2672 		appctx->ctx.sess.section = 1;
2673 		/* fall through */
2674 
2675 	case 1:
2676 		get_localtime(strm->logs.accept_date.tv_sec, &tm);
2677 		chunk_appendf(&trash,
2678 			     "%p: [%02d/%s/%04d:%02d:%02d:%02d.%06d] id=%u proto=%s",
2679 			     strm,
2680 			     tm.tm_mday, monthname[tm.tm_mon], tm.tm_year+1900,
2681 			     tm.tm_hour, tm.tm_min, tm.tm_sec, (int)(strm->logs.accept_date.tv_usec),
2682 			     strm->uniq_id,
2683 			     strm_li(strm) ? strm_li(strm)->proto->name : "?");
2684 
2685 		conn = objt_conn(strm_orig(strm));
2686 		switch (conn ? addr_to_str(&conn->addr.from, pn, sizeof(pn)) : AF_UNSPEC) {
2687 		case AF_INET:
2688 		case AF_INET6:
2689 			chunk_appendf(&trash, " source=%s:%d\n",
2690 			              pn, get_host_port(&conn->addr.from));
2691 			break;
2692 		case AF_UNIX:
2693 			chunk_appendf(&trash, " source=unix:%d\n", strm_li(strm)->luid);
2694 			break;
2695 		default:
2696 			/* no more information to print right now */
2697 			chunk_appendf(&trash, "\n");
2698 			break;
2699 		}
2700 
2701 		chunk_appendf(&trash,
2702 			     "  flags=0x%x, conn_retries=%d, srv_conn=%p, pend_pos=%p\n",
2703 			     strm->flags, strm->si[1].conn_retries, strm->srv_conn, strm->pend_pos);
2704 
2705 		chunk_appendf(&trash,
2706 			     "  frontend=%s (id=%u mode=%s), listener=%s (id=%u)",
2707 			     strm_fe(strm)->id, strm_fe(strm)->uuid, strm_fe(strm)->mode ? "http" : "tcp",
2708 			     strm_li(strm) ? strm_li(strm)->name ? strm_li(strm)->name : "?" : "?",
2709 			     strm_li(strm) ? strm_li(strm)->luid : 0);
2710 
2711 		if (conn)
2712 			conn_get_to_addr(conn);
2713 
2714 		switch (conn ? addr_to_str(&conn->addr.to, pn, sizeof(pn)) : AF_UNSPEC) {
2715 		case AF_INET:
2716 		case AF_INET6:
2717 			chunk_appendf(&trash, " addr=%s:%d\n",
2718 				     pn, get_host_port(&conn->addr.to));
2719 			break;
2720 		case AF_UNIX:
2721 			chunk_appendf(&trash, " addr=unix:%d\n", strm_li(strm)->luid);
2722 			break;
2723 		default:
2724 			/* no more information to print right now */
2725 			chunk_appendf(&trash, "\n");
2726 			break;
2727 		}
2728 
2729 		if (strm->be->cap & PR_CAP_BE)
2730 			chunk_appendf(&trash,
2731 				     "  backend=%s (id=%u mode=%s)",
2732 				     strm->be->id,
2733 				     strm->be->uuid, strm->be->mode ? "http" : "tcp");
2734 		else
2735 			chunk_appendf(&trash, "  backend=<NONE> (id=-1 mode=-)");
2736 
2737 		conn = objt_conn(strm->si[1].end);
2738 		if (conn)
2739 			conn_get_from_addr(conn);
2740 
2741 		switch (conn ? addr_to_str(&conn->addr.from, pn, sizeof(pn)) : AF_UNSPEC) {
2742 		case AF_INET:
2743 		case AF_INET6:
2744 			chunk_appendf(&trash, " addr=%s:%d\n",
2745 				     pn, get_host_port(&conn->addr.from));
2746 			break;
2747 		case AF_UNIX:
2748 			chunk_appendf(&trash, " addr=unix\n");
2749 			break;
2750 		default:
2751 			/* no more information to print right now */
2752 			chunk_appendf(&trash, "\n");
2753 			break;
2754 		}
2755 
2756 		if (strm->be->cap & PR_CAP_BE)
2757 			chunk_appendf(&trash,
2758 				     "  server=%s (id=%u)",
2759 				     objt_server(strm->target) ? objt_server(strm->target)->id : "<none>",
2760 				     objt_server(strm->target) ? objt_server(strm->target)->puid : 0);
2761 		else
2762 			chunk_appendf(&trash, "  server=<NONE> (id=-1)");
2763 
2764 		if (conn)
2765 			conn_get_to_addr(conn);
2766 
2767 		switch (conn ? addr_to_str(&conn->addr.to, pn, sizeof(pn)) : AF_UNSPEC) {
2768 		case AF_INET:
2769 		case AF_INET6:
2770 			chunk_appendf(&trash, " addr=%s:%d\n",
2771 				     pn, get_host_port(&conn->addr.to));
2772 			break;
2773 		case AF_UNIX:
2774 			chunk_appendf(&trash, " addr=unix\n");
2775 			break;
2776 		default:
2777 			/* no more information to print right now */
2778 			chunk_appendf(&trash, "\n");
2779 			break;
2780 		}
2781 
2782 		chunk_appendf(&trash,
2783 			     "  task=%p (state=0x%02x nice=%d calls=%d exp=%s%s",
2784 			     strm->task,
2785 			     strm->task->state,
2786 			     strm->task->nice, strm->task->calls,
2787 			     strm->task->expire ?
2788 			             tick_is_expired(strm->task->expire, now_ms) ? "<PAST>" :
2789 			                     human_time(TICKS_TO_MS(strm->task->expire - now_ms),
2790 			                     TICKS_TO_MS(1000)) : "<NEVER>",
2791 			     task_in_rq(strm->task) ? ", running" : "");
2792 
2793 		chunk_appendf(&trash,
2794 			     " age=%s)\n",
2795 			     human_time(now.tv_sec - strm->logs.accept_date.tv_sec, 1));
2796 
2797 		if (strm->txn)
2798 			chunk_appendf(&trash,
2799 			     "  txn=%p flags=0x%x meth=%d status=%d req.st=%s rsp.st=%s waiting=%d\n",
2800 			      strm->txn, strm->txn->flags, strm->txn->meth, strm->txn->status,
2801 			      http_msg_state_str(strm->txn->req.msg_state), http_msg_state_str(strm->txn->rsp.msg_state), !LIST_ISEMPTY(&strm->buffer_wait.list));
2802 
2803 		chunk_appendf(&trash,
2804 			     "  si[0]=%p (state=%s flags=0x%02x endp0=%s:%p exp=%s, et=0x%03x)\n",
2805 			     &strm->si[0],
2806 			     si_state_str(strm->si[0].state),
2807 			     strm->si[0].flags,
2808 			     obj_type_name(strm->si[0].end),
2809 			     obj_base_ptr(strm->si[0].end),
2810 			     strm->si[0].exp ?
2811 			             tick_is_expired(strm->si[0].exp, now_ms) ? "<PAST>" :
2812 			                     human_time(TICKS_TO_MS(strm->si[0].exp - now_ms),
2813 			                     TICKS_TO_MS(1000)) : "<NEVER>",
2814 			     strm->si[0].err_type);
2815 
2816 		chunk_appendf(&trash,
2817 			     "  si[1]=%p (state=%s flags=0x%02x endp1=%s:%p exp=%s, et=0x%03x)\n",
2818 			     &strm->si[1],
2819 			     si_state_str(strm->si[1].state),
2820 			     strm->si[1].flags,
2821 			     obj_type_name(strm->si[1].end),
2822 			     obj_base_ptr(strm->si[1].end),
2823 			     strm->si[1].exp ?
2824 			             tick_is_expired(strm->si[1].exp, now_ms) ? "<PAST>" :
2825 			                     human_time(TICKS_TO_MS(strm->si[1].exp - now_ms),
2826 			                     TICKS_TO_MS(1000)) : "<NEVER>",
2827 			     strm->si[1].err_type);
2828 
2829 		if ((conn = objt_conn(strm->si[0].end)) != NULL) {
2830 			chunk_appendf(&trash,
2831 			              "  co0=%p ctrl=%s xprt=%s data=%s target=%s:%p\n",
2832 				      conn,
2833 				      conn_get_ctrl_name(conn),
2834 				      conn_get_xprt_name(conn),
2835 				      conn_get_data_name(conn),
2836 			              obj_type_name(conn->target),
2837 			              obj_base_ptr(conn->target));
2838 
2839 			chunk_appendf(&trash,
2840 			              "      flags=0x%08x fd=%d fd.state=%02x fd.cache=%d updt=%d\n",
2841 			              conn->flags,
2842 			              conn->t.sock.fd,
2843 			              conn->t.sock.fd >= 0 ? fdtab[conn->t.sock.fd].state : 0,
2844 			              conn->t.sock.fd >= 0 ? fdtab[conn->t.sock.fd].cache : 0,
2845 			              conn->t.sock.fd >= 0 ? fdtab[conn->t.sock.fd].updated : 0);
2846 		}
2847 		else if ((tmpctx = objt_appctx(strm->si[0].end)) != NULL) {
2848 			chunk_appendf(&trash,
2849 			              "  app0=%p st0=%d st1=%d st2=%d applet=%s\n",
2850 				      tmpctx,
2851 				      tmpctx->st0,
2852 				      tmpctx->st1,
2853 				      tmpctx->st2,
2854 			              tmpctx->applet->name);
2855 		}
2856 
2857 		if ((conn = objt_conn(strm->si[1].end)) != NULL) {
2858 			chunk_appendf(&trash,
2859 			              "  co1=%p ctrl=%s xprt=%s data=%s target=%s:%p\n",
2860 				      conn,
2861 				      conn_get_ctrl_name(conn),
2862 				      conn_get_xprt_name(conn),
2863 				      conn_get_data_name(conn),
2864 			              obj_type_name(conn->target),
2865 			              obj_base_ptr(conn->target));
2866 
2867 			chunk_appendf(&trash,
2868 			              "      flags=0x%08x fd=%d fd.state=%02x fd.cache=%d updt=%d\n",
2869 			              conn->flags,
2870 			              conn->t.sock.fd,
2871 			              conn->t.sock.fd >= 0 ? fdtab[conn->t.sock.fd].state : 0,
2872 			              conn->t.sock.fd >= 0 ? fdtab[conn->t.sock.fd].cache : 0,
2873 			              conn->t.sock.fd >= 0 ? fdtab[conn->t.sock.fd].updated : 0);
2874 		}
2875 		else if ((tmpctx = objt_appctx(strm->si[1].end)) != NULL) {
2876 			chunk_appendf(&trash,
2877 			              "  app1=%p st0=%d st1=%d st2=%d applet=%s\n",
2878 				      tmpctx,
2879 				      tmpctx->st0,
2880 				      tmpctx->st1,
2881 				      tmpctx->st2,
2882 			              tmpctx->applet->name);
2883 		}
2884 
2885 		chunk_appendf(&trash,
2886 			     "  req=%p (f=0x%06x an=0x%x pipe=%d tofwd=%d total=%lld)\n"
2887 			     "      an_exp=%s",
2888 			     &strm->req,
2889 			     strm->req.flags, strm->req.analysers,
2890 			     strm->req.pipe ? strm->req.pipe->data : 0,
2891 			     strm->req.to_forward, strm->req.total,
2892 			     strm->req.analyse_exp ?
2893 			     human_time(TICKS_TO_MS(strm->req.analyse_exp - now_ms),
2894 					TICKS_TO_MS(1000)) : "<NEVER>");
2895 
2896 		chunk_appendf(&trash,
2897 			     " rex=%s",
2898 			     strm->req.rex ?
2899 			     human_time(TICKS_TO_MS(strm->req.rex - now_ms),
2900 					TICKS_TO_MS(1000)) : "<NEVER>");
2901 
2902 		chunk_appendf(&trash,
2903 			     " wex=%s\n"
2904 			     "      buf=%p data=%p o=%d p=%d req.next=%d i=%d size=%d\n",
2905 			     strm->req.wex ?
2906 			     human_time(TICKS_TO_MS(strm->req.wex - now_ms),
2907 					TICKS_TO_MS(1000)) : "<NEVER>",
2908 			     strm->req.buf,
2909 			     strm->req.buf->data, strm->req.buf->o,
2910 			     (int)(strm->req.buf->p - strm->req.buf->data),
2911 			     strm->txn ? strm->txn->req.next : 0, strm->req.buf->i,
2912 			     strm->req.buf->size);
2913 
2914 		chunk_appendf(&trash,
2915 			     "  res=%p (f=0x%06x an=0x%x pipe=%d tofwd=%d total=%lld)\n"
2916 			     "      an_exp=%s",
2917 			     &strm->res,
2918 			     strm->res.flags, strm->res.analysers,
2919 			     strm->res.pipe ? strm->res.pipe->data : 0,
2920 			     strm->res.to_forward, strm->res.total,
2921 			     strm->res.analyse_exp ?
2922 			     human_time(TICKS_TO_MS(strm->res.analyse_exp - now_ms),
2923 					TICKS_TO_MS(1000)) : "<NEVER>");
2924 
2925 		chunk_appendf(&trash,
2926 			     " rex=%s",
2927 			     strm->res.rex ?
2928 			     human_time(TICKS_TO_MS(strm->res.rex - now_ms),
2929 					TICKS_TO_MS(1000)) : "<NEVER>");
2930 
2931 		chunk_appendf(&trash,
2932 			     " wex=%s\n"
2933 			     "      buf=%p data=%p o=%d p=%d rsp.next=%d i=%d size=%d\n",
2934 			     strm->res.wex ?
2935 			     human_time(TICKS_TO_MS(strm->res.wex - now_ms),
2936 					TICKS_TO_MS(1000)) : "<NEVER>",
2937 			     strm->res.buf,
2938 			     strm->res.buf->data, strm->res.buf->o,
2939 			     (int)(strm->res.buf->p - strm->res.buf->data),
2940 			     strm->txn ? strm->txn->rsp.next : 0, strm->res.buf->i,
2941 			     strm->res.buf->size);
2942 
2943 		if (bi_putchk(si_ic(si), &trash) == -1) {
2944 			si_applet_cant_put(si);
2945 			return 0;
2946 		}
2947 
2948 		/* use other states to dump the contents */
2949 	}
2950 	/* end of dump */
2951 	appctx->ctx.sess.uid = 0;
2952 	appctx->ctx.sess.section = 0;
2953 	return 1;
2954 }
2955 
2956 
cli_parse_show_sess(char ** args,struct appctx * appctx,void * private)2957 static int cli_parse_show_sess(char **args, struct appctx *appctx, void *private)
2958 {
2959 	if (!cli_has_level(appctx, ACCESS_LVL_OPER))
2960 		return 1;
2961 
2962 	if (*args[2] && strcmp(args[2], "all") == 0)
2963 		appctx->ctx.sess.target = (void *)-1;
2964 	else if (*args[2])
2965 		appctx->ctx.sess.target = (void *)strtoul(args[2], NULL, 0);
2966 	else
2967 		appctx->ctx.sess.target = NULL;
2968 	appctx->ctx.sess.section = 0; /* start with stream status */
2969 	appctx->ctx.sess.pos = 0;
2970 
2971 	return 0;
2972 }
2973 
2974 /* This function dumps all streams' states onto the stream interface's
2975  * read buffer. It returns 0 if the output buffer is full and it needs
2976  * to be called again, otherwise non-zero. It is designed to be called
2977  * from stats_dump_sess_to_buffer() below.
2978  */
cli_io_handler_dump_sess(struct appctx * appctx)2979 static int cli_io_handler_dump_sess(struct appctx *appctx)
2980 {
2981 	struct stream_interface *si = appctx->owner;
2982 	struct connection *conn;
2983 
2984 	if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW))) {
2985 		/* If we're forced to shut down, we might have to remove our
2986 		 * reference to the last stream being dumped.
2987 		 */
2988 		if (appctx->st2 == STAT_ST_LIST) {
2989 			if (!LIST_ISEMPTY(&appctx->ctx.sess.bref.users)) {
2990 				LIST_DEL(&appctx->ctx.sess.bref.users);
2991 				LIST_INIT(&appctx->ctx.sess.bref.users);
2992 			}
2993 		}
2994 		return 1;
2995 	}
2996 
2997 	chunk_reset(&trash);
2998 
2999 	switch (appctx->st2) {
3000 	case STAT_ST_INIT:
3001 		/* the function had not been called yet, let's prepare the
3002 		 * buffer for a response. We initialize the current stream
3003 		 * pointer to the first in the global list. When a target
3004 		 * stream is being destroyed, it is responsible for updating
3005 		 * this pointer. We know we have reached the end when this
3006 		 * pointer points back to the head of the streams list.
3007 		 */
3008 		LIST_INIT(&appctx->ctx.sess.bref.users);
3009 		appctx->ctx.sess.bref.ref = streams.n;
3010 		appctx->st2 = STAT_ST_LIST;
3011 		/* fall through */
3012 
3013 	case STAT_ST_LIST:
3014 		/* first, let's detach the back-ref from a possible previous stream */
3015 		if (!LIST_ISEMPTY(&appctx->ctx.sess.bref.users)) {
3016 			LIST_DEL(&appctx->ctx.sess.bref.users);
3017 			LIST_INIT(&appctx->ctx.sess.bref.users);
3018 		}
3019 
3020 		/* and start from where we stopped */
3021 		while (appctx->ctx.sess.bref.ref != &streams) {
3022 			char pn[INET6_ADDRSTRLEN];
3023 			struct stream *curr_strm;
3024 
3025 			curr_strm = LIST_ELEM(appctx->ctx.sess.bref.ref, struct stream *, list);
3026 
3027 			if (appctx->ctx.sess.target) {
3028 				if (appctx->ctx.sess.target != (void *)-1 && appctx->ctx.sess.target != curr_strm)
3029 					goto next_sess;
3030 
3031 				LIST_ADDQ(&curr_strm->back_refs, &appctx->ctx.sess.bref.users);
3032 				/* call the proper dump() function and return if we're missing space */
3033 				if (!stats_dump_full_strm_to_buffer(si, curr_strm))
3034 					return 0;
3035 
3036 				/* stream dump complete */
3037 				LIST_DEL(&appctx->ctx.sess.bref.users);
3038 				LIST_INIT(&appctx->ctx.sess.bref.users);
3039 				if (appctx->ctx.sess.target != (void *)-1) {
3040 					appctx->ctx.sess.target = NULL;
3041 					break;
3042 				}
3043 				else
3044 					goto next_sess;
3045 			}
3046 
3047 			chunk_appendf(&trash,
3048 				     "%p: proto=%s",
3049 				     curr_strm,
3050 				     strm_li(curr_strm) ? strm_li(curr_strm)->proto->name : "?");
3051 
3052 			conn = objt_conn(strm_orig(curr_strm));
3053 			switch (conn ? addr_to_str(&conn->addr.from, pn, sizeof(pn)) : AF_UNSPEC) {
3054 			case AF_INET:
3055 			case AF_INET6:
3056 				chunk_appendf(&trash,
3057 					     " src=%s:%d fe=%s be=%s srv=%s",
3058 					     pn,
3059 					     get_host_port(&conn->addr.from),
3060 					     strm_fe(curr_strm)->id,
3061 					     (curr_strm->be->cap & PR_CAP_BE) ? curr_strm->be->id : "<NONE>",
3062 					     objt_server(curr_strm->target) ? objt_server(curr_strm->target)->id : "<none>"
3063 					     );
3064 				break;
3065 			case AF_UNIX:
3066 				chunk_appendf(&trash,
3067 					     " src=unix:%d fe=%s be=%s srv=%s",
3068 					     strm_li(curr_strm)->luid,
3069 					     strm_fe(curr_strm)->id,
3070 					     (curr_strm->be->cap & PR_CAP_BE) ? curr_strm->be->id : "<NONE>",
3071 					     objt_server(curr_strm->target) ? objt_server(curr_strm->target)->id : "<none>"
3072 					     );
3073 				break;
3074 			}
3075 
3076 			chunk_appendf(&trash,
3077 				     " ts=%02x age=%s calls=%d",
3078 				     curr_strm->task->state,
3079 				     human_time(now.tv_sec - curr_strm->logs.tv_accept.tv_sec, 1),
3080 				     curr_strm->task->calls);
3081 
3082 			chunk_appendf(&trash,
3083 				     " rq[f=%06xh,i=%d,an=%02xh,rx=%s",
3084 				     curr_strm->req.flags,
3085 				     curr_strm->req.buf->i,
3086 				     curr_strm->req.analysers,
3087 				     curr_strm->req.rex ?
3088 				     human_time(TICKS_TO_MS(curr_strm->req.rex - now_ms),
3089 						TICKS_TO_MS(1000)) : "");
3090 
3091 			chunk_appendf(&trash,
3092 				     ",wx=%s",
3093 				     curr_strm->req.wex ?
3094 				     human_time(TICKS_TO_MS(curr_strm->req.wex - now_ms),
3095 						TICKS_TO_MS(1000)) : "");
3096 
3097 			chunk_appendf(&trash,
3098 				     ",ax=%s]",
3099 				     curr_strm->req.analyse_exp ?
3100 				     human_time(TICKS_TO_MS(curr_strm->req.analyse_exp - now_ms),
3101 						TICKS_TO_MS(1000)) : "");
3102 
3103 			chunk_appendf(&trash,
3104 				     " rp[f=%06xh,i=%d,an=%02xh,rx=%s",
3105 				     curr_strm->res.flags,
3106 				     curr_strm->res.buf->i,
3107 				     curr_strm->res.analysers,
3108 				     curr_strm->res.rex ?
3109 				     human_time(TICKS_TO_MS(curr_strm->res.rex - now_ms),
3110 						TICKS_TO_MS(1000)) : "");
3111 
3112 			chunk_appendf(&trash,
3113 				     ",wx=%s",
3114 				     curr_strm->res.wex ?
3115 				     human_time(TICKS_TO_MS(curr_strm->res.wex - now_ms),
3116 						TICKS_TO_MS(1000)) : "");
3117 
3118 			chunk_appendf(&trash,
3119 				     ",ax=%s]",
3120 				     curr_strm->res.analyse_exp ?
3121 				     human_time(TICKS_TO_MS(curr_strm->res.analyse_exp - now_ms),
3122 						TICKS_TO_MS(1000)) : "");
3123 
3124 			conn = objt_conn(curr_strm->si[0].end);
3125 			chunk_appendf(&trash,
3126 				     " s0=[%d,%1xh,fd=%d,ex=%s]",
3127 				     curr_strm->si[0].state,
3128 				     curr_strm->si[0].flags,
3129 				     conn ? conn->t.sock.fd : -1,
3130 				     curr_strm->si[0].exp ?
3131 				     human_time(TICKS_TO_MS(curr_strm->si[0].exp - now_ms),
3132 						TICKS_TO_MS(1000)) : "");
3133 
3134 			conn = objt_conn(curr_strm->si[1].end);
3135 			chunk_appendf(&trash,
3136 				     " s1=[%d,%1xh,fd=%d,ex=%s]",
3137 				     curr_strm->si[1].state,
3138 				     curr_strm->si[1].flags,
3139 				     conn ? conn->t.sock.fd : -1,
3140 				     curr_strm->si[1].exp ?
3141 				     human_time(TICKS_TO_MS(curr_strm->si[1].exp - now_ms),
3142 						TICKS_TO_MS(1000)) : "");
3143 
3144 			chunk_appendf(&trash,
3145 				     " exp=%s",
3146 				     curr_strm->task->expire ?
3147 				     human_time(TICKS_TO_MS(curr_strm->task->expire - now_ms),
3148 						TICKS_TO_MS(1000)) : "");
3149 			if (task_in_rq(curr_strm->task))
3150 				chunk_appendf(&trash, " run(nice=%d)", curr_strm->task->nice);
3151 
3152 			chunk_appendf(&trash, "\n");
3153 
3154 			if (bi_putchk(si_ic(si), &trash) == -1) {
3155 				/* let's try again later from this stream. We add ourselves into
3156 				 * this stream's users so that it can remove us upon termination.
3157 				 */
3158 				si_applet_cant_put(si);
3159 				LIST_ADDQ(&curr_strm->back_refs, &appctx->ctx.sess.bref.users);
3160 				return 0;
3161 			}
3162 
3163 		next_sess:
3164 			appctx->ctx.sess.bref.ref = curr_strm->list.n;
3165 		}
3166 
3167 		if (appctx->ctx.sess.target && appctx->ctx.sess.target != (void *)-1) {
3168 			/* specified stream not found */
3169 			if (appctx->ctx.sess.section > 0)
3170 				chunk_appendf(&trash, "  *** session terminated while we were watching it ***\n");
3171 			else
3172 				chunk_appendf(&trash, "Session not found.\n");
3173 
3174 			if (bi_putchk(si_ic(si), &trash) == -1) {
3175 				si_applet_cant_put(si);
3176 				return 0;
3177 			}
3178 
3179 			appctx->ctx.sess.target = NULL;
3180 			appctx->ctx.sess.uid = 0;
3181 			return 1;
3182 		}
3183 
3184 		appctx->st2 = STAT_ST_FIN;
3185 		/* fall through */
3186 
3187 	default:
3188 		appctx->st2 = STAT_ST_FIN;
3189 		return 1;
3190 	}
3191 }
3192 
cli_release_show_sess(struct appctx * appctx)3193 static void cli_release_show_sess(struct appctx *appctx)
3194 {
3195 	if (appctx->st2 == STAT_ST_LIST) {
3196 		if (!LIST_ISEMPTY(&appctx->ctx.sess.bref.users))
3197 			LIST_DEL(&appctx->ctx.sess.bref.users);
3198 	}
3199 }
3200 
3201 /* Parses the "shutdown session" directive, it always returns 1 */
cli_parse_shutdown_session(char ** args,struct appctx * appctx,void * private)3202 static int cli_parse_shutdown_session(char **args, struct appctx *appctx, void *private)
3203 {
3204 	struct stream *strm, *ptr;
3205 
3206 	if (!cli_has_level(appctx, ACCESS_LVL_ADMIN))
3207 		return 1;
3208 
3209 	if (!*args[2]) {
3210 		appctx->ctx.cli.msg = "Session pointer expected (use 'show sess').\n";
3211 		appctx->st0 = CLI_ST_PRINT;
3212 		return 1;
3213 	}
3214 
3215 	ptr = (void *)strtoul(args[2], NULL, 0);
3216 
3217 	/* first, look for the requested stream in the stream table */
3218 	list_for_each_entry(strm, &streams, list) {
3219 		if (strm == ptr)
3220 			break;
3221 	}
3222 
3223 	/* do we have the stream ? */
3224 	if (strm != ptr) {
3225 		appctx->ctx.cli.msg = "No such session (use 'show sess').\n";
3226 		appctx->st0 = CLI_ST_PRINT;
3227 		return 1;
3228 	}
3229 
3230 	stream_shutdown(strm, SF_ERR_KILLED);
3231 	return 1;
3232 }
3233 
3234 /* Parses the "shutdown session server" directive, it always returns 1 */
cli_parse_shutdown_sessions_server(char ** args,struct appctx * appctx,void * private)3235 static int cli_parse_shutdown_sessions_server(char **args, struct appctx *appctx, void *private)
3236 {
3237 	struct server *sv;
3238 	struct stream *strm, *strm_bck;
3239 
3240 	if (!cli_has_level(appctx, ACCESS_LVL_ADMIN))
3241 		return 1;
3242 
3243 	sv = cli_find_server(appctx, args[3]);
3244 	if (!sv)
3245 		return 1;
3246 
3247 	/* kill all the stream that are on this server */
3248 	list_for_each_entry_safe(strm, strm_bck, &sv->actconns, by_srv)
3249 		if (strm->srv_conn == sv)
3250 			stream_shutdown(strm, SF_ERR_KILLED);
3251 	return 1;
3252 }
3253 
3254 /* register cli keywords */
3255 static struct cli_kw_list cli_kws = {{ },{
3256 	{ { "show", "sess",  NULL }, "show sess [id] : report the list of current sessions or dump this session", cli_parse_show_sess, cli_io_handler_dump_sess, cli_release_show_sess },
3257 	{ { "shutdown", "session",  NULL }, "shutdown session : kill a specific session", cli_parse_shutdown_session, NULL, NULL },
3258 	{ { "shutdown", "sessions",  "server" }, "shutdown sessions server : kill sessions on a server", cli_parse_shutdown_sessions_server, NULL, NULL },
3259 	{{},}
3260 }};
3261 
3262 /* main configuration keyword registration. */
3263 static struct action_kw_list stream_tcp_keywords = { ILH, {
3264 	{ "use-service", stream_parse_use_service },
3265 	{ /* END */ }
3266 }};
3267 
3268 static struct action_kw_list stream_http_keywords = { ILH, {
3269 	{ "use-service", stream_parse_use_service },
3270 	{ /* END */ }
3271 }};
3272 
3273 __attribute__((constructor))
__stream_init(void)3274 static void __stream_init(void)
3275 {
3276 	tcp_req_cont_keywords_register(&stream_tcp_keywords);
3277 	http_req_keywords_register(&stream_http_keywords);
3278 	cli_register_kw(&cli_kws);
3279 }
3280 
3281 /*
3282  * Local variables:
3283  *  c-indent-level: 8
3284  *  c-basic-offset: 8
3285  * End:
3286  */
3287