1 /*-
2 * Copyright (c) 2006 Verdens Gang AS
3 * Copyright (c) 2006-2015 Varnish Software AS
4 * All rights reserved.
5 *
6 * Author: Martin Blix Grydeland <martin@varnish-software.com>
7 *
8 * SPDX-License-Identifier: BSD-2-Clause
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
12 * are met:
13 * 1. Redistributions of source code must retain the above copyright
14 * notice, this list of conditions and the following disclaimer.
15 * 2. Redistributions in binary form must reproduce the above copyright
16 * notice, this list of conditions and the following disclaimer in the
17 * documentation and/or other materials provided with the distribution.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
20 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
23 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
25 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
26 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
28 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
29 * SUCH DAMAGE.
30 *
31 */
32
33 #include "config.h"
34
35 #include <stdarg.h>
36 #include <stdint.h>
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <string.h>
40
41 #include "vdef.h"
42 #include "vas.h"
43 #include "miniobj.h"
44
45 #include "vqueue.h"
46 #include "vre.h"
47 #include "vtim.h"
48 #include "vtree.h"
49
50 #include "vapi/vsl.h"
51
52 #include "vsl_api.h"
53
54 #define VTX_CACHE 10
55 #define VTX_BUFSIZE_MIN 64
56 #define VTX_SHMCHUNKS 3
57
58 static const char * const vsl_t_names[VSL_t__MAX] = {
59 [VSL_t_unknown] = "unknown",
60 [VSL_t_sess] = "sess",
61 [VSL_t_req] = "req",
62 [VSL_t_bereq] = "bereq",
63 [VSL_t_raw] = "raw",
64 };
65
66 static const char * const vsl_r_names[VSL_r__MAX] = {
67 [VSL_r_unknown] = "unknown",
68 [VSL_r_http_1] = "HTTP/1",
69 [VSL_r_rxreq] = "rxreq",
70 [VSL_r_esi] = "esi",
71 [VSL_r_restart] = "restart",
72 [VSL_r_pass] = "pass",
73 [VSL_r_fetch] = "fetch",
74 [VSL_r_bgfetch] = "bgfetch",
75 [VSL_r_pipe] = "pipe",
76 };
77
78 struct vtx;
79
80 struct vslc_raw {
81 unsigned magic;
82 #define VSLC_RAW_MAGIC 0x247EBD44
83
84 struct VSL_cursor cursor;
85
86 const uint32_t *ptr;
87 };
88
89 struct synth {
90 unsigned magic;
91 #define SYNTH_MAGIC 0xC654479F
92
93 VTAILQ_ENTRY(synth) list;
94 size_t offset;
95 uint32_t data[2 + 64 / sizeof (uint32_t)];
96 };
97 VTAILQ_HEAD(synthhead, synth);
98
99 enum chunk_t {
100 chunk_t__unassigned,
101 chunk_t_shm,
102 chunk_t_buf,
103 };
104
105 struct chunk {
106 unsigned magic;
107 #define CHUNK_MAGIC 0x48DC0194
108 enum chunk_t type;
109 union {
110 struct {
111 struct VSLC_ptr start;
112 VTAILQ_ENTRY(chunk) shmref;
113 } shm;
114 struct {
115 uint32_t *data;
116 size_t space;
117 } buf;
118 };
119 size_t len;
120 struct vtx *vtx;
121 VTAILQ_ENTRY(chunk) list;
122 };
123 VTAILQ_HEAD(chunkhead, chunk);
124
125 struct vslc_vtx {
126 unsigned magic;
127 #define VSLC_VTX_MAGIC 0x74C6523F
128
129 struct VSL_cursor cursor;
130
131 struct vtx *vtx;
132 struct synth *synth;
133 struct chunk *chunk;
134 size_t chunkstart;
135 size_t offset;
136 };
137
138 struct vtx_key {
139 unsigned vxid;
140 VRBT_ENTRY(vtx_key) entry;
141 };
142 VRBT_HEAD(vtx_tree, vtx_key);
143
144 struct vtx {
145 struct vtx_key key;
146 unsigned magic;
147 #define VTX_MAGIC 0xACC21D09
148 VTAILQ_ENTRY(vtx) list_child;
149 VTAILQ_ENTRY(vtx) list_vtx;
150
151 double t_start;
152 unsigned flags;
153 #define VTX_F_BEGIN 0x1 /* Begin record processed */
154 #define VTX_F_END 0x2 /* End record processed */
155 #define VTX_F_COMPLETE 0x4 /* Marked complete. No new children
156 should be appended */
157 #define VTX_F_READY 0x8 /* This vtx and all it's children are
158 complete */
159
160 enum VSL_transaction_e type;
161 enum VSL_reason_e reason;
162
163 struct vtx *parent;
164 VTAILQ_HEAD(,vtx) child;
165 unsigned n_child;
166 unsigned n_childready;
167 unsigned n_descend;
168
169 VTAILQ_HEAD(,synth) synth;
170
171 struct chunk shmchunks[VTX_SHMCHUNKS];
172 struct chunkhead shmchunks_free;
173
174 struct chunkhead chunks;
175 size_t len;
176
177 struct vslc_vtx c;
178 };
179
180 struct VSLQ {
181 unsigned magic;
182 #define VSLQ_MAGIC 0x23A8BE97
183
184 struct VSL_data *vsl;
185 struct VSL_cursor *c;
186 struct vslq_query *query;
187
188 enum VSL_grouping_e grouping;
189
190 /* Structured mode */
191 struct vtx_tree tree;
192 VTAILQ_HEAD(,vtx) ready;
193 VTAILQ_HEAD(,vtx) incomplete;
194 int n_outstanding;
195 struct chunkhead shmrefs;
196 VTAILQ_HEAD(,vtx) cache;
197 unsigned n_cache;
198
199 /* Rate limiting */
200 double credits;
201 vtim_mono last_use;
202
203 /* Raw mode */
204 struct {
205 struct vslc_raw c;
206 struct VSL_transaction trans;
207 struct VSL_transaction *ptrans[2];
208 struct VSLC_ptr start;
209 ssize_t len;
210 ssize_t offset;
211 } raw;
212 };
213
214 static void vtx_synth_rec(struct vtx *vtx, unsigned tag, const char *fmt, ...);
215 /*lint -esym(534, vtx_diag) */
216 static int vtx_diag(struct vtx *vtx, const char *msg);
217 /*lint -esym(534, vtx_diag_tag) */
218 static int vtx_diag_tag(struct vtx *vtx, const uint32_t *ptr,
219 const char *reason);
220
221 static inline int
vtx_keycmp(const struct vtx_key * a,const struct vtx_key * b)222 vtx_keycmp(const struct vtx_key *a, const struct vtx_key *b)
223 {
224 if (a->vxid < b->vxid)
225 return (-1);
226 if (a->vxid > b->vxid)
227 return (1);
228 return (0);
229 }
230
VRBT_PROTOTYPE_STATIC(vtx_tree,vtx_key,entry,vtx_keycmp)231 VRBT_PROTOTYPE_STATIC(vtx_tree, vtx_key, entry, vtx_keycmp)
232 VRBT_GENERATE_STATIC(vtx_tree, vtx_key, entry, vtx_keycmp)
233
234 static enum vsl_status v_matchproto_(vslc_next_f)
235 vslc_raw_next(const struct VSL_cursor *cursor)
236 {
237 struct vslc_raw *c;
238
239 CAST_OBJ_NOTNULL(c, cursor->priv_data, VSLC_RAW_MAGIC);
240 assert(&c->cursor == cursor);
241
242 AN(c->ptr);
243 if (c->cursor.rec.ptr == NULL) {
244 c->cursor.rec.ptr = c->ptr;
245 return (vsl_more);
246 } else {
247 c->cursor.rec.ptr = NULL;
248 return (vsl_end);
249 }
250 }
251
v_matchproto_(vslc_reset_f)252 static enum vsl_status v_matchproto_(vslc_reset_f)
253 vslc_raw_reset(const struct VSL_cursor *cursor)
254 {
255 struct vslc_raw *c;
256
257 CAST_OBJ_NOTNULL(c, cursor->priv_data, VSLC_RAW_MAGIC);
258 assert(&c->cursor == cursor);
259
260 AN(c->ptr);
261 c->cursor.rec.ptr = NULL;
262
263 return (vsl_end);
264 }
265
266 static const struct vslc_tbl vslc_raw_tbl = {
267 .magic = VSLC_TBL_MAGIC,
268 .delete = NULL,
269 .next = vslc_raw_next,
270 .reset = vslc_raw_reset,
271 .check = NULL,
272 };
273
v_matchproto_(vslc_next_f)274 static enum vsl_status v_matchproto_(vslc_next_f)
275 vslc_vtx_next(const struct VSL_cursor *cursor)
276 {
277 struct vslc_vtx *c;
278 const uint32_t *ptr;
279 unsigned overrun;
280
281 CAST_OBJ_NOTNULL(c, cursor->priv_data, VSLC_VTX_MAGIC);
282 assert(&c->cursor == cursor);
283 CHECK_OBJ_NOTNULL(c->vtx, VTX_MAGIC);
284
285 do {
286 CHECK_OBJ_ORNULL(c->synth, SYNTH_MAGIC);
287 if (c->synth != NULL && c->synth->offset == c->offset) {
288 /* We're at the offset of the next synth record,
289 point to it and advance the pointer */
290 c->cursor.rec.ptr = c->synth->data;
291 c->synth = VTAILQ_NEXT(c->synth, list);
292 } else {
293 overrun = c->offset > c->vtx->len;
294 AZ(overrun);
295 if (c->offset == c->vtx->len)
296 return (vsl_end);
297
298 /* Advance chunk pointer */
299 if (c->chunk == NULL) {
300 c->chunk = VTAILQ_FIRST(&c->vtx->chunks);
301 c->chunkstart = 0;
302 }
303 CHECK_OBJ_NOTNULL(c->chunk, CHUNK_MAGIC);
304 while (c->offset >= c->chunkstart + c->chunk->len) {
305 c->chunkstart += c->chunk->len;
306 c->chunk = VTAILQ_NEXT(c->chunk, list);
307 CHECK_OBJ_NOTNULL(c->chunk, CHUNK_MAGIC);
308 }
309
310 /* Point to the next stored record */
311 if (c->chunk->type == chunk_t_shm)
312 ptr = c->chunk->shm.start.ptr;
313 else {
314 assert(c->chunk->type == chunk_t_buf);
315 ptr = c->chunk->buf.data;
316 }
317 c->cursor.rec.ptr = ptr + c->offset - c->chunkstart;
318 c->offset += VSL_NEXT(c->cursor.rec.ptr) -
319 c->cursor.rec.ptr;
320 }
321 } while (VSL_TAG(c->cursor.rec.ptr) == SLT__Batch);
322
323 return (vsl_more);
324 }
325
v_matchproto_(vslc_reset_f)326 static enum vsl_status v_matchproto_(vslc_reset_f)
327 vslc_vtx_reset(const struct VSL_cursor *cursor)
328 {
329 struct vslc_vtx *c;
330
331 CAST_OBJ_NOTNULL(c, cursor->priv_data, VSLC_VTX_MAGIC);
332 assert(&c->cursor == cursor);
333 CHECK_OBJ_NOTNULL(c->vtx, VTX_MAGIC);
334 c->synth = VTAILQ_FIRST(&c->vtx->synth);
335 c->chunk = NULL;
336 c->chunkstart = 0;
337 c->offset = 0;
338 c->cursor.rec.ptr = NULL;
339
340 return (vsl_end);
341 }
342
343 static const struct vslc_tbl vslc_vtx_tbl = {
344 .magic = VSLC_TBL_MAGIC,
345 .delete = NULL,
346 .next = vslc_vtx_next,
347 .reset = vslc_vtx_reset,
348 .check = NULL,
349 };
350
351 /* Create a buf chunk */
352 static struct chunk *
chunk_newbuf(struct vtx * vtx,const uint32_t * ptr,size_t len)353 chunk_newbuf(struct vtx *vtx, const uint32_t *ptr, size_t len)
354 {
355 struct chunk *chunk;
356
357 ALLOC_OBJ(chunk, CHUNK_MAGIC);
358 XXXAN(chunk);
359 chunk->type = chunk_t_buf;
360 chunk->vtx = vtx;
361 chunk->buf.space = VTX_BUFSIZE_MIN;
362 while (chunk->buf.space < len)
363 chunk->buf.space *= 2;
364 chunk->buf.data = malloc(sizeof (uint32_t) * chunk->buf.space);
365 AN(chunk->buf.data);
366 memcpy(chunk->buf.data, ptr, sizeof (uint32_t) * len);
367 chunk->len = len;
368 return (chunk);
369 }
370
371 /* Free a buf chunk */
372 static void
chunk_freebuf(struct chunk ** pchunk)373 chunk_freebuf(struct chunk **pchunk)
374 {
375
376 CHECK_OBJ_NOTNULL(*pchunk, CHUNK_MAGIC);
377 assert((*pchunk)->type == chunk_t_buf);
378 free((*pchunk)->buf.data);
379 FREE_OBJ(*pchunk);
380 *pchunk = NULL;
381 }
382
383 /* Append a set of records to a chunk */
384 static void
chunk_appendbuf(struct chunk * chunk,const uint32_t * ptr,size_t len)385 chunk_appendbuf(struct chunk *chunk, const uint32_t *ptr, size_t len)
386 {
387
388 CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
389 assert(chunk->type == chunk_t_buf);
390 if (chunk->buf.space < chunk->len + len) {
391 while (chunk->buf.space < chunk->len + len)
392 chunk->buf.space *= 2;
393 chunk->buf.data = realloc(chunk->buf.data,
394 sizeof (uint32_t) * chunk->buf.space);
395 }
396 memcpy(chunk->buf.data + chunk->len, ptr, sizeof (uint32_t) * len);
397 chunk->len += len;
398 }
399
400 /* Transform a shm chunk to a buf chunk */
401 static void
chunk_shm_to_buf(struct VSLQ * vslq,struct chunk * chunk)402 chunk_shm_to_buf(struct VSLQ *vslq, struct chunk *chunk)
403 {
404 struct vtx *vtx;
405 struct chunk *buf;
406
407 CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
408 assert(chunk->type == chunk_t_shm);
409 vtx = chunk->vtx;
410 CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
411
412 buf = VTAILQ_PREV(chunk, chunkhead, list);
413 if (buf != NULL && buf->type == chunk_t_buf)
414 /* Previous is a buf chunk, append to it */
415 chunk_appendbuf(buf, chunk->shm.start.ptr, chunk->len);
416 else {
417 /* Create a new buf chunk and insert it before this */
418 buf = chunk_newbuf(vtx, chunk->shm.start.ptr, chunk->len);
419 AN(buf);
420 VTAILQ_INSERT_BEFORE(chunk, buf, list);
421 }
422
423 /* Reset cursor chunk pointer, vslc_vtx_next will set it correctly */
424 vtx->c.chunk = NULL;
425
426 /* Remove from the shmref list and vtx, and put chunk back
427 on the free list */
428 VTAILQ_REMOVE(&vslq->shmrefs, chunk, shm.shmref);
429 VTAILQ_REMOVE(&vtx->chunks, chunk, list);
430 VTAILQ_INSERT_HEAD(&vtx->shmchunks_free, chunk, list);
431 }
432
433 /* Append a set of records to a vtx structure */
434 static enum vsl_status
vtx_append(struct VSLQ * vslq,struct vtx * vtx,const struct VSLC_ptr * start,size_t len)435 vtx_append(struct VSLQ *vslq, struct vtx *vtx, const struct VSLC_ptr *start,
436 size_t len)
437 {
438 struct chunk *chunk;
439 enum vsl_check i;
440
441 AN(vtx);
442 AN(len);
443 AN(start);
444
445 i = VSL_Check(vslq->c, start);
446 if (i == vsl_check_e_inval)
447 return (vsl_e_overrun);
448
449 if (i == vsl_check_valid && !VTAILQ_EMPTY(&vtx->shmchunks_free)) {
450 /* Shmref it */
451 chunk = VTAILQ_FIRST(&vtx->shmchunks_free);
452 CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
453 assert(chunk->type == chunk_t_shm);
454 assert(chunk->vtx == vtx);
455 VTAILQ_REMOVE(&vtx->shmchunks_free, chunk, list);
456 chunk->shm.start = *start;
457 chunk->len = len;
458 VTAILQ_INSERT_TAIL(&vtx->chunks, chunk, list);
459
460 /* Append to shmref list */
461 VTAILQ_INSERT_TAIL(&vslq->shmrefs, chunk, shm.shmref);
462 } else {
463 /* Buffer it */
464 chunk = VTAILQ_LAST(&vtx->chunks, chunkhead);
465 CHECK_OBJ_ORNULL(chunk, CHUNK_MAGIC);
466 if (chunk != NULL && chunk->type == chunk_t_buf) {
467 /* Tail is a buf chunk, append to that */
468 chunk_appendbuf(chunk, start->ptr, len);
469 } else {
470 /* Append new buf chunk */
471 chunk = chunk_newbuf(vtx, start->ptr, len);
472 AN(chunk);
473 VTAILQ_INSERT_TAIL(&vtx->chunks, chunk, list);
474 }
475 }
476 vtx->len += len;
477 return (vsl_more);
478 }
479
480 /* Allocate a new vtx structure */
481 static struct vtx *
vtx_new(struct VSLQ * vslq)482 vtx_new(struct VSLQ *vslq)
483 {
484 struct vtx *vtx;
485 int i;
486
487 AN(vslq);
488 if (vslq->n_cache) {
489 AZ(VTAILQ_EMPTY(&vslq->cache));
490 vtx = VTAILQ_FIRST(&vslq->cache);
491 VTAILQ_REMOVE(&vslq->cache, vtx, list_child);
492 vslq->n_cache--;
493 } else {
494 ALLOC_OBJ(vtx, VTX_MAGIC);
495 AN(vtx);
496
497 VTAILQ_INIT(&vtx->child);
498 VTAILQ_INIT(&vtx->shmchunks_free);
499 for (i = 0; i < VTX_SHMCHUNKS; i++) {
500 vtx->shmchunks[i].magic = CHUNK_MAGIC;
501 vtx->shmchunks[i].type = chunk_t_shm;
502 vtx->shmchunks[i].vtx = vtx;
503 VTAILQ_INSERT_TAIL(&vtx->shmchunks_free,
504 &vtx->shmchunks[i], list);
505 }
506 VTAILQ_INIT(&vtx->chunks);
507 VTAILQ_INIT(&vtx->synth);
508 vtx->c.magic = VSLC_VTX_MAGIC;
509 vtx->c.vtx = vtx;
510 vtx->c.cursor.priv_tbl = &vslc_vtx_tbl;
511 vtx->c.cursor.priv_data = &vtx->c;
512 }
513
514 CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
515 vtx->key.vxid = 0;
516 vtx->t_start = VTIM_mono();
517 vtx->flags = 0;
518 vtx->type = VSL_t_unknown;
519 vtx->reason = VSL_r_unknown;
520 vtx->parent = NULL;
521 vtx->n_child = 0;
522 vtx->n_childready = 0;
523 vtx->n_descend = 0;
524 vtx->len = 0;
525 AN(vslc_vtx_reset(&vtx->c.cursor) == vsl_end);
526
527 return (vtx);
528 }
529
530 /* Disuse a vtx and all it's children, freeing any resources held. Free or
531 cache the vtx for later use */
532 static void
vtx_retire(struct VSLQ * vslq,struct vtx ** pvtx)533 vtx_retire(struct VSLQ *vslq, struct vtx **pvtx)
534 {
535 struct vtx *vtx;
536 struct vtx *child;
537 struct synth *synth;
538 struct chunk *chunk;
539
540 AN(vslq);
541 TAKE_OBJ_NOTNULL(vtx, pvtx, VTX_MAGIC);
542
543 AN(vtx->flags & VTX_F_COMPLETE);
544 AN(vtx->flags & VTX_F_READY);
545 AZ(vtx->parent);
546
547 while (!VTAILQ_EMPTY(&vtx->child)) {
548 child = VTAILQ_FIRST(&vtx->child);
549 assert(child->parent == vtx);
550 AN(vtx->n_child);
551 assert(vtx->n_descend >= child->n_descend + 1);
552 VTAILQ_REMOVE(&vtx->child, child, list_child);
553 child->parent = NULL;
554 vtx->n_child--;
555 vtx->n_descend -= child->n_descend + 1;
556 vtx_retire(vslq, &child);
557 AZ(child);
558 }
559 AZ(vtx->n_child);
560 AZ(vtx->n_descend);
561 vtx->n_childready = 0;
562 AN(VRBT_REMOVE(vtx_tree, &vslq->tree, &vtx->key));
563 vtx->key.vxid = 0;
564 vtx->flags = 0;
565
566 while (!VTAILQ_EMPTY(&vtx->synth)) {
567 synth = VTAILQ_FIRST(&vtx->synth);
568 CHECK_OBJ_NOTNULL(synth, SYNTH_MAGIC);
569 VTAILQ_REMOVE(&vtx->synth, synth, list);
570 FREE_OBJ(synth);
571 }
572
573 while (!VTAILQ_EMPTY(&vtx->chunks)) {
574 chunk = VTAILQ_FIRST(&vtx->chunks);
575 CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
576 VTAILQ_REMOVE(&vtx->chunks, chunk, list);
577 if (chunk->type == chunk_t_shm) {
578 VTAILQ_REMOVE(&vslq->shmrefs, chunk, shm.shmref);
579 VTAILQ_INSERT_HEAD(&vtx->shmchunks_free, chunk, list);
580 } else {
581 assert(chunk->type == chunk_t_buf);
582 chunk_freebuf(&chunk);
583 AZ(chunk);
584 }
585 }
586 vtx->len = 0;
587 AN(vslq->n_outstanding);
588 vslq->n_outstanding--;
589
590 if (vslq->n_cache < VTX_CACHE) {
591 VTAILQ_INSERT_HEAD(&vslq->cache, vtx, list_child);
592 vslq->n_cache++;
593 } else {
594 FREE_OBJ(vtx);
595 }
596 }
597
598 /* Lookup a vtx by vxid from the managed list */
599 static struct vtx *
vtx_lookup(const struct VSLQ * vslq,unsigned vxid)600 vtx_lookup(const struct VSLQ *vslq, unsigned vxid)
601 {
602 struct vtx_key lkey, *key;
603 struct vtx *vtx;
604
605 AN(vslq);
606 lkey.vxid = vxid;
607 key = VRBT_FIND(vtx_tree, &vslq->tree, &lkey);
608 if (key == NULL)
609 return (NULL);
610 CAST_OBJ_NOTNULL(vtx, (void *)key, VTX_MAGIC);
611 return (vtx);
612 }
613
614 /* Insert a new vtx into the managed list */
615 static struct vtx *
vtx_add(struct VSLQ * vslq,unsigned vxid)616 vtx_add(struct VSLQ *vslq, unsigned vxid)
617 {
618 struct vtx *vtx;
619
620 AN(vslq);
621 vtx = vtx_new(vslq);
622 AN(vtx);
623 vtx->key.vxid = vxid;
624 AZ(VRBT_INSERT(vtx_tree, &vslq->tree, &vtx->key));
625 VTAILQ_INSERT_TAIL(&vslq->incomplete, vtx, list_vtx);
626 vslq->n_outstanding++;
627 return (vtx);
628 }
629
630 /* Mark a vtx complete, update child counters and if possible push it or
631 it's top parent to the ready state */
632 static void
vtx_mark_complete(struct VSLQ * vslq,struct vtx * vtx)633 vtx_mark_complete(struct VSLQ *vslq, struct vtx *vtx)
634 {
635
636 AN(vslq);
637 AN(vtx->flags & VTX_F_END);
638 AZ(vtx->flags & VTX_F_COMPLETE);
639
640 if (vtx->type == VSL_t_unknown)
641 vtx_diag(vtx, "vtx of unknown type marked complete");
642
643 vtx->flags |= VTX_F_COMPLETE;
644 VTAILQ_REMOVE(&vslq->incomplete, vtx, list_vtx);
645
646 while (1) {
647 AZ(vtx->flags & VTX_F_READY);
648 if (vtx->flags & VTX_F_COMPLETE &&
649 vtx->n_child == vtx->n_childready)
650 vtx->flags |= VTX_F_READY;
651 else
652 return;
653 if (vtx->parent == NULL) {
654 /* Top level vtx ready */
655 VTAILQ_INSERT_TAIL(&vslq->ready, vtx, list_vtx);
656 return;
657 }
658 vtx = vtx->parent;
659 vtx->n_childready++;
660 assert(vtx->n_child >= vtx->n_childready);
661 }
662 }
663
664 /* Add a child to a parent, and update child counters */
665 static void
vtx_set_parent(struct vtx * parent,struct vtx * child)666 vtx_set_parent(struct vtx *parent, struct vtx *child)
667 {
668
669 CHECK_OBJ_NOTNULL(parent, VTX_MAGIC);
670 CHECK_OBJ_NOTNULL(child, VTX_MAGIC);
671 assert(parent != child);
672 AZ(parent->flags & VTX_F_COMPLETE);
673 AZ(child->flags & VTX_F_COMPLETE);
674 AZ(child->parent);
675 child->parent = parent;
676 VTAILQ_INSERT_TAIL(&parent->child, child, list_child);
677 parent->n_child++;
678 do
679 parent->n_descend += 1 + child->n_descend;
680 while ((parent = parent->parent) != NULL);
681 }
682
683 /* Parse a begin or link record. Returns the number of elements that was
684 successfully parsed. */
685 static int
vtx_parse_link(const char * str,enum VSL_transaction_e * ptype,unsigned * pvxid,enum VSL_reason_e * preason)686 vtx_parse_link(const char *str, enum VSL_transaction_e *ptype,
687 unsigned *pvxid, enum VSL_reason_e *preason)
688 {
689 char type[16], reason[16];
690 unsigned vxid;
691 int i;
692 enum VSL_transaction_e et;
693 enum VSL_reason_e er;
694
695 AN(str);
696 AN(ptype);
697 AN(pvxid);
698 AN(preason);
699
700 i = sscanf(str, "%15s %u %15s", type, &vxid, reason);
701 if (i < 1)
702 return (0);
703
704 /* transaction type */
705 for (et = VSL_t_unknown; et < VSL_t__MAX; et++)
706 if (!strcmp(type, vsl_t_names[et]))
707 break;
708 if (et >= VSL_t__MAX)
709 et = VSL_t_unknown;
710 *ptype = et;
711 if (i == 1)
712 return (1);
713
714 /* vxid */
715 assert((vxid & ~VSL_IDENTMASK) == 0);
716 *pvxid = vxid;
717 if (i == 2)
718 return (2);
719
720 /* transaction reason */
721 for (er = VSL_r_unknown; er < VSL_r__MAX; er++)
722 if (!strcmp(reason, vsl_r_names[er]))
723 break;
724 if (er >= VSL_r__MAX)
725 er = VSL_r_unknown;
726 *preason = er;
727 return (3);
728 }
729
730 /* Parse and process a begin record */
731 static int
vtx_scan_begin(struct VSLQ * vslq,struct vtx * vtx,const uint32_t * ptr)732 vtx_scan_begin(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
733 {
734 int i;
735 enum VSL_transaction_e type;
736 enum VSL_reason_e reason;
737 unsigned p_vxid;
738 struct vtx *p_vtx;
739
740 assert(VSL_TAG(ptr) == SLT_Begin);
741
742 AZ(vtx->flags & VTX_F_READY);
743
744 i = vtx_parse_link(VSL_CDATA(ptr), &type, &p_vxid, &reason);
745 if (i != 3)
746 return (vtx_diag_tag(vtx, ptr, "parse error"));
747 if (type == VSL_t_unknown)
748 (void)vtx_diag_tag(vtx, ptr, "unknown vxid type");
749
750 /* Check/set vtx type */
751 if (vtx->type != VSL_t_unknown && vtx->type != type)
752 /* Type not matching the one previously set by a link
753 record */
754 (void)vtx_diag_tag(vtx, ptr, "type mismatch");
755 vtx->type = type;
756 vtx->reason = reason;
757
758 if (p_vxid == 0)
759 /* Zero means no parent */
760 return (0);
761 if (p_vxid == vtx->key.vxid)
762 return (vtx_diag_tag(vtx, ptr, "link to self"));
763
764 if (vslq->grouping == VSL_g_vxid)
765 return (0); /* No links */
766 if (vslq->grouping == VSL_g_request && vtx->type == VSL_t_req &&
767 vtx->reason == VSL_r_rxreq)
768 return (0); /* No links */
769
770 if (vtx->parent != NULL) {
771 if (vtx->parent->key.vxid != p_vxid) {
772 /* This vtx already belongs to a different
773 parent */
774 return (vtx_diag_tag(vtx, ptr, "link mismatch"));
775 } else
776 /* Link already exists */
777 return (0);
778 }
779
780 p_vtx = vtx_lookup(vslq, p_vxid);
781 if (p_vtx == NULL) {
782 /* Not seen parent yet. Create it. */
783 p_vtx = vtx_add(vslq, p_vxid);
784 AN(p_vtx);
785 } else {
786 CHECK_OBJ_NOTNULL(p_vtx, VTX_MAGIC);
787 if (p_vtx->flags & VTX_F_COMPLETE)
788 return (vtx_diag_tag(vtx, ptr, "link too late"));
789 }
790
791 /* Create link */
792 vtx_set_parent(p_vtx, vtx);
793
794 return (0);
795 }
796
797 /* Parse and process a link record */
798 static int
vtx_scan_link(struct VSLQ * vslq,struct vtx * vtx,const uint32_t * ptr)799 vtx_scan_link(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
800 {
801 int i;
802 enum VSL_transaction_e c_type;
803 enum VSL_reason_e c_reason;
804 unsigned c_vxid;
805 struct vtx *c_vtx;
806
807 assert(VSL_TAG(ptr) == SLT_Link);
808
809 AZ(vtx->flags & VTX_F_READY);
810
811 i = vtx_parse_link(VSL_CDATA(ptr), &c_type, &c_vxid, &c_reason);
812 if (i != 3)
813 return (vtx_diag_tag(vtx, ptr, "parse error"));
814 if (c_type == VSL_t_unknown)
815 (void)vtx_diag_tag(vtx, ptr, "unknown vxid type");
816
817 if (vslq->grouping == VSL_g_vxid)
818 return (0); /* No links */
819 if (vslq->grouping == VSL_g_request && vtx->type == VSL_t_sess)
820 return (0); /* No links */
821
822 if (c_vxid == 0)
823 return (vtx_diag_tag(vtx, ptr, "illegal link vxid"));
824 if (c_vxid == vtx->key.vxid)
825 return (vtx_diag_tag(vtx, ptr, "link to self"));
826
827 /* Lookup and check child vtx */
828 c_vtx = vtx_lookup(vslq, c_vxid);
829 if (c_vtx == NULL) {
830 /* Child not seen before. Insert it and create link */
831 c_vtx = vtx_add(vslq, c_vxid);
832 AN(c_vtx);
833 AZ(c_vtx->parent);
834 c_vtx->type = c_type;
835 c_vtx->reason = c_reason;
836 vtx_set_parent(vtx, c_vtx);
837 return (0);
838 }
839
840 CHECK_OBJ_NOTNULL(c_vtx, VTX_MAGIC);
841 if (c_vtx->parent == vtx)
842 /* Link already exists */
843 return (0);
844 if (c_vtx->parent != NULL && c_vtx->parent != vtx)
845 return (vtx_diag_tag(vtx, ptr, "duplicate link"));
846 if (c_vtx->flags & VTX_F_COMPLETE)
847 return (vtx_diag_tag(vtx, ptr, "link too late"));
848 if (c_vtx->type != VSL_t_unknown && c_vtx->type != c_type)
849 (void)vtx_diag_tag(vtx, ptr, "type mismatch");
850
851 c_vtx->type = c_type;
852 c_vtx->reason = c_reason;
853 vtx_set_parent(vtx, c_vtx);
854 return (0);
855 }
856
857 /* Scan the records of a vtx, performing processing actions on specific
858 records */
859 static void
vtx_scan(struct VSLQ * vslq,struct vtx * vtx)860 vtx_scan(struct VSLQ *vslq, struct vtx *vtx)
861 {
862 const uint32_t *ptr;
863 enum VSL_tag_e tag;
864
865 while (!(vtx->flags & VTX_F_COMPLETE) &&
866 vslc_vtx_next(&vtx->c.cursor) == 1) {
867 ptr = vtx->c.cursor.rec.ptr;
868 if (VSL_ID(ptr) != vtx->key.vxid) {
869 (void)vtx_diag_tag(vtx, ptr, "vxid mismatch");
870 continue;
871 }
872
873 tag = VSL_TAG(ptr);
874 assert(tag != SLT__Batch);
875
876 switch (tag) {
877 case SLT_Begin:
878 if (vtx->flags & VTX_F_BEGIN)
879 (void)vtx_diag_tag(vtx, ptr, "duplicate begin");
880 else {
881 (void)vtx_scan_begin(vslq, vtx, ptr);
882 vtx->flags |= VTX_F_BEGIN;
883 }
884 break;
885
886 case SLT_Link:
887 (void)vtx_scan_link(vslq, vtx, ptr);
888 break;
889
890 case SLT_End:
891 AZ(vtx->flags & VTX_F_END);
892 vtx->flags |= VTX_F_END;
893 vtx_mark_complete(vslq, vtx);
894 break;
895
896 default:
897 break;
898 }
899 }
900 }
901
902 /* Force a vtx into complete status by synthing the necessary outstanding
903 records */
904 static void
vtx_force(struct VSLQ * vslq,struct vtx * vtx,const char * reason)905 vtx_force(struct VSLQ *vslq, struct vtx *vtx, const char *reason)
906 {
907
908 AZ(vtx->flags & VTX_F_COMPLETE);
909 AZ(vtx->flags & VTX_F_READY);
910 vtx_scan(vslq, vtx);
911 if (!(vtx->flags & VTX_F_BEGIN))
912 vtx_synth_rec(vtx, SLT_Begin, "%s %u synth",
913 vsl_t_names[vtx->type], 0);
914 vtx_diag(vtx, reason);
915 if (!(vtx->flags & VTX_F_END))
916 vtx_synth_rec(vtx, SLT_End, "synth");
917 vtx_scan(vslq, vtx);
918 AN(vtx->flags & VTX_F_COMPLETE);
919 }
920
921 static int
vslq_ratelimit(struct VSLQ * vslq)922 vslq_ratelimit(struct VSLQ *vslq)
923 {
924 vtim_mono now;
925 vtim_dur delta;
926
927 CHECK_OBJ_NOTNULL(vslq, VSLQ_MAGIC);
928 CHECK_OBJ_NOTNULL(vslq->vsl, VSL_MAGIC);
929
930 now = VTIM_mono();
931 delta = now - vslq->last_use;
932 vslq->credits += (delta / vslq->vsl->R_opt_p) * vslq->vsl->R_opt_l;
933 if (vslq->credits > vslq->vsl->R_opt_l)
934 vslq->credits = vslq->vsl->R_opt_l;
935 vslq->last_use = now;
936
937 if (vslq->credits < 1.0)
938 return (0);
939
940 vslq->credits -= 1.0;
941 return (1);
942 }
943
944 /* Build transaction array, do the query and callback. Returns 0 or the
945 return value from func */
946 static int
vslq_callback(struct VSLQ * vslq,struct vtx * vtx,VSLQ_dispatch_f * func,void * priv)947 vslq_callback(struct VSLQ *vslq, struct vtx *vtx, VSLQ_dispatch_f *func,
948 void *priv)
949 {
950 unsigned n = vtx->n_descend + 1;
951 struct vtx *vtxs[n];
952 struct VSL_transaction trans[n];
953 struct VSL_transaction *ptrans[n + 1];
954 unsigned i, j;
955
956 AN(vslq);
957 CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
958 AN(vtx->flags & VTX_F_READY);
959 AN(func);
960
961 if (vslq->grouping == VSL_g_session &&
962 vtx->type != VSL_t_sess)
963 return (0);
964 if (vslq->grouping == VSL_g_request &&
965 vtx->type != VSL_t_req)
966 return (0);
967
968 /* Build transaction array */
969 AN(vslc_vtx_reset(&vtx->c.cursor) == vsl_end);
970 vtxs[0] = vtx;
971 trans[0].level = 1;
972 trans[0].vxid = vtx->key.vxid;
973 trans[0].vxid_parent = 0;
974 trans[0].type = vtx->type;
975 trans[0].reason = vtx->reason;
976 trans[0].c = &vtx->c.cursor;
977 i = 1;
978 j = 0;
979 while (j < i) {
980 VTAILQ_FOREACH(vtx, &vtxs[j]->child, list_child) {
981 assert(i < n);
982 AN(vslc_vtx_reset(&vtx->c.cursor) == vsl_end);
983 vtxs[i] = vtx;
984 if (vtx->reason == VSL_r_restart)
985 /* Restarts stay at the same level as parent */
986 trans[i].level = trans[j].level;
987 else
988 trans[i].level = trans[j].level + 1;
989 trans[i].vxid = vtx->key.vxid;
990 trans[i].vxid_parent = trans[j].vxid;
991 trans[i].type = vtx->type;
992 trans[i].reason = vtx->reason;
993 trans[i].c = &vtx->c.cursor;
994 i++;
995 }
996 j++;
997 }
998 assert(i == n);
999
1000 /* Build pointer array */
1001 for (i = 0; i < n; i++)
1002 ptrans[i] = &trans[i];
1003 ptrans[i] = NULL;
1004
1005 /* Query test goes here */
1006 if (vslq->query != NULL && !vslq_runquery(vslq->query, ptrans))
1007 return (0);
1008
1009 if (vslq->vsl->R_opt_l != 0 && !vslq_ratelimit(vslq))
1010 return (0);
1011
1012 /* Callback */
1013 return ((func)(vslq->vsl, ptrans, priv));
1014 }
1015
1016 /* Create a synthetic log record. The record will be inserted at the
1017 current cursor offset */
1018 static void
vtx_synth_rec(struct vtx * vtx,unsigned tag,const char * fmt,...)1019 vtx_synth_rec(struct vtx *vtx, unsigned tag, const char *fmt, ...)
1020 {
1021 struct synth *synth, *it;
1022 va_list ap;
1023 char *buf;
1024 int l, buflen;
1025
1026 ALLOC_OBJ(synth, SYNTH_MAGIC);
1027 AN(synth);
1028
1029 buf = (char *)&synth->data[2];
1030 buflen = sizeof (synth->data) - 2 * sizeof (uint32_t);
1031 va_start(ap, fmt);
1032 l = vsnprintf(buf, buflen, fmt, ap);
1033 assert(l >= 0);
1034 va_end(ap);
1035 if (l > buflen - 1)
1036 l = buflen - 1;
1037 buf[l++] = '\0'; /* NUL-terminated */
1038 synth->data[1] = vtx->key.vxid;
1039 switch (vtx->type) {
1040 case VSL_t_req:
1041 synth->data[1] |= VSL_CLIENTMARKER;
1042 break;
1043 case VSL_t_bereq:
1044 synth->data[1] |= VSL_BACKENDMARKER;
1045 break;
1046 default:
1047 break;
1048 }
1049 synth->data[0] = (((tag & 0xff) << 24) | l);
1050 synth->offset = vtx->c.offset;
1051
1052 VTAILQ_FOREACH_REVERSE(it, &vtx->synth, synthhead, list) {
1053 /* Make sure the synth list is sorted on offset */
1054 CHECK_OBJ_NOTNULL(it, SYNTH_MAGIC);
1055 if (synth->offset >= it->offset)
1056 break;
1057 }
1058 if (it != NULL)
1059 VTAILQ_INSERT_AFTER(&vtx->synth, it, synth, list);
1060 else
1061 VTAILQ_INSERT_HEAD(&vtx->synth, synth, list);
1062
1063 /* Update cursor */
1064 CHECK_OBJ_ORNULL(vtx->c.synth, SYNTH_MAGIC);
1065 if (vtx->c.synth == NULL || vtx->c.synth->offset > synth->offset)
1066 vtx->c.synth = synth;
1067 }
1068
1069 /* Add a diagnostic SLT_VSL synth record to the vtx. */
1070 static int
vtx_diag(struct vtx * vtx,const char * msg)1071 vtx_diag(struct vtx *vtx, const char *msg)
1072 {
1073
1074 vtx_synth_rec(vtx, SLT_VSL, msg);
1075 return (-1);
1076 }
1077
1078 /* Add a SLT_VSL diag synth record to the vtx. Takes an offending record
1079 that will be included in the log record */
1080 static int
vtx_diag_tag(struct vtx * vtx,const uint32_t * ptr,const char * reason)1081 vtx_diag_tag(struct vtx *vtx, const uint32_t *ptr, const char *reason)
1082 {
1083
1084 vtx_synth_rec(vtx, SLT_VSL, "%s (%u:%s \"%.*s\")", reason, VSL_ID(ptr),
1085 VSL_tags[VSL_TAG(ptr)], (int)VSL_LEN(ptr), VSL_CDATA(ptr));
1086 return (-1);
1087 }
1088
1089 struct VSLQ *
VSLQ_New(struct VSL_data * vsl,struct VSL_cursor ** cp,enum VSL_grouping_e grouping,const char * querystring)1090 VSLQ_New(struct VSL_data *vsl, struct VSL_cursor **cp,
1091 enum VSL_grouping_e grouping, const char *querystring)
1092 {
1093 struct vslq_query *query;
1094 struct VSLQ *vslq;
1095
1096 CHECK_OBJ_NOTNULL(vsl, VSL_MAGIC);
1097 if (grouping >= VSL_g__MAX) {
1098 (void)vsl_diag(vsl, "Illegal query grouping");
1099 return (NULL);
1100 }
1101 if (querystring != NULL) {
1102 query = vslq_newquery(vsl, grouping, querystring);
1103 if (query == NULL)
1104 return (NULL);
1105 } else
1106 query = NULL;
1107
1108 ALLOC_OBJ(vslq, VSLQ_MAGIC);
1109 AN(vslq);
1110 vslq->vsl = vsl;
1111 if (cp != NULL) {
1112 vslq->c = *cp;
1113 *cp = NULL;
1114 }
1115 vslq->grouping = grouping;
1116 vslq->query = query;
1117 if (vslq->vsl->R_opt_l != 0) {
1118 vslq->last_use = VTIM_mono();
1119 vslq->credits = 1;
1120 }
1121
1122 /* Setup normal mode */
1123 VRBT_INIT(&vslq->tree);
1124 VTAILQ_INIT(&vslq->ready);
1125 VTAILQ_INIT(&vslq->incomplete);
1126 VTAILQ_INIT(&vslq->shmrefs);
1127 VTAILQ_INIT(&vslq->cache);
1128
1129 /* Setup raw mode */
1130 vslq->raw.c.magic = VSLC_RAW_MAGIC;
1131 vslq->raw.c.cursor.priv_tbl = &vslc_raw_tbl;
1132 vslq->raw.c.cursor.priv_data = &vslq->raw.c;
1133 vslq->raw.trans.level = 0;
1134 vslq->raw.trans.type = VSL_t_raw;
1135 vslq->raw.trans.reason = VSL_r_unknown;
1136 vslq->raw.trans.c = &vslq->raw.c.cursor;
1137 vslq->raw.ptrans[0] = &vslq->raw.trans;
1138 vslq->raw.ptrans[1] = NULL;
1139
1140 return (vslq);
1141 }
1142
1143 void
VSLQ_Delete(struct VSLQ ** pvslq)1144 VSLQ_Delete(struct VSLQ **pvslq)
1145 {
1146 struct VSLQ *vslq;
1147 struct vtx *vtx;
1148
1149 TAKE_OBJ_NOTNULL(vslq, pvslq, VSLQ_MAGIC);
1150
1151 (void)VSLQ_Flush(vslq, NULL, NULL);
1152 AZ(vslq->n_outstanding);
1153
1154 if (vslq->c != NULL) {
1155 VSL_DeleteCursor(vslq->c);
1156 vslq->c = NULL;
1157 }
1158
1159 if (vslq->query != NULL)
1160 vslq_deletequery(&vslq->query);
1161 AZ(vslq->query);
1162
1163 while (!VTAILQ_EMPTY(&vslq->cache)) {
1164 AN(vslq->n_cache);
1165 vtx = VTAILQ_FIRST(&vslq->cache);
1166 VTAILQ_REMOVE(&vslq->cache, vtx, list_child);
1167 vslq->n_cache--;
1168 FREE_OBJ(vtx);
1169 }
1170
1171 FREE_OBJ(vslq);
1172 }
1173
1174 void
VSLQ_SetCursor(struct VSLQ * vslq,struct VSL_cursor ** cp)1175 VSLQ_SetCursor(struct VSLQ *vslq, struct VSL_cursor **cp)
1176 {
1177
1178 CHECK_OBJ_NOTNULL(vslq, VSLQ_MAGIC);
1179
1180 if (vslq->c != NULL) {
1181 (void)VSLQ_Flush(vslq, NULL, NULL);
1182 AZ(vslq->n_outstanding);
1183 VSL_DeleteCursor(vslq->c);
1184 vslq->c = NULL;
1185 }
1186
1187 if (cp != NULL) {
1188 AN(*cp);
1189 vslq->c = *cp;
1190 *cp = NULL;
1191 }
1192 }
1193
1194 /* Regard each log line as a single transaction, feed it through the query
1195 and do the callback */
1196 static int
vslq_raw(struct VSLQ * vslq,VSLQ_dispatch_f * func,void * priv)1197 vslq_raw(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
1198 {
1199 enum vsl_status r = vsl_more;
1200 int i;
1201
1202 assert(vslq->grouping == VSL_g_raw);
1203
1204 assert(vslq->raw.offset <= vslq->raw.len);
1205 do {
1206 if (vslq->raw.offset == vslq->raw.len) {
1207 r = VSL_Next(vslq->c);
1208 if (r != vsl_more)
1209 return (r);
1210 AN(vslq->c->rec.ptr);
1211 vslq->raw.start = vslq->c->rec;
1212 if (VSL_TAG(vslq->c->rec.ptr) == SLT__Batch)
1213 vslq->raw.len = VSL_END(vslq->c->rec.ptr,
1214 VSL_BATCHLEN(vslq->c->rec.ptr)) -
1215 vslq->c->rec.ptr;
1216 else
1217 vslq->raw.len = VSL_NEXT(vslq->raw.start.ptr) -
1218 vslq->raw.start.ptr;
1219 assert(vslq->raw.len > 0);
1220 vslq->raw.offset = 0;
1221 }
1222
1223 vslq->raw.c.ptr = vslq->raw.start.ptr + vslq->raw.offset;
1224 vslq->raw.c.cursor.rec.ptr = NULL;
1225 vslq->raw.trans.vxid = VSL_ID(vslq->raw.c.ptr);
1226 vslq->raw.offset += VSL_NEXT(vslq->raw.c.ptr) - vslq->raw.c.ptr;
1227 } while (VSL_TAG(vslq->raw.c.ptr) == SLT__Batch);
1228
1229 assert (r == vsl_more);
1230
1231 if (func == NULL)
1232 return (r);
1233
1234 if (vslq->query != NULL &&
1235 !vslq_runquery(vslq->query, vslq->raw.ptrans))
1236 return (r);
1237
1238 if (vslq->vsl->R_opt_l != 0 && !vslq_ratelimit(vslq))
1239 return (r);
1240
1241 i = (func)(vslq->vsl, vslq->raw.ptrans, priv);
1242 if (i)
1243 return (i);
1244
1245 return (r);
1246 }
1247
1248 /* Check the beginning of the shmref list, and buffer refs that are at
1249 * warning level.
1250 */
1251 static enum vsl_status
vslq_shmref_check(struct VSLQ * vslq)1252 vslq_shmref_check(struct VSLQ *vslq)
1253 {
1254 struct chunk *chunk;
1255 enum vsl_check i;
1256
1257 while ((chunk = VTAILQ_FIRST(&vslq->shmrefs)) != NULL) {
1258 CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
1259 assert(chunk->type == chunk_t_shm);
1260 i = VSL_Check(vslq->c, &chunk->shm.start);
1261 switch (i) {
1262 case vsl_check_valid:
1263 /* First on list is OK, refs behind it must also
1264 be OK */
1265 return (vsl_more);
1266 case vsl_check_warn:
1267 /* Buffer this chunk */
1268 chunk_shm_to_buf(vslq, chunk);
1269 break;
1270 default:
1271 /* Too late to buffer */
1272 return (vsl_e_overrun);
1273 }
1274 }
1275
1276 return (vsl_more);
1277 }
1278
1279 static unsigned
vslq_candidate(struct VSLQ * vslq,const uint32_t * ptr)1280 vslq_candidate(struct VSLQ *vslq, const uint32_t *ptr)
1281 {
1282 enum VSL_transaction_e type;
1283 enum VSL_reason_e reason;
1284 struct VSL_data *vsl;
1285 enum VSL_tag_e tag;
1286 unsigned p_vxid;
1287 int i;
1288
1289 CHECK_OBJ_NOTNULL(vslq, VSLQ_MAGIC);
1290 AN(ptr);
1291
1292 assert(vslq->grouping != VSL_g_raw);
1293 if (vslq->grouping == VSL_g_session)
1294 return (1); /* All are needed */
1295
1296 vsl = vslq->vsl;
1297 CHECK_OBJ_NOTNULL(vsl, VSL_MAGIC);
1298 if (vslq->grouping == VSL_g_vxid) {
1299 if (!vsl->c_opt && !vsl->b_opt)
1300 return (1); /* Implies also !vsl->E_opt */
1301 if (!vsl->b_opt && !VSL_CLIENT(ptr))
1302 return (0);
1303 if (!vsl->c_opt && !VSL_BACKEND(ptr))
1304 return (0);
1305 /* Need to parse the Begin tag - fallthrough to below */
1306 }
1307
1308 tag = VSL_TAG(ptr);
1309 assert(tag == SLT_Begin);
1310 i = vtx_parse_link(VSL_CDATA(ptr), &type, &p_vxid, &reason);
1311 if (i != 3 || type == VSL_t_unknown)
1312 return (0);
1313
1314 if (type == VSL_t_sess)
1315 return (0);
1316
1317 if (vslq->grouping == VSL_g_vxid && reason == VSL_r_esi && !vsl->E_opt)
1318 return (0);
1319
1320 return (1);
1321 }
1322
1323 /* Process next input record */
1324 static enum vsl_status
vslq_next(struct VSLQ * vslq)1325 vslq_next(struct VSLQ *vslq)
1326 {
1327 const uint32_t *ptr;
1328 struct VSL_cursor *c;
1329 enum vsl_status r;
1330 enum VSL_tag_e tag;
1331 ssize_t len;
1332 unsigned vxid, keep;
1333 struct vtx *vtx;
1334
1335 c = vslq->c;
1336 r = VSL_Next(c);
1337 if (r != vsl_more)
1338 return (r);
1339
1340 assert (r == vsl_more);
1341
1342 tag = (enum VSL_tag_e)VSL_TAG(c->rec.ptr);
1343 if (tag == SLT__Batch) {
1344 vxid = VSL_BATCHID(c->rec.ptr);
1345 len = VSL_END(c->rec.ptr, VSL_BATCHLEN(c->rec.ptr)) -
1346 c->rec.ptr;
1347 if (len == 0)
1348 return (r);
1349 ptr = VSL_NEXT(c->rec.ptr);
1350 tag = (enum VSL_tag_e)VSL_TAG(ptr);
1351 } else {
1352 vxid = VSL_ID(c->rec.ptr);
1353 len = VSL_NEXT(c->rec.ptr) - c->rec.ptr;
1354 ptr = c->rec.ptr;
1355 }
1356 assert(len > 0);
1357 if (vxid == 0)
1358 /* Skip non-transactional records */
1359 return (r);
1360
1361 vtx = vtx_lookup(vslq, vxid);
1362 keep = tag != SLT_Begin || vslq_candidate(vslq, ptr);
1363 if (vtx == NULL && tag == SLT_Begin && keep) {
1364 vtx = vtx_add(vslq, vxid);
1365 AN(vtx);
1366 }
1367 if (vtx != NULL) {
1368 AN(keep);
1369 r = vtx_append(vslq, vtx, &c->rec, len);
1370 if (r == vsl_more)
1371 vtx_scan(vslq, vtx);
1372 }
1373
1374 return (r);
1375 }
1376
1377 /* Test query and report any ready transactions */
1378 static int
vslq_process_ready(struct VSLQ * vslq,VSLQ_dispatch_f * func,void * priv)1379 vslq_process_ready(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
1380 {
1381 struct vtx *vtx;
1382 int i = 0;
1383
1384 AN(vslq);
1385
1386 while (!VTAILQ_EMPTY(&vslq->ready)) {
1387 vtx = VTAILQ_FIRST(&vslq->ready);
1388 CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
1389 VTAILQ_REMOVE(&vslq->ready, vtx, list_vtx);
1390 AN(vtx->flags & VTX_F_READY);
1391 if (func != NULL)
1392 i = vslq_callback(vslq, vtx, func, priv);
1393 vtx_retire(vslq, &vtx);
1394 AZ(vtx);
1395 if (i)
1396 return (i);
1397 }
1398
1399 return (0);
1400 }
1401
1402 /* Process the input cursor, calling the callback function on matching
1403 transaction sets */
1404 int
VSLQ_Dispatch(struct VSLQ * vslq,VSLQ_dispatch_f * func,void * priv)1405 VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
1406 {
1407 enum vsl_status r;
1408 int i;
1409 double now;
1410 struct vtx *vtx;
1411
1412 CHECK_OBJ_NOTNULL(vslq, VSLQ_MAGIC);
1413
1414 /* Check that we have a cursor */
1415 if (vslq->c == NULL)
1416 return (vsl_e_abandon);
1417
1418 if (vslq->grouping == VSL_g_raw)
1419 return (vslq_raw(vslq, func, priv));
1420
1421 /* Process next cursor input */
1422 r = vslq_next(vslq);
1423 if (r != vsl_more)
1424 /* At end of log or cursor reports error condition */
1425 return (r);
1426
1427 /* Check shmref list and buffer if necessary */
1428 r = vslq_shmref_check(vslq);
1429 if (r != vsl_more)
1430 /* Buffering of shm ref failed */
1431 return (r);
1432
1433 assert (r == vsl_more);
1434
1435 /* Check vtx timeout */
1436 now = VTIM_mono();
1437 while (!VTAILQ_EMPTY(&vslq->incomplete)) {
1438 vtx = VTAILQ_FIRST(&vslq->incomplete);
1439 CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
1440 if (now - vtx->t_start < vslq->vsl->T_opt)
1441 break;
1442 vtx_force(vslq, vtx, "timeout");
1443 AN(vtx->flags & VTX_F_COMPLETE);
1444 }
1445
1446 /* Check store limit */
1447 while (vslq->n_outstanding > vslq->vsl->L_opt &&
1448 !(VTAILQ_EMPTY(&vslq->incomplete))) {
1449 vtx = VTAILQ_FIRST(&vslq->incomplete);
1450 CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
1451 vtx_force(vslq, vtx, "store overflow");
1452 AN(vtx->flags & VTX_F_COMPLETE);
1453 i = vslq_process_ready(vslq, func, priv);
1454 if (i)
1455 /* User return code */
1456 return (i);
1457 }
1458
1459 /* Check ready list */
1460 if (!VTAILQ_EMPTY(&vslq->ready)) {
1461 i = vslq_process_ready(vslq, func, priv);
1462 if (i)
1463 /* User return code */
1464 return (i);
1465 }
1466
1467 return (vsl_more);
1468 }
1469
1470 /* Flush any incomplete vtx held on to. Do callbacks if func != NULL */
1471 int
VSLQ_Flush(struct VSLQ * vslq,VSLQ_dispatch_f * func,void * priv)1472 VSLQ_Flush(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
1473 {
1474 struct vtx *vtx;
1475
1476 CHECK_OBJ_NOTNULL(vslq, VSLQ_MAGIC);
1477
1478 while (!VTAILQ_EMPTY(&vslq->incomplete)) {
1479 vtx = VTAILQ_FIRST(&vslq->incomplete);
1480 CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
1481 AZ(vtx->flags & VTX_F_COMPLETE);
1482 vtx_force(vslq, vtx, "flush");
1483 }
1484
1485 return (vslq_process_ready(vslq, func, priv));
1486 }
1487