1 /*-
2  * Copyright (c) 2006 Verdens Gang AS
3  * Copyright (c) 2006-2015 Varnish Software AS
4  * All rights reserved.
5  *
6  * Author: Martin Blix Grydeland <martin@varnish-software.com>
7  *
8  * SPDX-License-Identifier: BSD-2-Clause
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  * 1. Redistributions of source code must retain the above copyright
14  *    notice, this list of conditions and the following disclaimer.
15  * 2. Redistributions in binary form must reproduce the above copyright
16  *    notice, this list of conditions and the following disclaimer in the
17  *    documentation and/or other materials provided with the distribution.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
20  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22  * ARE DISCLAIMED.  IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
23  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
25  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
26  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
28  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
29  * SUCH DAMAGE.
30  *
31  */
32 
33 #include "config.h"
34 
35 #include <stdarg.h>
36 #include <stdint.h>
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <string.h>
40 
41 #include "vdef.h"
42 #include "vas.h"
43 #include "miniobj.h"
44 
45 #include "vqueue.h"
46 #include "vre.h"
47 #include "vtim.h"
48 #include "vtree.h"
49 
50 #include "vapi/vsl.h"
51 
52 #include "vsl_api.h"
53 
54 #define VTX_CACHE 10
55 #define VTX_BUFSIZE_MIN 64
56 #define VTX_SHMCHUNKS 3
57 
58 static const char * const vsl_t_names[VSL_t__MAX] = {
59 	[VSL_t_unknown]	= "unknown",
60 	[VSL_t_sess]	= "sess",
61 	[VSL_t_req]	= "req",
62 	[VSL_t_bereq]	= "bereq",
63 	[VSL_t_raw]	= "raw",
64 };
65 
66 static const char * const vsl_r_names[VSL_r__MAX] = {
67 	[VSL_r_unknown]	= "unknown",
68 	[VSL_r_http_1]	= "HTTP/1",
69 	[VSL_r_rxreq]	= "rxreq",
70 	[VSL_r_esi]	= "esi",
71 	[VSL_r_restart]	= "restart",
72 	[VSL_r_pass]	= "pass",
73 	[VSL_r_fetch]	= "fetch",
74 	[VSL_r_bgfetch]	= "bgfetch",
75 	[VSL_r_pipe]	= "pipe",
76 };
77 
78 struct vtx;
79 
80 struct vslc_raw {
81 	unsigned		magic;
82 #define VSLC_RAW_MAGIC		0x247EBD44
83 
84 	struct VSL_cursor	cursor;
85 
86 	const uint32_t		*ptr;
87 };
88 
89 struct synth {
90 	unsigned		magic;
91 #define SYNTH_MAGIC		0xC654479F
92 
93 	VTAILQ_ENTRY(synth)	list;
94 	size_t			offset;
95 	uint32_t		data[2 + 64 / sizeof (uint32_t)];
96 };
97 VTAILQ_HEAD(synthhead, synth);
98 
99 enum chunk_t {
100 	chunk_t__unassigned,
101 	chunk_t_shm,
102 	chunk_t_buf,
103 };
104 
105 struct chunk {
106 	unsigned				magic;
107 #define CHUNK_MAGIC				0x48DC0194
108 	enum chunk_t				type;
109 	union {
110 		struct {
111 			struct VSLC_ptr		start;
112 			VTAILQ_ENTRY(chunk)	shmref;
113 		} shm;
114 		struct {
115 			uint32_t		*data;
116 			size_t			space;
117 		} buf;
118 	};
119 	size_t					len;
120 	struct vtx				*vtx;
121 	VTAILQ_ENTRY(chunk)			list;
122 };
123 VTAILQ_HEAD(chunkhead, chunk);
124 
125 struct vslc_vtx {
126 	unsigned		magic;
127 #define VSLC_VTX_MAGIC		0x74C6523F
128 
129 	struct VSL_cursor	cursor;
130 
131 	struct vtx		*vtx;
132 	struct synth		*synth;
133 	struct chunk		*chunk;
134 	size_t			chunkstart;
135 	size_t			offset;
136 };
137 
138 struct vtx_key {
139 	unsigned		vxid;
140 	VRBT_ENTRY(vtx_key)	entry;
141 };
142 VRBT_HEAD(vtx_tree, vtx_key);
143 
144 struct vtx {
145 	struct vtx_key		key;
146 	unsigned		magic;
147 #define VTX_MAGIC		0xACC21D09
148 	VTAILQ_ENTRY(vtx)	list_child;
149 	VTAILQ_ENTRY(vtx)	list_vtx;
150 
151 	double			t_start;
152 	unsigned		flags;
153 #define VTX_F_BEGIN		0x1 /* Begin record processed */
154 #define VTX_F_END		0x2 /* End record processed */
155 #define VTX_F_COMPLETE		0x4 /* Marked complete. No new children
156 				       should be appended */
157 #define VTX_F_READY		0x8 /* This vtx and all it's children are
158 				       complete */
159 
160 	enum VSL_transaction_e	type;
161 	enum VSL_reason_e	reason;
162 
163 	struct vtx		*parent;
164 	VTAILQ_HEAD(,vtx)	child;
165 	unsigned		n_child;
166 	unsigned		n_childready;
167 	unsigned		n_descend;
168 
169 	VTAILQ_HEAD(,synth)	synth;
170 
171 	struct chunk		shmchunks[VTX_SHMCHUNKS];
172 	struct chunkhead	shmchunks_free;
173 
174 	struct chunkhead	chunks;
175 	size_t			len;
176 
177 	struct vslc_vtx		c;
178 };
179 
180 struct VSLQ {
181 	unsigned		magic;
182 #define VSLQ_MAGIC		0x23A8BE97
183 
184 	struct VSL_data		*vsl;
185 	struct VSL_cursor	*c;
186 	struct vslq_query	*query;
187 
188 	enum VSL_grouping_e	grouping;
189 
190 	/* Structured mode */
191 	struct vtx_tree		tree;
192 	VTAILQ_HEAD(,vtx)	ready;
193 	VTAILQ_HEAD(,vtx)	incomplete;
194 	int			n_outstanding;
195 	struct chunkhead	shmrefs;
196 	VTAILQ_HEAD(,vtx)	cache;
197 	unsigned		n_cache;
198 
199 	/* Rate limiting */
200 	double			credits;
201 	vtim_mono		last_use;
202 
203 	/* Raw mode */
204 	struct {
205 		struct vslc_raw		c;
206 		struct VSL_transaction	trans;
207 		struct VSL_transaction	*ptrans[2];
208 		struct VSLC_ptr		start;
209 		ssize_t			len;
210 		ssize_t			offset;
211 	} raw;
212 };
213 
214 static void vtx_synth_rec(struct vtx *vtx, unsigned tag, const char *fmt, ...);
215 /*lint -esym(534, vtx_diag) */
216 static int vtx_diag(struct vtx *vtx, const char *msg);
217 /*lint -esym(534, vtx_diag_tag) */
218 static int vtx_diag_tag(struct vtx *vtx, const uint32_t *ptr,
219     const char *reason);
220 
221 static inline int
vtx_keycmp(const struct vtx_key * a,const struct vtx_key * b)222 vtx_keycmp(const struct vtx_key *a, const struct vtx_key *b)
223 {
224 	if (a->vxid < b->vxid)
225 		return (-1);
226 	if (a->vxid > b->vxid)
227 		return (1);
228 	return (0);
229 }
230 
VRBT_PROTOTYPE_STATIC(vtx_tree,vtx_key,entry,vtx_keycmp)231 VRBT_PROTOTYPE_STATIC(vtx_tree, vtx_key, entry, vtx_keycmp)
232 VRBT_GENERATE_STATIC(vtx_tree, vtx_key, entry, vtx_keycmp)
233 
234 static enum vsl_status v_matchproto_(vslc_next_f)
235 vslc_raw_next(const struct VSL_cursor *cursor)
236 {
237 	struct vslc_raw *c;
238 
239 	CAST_OBJ_NOTNULL(c, cursor->priv_data, VSLC_RAW_MAGIC);
240 	assert(&c->cursor == cursor);
241 
242 	AN(c->ptr);
243 	if (c->cursor.rec.ptr == NULL) {
244 		c->cursor.rec.ptr = c->ptr;
245 		return (vsl_more);
246 	} else {
247 		c->cursor.rec.ptr = NULL;
248 		return (vsl_end);
249 	}
250 }
251 
v_matchproto_(vslc_reset_f)252 static enum vsl_status v_matchproto_(vslc_reset_f)
253 vslc_raw_reset(const struct VSL_cursor *cursor)
254 {
255 	struct vslc_raw *c;
256 
257 	CAST_OBJ_NOTNULL(c, cursor->priv_data, VSLC_RAW_MAGIC);
258 	assert(&c->cursor == cursor);
259 
260 	AN(c->ptr);
261 	c->cursor.rec.ptr = NULL;
262 
263 	return (vsl_end);
264 }
265 
266 static const struct vslc_tbl vslc_raw_tbl = {
267 	.magic	= VSLC_TBL_MAGIC,
268 	.delete	= NULL,
269 	.next	= vslc_raw_next,
270 	.reset	= vslc_raw_reset,
271 	.check	= NULL,
272 };
273 
v_matchproto_(vslc_next_f)274 static enum vsl_status v_matchproto_(vslc_next_f)
275 vslc_vtx_next(const struct VSL_cursor *cursor)
276 {
277 	struct vslc_vtx *c;
278 	const uint32_t *ptr;
279 	unsigned overrun;
280 
281 	CAST_OBJ_NOTNULL(c, cursor->priv_data, VSLC_VTX_MAGIC);
282 	assert(&c->cursor == cursor);
283 	CHECK_OBJ_NOTNULL(c->vtx, VTX_MAGIC);
284 
285 	do {
286 		CHECK_OBJ_ORNULL(c->synth, SYNTH_MAGIC);
287 		if (c->synth != NULL && c->synth->offset == c->offset) {
288 			/* We're at the offset of the next synth record,
289 			   point to it and advance the pointer */
290 			c->cursor.rec.ptr = c->synth->data;
291 			c->synth = VTAILQ_NEXT(c->synth, list);
292 		} else {
293 			overrun = c->offset > c->vtx->len;
294 			AZ(overrun);
295 			if (c->offset == c->vtx->len)
296 				return (vsl_end);
297 
298 			/* Advance chunk pointer */
299 			if (c->chunk == NULL) {
300 				c->chunk = VTAILQ_FIRST(&c->vtx->chunks);
301 				c->chunkstart = 0;
302 			}
303 			CHECK_OBJ_NOTNULL(c->chunk, CHUNK_MAGIC);
304 			while (c->offset >= c->chunkstart + c->chunk->len) {
305 				c->chunkstart += c->chunk->len;
306 				c->chunk = VTAILQ_NEXT(c->chunk, list);
307 				CHECK_OBJ_NOTNULL(c->chunk, CHUNK_MAGIC);
308 			}
309 
310 			/* Point to the next stored record */
311 			if (c->chunk->type == chunk_t_shm)
312 				ptr = c->chunk->shm.start.ptr;
313 			else {
314 				assert(c->chunk->type == chunk_t_buf);
315 				ptr = c->chunk->buf.data;
316 			}
317 			c->cursor.rec.ptr = ptr + c->offset - c->chunkstart;
318 			c->offset += VSL_NEXT(c->cursor.rec.ptr) -
319 			    c->cursor.rec.ptr;
320 		}
321 	} while (VSL_TAG(c->cursor.rec.ptr) == SLT__Batch);
322 
323 	return (vsl_more);
324 }
325 
v_matchproto_(vslc_reset_f)326 static enum vsl_status v_matchproto_(vslc_reset_f)
327 vslc_vtx_reset(const struct VSL_cursor *cursor)
328 {
329 	struct vslc_vtx *c;
330 
331 	CAST_OBJ_NOTNULL(c, cursor->priv_data, VSLC_VTX_MAGIC);
332 	assert(&c->cursor == cursor);
333 	CHECK_OBJ_NOTNULL(c->vtx, VTX_MAGIC);
334 	c->synth = VTAILQ_FIRST(&c->vtx->synth);
335 	c->chunk = NULL;
336 	c->chunkstart = 0;
337 	c->offset = 0;
338 	c->cursor.rec.ptr = NULL;
339 
340 	return (vsl_end);
341 }
342 
343 static const struct vslc_tbl vslc_vtx_tbl = {
344 	.magic	= VSLC_TBL_MAGIC,
345 	.delete	= NULL,
346 	.next	= vslc_vtx_next,
347 	.reset	= vslc_vtx_reset,
348 	.check	= NULL,
349 };
350 
351 /* Create a buf chunk */
352 static struct chunk *
chunk_newbuf(struct vtx * vtx,const uint32_t * ptr,size_t len)353 chunk_newbuf(struct vtx *vtx, const uint32_t *ptr, size_t len)
354 {
355 	struct chunk *chunk;
356 
357 	ALLOC_OBJ(chunk, CHUNK_MAGIC);
358 	XXXAN(chunk);
359 	chunk->type = chunk_t_buf;
360 	chunk->vtx = vtx;
361 	chunk->buf.space = VTX_BUFSIZE_MIN;
362 	while (chunk->buf.space < len)
363 		chunk->buf.space *= 2;
364 	chunk->buf.data = malloc(sizeof (uint32_t) * chunk->buf.space);
365 	AN(chunk->buf.data);
366 	memcpy(chunk->buf.data, ptr, sizeof (uint32_t) * len);
367 	chunk->len = len;
368 	return (chunk);
369 }
370 
371 /* Free a buf chunk */
372 static void
chunk_freebuf(struct chunk ** pchunk)373 chunk_freebuf(struct chunk **pchunk)
374 {
375 
376 	CHECK_OBJ_NOTNULL(*pchunk, CHUNK_MAGIC);
377 	assert((*pchunk)->type == chunk_t_buf);
378 	free((*pchunk)->buf.data);
379 	FREE_OBJ(*pchunk);
380 	*pchunk = NULL;
381 }
382 
383 /* Append a set of records to a chunk */
384 static void
chunk_appendbuf(struct chunk * chunk,const uint32_t * ptr,size_t len)385 chunk_appendbuf(struct chunk *chunk, const uint32_t *ptr, size_t len)
386 {
387 
388 	CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
389 	assert(chunk->type == chunk_t_buf);
390 	if (chunk->buf.space < chunk->len + len) {
391 		while (chunk->buf.space < chunk->len + len)
392 			chunk->buf.space *= 2;
393 		chunk->buf.data = realloc(chunk->buf.data,
394 		    sizeof (uint32_t) * chunk->buf.space);
395 	}
396 	memcpy(chunk->buf.data + chunk->len, ptr, sizeof (uint32_t) * len);
397 	chunk->len += len;
398 }
399 
400 /* Transform a shm chunk to a buf chunk */
401 static void
chunk_shm_to_buf(struct VSLQ * vslq,struct chunk * chunk)402 chunk_shm_to_buf(struct VSLQ *vslq, struct chunk *chunk)
403 {
404 	struct vtx *vtx;
405 	struct chunk *buf;
406 
407 	CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
408 	assert(chunk->type == chunk_t_shm);
409 	vtx = chunk->vtx;
410 	CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
411 
412 	buf = VTAILQ_PREV(chunk, chunkhead, list);
413 	if (buf != NULL && buf->type == chunk_t_buf)
414 		/* Previous is a buf chunk, append to it */
415 		chunk_appendbuf(buf, chunk->shm.start.ptr, chunk->len);
416 	else {
417 		/* Create a new buf chunk and insert it before this */
418 		buf = chunk_newbuf(vtx, chunk->shm.start.ptr, chunk->len);
419 		AN(buf);
420 		VTAILQ_INSERT_BEFORE(chunk, buf, list);
421 	}
422 
423 	/* Reset cursor chunk pointer, vslc_vtx_next will set it correctly */
424 	vtx->c.chunk = NULL;
425 
426 	/* Remove from the shmref list and vtx, and put chunk back
427 	   on the free list */
428 	VTAILQ_REMOVE(&vslq->shmrefs, chunk, shm.shmref);
429 	VTAILQ_REMOVE(&vtx->chunks, chunk, list);
430 	VTAILQ_INSERT_HEAD(&vtx->shmchunks_free, chunk, list);
431 }
432 
433 /* Append a set of records to a vtx structure */
434 static enum vsl_status
vtx_append(struct VSLQ * vslq,struct vtx * vtx,const struct VSLC_ptr * start,size_t len)435 vtx_append(struct VSLQ *vslq, struct vtx *vtx, const struct VSLC_ptr *start,
436     size_t len)
437 {
438 	struct chunk *chunk;
439 	enum vsl_check i;
440 
441 	AN(vtx);
442 	AN(len);
443 	AN(start);
444 
445 	i = VSL_Check(vslq->c, start);
446 	if (i == vsl_check_e_inval)
447 		return (vsl_e_overrun);
448 
449 	if (i == vsl_check_valid && !VTAILQ_EMPTY(&vtx->shmchunks_free)) {
450 		/* Shmref it */
451 		chunk = VTAILQ_FIRST(&vtx->shmchunks_free);
452 		CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
453 		assert(chunk->type == chunk_t_shm);
454 		assert(chunk->vtx == vtx);
455 		VTAILQ_REMOVE(&vtx->shmchunks_free, chunk, list);
456 		chunk->shm.start = *start;
457 		chunk->len = len;
458 		VTAILQ_INSERT_TAIL(&vtx->chunks, chunk, list);
459 
460 		/* Append to shmref list */
461 		VTAILQ_INSERT_TAIL(&vslq->shmrefs, chunk, shm.shmref);
462 	} else {
463 		/* Buffer it */
464 		chunk = VTAILQ_LAST(&vtx->chunks, chunkhead);
465 		CHECK_OBJ_ORNULL(chunk, CHUNK_MAGIC);
466 		if (chunk != NULL && chunk->type == chunk_t_buf) {
467 			/* Tail is a buf chunk, append to that */
468 			chunk_appendbuf(chunk, start->ptr, len);
469 		} else {
470 			/* Append new buf chunk */
471 			chunk = chunk_newbuf(vtx, start->ptr, len);
472 			AN(chunk);
473 			VTAILQ_INSERT_TAIL(&vtx->chunks, chunk, list);
474 		}
475 	}
476 	vtx->len += len;
477 	return (vsl_more);
478 }
479 
480 /* Allocate a new vtx structure */
481 static struct vtx *
vtx_new(struct VSLQ * vslq)482 vtx_new(struct VSLQ *vslq)
483 {
484 	struct vtx *vtx;
485 	int i;
486 
487 	AN(vslq);
488 	if (vslq->n_cache) {
489 		AZ(VTAILQ_EMPTY(&vslq->cache));
490 		vtx = VTAILQ_FIRST(&vslq->cache);
491 		VTAILQ_REMOVE(&vslq->cache, vtx, list_child);
492 		vslq->n_cache--;
493 	} else {
494 		ALLOC_OBJ(vtx, VTX_MAGIC);
495 		AN(vtx);
496 
497 		VTAILQ_INIT(&vtx->child);
498 		VTAILQ_INIT(&vtx->shmchunks_free);
499 		for (i = 0; i < VTX_SHMCHUNKS; i++) {
500 			vtx->shmchunks[i].magic = CHUNK_MAGIC;
501 			vtx->shmchunks[i].type = chunk_t_shm;
502 			vtx->shmchunks[i].vtx = vtx;
503 			VTAILQ_INSERT_TAIL(&vtx->shmchunks_free,
504 			    &vtx->shmchunks[i], list);
505 		}
506 		VTAILQ_INIT(&vtx->chunks);
507 		VTAILQ_INIT(&vtx->synth);
508 		vtx->c.magic = VSLC_VTX_MAGIC;
509 		vtx->c.vtx = vtx;
510 		vtx->c.cursor.priv_tbl = &vslc_vtx_tbl;
511 		vtx->c.cursor.priv_data = &vtx->c;
512 	}
513 
514 	CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
515 	vtx->key.vxid = 0;
516 	vtx->t_start = VTIM_mono();
517 	vtx->flags = 0;
518 	vtx->type = VSL_t_unknown;
519 	vtx->reason = VSL_r_unknown;
520 	vtx->parent = NULL;
521 	vtx->n_child = 0;
522 	vtx->n_childready = 0;
523 	vtx->n_descend = 0;
524 	vtx->len = 0;
525 	AN(vslc_vtx_reset(&vtx->c.cursor) == vsl_end);
526 
527 	return (vtx);
528 }
529 
530 /* Disuse a vtx and all it's children, freeing any resources held. Free or
531    cache the vtx for later use */
532 static void
vtx_retire(struct VSLQ * vslq,struct vtx ** pvtx)533 vtx_retire(struct VSLQ *vslq, struct vtx **pvtx)
534 {
535 	struct vtx *vtx;
536 	struct vtx *child;
537 	struct synth *synth;
538 	struct chunk *chunk;
539 
540 	AN(vslq);
541 	TAKE_OBJ_NOTNULL(vtx, pvtx, VTX_MAGIC);
542 
543 	AN(vtx->flags & VTX_F_COMPLETE);
544 	AN(vtx->flags & VTX_F_READY);
545 	AZ(vtx->parent);
546 
547 	while (!VTAILQ_EMPTY(&vtx->child)) {
548 		child = VTAILQ_FIRST(&vtx->child);
549 		assert(child->parent == vtx);
550 		AN(vtx->n_child);
551 		assert(vtx->n_descend >= child->n_descend + 1);
552 		VTAILQ_REMOVE(&vtx->child, child, list_child);
553 		child->parent = NULL;
554 		vtx->n_child--;
555 		vtx->n_descend -= child->n_descend + 1;
556 		vtx_retire(vslq, &child);
557 		AZ(child);
558 	}
559 	AZ(vtx->n_child);
560 	AZ(vtx->n_descend);
561 	vtx->n_childready = 0;
562 	AN(VRBT_REMOVE(vtx_tree, &vslq->tree, &vtx->key));
563 	vtx->key.vxid = 0;
564 	vtx->flags = 0;
565 
566 	while (!VTAILQ_EMPTY(&vtx->synth)) {
567 		synth = VTAILQ_FIRST(&vtx->synth);
568 		CHECK_OBJ_NOTNULL(synth, SYNTH_MAGIC);
569 		VTAILQ_REMOVE(&vtx->synth, synth, list);
570 		FREE_OBJ(synth);
571 	}
572 
573 	while (!VTAILQ_EMPTY(&vtx->chunks)) {
574 		chunk = VTAILQ_FIRST(&vtx->chunks);
575 		CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
576 		VTAILQ_REMOVE(&vtx->chunks, chunk, list);
577 		if (chunk->type == chunk_t_shm) {
578 			VTAILQ_REMOVE(&vslq->shmrefs, chunk, shm.shmref);
579 			VTAILQ_INSERT_HEAD(&vtx->shmchunks_free, chunk, list);
580 		} else {
581 			assert(chunk->type == chunk_t_buf);
582 			chunk_freebuf(&chunk);
583 			AZ(chunk);
584 		}
585 	}
586 	vtx->len = 0;
587 	AN(vslq->n_outstanding);
588 	vslq->n_outstanding--;
589 
590 	if (vslq->n_cache < VTX_CACHE) {
591 		VTAILQ_INSERT_HEAD(&vslq->cache, vtx, list_child);
592 		vslq->n_cache++;
593 	} else {
594 		FREE_OBJ(vtx);
595 	}
596 }
597 
598 /* Lookup a vtx by vxid from the managed list */
599 static struct vtx *
vtx_lookup(const struct VSLQ * vslq,unsigned vxid)600 vtx_lookup(const struct VSLQ *vslq, unsigned vxid)
601 {
602 	struct vtx_key lkey, *key;
603 	struct vtx *vtx;
604 
605 	AN(vslq);
606 	lkey.vxid = vxid;
607 	key = VRBT_FIND(vtx_tree, &vslq->tree, &lkey);
608 	if (key == NULL)
609 		return (NULL);
610 	CAST_OBJ_NOTNULL(vtx, (void *)key, VTX_MAGIC);
611 	return (vtx);
612 }
613 
614 /* Insert a new vtx into the managed list */
615 static struct vtx *
vtx_add(struct VSLQ * vslq,unsigned vxid)616 vtx_add(struct VSLQ *vslq, unsigned vxid)
617 {
618 	struct vtx *vtx;
619 
620 	AN(vslq);
621 	vtx = vtx_new(vslq);
622 	AN(vtx);
623 	vtx->key.vxid = vxid;
624 	AZ(VRBT_INSERT(vtx_tree, &vslq->tree, &vtx->key));
625 	VTAILQ_INSERT_TAIL(&vslq->incomplete, vtx, list_vtx);
626 	vslq->n_outstanding++;
627 	return (vtx);
628 }
629 
630 /* Mark a vtx complete, update child counters and if possible push it or
631    it's top parent to the ready state */
632 static void
vtx_mark_complete(struct VSLQ * vslq,struct vtx * vtx)633 vtx_mark_complete(struct VSLQ *vslq, struct vtx *vtx)
634 {
635 
636 	AN(vslq);
637 	AN(vtx->flags & VTX_F_END);
638 	AZ(vtx->flags & VTX_F_COMPLETE);
639 
640 	if (vtx->type == VSL_t_unknown)
641 		vtx_diag(vtx, "vtx of unknown type marked complete");
642 
643 	vtx->flags |= VTX_F_COMPLETE;
644 	VTAILQ_REMOVE(&vslq->incomplete, vtx, list_vtx);
645 
646 	while (1) {
647 		AZ(vtx->flags & VTX_F_READY);
648 		if (vtx->flags & VTX_F_COMPLETE &&
649 		    vtx->n_child == vtx->n_childready)
650 			vtx->flags |= VTX_F_READY;
651 		else
652 			return;
653 		if (vtx->parent == NULL) {
654 			/* Top level vtx ready */
655 			VTAILQ_INSERT_TAIL(&vslq->ready, vtx, list_vtx);
656 			return;
657 		}
658 		vtx = vtx->parent;
659 		vtx->n_childready++;
660 		assert(vtx->n_child >= vtx->n_childready);
661 	}
662 }
663 
664 /* Add a child to a parent, and update child counters */
665 static void
vtx_set_parent(struct vtx * parent,struct vtx * child)666 vtx_set_parent(struct vtx *parent, struct vtx *child)
667 {
668 
669 	CHECK_OBJ_NOTNULL(parent, VTX_MAGIC);
670 	CHECK_OBJ_NOTNULL(child, VTX_MAGIC);
671 	assert(parent != child);
672 	AZ(parent->flags & VTX_F_COMPLETE);
673 	AZ(child->flags & VTX_F_COMPLETE);
674 	AZ(child->parent);
675 	child->parent = parent;
676 	VTAILQ_INSERT_TAIL(&parent->child, child, list_child);
677 	parent->n_child++;
678 	do
679 		parent->n_descend += 1 + child->n_descend;
680 	while ((parent = parent->parent) != NULL);
681 }
682 
683 /* Parse a begin or link record. Returns the number of elements that was
684    successfully parsed. */
685 static int
vtx_parse_link(const char * str,enum VSL_transaction_e * ptype,unsigned * pvxid,enum VSL_reason_e * preason)686 vtx_parse_link(const char *str, enum VSL_transaction_e *ptype,
687     unsigned *pvxid, enum VSL_reason_e *preason)
688 {
689 	char type[16], reason[16];
690 	unsigned vxid;
691 	int i;
692 	enum VSL_transaction_e et;
693 	enum VSL_reason_e er;
694 
695 	AN(str);
696 	AN(ptype);
697 	AN(pvxid);
698 	AN(preason);
699 
700 	i = sscanf(str, "%15s %u %15s", type, &vxid, reason);
701 	if (i < 1)
702 		return (0);
703 
704 	/* transaction type */
705 	for (et = VSL_t_unknown; et < VSL_t__MAX; et++)
706 		if (!strcmp(type, vsl_t_names[et]))
707 			break;
708 	if (et >= VSL_t__MAX)
709 		et = VSL_t_unknown;
710 	*ptype = et;
711 	if (i == 1)
712 		return (1);
713 
714 	/* vxid */
715 	assert((vxid & ~VSL_IDENTMASK) == 0);
716 	*pvxid = vxid;
717 	if (i == 2)
718 		return (2);
719 
720 	/* transaction reason */
721 	for (er = VSL_r_unknown; er < VSL_r__MAX; er++)
722 		if (!strcmp(reason, vsl_r_names[er]))
723 			break;
724 	if (er >= VSL_r__MAX)
725 		er = VSL_r_unknown;
726 	*preason = er;
727 	return (3);
728 }
729 
730 /* Parse and process a begin record */
731 static int
vtx_scan_begin(struct VSLQ * vslq,struct vtx * vtx,const uint32_t * ptr)732 vtx_scan_begin(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
733 {
734 	int i;
735 	enum VSL_transaction_e type;
736 	enum VSL_reason_e reason;
737 	unsigned p_vxid;
738 	struct vtx *p_vtx;
739 
740 	assert(VSL_TAG(ptr) == SLT_Begin);
741 
742 	AZ(vtx->flags & VTX_F_READY);
743 
744 	i = vtx_parse_link(VSL_CDATA(ptr), &type, &p_vxid, &reason);
745 	if (i != 3)
746 		return (vtx_diag_tag(vtx, ptr, "parse error"));
747 	if (type == VSL_t_unknown)
748 		(void)vtx_diag_tag(vtx, ptr, "unknown vxid type");
749 
750 	/* Check/set vtx type */
751 	if (vtx->type != VSL_t_unknown && vtx->type != type)
752 		/* Type not matching the one previously set by a link
753 		   record */
754 		(void)vtx_diag_tag(vtx, ptr, "type mismatch");
755 	vtx->type = type;
756 	vtx->reason = reason;
757 
758 	if (p_vxid == 0)
759 		/* Zero means no parent */
760 		return (0);
761 	if (p_vxid == vtx->key.vxid)
762 		return (vtx_diag_tag(vtx, ptr, "link to self"));
763 
764 	if (vslq->grouping == VSL_g_vxid)
765 		return (0);	/* No links */
766 	if (vslq->grouping == VSL_g_request && vtx->type == VSL_t_req &&
767 	    vtx->reason == VSL_r_rxreq)
768 		return (0);	/* No links */
769 
770 	if (vtx->parent != NULL) {
771 		if (vtx->parent->key.vxid != p_vxid) {
772 			/* This vtx already belongs to a different
773 			   parent */
774 			return (vtx_diag_tag(vtx, ptr, "link mismatch"));
775 		} else
776 			/* Link already exists */
777 			return (0);
778 	}
779 
780 	p_vtx = vtx_lookup(vslq, p_vxid);
781 	if (p_vtx == NULL) {
782 		/* Not seen parent yet. Create it. */
783 		p_vtx = vtx_add(vslq, p_vxid);
784 		AN(p_vtx);
785 	} else {
786 		CHECK_OBJ_NOTNULL(p_vtx, VTX_MAGIC);
787 		if (p_vtx->flags & VTX_F_COMPLETE)
788 			return (vtx_diag_tag(vtx, ptr, "link too late"));
789 	}
790 
791 	/* Create link */
792 	vtx_set_parent(p_vtx, vtx);
793 
794 	return (0);
795 }
796 
797 /* Parse and process a link record */
798 static int
vtx_scan_link(struct VSLQ * vslq,struct vtx * vtx,const uint32_t * ptr)799 vtx_scan_link(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
800 {
801 	int i;
802 	enum VSL_transaction_e c_type;
803 	enum VSL_reason_e c_reason;
804 	unsigned c_vxid;
805 	struct vtx *c_vtx;
806 
807 	assert(VSL_TAG(ptr) == SLT_Link);
808 
809 	AZ(vtx->flags & VTX_F_READY);
810 
811 	i = vtx_parse_link(VSL_CDATA(ptr), &c_type, &c_vxid, &c_reason);
812 	if (i != 3)
813 		return (vtx_diag_tag(vtx, ptr, "parse error"));
814 	if (c_type == VSL_t_unknown)
815 		(void)vtx_diag_tag(vtx, ptr, "unknown vxid type");
816 
817 	if (vslq->grouping == VSL_g_vxid)
818 		return (0);	/* No links */
819 	if (vslq->grouping == VSL_g_request && vtx->type == VSL_t_sess)
820 		return (0);	/* No links */
821 
822 	if (c_vxid == 0)
823 		return (vtx_diag_tag(vtx, ptr, "illegal link vxid"));
824 	if (c_vxid == vtx->key.vxid)
825 		return (vtx_diag_tag(vtx, ptr, "link to self"));
826 
827 	/* Lookup and check child vtx */
828 	c_vtx = vtx_lookup(vslq, c_vxid);
829 	if (c_vtx == NULL) {
830 		/* Child not seen before. Insert it and create link */
831 		c_vtx = vtx_add(vslq, c_vxid);
832 		AN(c_vtx);
833 		AZ(c_vtx->parent);
834 		c_vtx->type = c_type;
835 		c_vtx->reason = c_reason;
836 		vtx_set_parent(vtx, c_vtx);
837 		return (0);
838 	}
839 
840 	CHECK_OBJ_NOTNULL(c_vtx, VTX_MAGIC);
841 	if (c_vtx->parent == vtx)
842 		/* Link already exists */
843 		return (0);
844 	if (c_vtx->parent != NULL && c_vtx->parent != vtx)
845 		return (vtx_diag_tag(vtx, ptr, "duplicate link"));
846 	if (c_vtx->flags & VTX_F_COMPLETE)
847 		return (vtx_diag_tag(vtx, ptr, "link too late"));
848 	if (c_vtx->type != VSL_t_unknown && c_vtx->type != c_type)
849 		(void)vtx_diag_tag(vtx, ptr, "type mismatch");
850 
851 	c_vtx->type = c_type;
852 	c_vtx->reason = c_reason;
853 	vtx_set_parent(vtx, c_vtx);
854 	return (0);
855 }
856 
857 /* Scan the records of a vtx, performing processing actions on specific
858    records */
859 static void
vtx_scan(struct VSLQ * vslq,struct vtx * vtx)860 vtx_scan(struct VSLQ *vslq, struct vtx *vtx)
861 {
862 	const uint32_t *ptr;
863 	enum VSL_tag_e tag;
864 
865 	while (!(vtx->flags & VTX_F_COMPLETE) &&
866 	    vslc_vtx_next(&vtx->c.cursor) == 1) {
867 		ptr = vtx->c.cursor.rec.ptr;
868 		if (VSL_ID(ptr) != vtx->key.vxid) {
869 			(void)vtx_diag_tag(vtx, ptr, "vxid mismatch");
870 			continue;
871 		}
872 
873 		tag = VSL_TAG(ptr);
874 		assert(tag != SLT__Batch);
875 
876 		switch (tag) {
877 		case SLT_Begin:
878 			if (vtx->flags & VTX_F_BEGIN)
879 				(void)vtx_diag_tag(vtx, ptr, "duplicate begin");
880 			else {
881 				(void)vtx_scan_begin(vslq, vtx, ptr);
882 				vtx->flags |= VTX_F_BEGIN;
883 			}
884 			break;
885 
886 		case SLT_Link:
887 			(void)vtx_scan_link(vslq, vtx, ptr);
888 			break;
889 
890 		case SLT_End:
891 			AZ(vtx->flags & VTX_F_END);
892 			vtx->flags |= VTX_F_END;
893 			vtx_mark_complete(vslq, vtx);
894 			break;
895 
896 		default:
897 			break;
898 		}
899 	}
900 }
901 
902 /* Force a vtx into complete status by synthing the necessary outstanding
903    records */
904 static void
vtx_force(struct VSLQ * vslq,struct vtx * vtx,const char * reason)905 vtx_force(struct VSLQ *vslq, struct vtx *vtx, const char *reason)
906 {
907 
908 	AZ(vtx->flags & VTX_F_COMPLETE);
909 	AZ(vtx->flags & VTX_F_READY);
910 	vtx_scan(vslq, vtx);
911 	if (!(vtx->flags & VTX_F_BEGIN))
912 		vtx_synth_rec(vtx, SLT_Begin, "%s %u synth",
913 		    vsl_t_names[vtx->type], 0);
914 	vtx_diag(vtx, reason);
915 	if (!(vtx->flags & VTX_F_END))
916 		vtx_synth_rec(vtx, SLT_End, "synth");
917 	vtx_scan(vslq, vtx);
918 	AN(vtx->flags & VTX_F_COMPLETE);
919 }
920 
921 static int
vslq_ratelimit(struct VSLQ * vslq)922 vslq_ratelimit(struct VSLQ *vslq)
923 {
924 	vtim_mono now;
925 	vtim_dur delta;
926 
927 	CHECK_OBJ_NOTNULL(vslq, VSLQ_MAGIC);
928 	CHECK_OBJ_NOTNULL(vslq->vsl, VSL_MAGIC);
929 
930 	now = VTIM_mono();
931 	delta = now - vslq->last_use;
932 	vslq->credits += (delta / vslq->vsl->R_opt_p) * vslq->vsl->R_opt_l;
933 	if (vslq->credits > vslq->vsl->R_opt_l)
934 		vslq->credits = vslq->vsl->R_opt_l;
935 	vslq->last_use = now;
936 
937 	if (vslq->credits < 1.0)
938 		return (0);
939 
940 	vslq->credits -= 1.0;
941 	return (1);
942 }
943 
944 /* Build transaction array, do the query and callback. Returns 0 or the
945    return value from func */
946 static int
vslq_callback(struct VSLQ * vslq,struct vtx * vtx,VSLQ_dispatch_f * func,void * priv)947 vslq_callback(struct VSLQ *vslq, struct vtx *vtx, VSLQ_dispatch_f *func,
948     void *priv)
949 {
950 	unsigned n = vtx->n_descend + 1;
951 	struct vtx *vtxs[n];
952 	struct VSL_transaction trans[n];
953 	struct VSL_transaction *ptrans[n + 1];
954 	unsigned i, j;
955 
956 	AN(vslq);
957 	CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
958 	AN(vtx->flags & VTX_F_READY);
959 	AN(func);
960 
961 	if (vslq->grouping == VSL_g_session &&
962 	    vtx->type != VSL_t_sess)
963 		return (0);
964 	if (vslq->grouping == VSL_g_request &&
965 	    vtx->type != VSL_t_req)
966 		return (0);
967 
968 	/* Build transaction array */
969 	AN(vslc_vtx_reset(&vtx->c.cursor) == vsl_end);
970 	vtxs[0] = vtx;
971 	trans[0].level = 1;
972 	trans[0].vxid = vtx->key.vxid;
973 	trans[0].vxid_parent = 0;
974 	trans[0].type = vtx->type;
975 	trans[0].reason = vtx->reason;
976 	trans[0].c = &vtx->c.cursor;
977 	i = 1;
978 	j = 0;
979 	while (j < i) {
980 		VTAILQ_FOREACH(vtx, &vtxs[j]->child, list_child) {
981 			assert(i < n);
982 			AN(vslc_vtx_reset(&vtx->c.cursor) == vsl_end);
983 			vtxs[i] = vtx;
984 			if (vtx->reason == VSL_r_restart)
985 				/* Restarts stay at the same level as parent */
986 				trans[i].level = trans[j].level;
987 			else
988 				trans[i].level = trans[j].level + 1;
989 			trans[i].vxid = vtx->key.vxid;
990 			trans[i].vxid_parent = trans[j].vxid;
991 			trans[i].type = vtx->type;
992 			trans[i].reason = vtx->reason;
993 			trans[i].c = &vtx->c.cursor;
994 			i++;
995 		}
996 		j++;
997 	}
998 	assert(i == n);
999 
1000 	/* Build pointer array */
1001 	for (i = 0; i < n; i++)
1002 		ptrans[i] = &trans[i];
1003 	ptrans[i] = NULL;
1004 
1005 	/* Query test goes here */
1006 	if (vslq->query != NULL && !vslq_runquery(vslq->query, ptrans))
1007 		return (0);
1008 
1009 	if (vslq->vsl->R_opt_l != 0 && !vslq_ratelimit(vslq))
1010 		return (0);
1011 
1012 	/* Callback */
1013 	return ((func)(vslq->vsl, ptrans, priv));
1014 }
1015 
1016 /* Create a synthetic log record. The record will be inserted at the
1017    current cursor offset */
1018 static void
vtx_synth_rec(struct vtx * vtx,unsigned tag,const char * fmt,...)1019 vtx_synth_rec(struct vtx *vtx, unsigned tag, const char *fmt, ...)
1020 {
1021 	struct synth *synth, *it;
1022 	va_list ap;
1023 	char *buf;
1024 	int l, buflen;
1025 
1026 	ALLOC_OBJ(synth, SYNTH_MAGIC);
1027 	AN(synth);
1028 
1029 	buf = (char *)&synth->data[2];
1030 	buflen = sizeof (synth->data) - 2 * sizeof (uint32_t);
1031 	va_start(ap, fmt);
1032 	l = vsnprintf(buf, buflen, fmt, ap);
1033 	assert(l >= 0);
1034 	va_end(ap);
1035 	if (l > buflen - 1)
1036 		l = buflen - 1;
1037 	buf[l++] = '\0';	/* NUL-terminated */
1038 	synth->data[1] = vtx->key.vxid;
1039 	switch (vtx->type) {
1040 	case VSL_t_req:
1041 		synth->data[1] |= VSL_CLIENTMARKER;
1042 		break;
1043 	case VSL_t_bereq:
1044 		synth->data[1] |= VSL_BACKENDMARKER;
1045 		break;
1046 	default:
1047 		break;
1048 	}
1049 	synth->data[0] = (((tag & 0xff) << 24) | l);
1050 	synth->offset = vtx->c.offset;
1051 
1052 	VTAILQ_FOREACH_REVERSE(it, &vtx->synth, synthhead, list) {
1053 		/* Make sure the synth list is sorted on offset */
1054 		CHECK_OBJ_NOTNULL(it, SYNTH_MAGIC);
1055 		if (synth->offset >= it->offset)
1056 			break;
1057 	}
1058 	if (it != NULL)
1059 		VTAILQ_INSERT_AFTER(&vtx->synth, it, synth, list);
1060 	else
1061 		VTAILQ_INSERT_HEAD(&vtx->synth, synth, list);
1062 
1063 	/* Update cursor */
1064 	CHECK_OBJ_ORNULL(vtx->c.synth, SYNTH_MAGIC);
1065 	if (vtx->c.synth == NULL || vtx->c.synth->offset > synth->offset)
1066 		vtx->c.synth = synth;
1067 }
1068 
1069 /* Add a diagnostic SLT_VSL synth record to the vtx. */
1070 static int
vtx_diag(struct vtx * vtx,const char * msg)1071 vtx_diag(struct vtx *vtx, const char *msg)
1072 {
1073 
1074 	vtx_synth_rec(vtx, SLT_VSL, msg);
1075 	return (-1);
1076 }
1077 
1078 /* Add a SLT_VSL diag synth record to the vtx. Takes an offending record
1079    that will be included in the log record */
1080 static int
vtx_diag_tag(struct vtx * vtx,const uint32_t * ptr,const char * reason)1081 vtx_diag_tag(struct vtx *vtx, const uint32_t *ptr, const char *reason)
1082 {
1083 
1084 	vtx_synth_rec(vtx, SLT_VSL, "%s (%u:%s \"%.*s\")", reason, VSL_ID(ptr),
1085 	    VSL_tags[VSL_TAG(ptr)], (int)VSL_LEN(ptr), VSL_CDATA(ptr));
1086 	return (-1);
1087 }
1088 
1089 struct VSLQ *
VSLQ_New(struct VSL_data * vsl,struct VSL_cursor ** cp,enum VSL_grouping_e grouping,const char * querystring)1090 VSLQ_New(struct VSL_data *vsl, struct VSL_cursor **cp,
1091     enum VSL_grouping_e grouping, const char *querystring)
1092 {
1093 	struct vslq_query *query;
1094 	struct VSLQ *vslq;
1095 
1096 	CHECK_OBJ_NOTNULL(vsl, VSL_MAGIC);
1097 	if (grouping >= VSL_g__MAX) {
1098 		(void)vsl_diag(vsl, "Illegal query grouping");
1099 		return (NULL);
1100 	}
1101 	if (querystring != NULL) {
1102 		query = vslq_newquery(vsl, grouping, querystring);
1103 		if (query == NULL)
1104 			return (NULL);
1105 	} else
1106 		query = NULL;
1107 
1108 	ALLOC_OBJ(vslq, VSLQ_MAGIC);
1109 	AN(vslq);
1110 	vslq->vsl = vsl;
1111 	if (cp != NULL) {
1112 		vslq->c = *cp;
1113 		*cp = NULL;
1114 	}
1115 	vslq->grouping = grouping;
1116 	vslq->query = query;
1117 	if (vslq->vsl->R_opt_l != 0) {
1118 		vslq->last_use = VTIM_mono();
1119 		vslq->credits = 1;
1120 	}
1121 
1122 	/* Setup normal mode */
1123 	VRBT_INIT(&vslq->tree);
1124 	VTAILQ_INIT(&vslq->ready);
1125 	VTAILQ_INIT(&vslq->incomplete);
1126 	VTAILQ_INIT(&vslq->shmrefs);
1127 	VTAILQ_INIT(&vslq->cache);
1128 
1129 	/* Setup raw mode */
1130 	vslq->raw.c.magic = VSLC_RAW_MAGIC;
1131 	vslq->raw.c.cursor.priv_tbl = &vslc_raw_tbl;
1132 	vslq->raw.c.cursor.priv_data = &vslq->raw.c;
1133 	vslq->raw.trans.level = 0;
1134 	vslq->raw.trans.type = VSL_t_raw;
1135 	vslq->raw.trans.reason = VSL_r_unknown;
1136 	vslq->raw.trans.c = &vslq->raw.c.cursor;
1137 	vslq->raw.ptrans[0] = &vslq->raw.trans;
1138 	vslq->raw.ptrans[1] = NULL;
1139 
1140 	return (vslq);
1141 }
1142 
1143 void
VSLQ_Delete(struct VSLQ ** pvslq)1144 VSLQ_Delete(struct VSLQ **pvslq)
1145 {
1146 	struct VSLQ *vslq;
1147 	struct vtx *vtx;
1148 
1149 	TAKE_OBJ_NOTNULL(vslq, pvslq, VSLQ_MAGIC);
1150 
1151 	(void)VSLQ_Flush(vslq, NULL, NULL);
1152 	AZ(vslq->n_outstanding);
1153 
1154 	if (vslq->c != NULL) {
1155 		VSL_DeleteCursor(vslq->c);
1156 		vslq->c = NULL;
1157 	}
1158 
1159 	if (vslq->query != NULL)
1160 		vslq_deletequery(&vslq->query);
1161 	AZ(vslq->query);
1162 
1163 	while (!VTAILQ_EMPTY(&vslq->cache)) {
1164 		AN(vslq->n_cache);
1165 		vtx = VTAILQ_FIRST(&vslq->cache);
1166 		VTAILQ_REMOVE(&vslq->cache, vtx, list_child);
1167 		vslq->n_cache--;
1168 		FREE_OBJ(vtx);
1169 	}
1170 
1171 	FREE_OBJ(vslq);
1172 }
1173 
1174 void
VSLQ_SetCursor(struct VSLQ * vslq,struct VSL_cursor ** cp)1175 VSLQ_SetCursor(struct VSLQ *vslq, struct VSL_cursor **cp)
1176 {
1177 
1178 	CHECK_OBJ_NOTNULL(vslq, VSLQ_MAGIC);
1179 
1180 	if (vslq->c != NULL) {
1181 		(void)VSLQ_Flush(vslq, NULL, NULL);
1182 		AZ(vslq->n_outstanding);
1183 		VSL_DeleteCursor(vslq->c);
1184 		vslq->c = NULL;
1185 	}
1186 
1187 	if (cp != NULL) {
1188 		AN(*cp);
1189 		vslq->c = *cp;
1190 		*cp = NULL;
1191 	}
1192 }
1193 
1194 /* Regard each log line as a single transaction, feed it through the query
1195    and do the callback */
1196 static int
vslq_raw(struct VSLQ * vslq,VSLQ_dispatch_f * func,void * priv)1197 vslq_raw(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
1198 {
1199 	enum vsl_status r = vsl_more;
1200 	int i;
1201 
1202 	assert(vslq->grouping == VSL_g_raw);
1203 
1204 	assert(vslq->raw.offset <= vslq->raw.len);
1205 	do {
1206 		if (vslq->raw.offset == vslq->raw.len) {
1207 			r = VSL_Next(vslq->c);
1208 			if (r != vsl_more)
1209 				return (r);
1210 			AN(vslq->c->rec.ptr);
1211 			vslq->raw.start = vslq->c->rec;
1212 			if (VSL_TAG(vslq->c->rec.ptr) == SLT__Batch)
1213 				vslq->raw.len = VSL_END(vslq->c->rec.ptr,
1214 				    VSL_BATCHLEN(vslq->c->rec.ptr)) -
1215 				    vslq->c->rec.ptr;
1216 			else
1217 				vslq->raw.len = VSL_NEXT(vslq->raw.start.ptr) -
1218 				    vslq->raw.start.ptr;
1219 			assert(vslq->raw.len > 0);
1220 			vslq->raw.offset = 0;
1221 		}
1222 
1223 		vslq->raw.c.ptr = vslq->raw.start.ptr + vslq->raw.offset;
1224 		vslq->raw.c.cursor.rec.ptr = NULL;
1225 		vslq->raw.trans.vxid = VSL_ID(vslq->raw.c.ptr);
1226 		vslq->raw.offset += VSL_NEXT(vslq->raw.c.ptr) - vslq->raw.c.ptr;
1227 	} while (VSL_TAG(vslq->raw.c.ptr) == SLT__Batch);
1228 
1229 	assert (r == vsl_more);
1230 
1231 	if (func == NULL)
1232 		return (r);
1233 
1234 	if (vslq->query != NULL &&
1235 	    !vslq_runquery(vslq->query, vslq->raw.ptrans))
1236 		return (r);
1237 
1238 	if (vslq->vsl->R_opt_l != 0 && !vslq_ratelimit(vslq))
1239 		return (r);
1240 
1241 	i = (func)(vslq->vsl, vslq->raw.ptrans, priv);
1242 	if (i)
1243 		return (i);
1244 
1245 	return (r);
1246 }
1247 
1248 /* Check the beginning of the shmref list, and buffer refs that are at
1249  * warning level.
1250  */
1251 static enum vsl_status
vslq_shmref_check(struct VSLQ * vslq)1252 vslq_shmref_check(struct VSLQ *vslq)
1253 {
1254 	struct chunk *chunk;
1255 	enum vsl_check i;
1256 
1257 	while ((chunk = VTAILQ_FIRST(&vslq->shmrefs)) != NULL) {
1258 		CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
1259 		assert(chunk->type == chunk_t_shm);
1260 		i = VSL_Check(vslq->c, &chunk->shm.start);
1261 		switch (i) {
1262 		case vsl_check_valid:
1263 			/* First on list is OK, refs behind it must also
1264 			   be OK */
1265 			return (vsl_more);
1266 		case vsl_check_warn:
1267 			/* Buffer this chunk */
1268 			chunk_shm_to_buf(vslq, chunk);
1269 			break;
1270 		default:
1271 			/* Too late to buffer */
1272 			return (vsl_e_overrun);
1273 		}
1274 	}
1275 
1276 	return (vsl_more);
1277 }
1278 
1279 static unsigned
vslq_candidate(struct VSLQ * vslq,const uint32_t * ptr)1280 vslq_candidate(struct VSLQ *vslq, const uint32_t *ptr)
1281 {
1282 	enum VSL_transaction_e type;
1283 	enum VSL_reason_e reason;
1284 	struct VSL_data *vsl;
1285 	enum VSL_tag_e tag;
1286 	unsigned p_vxid;
1287 	int i;
1288 
1289 	CHECK_OBJ_NOTNULL(vslq, VSLQ_MAGIC);
1290 	AN(ptr);
1291 
1292 	assert(vslq->grouping != VSL_g_raw);
1293 	if (vslq->grouping == VSL_g_session)
1294 		return (1); /* All are needed */
1295 
1296 	vsl = vslq->vsl;
1297 	CHECK_OBJ_NOTNULL(vsl, VSL_MAGIC);
1298 	if (vslq->grouping == VSL_g_vxid) {
1299 		if (!vsl->c_opt && !vsl->b_opt)
1300 			return (1); /* Implies also !vsl->E_opt */
1301 		if (!vsl->b_opt && !VSL_CLIENT(ptr))
1302 			return (0);
1303 		if (!vsl->c_opt && !VSL_BACKEND(ptr))
1304 			return (0);
1305 		/* Need to parse the Begin tag - fallthrough to below */
1306 	}
1307 
1308 	tag = VSL_TAG(ptr);
1309 	assert(tag == SLT_Begin);
1310 	i = vtx_parse_link(VSL_CDATA(ptr), &type, &p_vxid, &reason);
1311 	if (i != 3 || type == VSL_t_unknown)
1312 		return (0);
1313 
1314 	if (type == VSL_t_sess)
1315 		return (0);
1316 
1317 	if (vslq->grouping == VSL_g_vxid && reason == VSL_r_esi && !vsl->E_opt)
1318 		return (0);
1319 
1320 	return (1);
1321 }
1322 
1323 /* Process next input record */
1324 static enum vsl_status
vslq_next(struct VSLQ * vslq)1325 vslq_next(struct VSLQ *vslq)
1326 {
1327 	const uint32_t *ptr;
1328 	struct VSL_cursor *c;
1329 	enum vsl_status r;
1330 	enum VSL_tag_e tag;
1331 	ssize_t len;
1332 	unsigned vxid, keep;
1333 	struct vtx *vtx;
1334 
1335 	c = vslq->c;
1336 	r = VSL_Next(c);
1337 	if (r != vsl_more)
1338 		return (r);
1339 
1340 	assert (r == vsl_more);
1341 
1342 	tag = (enum VSL_tag_e)VSL_TAG(c->rec.ptr);
1343 	if (tag == SLT__Batch) {
1344 		vxid = VSL_BATCHID(c->rec.ptr);
1345 		len = VSL_END(c->rec.ptr, VSL_BATCHLEN(c->rec.ptr)) -
1346 		    c->rec.ptr;
1347 		if (len == 0)
1348 			return (r);
1349 		ptr = VSL_NEXT(c->rec.ptr);
1350 		tag = (enum VSL_tag_e)VSL_TAG(ptr);
1351 	} else {
1352 		vxid = VSL_ID(c->rec.ptr);
1353 		len = VSL_NEXT(c->rec.ptr) - c->rec.ptr;
1354 		ptr = c->rec.ptr;
1355 	}
1356 	assert(len > 0);
1357 	if (vxid == 0)
1358 		/* Skip non-transactional records */
1359 		return (r);
1360 
1361 	vtx = vtx_lookup(vslq, vxid);
1362 	keep = tag != SLT_Begin || vslq_candidate(vslq, ptr);
1363 	if (vtx == NULL && tag == SLT_Begin && keep) {
1364 		vtx = vtx_add(vslq, vxid);
1365 		AN(vtx);
1366 	}
1367 	if (vtx != NULL) {
1368 		AN(keep);
1369 		r = vtx_append(vslq, vtx, &c->rec, len);
1370 		if (r == vsl_more)
1371 			vtx_scan(vslq, vtx);
1372 	}
1373 
1374 	return (r);
1375 }
1376 
1377 /* Test query and report any ready transactions */
1378 static int
vslq_process_ready(struct VSLQ * vslq,VSLQ_dispatch_f * func,void * priv)1379 vslq_process_ready(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
1380 {
1381 	struct vtx *vtx;
1382 	int i = 0;
1383 
1384 	AN(vslq);
1385 
1386 	while (!VTAILQ_EMPTY(&vslq->ready)) {
1387 		vtx = VTAILQ_FIRST(&vslq->ready);
1388 		CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
1389 		VTAILQ_REMOVE(&vslq->ready, vtx, list_vtx);
1390 		AN(vtx->flags & VTX_F_READY);
1391 		if (func != NULL)
1392 			i = vslq_callback(vslq, vtx, func, priv);
1393 		vtx_retire(vslq, &vtx);
1394 		AZ(vtx);
1395 		if (i)
1396 			return (i);
1397 	}
1398 
1399 	return (0);
1400 }
1401 
1402 /* Process the input cursor, calling the callback function on matching
1403    transaction sets */
1404 int
VSLQ_Dispatch(struct VSLQ * vslq,VSLQ_dispatch_f * func,void * priv)1405 VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
1406 {
1407 	enum vsl_status r;
1408 	int i;
1409 	double now;
1410 	struct vtx *vtx;
1411 
1412 	CHECK_OBJ_NOTNULL(vslq, VSLQ_MAGIC);
1413 
1414 	/* Check that we have a cursor */
1415 	if (vslq->c == NULL)
1416 		return (vsl_e_abandon);
1417 
1418 	if (vslq->grouping == VSL_g_raw)
1419 		return (vslq_raw(vslq, func, priv));
1420 
1421 	/* Process next cursor input */
1422 	r = vslq_next(vslq);
1423 	if (r != vsl_more)
1424 		/* At end of log or cursor reports error condition */
1425 		return (r);
1426 
1427 	/* Check shmref list and buffer if necessary */
1428 	r = vslq_shmref_check(vslq);
1429 	if (r != vsl_more)
1430 		/* Buffering of shm ref failed */
1431 		return (r);
1432 
1433 	assert (r == vsl_more);
1434 
1435 	/* Check vtx timeout */
1436 	now = VTIM_mono();
1437 	while (!VTAILQ_EMPTY(&vslq->incomplete)) {
1438 		vtx = VTAILQ_FIRST(&vslq->incomplete);
1439 		CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
1440 		if (now - vtx->t_start < vslq->vsl->T_opt)
1441 			break;
1442 		vtx_force(vslq, vtx, "timeout");
1443 		AN(vtx->flags & VTX_F_COMPLETE);
1444 	}
1445 
1446 	/* Check store limit */
1447 	while (vslq->n_outstanding > vslq->vsl->L_opt &&
1448 	    !(VTAILQ_EMPTY(&vslq->incomplete))) {
1449 		vtx = VTAILQ_FIRST(&vslq->incomplete);
1450 		CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
1451 		vtx_force(vslq, vtx, "store overflow");
1452 		AN(vtx->flags & VTX_F_COMPLETE);
1453 		i = vslq_process_ready(vslq, func, priv);
1454 		if (i)
1455 			/* User return code */
1456 			return (i);
1457 	}
1458 
1459 	/* Check ready list */
1460 	if (!VTAILQ_EMPTY(&vslq->ready)) {
1461 		i = vslq_process_ready(vslq, func, priv);
1462 		if (i)
1463 			/* User return code */
1464 			return (i);
1465 	}
1466 
1467 	return (vsl_more);
1468 }
1469 
1470 /* Flush any incomplete vtx held on to. Do callbacks if func != NULL */
1471 int
VSLQ_Flush(struct VSLQ * vslq,VSLQ_dispatch_f * func,void * priv)1472 VSLQ_Flush(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
1473 {
1474 	struct vtx *vtx;
1475 
1476 	CHECK_OBJ_NOTNULL(vslq, VSLQ_MAGIC);
1477 
1478 	while (!VTAILQ_EMPTY(&vslq->incomplete)) {
1479 		vtx = VTAILQ_FIRST(&vslq->incomplete);
1480 		CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
1481 		AZ(vtx->flags & VTX_F_COMPLETE);
1482 		vtx_force(vslq, vtx, "flush");
1483 	}
1484 
1485 	return (vslq_process_ready(vslq, func, priv));
1486 }
1487