1 /*
2 * Pass-through mux-demux for connections
3 *
4 * Copyright 2017 Willy Tarreau <w@1wt.eu>
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 *
11 */
12
13 #include <haproxy/api.h>
14 #include <haproxy/buf.h>
15 #include <haproxy/connection.h>
16 #include <haproxy/pipe-t.h>
17 #include <haproxy/stream.h>
18 #include <haproxy/task.h>
19 #include <haproxy/trace.h>
20
21 struct mux_pt_ctx {
22 struct conn_stream *cs;
23 struct connection *conn;
24 struct wait_event wait_event;
25 };
26
27 DECLARE_STATIC_POOL(pool_head_pt_ctx, "mux_pt", sizeof(struct mux_pt_ctx));
28
29 /* trace source and events */
30 static void pt_trace(enum trace_level level, uint64_t mask,
31 const struct trace_source *src,
32 const struct ist where, const struct ist func,
33 const void *a1, const void *a2, const void *a3, const void *a4);
34
35 /* The event representation is split like this :
36 * pt_ctx - internal PT context
37 * strm - application layer
38 */
39 static const struct trace_event pt_trace_events[] = {
40 #define PT_EV_CONN_NEW (1ULL << 0)
41 { .mask = PT_EV_CONN_NEW, .name = "pt_conn_new", .desc = "new PT connection" },
42 #define PT_EV_CONN_WAKE (1ULL << 1)
43 { .mask = PT_EV_CONN_WAKE, .name = "pt_conn_wake", .desc = "PT connection woken up" },
44 #define PT_EV_CONN_END (1ULL << 2)
45 { .mask = PT_EV_CONN_END, .name = "pt_conn_end", .desc = "PT connection terminated" },
46 #define PT_EV_CONN_ERR (1ULL << 3)
47 { .mask = PT_EV_CONN_ERR, .name = "pt_conn_err", .desc = "error on PT connection" },
48 #define PT_EV_STRM_NEW (1ULL << 4)
49 { .mask = PT_EV_STRM_NEW, .name = "strm_new", .desc = "app-layer stream creation" },
50 #define PT_EV_STRM_SHUT (1ULL << 5)
51 { .mask = PT_EV_STRM_SHUT, .name = "strm_shut", .desc = "stream shutdown" },
52 #define PT_EV_STRM_END (1ULL << 6)
53 { .mask = PT_EV_STRM_END, .name = "strm_end", .desc = "detaching app-layer stream" },
54 #define PT_EV_STRM_ERR (1ULL << 7)
55 { .mask = PT_EV_STRM_ERR, .name = "strm_err", .desc = "stream error" },
56 #define PT_EV_RX_DATA (1ULL << 8)
57 { .mask = PT_EV_RX_DATA, .name = "pt_rx_data", .desc = "Rx on PT connection" },
58 #define PT_EV_TX_DATA (1ULL << 9)
59 { .mask = PT_EV_TX_DATA, .name = "pt_tx_data", .desc = "Tx on PT connection" },
60
61 {}
62 };
63
64
65 static const struct name_desc pt_trace_decoding[] = {
66 #define PT_VERB_CLEAN 1
67 { .name="clean", .desc="only user-friendly stuff, generally suitable for level \"user\"" },
68 #define PT_VERB_MINIMAL 2
69 { .name="minimal", .desc="report only h1c/h1s state and flags, no real decoding" },
70 #define PT_VERB_SIMPLE 3
71 { .name="simple", .desc="add request/response status line or htx info when available" },
72 #define PT_VERB_ADVANCED 4
73 { .name="advanced", .desc="add header fields or frame decoding when available" },
74 #define PT_VERB_COMPLETE 5
75 { .name="complete", .desc="add full data dump when available" },
76 { /* end */ }
77 };
78
79 static struct trace_source trace_pt __read_mostly = {
80 .name = IST("pt"),
81 .desc = "Passthrough multiplexer",
82 .arg_def = TRC_ARG1_CONN, // TRACE()'s first argument is always a connection
83 .default_cb = pt_trace,
84 .known_events = pt_trace_events,
85 .lockon_args = NULL,
86 .decoding = pt_trace_decoding,
87 .report_events = ~0, // report everything by default
88 };
89
90 #define TRACE_SOURCE &trace_pt
91 INITCALL1(STG_REGISTER, trace_register_source, TRACE_SOURCE);
92
pt_trace_buf(const struct buffer * buf,size_t ofs,size_t len)93 static inline void pt_trace_buf(const struct buffer *buf, size_t ofs, size_t len)
94 {
95 size_t block1, block2;
96 int line, ptr, newptr;
97
98 block1 = b_contig_data(buf, ofs);
99 block2 = 0;
100 if (block1 > len)
101 block1 = len;
102 block2 = len - block1;
103
104 ofs = b_peek_ofs(buf, ofs);
105
106 line = 0;
107 ptr = ofs;
108 while (ptr < ofs + block1) {
109 newptr = dump_text_line(&trace_buf, b_orig(buf), b_size(buf), ofs + block1, &line, ptr);
110 if (newptr == ptr)
111 break;
112 ptr = newptr;
113 }
114
115 line = ptr = 0;
116 while (ptr < block2) {
117 newptr = dump_text_line(&trace_buf, b_orig(buf), b_size(buf), block2, &line, ptr);
118 if (newptr == ptr)
119 break;
120 ptr = newptr;
121 }
122 }
123
124 /* the PT traces always expect that arg1, if non-null, is of type connection
125 * (from which we can derive the pt context), that arg2, if non-null, is a
126 * conn-stream, and that arg3, if non-null, is a buffer.
127 */
pt_trace(enum trace_level level,uint64_t mask,const struct trace_source * src,const struct ist where,const struct ist func,const void * a1,const void * a2,const void * a3,const void * a4)128 static void pt_trace(enum trace_level level, uint64_t mask, const struct trace_source *src,
129 const struct ist where, const struct ist func,
130 const void *a1, const void *a2, const void *a3, const void *a4)
131 {
132 const struct connection *conn = a1;
133 const struct mux_pt_ctx *ctx = conn ? conn->ctx : NULL;
134 const struct conn_stream *cs = a2;
135 const struct buffer *buf = a3;
136 const size_t *val = a4;
137
138 if (!ctx|| src->verbosity < PT_VERB_CLEAN)
139 return;
140
141 /* Display frontend/backend info by default */
142 chunk_appendf(&trace_buf, " : [%c]", (conn_is_back(conn) ? 'B' : 'F'));
143
144 if (src->verbosity == PT_VERB_CLEAN)
145 return;
146
147 /* Display the value to the 4th argument (level > STATE) */
148 if (src->level > TRACE_LEVEL_STATE && val)
149 chunk_appendf(&trace_buf, " - VAL=%lu", (long)*val);
150
151 /* Display conn and cs info, if defined (pointer + flags) */
152 chunk_appendf(&trace_buf, " - conn=%p(0x%08x)", conn, conn->flags);
153 if (cs)
154 chunk_appendf(&trace_buf, " cs=%p(0x%08x)", cs, cs->flags);
155
156 if (src->verbosity == PT_VERB_MINIMAL)
157 return;
158
159 /* Display buffer info, if defined (level > USER & verbosity > SIMPLE) */
160 if (src->level > TRACE_LEVEL_USER && buf) {
161 int full = 0, max = 3000, chunk = 1024;
162
163 /* Full info (level > STATE && verbosity > SIMPLE) */
164 if (src->level > TRACE_LEVEL_STATE) {
165 if (src->verbosity == PT_VERB_COMPLETE)
166 full = 1;
167 else if (src->verbosity == PT_VERB_ADVANCED) {
168 full = 1;
169 max = 256;
170 chunk = 64;
171 }
172 }
173
174 chunk_appendf(&trace_buf, " buf=%u@%p+%u/%u",
175 (unsigned int)b_data(buf), b_orig(buf),
176 (unsigned int)b_head_ofs(buf), (unsigned int)b_size(buf));
177
178 if (b_data(buf) && full) {
179 chunk_memcat(&trace_buf, "\n", 1);
180 if (b_data(buf) < max)
181 pt_trace_buf(buf, 0, b_data(buf));
182 else {
183 pt_trace_buf(buf, 0, chunk);
184 chunk_memcat(&trace_buf, " ...\n", 6);
185 pt_trace_buf(buf, b_data(buf) - chunk, chunk);
186 }
187 }
188 }
189 }
190
mux_pt_destroy(struct mux_pt_ctx * ctx)191 static void mux_pt_destroy(struct mux_pt_ctx *ctx)
192 {
193 struct connection *conn = NULL;
194
195 TRACE_POINT(PT_EV_CONN_END);
196
197 if (ctx) {
198 /* The connection must be attached to this mux to be released */
199 if (ctx->conn && ctx->conn->ctx == ctx)
200 conn = ctx->conn;
201
202 TRACE_DEVEL("freeing pt context", PT_EV_CONN_END, conn);
203
204 tasklet_free(ctx->wait_event.tasklet);
205
206 if (conn && ctx->wait_event.events != 0)
207 conn->xprt->unsubscribe(conn, conn->xprt_ctx, ctx->wait_event.events,
208 &ctx->wait_event);
209 pool_free(pool_head_pt_ctx, ctx);
210 }
211
212 if (conn) {
213 conn->mux = NULL;
214 conn->ctx = NULL;
215 TRACE_DEVEL("freeing conn", PT_EV_CONN_END, conn);
216
217 conn_stop_tracking(conn);
218 conn_full_close(conn);
219 if (conn->destroy_cb)
220 conn->destroy_cb(conn);
221 conn_free(conn);
222 }
223 }
224
225 /* Callback, used when we get I/Os while in idle mode. This one is exported so
226 * that "show fd" can resolve it.
227 */
mux_pt_io_cb(struct task * t,void * tctx,unsigned int status)228 struct task *mux_pt_io_cb(struct task *t, void *tctx, unsigned int status)
229 {
230 struct mux_pt_ctx *ctx = tctx;
231
232 TRACE_ENTER(PT_EV_CONN_WAKE, ctx->conn, ctx->cs);
233 if (ctx->cs) {
234 /* There's a small race condition.
235 * mux_pt_io_cb() is only supposed to be called if we have no
236 * stream attached. However, maybe the tasklet got woken up,
237 * and this connection was then attached to a new stream.
238 * If this happened, just wake the tasklet up if anybody
239 * subscribed to receive events, and otherwise call the wake
240 * method, to make sure the event is noticed.
241 */
242 if (ctx->conn->subs) {
243 ctx->conn->subs->events = 0;
244 tasklet_wakeup(ctx->conn->subs->tasklet);
245 ctx->conn->subs = NULL;
246 } else if (ctx->cs->data_cb->wake)
247 ctx->cs->data_cb->wake(ctx->cs);
248 TRACE_DEVEL("leaving waking up CS", PT_EV_CONN_WAKE, ctx->conn, ctx->cs);
249 return t;
250 }
251 conn_ctrl_drain(ctx->conn);
252 if (ctx->conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_SOCK_WR_SH)) {
253 TRACE_DEVEL("leaving destroying pt context", PT_EV_CONN_WAKE, ctx->conn);
254 mux_pt_destroy(ctx);
255 t = NULL;
256 }
257 else {
258 ctx->conn->xprt->subscribe(ctx->conn, ctx->conn->xprt_ctx, SUB_RETRY_RECV,
259 &ctx->wait_event);
260 TRACE_DEVEL("leaving subscribing for reads", PT_EV_CONN_WAKE, ctx->conn);
261 }
262
263 return t;
264 }
265
266 /* Initialize the mux once it's attached. It is expected that conn->ctx
267 * points to the existing conn_stream (for outgoing connections) or NULL (for
268 * incoming ones, in which case one will be allocated and a new stream will be
269 * instantiated). Returns < 0 on error.
270 */
mux_pt_init(struct connection * conn,struct proxy * prx,struct session * sess,struct buffer * input)271 static int mux_pt_init(struct connection *conn, struct proxy *prx, struct session *sess,
272 struct buffer *input)
273 {
274 struct conn_stream *cs = conn->ctx;
275 struct mux_pt_ctx *ctx = pool_alloc(pool_head_pt_ctx);
276
277 TRACE_ENTER(PT_EV_CONN_NEW);
278
279 if (!ctx) {
280 TRACE_ERROR("PT context allocation failure", PT_EV_CONN_NEW|PT_EV_CONN_END|PT_EV_CONN_ERR);
281 goto fail;
282 }
283
284 ctx->wait_event.tasklet = tasklet_new();
285 if (!ctx->wait_event.tasklet)
286 goto fail_free_ctx;
287 ctx->wait_event.tasklet->context = ctx;
288 ctx->wait_event.tasklet->process = mux_pt_io_cb;
289 ctx->wait_event.events = 0;
290 ctx->conn = conn;
291
292 if (!cs) {
293 cs = cs_new(conn, conn->target);
294 if (!cs) {
295 TRACE_ERROR("CS allocation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn);
296 goto fail_free_ctx;
297 }
298
299 if (stream_create_from_cs(cs, &BUF_NULL) < 0) {
300 TRACE_ERROR("stream creation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn, cs);
301 goto fail_free;
302 }
303 TRACE_POINT(PT_EV_STRM_NEW, conn, cs);
304 }
305 conn->ctx = ctx;
306 ctx->cs = cs;
307 cs->flags |= CS_FL_RCV_MORE;
308 if (global.tune.options & GTUNE_USE_SPLICE)
309 cs->flags |= CS_FL_MAY_SPLICE;
310
311 TRACE_LEAVE(PT_EV_CONN_NEW, conn, cs);
312 return 0;
313
314 fail_free:
315 cs_free(cs);
316 fail_free_ctx:
317 if (ctx->wait_event.tasklet)
318 tasklet_free(ctx->wait_event.tasklet);
319 pool_free(pool_head_pt_ctx, ctx);
320 fail:
321 TRACE_DEVEL("leaving in error", PT_EV_CONN_NEW|PT_EV_CONN_END|PT_EV_CONN_ERR);
322 return -1;
323 }
324
325 /* callback to be used by default for the pass-through mux. It calls the data
326 * layer wake() callback if it is set otherwise returns 0.
327 */
mux_pt_wake(struct connection * conn)328 static int mux_pt_wake(struct connection *conn)
329 {
330 struct mux_pt_ctx *ctx = conn->ctx;
331 struct conn_stream *cs = ctx->cs;
332 int ret = 0;
333
334 TRACE_ENTER(PT_EV_CONN_WAKE, ctx->conn, cs);
335 if (cs) {
336 ret = cs->data_cb->wake ? cs->data_cb->wake(cs) : 0;
337
338 if (ret < 0) {
339 TRACE_DEVEL("leaving waking up CS", PT_EV_CONN_WAKE, ctx->conn, cs);
340 return ret;
341 }
342 } else {
343 conn_ctrl_drain(conn);
344 if (conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH)) {
345 TRACE_DEVEL("leaving destroying PT context", PT_EV_CONN_WAKE, ctx->conn);
346 mux_pt_destroy(ctx);
347 return -1;
348 }
349 }
350
351 /* If we had early data, and we're done with the handshake
352 * then we know the data are safe, and we can remove the flag.
353 */
354 if ((conn->flags & (CO_FL_EARLY_DATA | CO_FL_EARLY_SSL_HS | CO_FL_WAIT_XPRT)) ==
355 CO_FL_EARLY_DATA)
356 conn->flags &= ~CO_FL_EARLY_DATA;
357
358 TRACE_LEAVE(PT_EV_CONN_WAKE, ctx->conn);
359 return ret;
360 }
361
362 /*
363 * Attach a new stream to a connection
364 * (Used for outgoing connections)
365 */
mux_pt_attach(struct connection * conn,struct session * sess)366 static struct conn_stream *mux_pt_attach(struct connection *conn, struct session *sess)
367 {
368 struct conn_stream *cs;
369 struct mux_pt_ctx *ctx = conn->ctx;
370
371 TRACE_ENTER(PT_EV_STRM_NEW, conn);
372 if (ctx->wait_event.events)
373 conn->xprt->unsubscribe(ctx->conn, conn->xprt_ctx, SUB_RETRY_RECV, &ctx->wait_event);
374 cs = cs_new(conn, conn->target);
375 if (!cs) {
376 TRACE_ERROR("CS allocation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn);
377 goto fail;
378 }
379
380 ctx->cs = cs;
381 cs->flags |= CS_FL_RCV_MORE;
382
383 TRACE_LEAVE(PT_EV_STRM_NEW, conn, cs);
384 return (cs);
385 fail:
386 TRACE_DEVEL("leaving on error", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn);
387 return NULL;
388 }
389
390 /* Retrieves a valid conn_stream from this connection, or returns NULL. For
391 * this mux, it's easy as we can only store a single conn_stream.
392 */
mux_pt_get_first_cs(const struct connection * conn)393 static const struct conn_stream *mux_pt_get_first_cs(const struct connection *conn)
394 {
395 struct mux_pt_ctx *ctx = conn->ctx;
396 struct conn_stream *cs = ctx->cs;
397
398 return cs;
399 }
400
401 /* Destroy the mux and the associated connection if still attached to this mux
402 * and no longer used */
mux_pt_destroy_meth(void * ctx)403 static void mux_pt_destroy_meth(void *ctx)
404 {
405 struct mux_pt_ctx *pt = ctx;
406
407 TRACE_POINT(PT_EV_CONN_END, pt->conn, pt->cs);
408 if (!(pt->cs) || !(pt->conn) || pt->conn->ctx != pt)
409 mux_pt_destroy(pt);
410 }
411
412 /*
413 * Detach the stream from the connection and possibly release the connection.
414 */
mux_pt_detach(struct conn_stream * cs)415 static void mux_pt_detach(struct conn_stream *cs)
416 {
417 struct connection *conn = cs->conn;
418 struct mux_pt_ctx *ctx = cs->conn->ctx;
419
420 TRACE_ENTER(PT_EV_STRM_END, conn, cs);
421
422 /* Subscribe, to know if we got disconnected */
423 if (conn->owner != NULL &&
424 !(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_SOCK_WR_SH))) {
425 ctx->cs = NULL;
426 conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_RECV, &ctx->wait_event);
427 } else {
428 /* There's no session attached to that connection, destroy it */
429 TRACE_DEVEL("killing dead connection", PT_EV_STRM_END, conn, cs);
430 mux_pt_destroy(ctx);
431 }
432
433 TRACE_LEAVE(PT_EV_STRM_END);
434 }
435
436 /* returns the number of streams in use on a connection */
mux_pt_used_streams(struct connection * conn)437 static int mux_pt_used_streams(struct connection *conn)
438 {
439 struct mux_pt_ctx *ctx = conn->ctx;
440
441 return ctx->cs ? 1 : 0;
442 }
443
444 /* returns the number of streams still available on a connection */
mux_pt_avail_streams(struct connection * conn)445 static int mux_pt_avail_streams(struct connection *conn)
446 {
447 return 1 - mux_pt_used_streams(conn);
448 }
449
mux_pt_shutr(struct conn_stream * cs,enum cs_shr_mode mode)450 static void mux_pt_shutr(struct conn_stream *cs, enum cs_shr_mode mode)
451 {
452 TRACE_ENTER(PT_EV_STRM_SHUT, cs->conn, cs);
453
454 if (cs->flags & CS_FL_SHR)
455 return;
456 cs->flags &= ~(CS_FL_RCV_MORE | CS_FL_WANT_ROOM);
457 if (conn_xprt_ready(cs->conn) && cs->conn->xprt->shutr)
458 cs->conn->xprt->shutr(cs->conn, cs->conn->xprt_ctx,
459 (mode == CS_SHR_DRAIN));
460 else if (mode == CS_SHR_DRAIN)
461 conn_ctrl_drain(cs->conn);
462 if (cs->flags & CS_FL_SHW)
463 conn_full_close(cs->conn);
464
465 TRACE_LEAVE(PT_EV_STRM_SHUT, cs->conn, cs);
466 }
467
mux_pt_shutw(struct conn_stream * cs,enum cs_shw_mode mode)468 static void mux_pt_shutw(struct conn_stream *cs, enum cs_shw_mode mode)
469 {
470 TRACE_ENTER(PT_EV_STRM_SHUT, cs->conn, cs);
471
472 if (cs->flags & CS_FL_SHW)
473 return;
474 if (conn_xprt_ready(cs->conn) && cs->conn->xprt->shutw)
475 cs->conn->xprt->shutw(cs->conn, cs->conn->xprt_ctx,
476 (mode == CS_SHW_NORMAL));
477 if (!(cs->flags & CS_FL_SHR))
478 conn_sock_shutw(cs->conn, (mode == CS_SHW_NORMAL));
479 else
480 conn_full_close(cs->conn);
481
482 TRACE_LEAVE(PT_EV_STRM_SHUT, cs->conn, cs);
483 }
484
485 /*
486 * Called from the upper layer, to get more data
487 *
488 * The caller is responsible for defragmenting <buf> if necessary. But <flags>
489 * must be tested to know the calling context. If CO_RFL_BUF_FLUSH is set, it
490 * means the caller wants to flush input data (from the mux buffer and the
491 * channel buffer) to be able to use kernel splicing or any kind of mux-to-mux
492 * xfer. If CO_RFL_KEEP_RECV is set, the mux must always subscribe for read
493 * events before giving back. CO_RFL_BUF_WET is set if <buf> is congested with
494 * data scheduled for leaving soon. CO_RFL_BUF_NOT_STUCK is set to instruct the
495 * mux it may optimize the data copy to <buf> if necessary. Otherwise, it should
496 * copy as much data as possible.
497 */
mux_pt_rcv_buf(struct conn_stream * cs,struct buffer * buf,size_t count,int flags)498 static size_t mux_pt_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags)
499 {
500 size_t ret = 0;
501
502 TRACE_ENTER(PT_EV_RX_DATA, cs->conn, cs, buf, (size_t[]){count});
503
504 if (!count) {
505 cs->flags |= (CS_FL_RCV_MORE | CS_FL_WANT_ROOM);
506 goto end;
507 }
508 b_realign_if_empty(buf);
509 ret = cs->conn->xprt->rcv_buf(cs->conn, cs->conn->xprt_ctx, buf, count, flags);
510 if (conn_xprt_read0_pending(cs->conn)) {
511 cs->flags &= ~(CS_FL_RCV_MORE | CS_FL_WANT_ROOM);
512 cs->flags |= CS_FL_EOS;
513 TRACE_DEVEL("read0 on connection", PT_EV_RX_DATA, cs->conn, cs);
514 }
515 if (cs->conn->flags & CO_FL_ERROR) {
516 cs->flags &= ~(CS_FL_RCV_MORE | CS_FL_WANT_ROOM);
517 cs->flags |= CS_FL_ERROR;
518 TRACE_DEVEL("error on connection", PT_EV_RX_DATA|PT_EV_CONN_ERR, cs->conn, cs);
519 }
520 end:
521 TRACE_LEAVE(PT_EV_RX_DATA, cs->conn, cs, buf, (size_t[]){ret});
522 return ret;
523 }
524
525 /* Called from the upper layer, to send data */
mux_pt_snd_buf(struct conn_stream * cs,struct buffer * buf,size_t count,int flags)526 static size_t mux_pt_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags)
527 {
528 size_t ret;
529
530 TRACE_ENTER(PT_EV_TX_DATA, cs->conn, cs, buf, (size_t[]){count});
531
532 ret = cs->conn->xprt->snd_buf(cs->conn, cs->conn->xprt_ctx, buf, count, flags);
533
534 if (ret > 0)
535 b_del(buf, ret);
536
537 TRACE_LEAVE(PT_EV_TX_DATA, cs->conn, cs, buf, (size_t[]){ret});
538 return ret;
539 }
540
541 /* Called from the upper layer, to subscribe <es> to events <event_type>. The
542 * event subscriber <es> is not allowed to change from a previous call as long
543 * as at least one event is still subscribed. The <event_type> must only be a
544 * combination of SUB_RETRY_RECV and SUB_RETRY_SEND. It always returns 0.
545 */
mux_pt_subscribe(struct conn_stream * cs,int event_type,struct wait_event * es)546 static int mux_pt_subscribe(struct conn_stream *cs, int event_type, struct wait_event *es)
547 {
548 TRACE_POINT(PT_EV_RX_DATA|PT_EV_TX_DATA, cs->conn, cs, 0, (size_t[]){event_type});
549 return cs->conn->xprt->subscribe(cs->conn, cs->conn->xprt_ctx, event_type, es);
550 }
551
552 /* Called from the upper layer, to unsubscribe <es> from events <event_type>.
553 * The <es> pointer is not allowed to differ from the one passed to the
554 * subscribe() call. It always returns zero.
555 */
mux_pt_unsubscribe(struct conn_stream * cs,int event_type,struct wait_event * es)556 static int mux_pt_unsubscribe(struct conn_stream *cs, int event_type, struct wait_event *es)
557 {
558 TRACE_POINT(PT_EV_RX_DATA|PT_EV_TX_DATA, cs->conn, cs, 0, (size_t[]){event_type});
559 return cs->conn->xprt->unsubscribe(cs->conn, cs->conn->xprt_ctx, event_type, es);
560 }
561
562 #if defined(USE_LINUX_SPLICE)
563 /* Send and get, using splicing */
mux_pt_rcv_pipe(struct conn_stream * cs,struct pipe * pipe,unsigned int count)564 static int mux_pt_rcv_pipe(struct conn_stream *cs, struct pipe *pipe, unsigned int count)
565 {
566 int ret;
567
568 TRACE_ENTER(PT_EV_RX_DATA, cs->conn, cs, 0, (size_t[]){count});
569
570 ret = cs->conn->xprt->rcv_pipe(cs->conn, cs->conn->xprt_ctx, pipe, count);
571 if (conn_xprt_read0_pending(cs->conn)) {
572 cs->flags |= CS_FL_EOS;
573 TRACE_DEVEL("read0 on connection", PT_EV_RX_DATA, cs->conn, cs);
574 }
575 if (cs->conn->flags & CO_FL_ERROR) {
576 cs->flags |= CS_FL_ERROR;
577 TRACE_DEVEL("error on connection", PT_EV_RX_DATA|PT_EV_CONN_ERR, cs->conn, cs);
578 }
579
580 TRACE_LEAVE(PT_EV_RX_DATA, cs->conn, cs, 0, (size_t[]){ret});
581 return (ret);
582 }
583
mux_pt_snd_pipe(struct conn_stream * cs,struct pipe * pipe)584 static int mux_pt_snd_pipe(struct conn_stream *cs, struct pipe *pipe)
585 {
586 int ret;
587
588 TRACE_ENTER(PT_EV_TX_DATA, cs->conn, cs, 0, (size_t[]){pipe->data});
589
590 ret = cs->conn->xprt->snd_pipe(cs->conn, cs->conn->xprt_ctx, pipe);
591
592 TRACE_LEAVE(PT_EV_TX_DATA, cs->conn, cs, 0, (size_t[]){ret});
593 return ret;
594 }
595 #endif
596
mux_pt_ctl(struct connection * conn,enum mux_ctl_type mux_ctl,void * output)597 static int mux_pt_ctl(struct connection *conn, enum mux_ctl_type mux_ctl, void *output)
598 {
599 int ret = 0;
600 switch (mux_ctl) {
601 case MUX_STATUS:
602 if (!(conn->flags & CO_FL_WAIT_XPRT))
603 ret |= MUX_STATUS_READY;
604 return ret;
605 case MUX_EXIT_STATUS:
606 return MUX_ES_UNKNOWN;
607 default:
608 return -1;
609 }
610 }
611
612 /* The mux operations */
613 const struct mux_ops mux_tcp_ops = {
614 .init = mux_pt_init,
615 .wake = mux_pt_wake,
616 .rcv_buf = mux_pt_rcv_buf,
617 .snd_buf = mux_pt_snd_buf,
618 .subscribe = mux_pt_subscribe,
619 .unsubscribe = mux_pt_unsubscribe,
620 #if defined(USE_LINUX_SPLICE)
621 .rcv_pipe = mux_pt_rcv_pipe,
622 .snd_pipe = mux_pt_snd_pipe,
623 #endif
624 .attach = mux_pt_attach,
625 .get_first_cs = mux_pt_get_first_cs,
626 .detach = mux_pt_detach,
627 .avail_streams = mux_pt_avail_streams,
628 .used_streams = mux_pt_used_streams,
629 .destroy = mux_pt_destroy_meth,
630 .ctl = mux_pt_ctl,
631 .shutr = mux_pt_shutr,
632 .shutw = mux_pt_shutw,
633 .flags = MX_FL_NONE,
634 .name = "PASS",
635 };
636
637
638 const struct mux_ops mux_pt_ops = {
639 .init = mux_pt_init,
640 .wake = mux_pt_wake,
641 .rcv_buf = mux_pt_rcv_buf,
642 .snd_buf = mux_pt_snd_buf,
643 .subscribe = mux_pt_subscribe,
644 .unsubscribe = mux_pt_unsubscribe,
645 #if defined(USE_LINUX_SPLICE)
646 .rcv_pipe = mux_pt_rcv_pipe,
647 .snd_pipe = mux_pt_snd_pipe,
648 #endif
649 .attach = mux_pt_attach,
650 .get_first_cs = mux_pt_get_first_cs,
651 .detach = mux_pt_detach,
652 .avail_streams = mux_pt_avail_streams,
653 .used_streams = mux_pt_used_streams,
654 .destroy = mux_pt_destroy_meth,
655 .ctl = mux_pt_ctl,
656 .shutr = mux_pt_shutr,
657 .shutw = mux_pt_shutw,
658 .flags = MX_FL_NONE|MX_FL_NO_UPG,
659 .name = "PASS",
660 };
661
662 /* PROT selection : default mux has empty name */
663 static struct mux_proto_list mux_proto_none =
664 { .token = IST("none"), .mode = PROTO_MODE_TCP, .side = PROTO_SIDE_BOTH, .mux = &mux_pt_ops };
665 static struct mux_proto_list mux_proto_tcp =
666 { .token = IST(""), .mode = PROTO_MODE_TCP, .side = PROTO_SIDE_BOTH, .mux = &mux_tcp_ops };
667
668 INITCALL1(STG_REGISTER, register_mux_proto, &mux_proto_none);
669 INITCALL1(STG_REGISTER, register_mux_proto, &mux_proto_tcp);
670