1 /*
2  * Pass-through mux-demux for connections
3  *
4  * Copyright 2017 Willy Tarreau <w@1wt.eu>
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version
9  * 2 of the License, or (at your option) any later version.
10  *
11  */
12 
13 #include <haproxy/api.h>
14 #include <haproxy/buf.h>
15 #include <haproxy/connection.h>
16 #include <haproxy/pipe-t.h>
17 #include <haproxy/stream.h>
18 #include <haproxy/task.h>
19 #include <haproxy/trace.h>
20 
21 struct mux_pt_ctx {
22 	struct conn_stream *cs;
23 	struct connection *conn;
24 	struct wait_event wait_event;
25 };
26 
27 DECLARE_STATIC_POOL(pool_head_pt_ctx, "mux_pt", sizeof(struct mux_pt_ctx));
28 
29 /* trace source and events */
30 static void pt_trace(enum trace_level level, uint64_t mask,
31                      const struct trace_source *src,
32                      const struct ist where, const struct ist func,
33                      const void *a1, const void *a2, const void *a3, const void *a4);
34 
35 /* The event representation is split like this :
36  *   pt_ctx - internal PT context
37  *   strm   - application layer
38  */
39 static const struct trace_event pt_trace_events[] = {
40 #define           PT_EV_CONN_NEW      (1ULL <<  0)
41 	{ .mask = PT_EV_CONN_NEW,     .name = "pt_conn_new",  .desc = "new PT connection" },
42 #define           PT_EV_CONN_WAKE     (1ULL <<  1)
43 	{ .mask = PT_EV_CONN_WAKE,    .name = "pt_conn_wake", .desc = "PT connection woken up" },
44 #define           PT_EV_CONN_END      (1ULL <<  2)
45 	{ .mask = PT_EV_CONN_END,     .name = "pt_conn_end",  .desc = "PT connection terminated" },
46 #define           PT_EV_CONN_ERR      (1ULL <<  3)
47 	{ .mask = PT_EV_CONN_ERR,     .name = "pt_conn_err",  .desc = "error on PT connection" },
48 #define           PT_EV_STRM_NEW      (1ULL <<  4)
49 	{ .mask = PT_EV_STRM_NEW,     .name = "strm_new",      .desc = "app-layer stream creation" },
50 #define           PT_EV_STRM_SHUT     (1ULL <<  5)
51 	{ .mask = PT_EV_STRM_SHUT,    .name = "strm_shut",     .desc = "stream shutdown" },
52 #define           PT_EV_STRM_END      (1ULL <<  6)
53 	{ .mask = PT_EV_STRM_END,     .name = "strm_end",      .desc = "detaching app-layer stream" },
54 #define           PT_EV_STRM_ERR      (1ULL <<  7)
55 	{ .mask = PT_EV_STRM_ERR,     .name = "strm_err",      .desc = "stream error" },
56 #define           PT_EV_RX_DATA       (1ULL <<  8)
57 	{ .mask = PT_EV_RX_DATA,      .name = "pt_rx_data", .desc = "Rx on PT connection" },
58 #define           PT_EV_TX_DATA       (1ULL <<  9)
59 	{ .mask = PT_EV_TX_DATA,      .name = "pt_tx_data", .desc = "Tx on PT connection" },
60 
61 	{}
62 };
63 
64 
65 static const struct name_desc pt_trace_decoding[] = {
66 #define PT_VERB_CLEAN    1
67 	{ .name="clean",    .desc="only user-friendly stuff, generally suitable for level \"user\"" },
68 #define PT_VERB_MINIMAL  2
69 	{ .name="minimal",  .desc="report only h1c/h1s state and flags, no real decoding" },
70 #define PT_VERB_SIMPLE   3
71 	{ .name="simple",   .desc="add request/response status line or htx info when available" },
72 #define PT_VERB_ADVANCED 4
73 	{ .name="advanced", .desc="add header fields or frame decoding when available" },
74 #define PT_VERB_COMPLETE 5
75 	{ .name="complete", .desc="add full data dump when available" },
76 	{ /* end */ }
77 };
78 
79 static struct trace_source trace_pt __read_mostly = {
80 	.name = IST("pt"),
81 	.desc = "Passthrough multiplexer",
82 	.arg_def = TRC_ARG1_CONN,  // TRACE()'s first argument is always a connection
83 	.default_cb = pt_trace,
84 	.known_events = pt_trace_events,
85 	.lockon_args = NULL,
86 	.decoding = pt_trace_decoding,
87 	.report_events = ~0,  // report everything by default
88 };
89 
90 #define TRACE_SOURCE &trace_pt
91 INITCALL1(STG_REGISTER, trace_register_source, TRACE_SOURCE);
92 
pt_trace_buf(const struct buffer * buf,size_t ofs,size_t len)93 static inline void pt_trace_buf(const struct buffer *buf, size_t ofs, size_t len)
94 {
95 	size_t block1, block2;
96 	int line, ptr, newptr;
97 
98 	block1 = b_contig_data(buf, ofs);
99 	block2 = 0;
100 	if (block1 > len)
101 		block1 = len;
102 	block2 = len - block1;
103 
104 	ofs = b_peek_ofs(buf, ofs);
105 
106 	line = 0;
107 	ptr = ofs;
108 	while (ptr < ofs + block1) {
109 		newptr = dump_text_line(&trace_buf, b_orig(buf), b_size(buf), ofs + block1, &line, ptr);
110 		if (newptr == ptr)
111 			break;
112 		ptr = newptr;
113 	}
114 
115 	line = ptr = 0;
116 	while (ptr < block2) {
117 		newptr = dump_text_line(&trace_buf, b_orig(buf), b_size(buf), block2, &line, ptr);
118 		if (newptr == ptr)
119 			break;
120 		ptr = newptr;
121 	}
122 }
123 
124 /* the PT traces always expect that arg1, if non-null, is of type connection
125  * (from which we can derive the pt context), that arg2, if non-null, is a
126  * conn-stream, and that arg3, if non-null, is a buffer.
127  */
pt_trace(enum trace_level level,uint64_t mask,const struct trace_source * src,const struct ist where,const struct ist func,const void * a1,const void * a2,const void * a3,const void * a4)128 static void pt_trace(enum trace_level level, uint64_t mask, const struct trace_source *src,
129                      const struct ist where, const struct ist func,
130                      const void *a1, const void *a2, const void *a3, const void *a4)
131 {
132 	const struct connection *conn = a1;
133 	const struct mux_pt_ctx *ctx = conn ? conn->ctx : NULL;
134 	const struct conn_stream *cs = a2;
135 	const struct buffer *buf = a3;
136 	const size_t *val = a4;
137 
138 	if (!ctx|| src->verbosity < PT_VERB_CLEAN)
139 		return;
140 
141 	/* Display frontend/backend info by default */
142 	chunk_appendf(&trace_buf, " : [%c]", (conn_is_back(conn) ? 'B' : 'F'));
143 
144 	if (src->verbosity == PT_VERB_CLEAN)
145 		return;
146 
147 	/* Display the value to the 4th argument (level > STATE) */
148 	if (src->level > TRACE_LEVEL_STATE && val)
149 		chunk_appendf(&trace_buf, " - VAL=%lu", (long)*val);
150 
151 	/* Display conn and cs info, if defined (pointer + flags) */
152 	chunk_appendf(&trace_buf, " - conn=%p(0x%08x)", conn, conn->flags);
153 	if (cs)
154 		chunk_appendf(&trace_buf, " cs=%p(0x%08x)", cs, cs->flags);
155 
156 	if (src->verbosity == PT_VERB_MINIMAL)
157 		return;
158 
159 	/* Display buffer info, if defined (level > USER & verbosity > SIMPLE) */
160 	if (src->level > TRACE_LEVEL_USER && buf) {
161 		int full = 0, max = 3000, chunk = 1024;
162 
163 		/* Full info (level > STATE && verbosity > SIMPLE) */
164 		if (src->level > TRACE_LEVEL_STATE) {
165 			if (src->verbosity == PT_VERB_COMPLETE)
166 				full = 1;
167 			else if (src->verbosity == PT_VERB_ADVANCED) {
168 				full = 1;
169 				max = 256;
170 				chunk = 64;
171 			}
172 		}
173 
174 		chunk_appendf(&trace_buf, " buf=%u@%p+%u/%u",
175 			      (unsigned int)b_data(buf), b_orig(buf),
176 			      (unsigned int)b_head_ofs(buf), (unsigned int)b_size(buf));
177 
178 		if (b_data(buf) && full) {
179 			chunk_memcat(&trace_buf, "\n", 1);
180 			if (b_data(buf) < max)
181 				pt_trace_buf(buf, 0, b_data(buf));
182 			else {
183 				pt_trace_buf(buf, 0, chunk);
184 				chunk_memcat(&trace_buf, "  ...\n", 6);
185 				pt_trace_buf(buf, b_data(buf) - chunk, chunk);
186 			}
187 		}
188 	}
189 }
190 
mux_pt_destroy(struct mux_pt_ctx * ctx)191 static void mux_pt_destroy(struct mux_pt_ctx *ctx)
192 {
193 	struct connection *conn = NULL;
194 
195 	TRACE_POINT(PT_EV_CONN_END);
196 
197 	if (ctx) {
198 		/* The connection must be attached to this mux to be released */
199 		if (ctx->conn && ctx->conn->ctx == ctx)
200 			conn = ctx->conn;
201 
202 		TRACE_DEVEL("freeing pt context", PT_EV_CONN_END, conn);
203 
204 		tasklet_free(ctx->wait_event.tasklet);
205 
206 		if (conn && ctx->wait_event.events != 0)
207 			conn->xprt->unsubscribe(conn, conn->xprt_ctx, ctx->wait_event.events,
208 						&ctx->wait_event);
209 		pool_free(pool_head_pt_ctx, ctx);
210 	}
211 
212 	if (conn) {
213 		conn->mux = NULL;
214 		conn->ctx = NULL;
215 		TRACE_DEVEL("freeing conn", PT_EV_CONN_END, conn);
216 
217 		conn_stop_tracking(conn);
218 		conn_full_close(conn);
219 		if (conn->destroy_cb)
220 			conn->destroy_cb(conn);
221 		conn_free(conn);
222 	}
223 }
224 
225 /* Callback, used when we get I/Os while in idle mode. This one is exported so
226  * that "show fd" can resolve it.
227  */
mux_pt_io_cb(struct task * t,void * tctx,unsigned int status)228 struct task *mux_pt_io_cb(struct task *t, void *tctx, unsigned int status)
229 {
230 	struct mux_pt_ctx *ctx = tctx;
231 
232 	TRACE_ENTER(PT_EV_CONN_WAKE, ctx->conn, ctx->cs);
233 	if (ctx->cs) {
234 		/* There's a small race condition.
235 		 * mux_pt_io_cb() is only supposed to be called if we have no
236 		 * stream attached. However, maybe the tasklet got woken up,
237 		 * and this connection was then attached to a new stream.
238 		 * If this happened, just wake the tasklet up if anybody
239 		 * subscribed to receive events, and otherwise call the wake
240 		 * method, to make sure the event is noticed.
241 		 */
242 		if (ctx->conn->subs) {
243 			ctx->conn->subs->events = 0;
244 			tasklet_wakeup(ctx->conn->subs->tasklet);
245 			ctx->conn->subs = NULL;
246 		} else if (ctx->cs->data_cb->wake)
247 			ctx->cs->data_cb->wake(ctx->cs);
248 		TRACE_DEVEL("leaving waking up CS", PT_EV_CONN_WAKE, ctx->conn, ctx->cs);
249 		return t;
250 	}
251 	conn_ctrl_drain(ctx->conn);
252 	if (ctx->conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_SOCK_WR_SH)) {
253 		TRACE_DEVEL("leaving destroying pt context", PT_EV_CONN_WAKE, ctx->conn);
254 		mux_pt_destroy(ctx);
255 		t = NULL;
256 	}
257 	else {
258 		ctx->conn->xprt->subscribe(ctx->conn, ctx->conn->xprt_ctx, SUB_RETRY_RECV,
259 					   &ctx->wait_event);
260 		TRACE_DEVEL("leaving subscribing for reads", PT_EV_CONN_WAKE, ctx->conn);
261 	}
262 
263 	return t;
264 }
265 
266 /* Initialize the mux once it's attached. It is expected that conn->ctx
267  * points to the existing conn_stream (for outgoing connections) or NULL (for
268  * incoming ones, in which case one will be allocated and a new stream will be
269  * instantiated). Returns < 0 on error.
270  */
mux_pt_init(struct connection * conn,struct proxy * prx,struct session * sess,struct buffer * input)271 static int mux_pt_init(struct connection *conn, struct proxy *prx, struct session *sess,
272 		       struct buffer *input)
273 {
274 	struct conn_stream *cs = conn->ctx;
275 	struct mux_pt_ctx *ctx = pool_alloc(pool_head_pt_ctx);
276 
277 	TRACE_ENTER(PT_EV_CONN_NEW);
278 
279 	if (!ctx) {
280 		TRACE_ERROR("PT context allocation failure", PT_EV_CONN_NEW|PT_EV_CONN_END|PT_EV_CONN_ERR);
281 		goto fail;
282 	}
283 
284 	ctx->wait_event.tasklet = tasklet_new();
285 	if (!ctx->wait_event.tasklet)
286 		goto fail_free_ctx;
287 	ctx->wait_event.tasklet->context = ctx;
288 	ctx->wait_event.tasklet->process = mux_pt_io_cb;
289 	ctx->wait_event.events = 0;
290 	ctx->conn = conn;
291 
292 	if (!cs) {
293 		cs = cs_new(conn, conn->target);
294 		if (!cs) {
295 			TRACE_ERROR("CS allocation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn);
296 			goto fail_free_ctx;
297 		}
298 
299 		if (stream_create_from_cs(cs, &BUF_NULL) < 0) {
300 			TRACE_ERROR("stream creation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn, cs);
301 			goto fail_free;
302 		}
303 		TRACE_POINT(PT_EV_STRM_NEW, conn, cs);
304 	}
305 	conn->ctx = ctx;
306 	ctx->cs = cs;
307 	cs->flags |= CS_FL_RCV_MORE;
308 	if (global.tune.options & GTUNE_USE_SPLICE)
309 		cs->flags |= CS_FL_MAY_SPLICE;
310 
311 	TRACE_LEAVE(PT_EV_CONN_NEW, conn, cs);
312 	return 0;
313 
314  fail_free:
315 	cs_free(cs);
316 fail_free_ctx:
317 	if (ctx->wait_event.tasklet)
318 		tasklet_free(ctx->wait_event.tasklet);
319 	pool_free(pool_head_pt_ctx, ctx);
320  fail:
321 	TRACE_DEVEL("leaving in error", PT_EV_CONN_NEW|PT_EV_CONN_END|PT_EV_CONN_ERR);
322 	return -1;
323 }
324 
325 /* callback to be used by default for the pass-through mux. It calls the data
326  * layer wake() callback if it is set otherwise returns 0.
327  */
mux_pt_wake(struct connection * conn)328 static int mux_pt_wake(struct connection *conn)
329 {
330 	struct mux_pt_ctx *ctx = conn->ctx;
331 	struct conn_stream *cs = ctx->cs;
332 	int ret = 0;
333 
334 	TRACE_ENTER(PT_EV_CONN_WAKE, ctx->conn, cs);
335 	if (cs) {
336 		ret = cs->data_cb->wake ? cs->data_cb->wake(cs) : 0;
337 
338 		if (ret < 0) {
339 			TRACE_DEVEL("leaving waking up CS", PT_EV_CONN_WAKE, ctx->conn, cs);
340 			return ret;
341 		}
342 	} else {
343 		conn_ctrl_drain(conn);
344 		if (conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH)) {
345 			TRACE_DEVEL("leaving destroying PT context", PT_EV_CONN_WAKE, ctx->conn);
346 			mux_pt_destroy(ctx);
347 			return -1;
348 		}
349 	}
350 
351 	/* If we had early data, and we're done with the handshake
352 	 * then we know the data are safe, and we can remove the flag.
353 	 */
354 	if ((conn->flags & (CO_FL_EARLY_DATA | CO_FL_EARLY_SSL_HS | CO_FL_WAIT_XPRT)) ==
355 	    CO_FL_EARLY_DATA)
356 		conn->flags &= ~CO_FL_EARLY_DATA;
357 
358 	TRACE_LEAVE(PT_EV_CONN_WAKE, ctx->conn);
359 	return ret;
360 }
361 
362 /*
363  * Attach a new stream to a connection
364  * (Used for outgoing connections)
365  */
mux_pt_attach(struct connection * conn,struct session * sess)366 static struct conn_stream *mux_pt_attach(struct connection *conn, struct session *sess)
367 {
368 	struct conn_stream *cs;
369 	struct mux_pt_ctx *ctx = conn->ctx;
370 
371 	TRACE_ENTER(PT_EV_STRM_NEW, conn);
372 	if (ctx->wait_event.events)
373 		conn->xprt->unsubscribe(ctx->conn, conn->xprt_ctx, SUB_RETRY_RECV, &ctx->wait_event);
374 	cs = cs_new(conn, conn->target);
375 	if (!cs) {
376 		TRACE_ERROR("CS allocation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn);
377 		goto fail;
378 	}
379 
380 	ctx->cs = cs;
381 	cs->flags |= CS_FL_RCV_MORE;
382 
383 	TRACE_LEAVE(PT_EV_STRM_NEW, conn, cs);
384 	return (cs);
385 fail:
386 	TRACE_DEVEL("leaving on error", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn);
387 	return NULL;
388 }
389 
390 /* Retrieves a valid conn_stream from this connection, or returns NULL. For
391  * this mux, it's easy as we can only store a single conn_stream.
392  */
mux_pt_get_first_cs(const struct connection * conn)393 static const struct conn_stream *mux_pt_get_first_cs(const struct connection *conn)
394 {
395 	struct mux_pt_ctx *ctx = conn->ctx;
396 	struct conn_stream *cs = ctx->cs;
397 
398 	return cs;
399 }
400 
401 /* Destroy the mux and the associated connection if still attached to this mux
402  * and no longer used */
mux_pt_destroy_meth(void * ctx)403 static void mux_pt_destroy_meth(void *ctx)
404 {
405 	struct mux_pt_ctx *pt = ctx;
406 
407 	TRACE_POINT(PT_EV_CONN_END, pt->conn, pt->cs);
408 	if (!(pt->cs) || !(pt->conn) || pt->conn->ctx != pt)
409 		mux_pt_destroy(pt);
410 }
411 
412 /*
413  * Detach the stream from the connection and possibly release the connection.
414  */
mux_pt_detach(struct conn_stream * cs)415 static void mux_pt_detach(struct conn_stream *cs)
416 {
417 	struct connection *conn = cs->conn;
418 	struct mux_pt_ctx *ctx = cs->conn->ctx;
419 
420 	TRACE_ENTER(PT_EV_STRM_END, conn, cs);
421 
422 	/* Subscribe, to know if we got disconnected */
423 	if (conn->owner != NULL &&
424 	    !(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_SOCK_WR_SH))) {
425 		ctx->cs = NULL;
426 		conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_RECV, &ctx->wait_event);
427 	} else {
428 		/* There's no session attached to that connection, destroy it */
429 		TRACE_DEVEL("killing dead connection", PT_EV_STRM_END, conn, cs);
430 		mux_pt_destroy(ctx);
431 	}
432 
433 	TRACE_LEAVE(PT_EV_STRM_END);
434 }
435 
436 /* returns the number of streams in use on a connection */
mux_pt_used_streams(struct connection * conn)437 static int mux_pt_used_streams(struct connection *conn)
438 {
439 	struct mux_pt_ctx *ctx = conn->ctx;
440 
441 	return ctx->cs ? 1 : 0;
442 }
443 
444 /* returns the number of streams still available on a connection */
mux_pt_avail_streams(struct connection * conn)445 static int mux_pt_avail_streams(struct connection *conn)
446 {
447 	return 1 - mux_pt_used_streams(conn);
448 }
449 
mux_pt_shutr(struct conn_stream * cs,enum cs_shr_mode mode)450 static void mux_pt_shutr(struct conn_stream *cs, enum cs_shr_mode mode)
451 {
452 	TRACE_ENTER(PT_EV_STRM_SHUT, cs->conn, cs);
453 
454 	if (cs->flags & CS_FL_SHR)
455 		return;
456 	cs->flags &= ~(CS_FL_RCV_MORE | CS_FL_WANT_ROOM);
457 	if (conn_xprt_ready(cs->conn) && cs->conn->xprt->shutr)
458 		cs->conn->xprt->shutr(cs->conn, cs->conn->xprt_ctx,
459 		    (mode == CS_SHR_DRAIN));
460 	else if (mode == CS_SHR_DRAIN)
461 		conn_ctrl_drain(cs->conn);
462 	if (cs->flags & CS_FL_SHW)
463 		conn_full_close(cs->conn);
464 
465 	TRACE_LEAVE(PT_EV_STRM_SHUT, cs->conn, cs);
466 }
467 
mux_pt_shutw(struct conn_stream * cs,enum cs_shw_mode mode)468 static void mux_pt_shutw(struct conn_stream *cs, enum cs_shw_mode mode)
469 {
470 	TRACE_ENTER(PT_EV_STRM_SHUT, cs->conn, cs);
471 
472 	if (cs->flags & CS_FL_SHW)
473 		return;
474 	if (conn_xprt_ready(cs->conn) && cs->conn->xprt->shutw)
475 		cs->conn->xprt->shutw(cs->conn, cs->conn->xprt_ctx,
476 		    (mode == CS_SHW_NORMAL));
477 	if (!(cs->flags & CS_FL_SHR))
478 		conn_sock_shutw(cs->conn, (mode == CS_SHW_NORMAL));
479 	else
480 		conn_full_close(cs->conn);
481 
482 	TRACE_LEAVE(PT_EV_STRM_SHUT, cs->conn, cs);
483 }
484 
485 /*
486  * Called from the upper layer, to get more data
487  *
488  * The caller is responsible for defragmenting <buf> if necessary. But <flags>
489  * must be tested to know the calling context. If CO_RFL_BUF_FLUSH is set, it
490  * means the caller wants to flush input data (from the mux buffer and the
491  * channel buffer) to be able to use kernel splicing or any kind of mux-to-mux
492  * xfer. If CO_RFL_KEEP_RECV is set, the mux must always subscribe for read
493  * events before giving back. CO_RFL_BUF_WET is set if <buf> is congested with
494  * data scheduled for leaving soon. CO_RFL_BUF_NOT_STUCK is set to instruct the
495  * mux it may optimize the data copy to <buf> if necessary. Otherwise, it should
496  * copy as much data as possible.
497  */
mux_pt_rcv_buf(struct conn_stream * cs,struct buffer * buf,size_t count,int flags)498 static size_t mux_pt_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags)
499 {
500 	size_t ret = 0;
501 
502 	TRACE_ENTER(PT_EV_RX_DATA, cs->conn, cs, buf, (size_t[]){count});
503 
504 	if (!count) {
505 		cs->flags |= (CS_FL_RCV_MORE | CS_FL_WANT_ROOM);
506 		goto end;
507 	}
508 	b_realign_if_empty(buf);
509 	ret = cs->conn->xprt->rcv_buf(cs->conn, cs->conn->xprt_ctx, buf, count, flags);
510 	if (conn_xprt_read0_pending(cs->conn)) {
511 		cs->flags &= ~(CS_FL_RCV_MORE | CS_FL_WANT_ROOM);
512 		cs->flags |= CS_FL_EOS;
513 		TRACE_DEVEL("read0 on connection", PT_EV_RX_DATA, cs->conn, cs);
514 	}
515 	if (cs->conn->flags & CO_FL_ERROR) {
516 		cs->flags &= ~(CS_FL_RCV_MORE | CS_FL_WANT_ROOM);
517 		cs->flags |= CS_FL_ERROR;
518 		TRACE_DEVEL("error on connection", PT_EV_RX_DATA|PT_EV_CONN_ERR, cs->conn, cs);
519 	}
520   end:
521 	TRACE_LEAVE(PT_EV_RX_DATA, cs->conn, cs, buf, (size_t[]){ret});
522 	return ret;
523 }
524 
525 /* Called from the upper layer, to send data */
mux_pt_snd_buf(struct conn_stream * cs,struct buffer * buf,size_t count,int flags)526 static size_t mux_pt_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags)
527 {
528 	size_t ret;
529 
530 	TRACE_ENTER(PT_EV_TX_DATA, cs->conn, cs, buf, (size_t[]){count});
531 
532 	ret = cs->conn->xprt->snd_buf(cs->conn, cs->conn->xprt_ctx, buf, count, flags);
533 
534 	if (ret > 0)
535 		b_del(buf, ret);
536 
537 	TRACE_LEAVE(PT_EV_TX_DATA, cs->conn, cs, buf, (size_t[]){ret});
538 	return ret;
539 }
540 
541 /* Called from the upper layer, to subscribe <es> to events <event_type>. The
542  * event subscriber <es> is not allowed to change from a previous call as long
543  * as at least one event is still subscribed. The <event_type> must only be a
544  * combination of SUB_RETRY_RECV and SUB_RETRY_SEND. It always returns 0.
545  */
mux_pt_subscribe(struct conn_stream * cs,int event_type,struct wait_event * es)546 static int mux_pt_subscribe(struct conn_stream *cs, int event_type, struct wait_event *es)
547 {
548 	TRACE_POINT(PT_EV_RX_DATA|PT_EV_TX_DATA, cs->conn, cs, 0, (size_t[]){event_type});
549 	return cs->conn->xprt->subscribe(cs->conn, cs->conn->xprt_ctx, event_type, es);
550 }
551 
552 /* Called from the upper layer, to unsubscribe <es> from events <event_type>.
553  * The <es> pointer is not allowed to differ from the one passed to the
554  * subscribe() call. It always returns zero.
555  */
mux_pt_unsubscribe(struct conn_stream * cs,int event_type,struct wait_event * es)556 static int mux_pt_unsubscribe(struct conn_stream *cs, int event_type, struct wait_event *es)
557 {
558 	TRACE_POINT(PT_EV_RX_DATA|PT_EV_TX_DATA, cs->conn, cs, 0, (size_t[]){event_type});
559 	return cs->conn->xprt->unsubscribe(cs->conn, cs->conn->xprt_ctx, event_type, es);
560 }
561 
562 #if defined(USE_LINUX_SPLICE)
563 /* Send and get, using splicing */
mux_pt_rcv_pipe(struct conn_stream * cs,struct pipe * pipe,unsigned int count)564 static int mux_pt_rcv_pipe(struct conn_stream *cs, struct pipe *pipe, unsigned int count)
565 {
566 	int ret;
567 
568 	TRACE_ENTER(PT_EV_RX_DATA, cs->conn, cs, 0, (size_t[]){count});
569 
570 	ret = cs->conn->xprt->rcv_pipe(cs->conn, cs->conn->xprt_ctx, pipe, count);
571 	if (conn_xprt_read0_pending(cs->conn))  {
572 		cs->flags |= CS_FL_EOS;
573 		TRACE_DEVEL("read0 on connection", PT_EV_RX_DATA, cs->conn, cs);
574 	}
575 	if (cs->conn->flags & CO_FL_ERROR) {
576 		cs->flags |= CS_FL_ERROR;
577 		TRACE_DEVEL("error on connection", PT_EV_RX_DATA|PT_EV_CONN_ERR, cs->conn, cs);
578 	}
579 
580 	TRACE_LEAVE(PT_EV_RX_DATA, cs->conn, cs, 0, (size_t[]){ret});
581 	return (ret);
582 }
583 
mux_pt_snd_pipe(struct conn_stream * cs,struct pipe * pipe)584 static int mux_pt_snd_pipe(struct conn_stream *cs, struct pipe *pipe)
585 {
586 	int ret;
587 
588 	TRACE_ENTER(PT_EV_TX_DATA, cs->conn, cs, 0, (size_t[]){pipe->data});
589 
590 	ret = cs->conn->xprt->snd_pipe(cs->conn, cs->conn->xprt_ctx, pipe);
591 
592 	TRACE_LEAVE(PT_EV_TX_DATA, cs->conn, cs, 0, (size_t[]){ret});
593 	return ret;
594 }
595 #endif
596 
mux_pt_ctl(struct connection * conn,enum mux_ctl_type mux_ctl,void * output)597 static int mux_pt_ctl(struct connection *conn, enum mux_ctl_type mux_ctl, void *output)
598 {
599 	int ret = 0;
600 	switch (mux_ctl) {
601 	case MUX_STATUS:
602 		if (!(conn->flags & CO_FL_WAIT_XPRT))
603 			ret |= MUX_STATUS_READY;
604 		return ret;
605 	case MUX_EXIT_STATUS:
606 		return MUX_ES_UNKNOWN;
607 	default:
608 		return -1;
609 	}
610 }
611 
612 /* The mux operations */
613 const struct mux_ops mux_tcp_ops = {
614 	.init = mux_pt_init,
615 	.wake = mux_pt_wake,
616 	.rcv_buf = mux_pt_rcv_buf,
617 	.snd_buf = mux_pt_snd_buf,
618 	.subscribe = mux_pt_subscribe,
619 	.unsubscribe = mux_pt_unsubscribe,
620 #if defined(USE_LINUX_SPLICE)
621 	.rcv_pipe = mux_pt_rcv_pipe,
622 	.snd_pipe = mux_pt_snd_pipe,
623 #endif
624 	.attach = mux_pt_attach,
625 	.get_first_cs = mux_pt_get_first_cs,
626 	.detach = mux_pt_detach,
627 	.avail_streams = mux_pt_avail_streams,
628 	.used_streams = mux_pt_used_streams,
629 	.destroy = mux_pt_destroy_meth,
630 	.ctl = mux_pt_ctl,
631 	.shutr = mux_pt_shutr,
632 	.shutw = mux_pt_shutw,
633 	.flags = MX_FL_NONE,
634 	.name = "PASS",
635 };
636 
637 
638 const struct mux_ops mux_pt_ops = {
639 	.init = mux_pt_init,
640 	.wake = mux_pt_wake,
641 	.rcv_buf = mux_pt_rcv_buf,
642 	.snd_buf = mux_pt_snd_buf,
643 	.subscribe = mux_pt_subscribe,
644 	.unsubscribe = mux_pt_unsubscribe,
645 #if defined(USE_LINUX_SPLICE)
646 	.rcv_pipe = mux_pt_rcv_pipe,
647 	.snd_pipe = mux_pt_snd_pipe,
648 #endif
649 	.attach = mux_pt_attach,
650 	.get_first_cs = mux_pt_get_first_cs,
651 	.detach = mux_pt_detach,
652 	.avail_streams = mux_pt_avail_streams,
653 	.used_streams = mux_pt_used_streams,
654 	.destroy = mux_pt_destroy_meth,
655 	.ctl = mux_pt_ctl,
656 	.shutr = mux_pt_shutr,
657 	.shutw = mux_pt_shutw,
658 	.flags = MX_FL_NONE|MX_FL_NO_UPG,
659 	.name = "PASS",
660 };
661 
662 /* PROT selection : default mux has empty name */
663 static struct mux_proto_list mux_proto_none =
664 	{ .token = IST("none"), .mode = PROTO_MODE_TCP, .side = PROTO_SIDE_BOTH, .mux = &mux_pt_ops };
665 static struct mux_proto_list mux_proto_tcp =
666 	{ .token = IST(""), .mode = PROTO_MODE_TCP, .side = PROTO_SIDE_BOTH, .mux = &mux_tcp_ops };
667 
668 INITCALL1(STG_REGISTER, register_mux_proto, &mux_proto_none);
669 INITCALL1(STG_REGISTER, register_mux_proto, &mux_proto_tcp);
670