xref: /dragonfly/sbin/jscan/jstream.c (revision 0db87cb7)
1 /*
2  * Copyright (c) 2004,2005 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  *
34  * $DragonFly: src/sbin/jscan/jstream.c,v 1.8 2005/09/07 07:20:23 dillon Exp $
35  */
36 
37 #include "jscan.h"
38 
39 static struct jhash	*JHashAry[JHASH_SIZE];
40 
41 static void jnormalize(struct jstream *js);
42 static int jaddrecord_backtrack(struct jsession *ss, struct jdata *jd);
43 
44 /*
45  * Integrate a raw record.  Deal with the transaction begin and end flags
46  * to create a forward-referenced collection of jstream records.  If we are
47  * able to complete a transaction, the first js associated with that
48  * transaction is returned.
49  *
50  * XXX we need to store the data for very large multi-record transactions
51  * separately since it might not fit into memory.
52  *
53  * Note that a transaction might represent a huge I/O operation, resulting
54  * in an overall node structure that spans gigabytes, but individual
55  * subrecord leaf nodes are limited in size and we depend on this to simplify
56  * the handling of leaf records.
57  *
58  * A transaction may cover several raw records.  The jstream collection for
59  * a transaction is only returned when the entire transaction has been
60  * successfully scanned.  Due to the interleaving of transactions the ordering
61  * of returned JS's may be different (not exactly reversed) when scanning a
62  * journal backwards verses forwards.  Since parallel operations are
63  * theoretically non-conflicting, this should not present a problem.
64  */
65 struct jstream *
66 jaddrecord(struct jsession *ss, struct jdata *jd)
67 {
68     struct journal_rawrecbeg *head;
69     struct jstream *js;
70     struct jhash *jh;
71     struct jhash **jhp;
72 
73     js = malloc(sizeof(struct jstream));
74     bzero(js, sizeof(struct jstream));
75     js->js_jdata = jref(jd);
76     js->js_head = (void *)jd->jd_data;
77     js->js_session = ss;
78     head = js->js_head;
79 
80     /*
81      * Check for a completely self-contained transaction, just return the
82      * js if possible.
83      */
84     if ((head->streamid & (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)) ==
85 	(JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)
86     ) {
87 	jnormalize(js);
88 	return (js);
89     }
90 
91 retry:
92     /*
93      * Check for an open transaction in the hash table.
94      */
95     jhp = &JHashAry[head->streamid & JHASH_MASK];
96     while ((jh = *jhp) != NULL) {
97 	if (jh->jh_session == ss &&
98 	   ((jh->jh_transid ^ head->streamid) & JREC_STREAMID_MASK) == 0
99 	) {
100 	    break;
101 	}
102 	jhp = &jh->jh_hash;
103     }
104 
105     /*
106      * We might have picked up a transaction in the middle, which we
107      * detect by not finding a hash record coupled with a raw record
108      * whos JREC_STREAMCTL_BEGIN bit is not set (or JREC_STREAMCTL_END
109      * bit if we are scanning backwards).
110      *
111      * When this case occurs we have to backtrack to locate the
112      * BEGIN (END if scanning backwards) and collect those records
113      * in order to obtain a complete transaction.
114      *
115      * This case most often occurs when a batch operation runs in
116      * prefix set whos starting raw-record transaction id is in
117      * the middle of one or more meta-transactions.  It's a bit of
118      * a tricky situation, but easily resolvable by scanning the
119      * prefix set backwards (forwards if originally scanning backwards)
120      * to locate the raw record representing the start (end) of the
121      * transaction.
122      */
123     if (jh == NULL) {
124 	if (ss->ss_direction == JD_FORWARDS &&
125 	    (head->streamid & JREC_STREAMCTL_BEGIN) == 0
126 	) {
127 	    if (verbose_opt > 1)
128 		fprintf(stderr, "mid-transaction detected transid %016jx "
129 				"streamid %04x\n",
130 			(uintmax_t)jd->jd_transid,
131 			head->streamid & JREC_STREAMID_MASK);
132 	    if (jaddrecord_backtrack(ss, jd) == 0) {
133 		if (verbose_opt)
134 		    fprintf(stderr, "mid-transaction streamid %04x collection "
135 				    "succeeded\n",
136 			    head->streamid & JREC_STREAMID_MASK);
137 		goto retry;
138 	    }
139 	    fprintf(stderr, "mid-transaction streamid %04x collection failed\n",
140 		    head->streamid & JREC_STREAMID_MASK);
141 	    jscan_dispose(js);
142 	    return(NULL);
143 	} else if (ss->ss_direction == JD_BACKWARDS &&
144 	    (head->streamid & JREC_STREAMCTL_END) == 0
145 	) {
146 	    if (verbose_opt > 1)
147 		fprintf(stderr, "mid-transaction detected transid %016jx "
148 				"streamid %04x\n",
149 				(uintmax_t)jd->jd_transid,
150 				head->streamid & JREC_STREAMID_MASK);
151 	    if (jaddrecord_backtrack(ss, jd) == 0) {
152 		if (verbose_opt)
153 		    fprintf(stderr, "mid-transaction streamid %04x "
154 				    "collection succeeded\n",
155 			    head->streamid & JREC_STREAMID_MASK);
156 		goto retry;
157 	    }
158 	    fprintf(stderr, "mid-transaction streamid %04x collection failed\n",
159 		    head->streamid & JREC_STREAMID_MASK);
160 	    jscan_dispose(js);
161 	    return(NULL);
162 	}
163     }
164 
165     /*
166      * If we've made it to here and we still don't have a hash record
167      * to track the transaction, create one.
168      */
169     if (jh == NULL) {
170 	jh = malloc(sizeof(*jh));
171 	bzero(jh, sizeof(*jh));
172 	*jhp = jh;
173 	jh->jh_first = js;
174 	jh->jh_last = js;
175 	jh->jh_transid = head->streamid;
176 	jh->jh_session = ss;
177 	return (NULL);
178     }
179 
180     /*
181      * Emplace the stream segment
182      */
183     jh->jh_transid |= head->streamid & JREC_STREAMCTL_MASK;
184     if (ss->ss_direction == JD_FORWARDS) {
185 	jh->jh_last->js_next = js;
186 	jh->jh_last = js;
187     } else {
188 	js->js_next = jh->jh_first;
189 	jh->jh_first = js;
190     }
191 
192     /*
193      * If the transaction is complete, remove the hash entry and return the
194      * js representing the beginning of the transaction.  Otherwise leave
195      * the hash entry intact and return NULL.
196      */
197     if ((jh->jh_transid & (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)) ==
198 	(JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)
199     ) {
200 	*jhp = jh->jh_hash;
201 	js = jh->jh_first;
202 	free(jh);
203 
204 	jnormalize(js);
205     } else {
206 	js = NULL;
207     }
208     return (js);
209 }
210 
211 /*
212  * Renormalize the jscan list to remove all the meta record headers
213  * and trailers except for the very first one.
214  */
215 static
216 void
217 jnormalize(struct jstream *js)
218 {
219     struct jstream *jscan;
220     off_t off;
221 
222     js->js_normalized_off = 0;
223     js->js_normalized_base = (void *)js->js_head;
224     js->js_normalized_size = js->js_head->recsize - sizeof(struct journal_rawrecend);
225     js->js_normalized_total = js->js_normalized_size;
226     off = js->js_normalized_size;
227     for (jscan = js->js_next; jscan; jscan = jscan->js_next) {
228 	jscan->js_normalized_off = off;
229 	jscan->js_normalized_base = (char *)jscan->js_head +
230 		sizeof(struct journal_rawrecbeg);
231 	jscan->js_normalized_size = jscan->js_head->recsize -
232 	       sizeof(struct journal_rawrecbeg) -
233 	       sizeof(struct journal_rawrecend);
234 	off += jscan->js_normalized_size;
235 	js->js_normalized_total += jscan->js_normalized_size;
236     }
237 }
238 
239 /*
240  * For sanity's sake I will describe the normal backtracking that must occur,
241  * but this routine must also operate on reverse-scanned (undo) records
242  * by forward tracking.
243  *
244  * A record has been found that represents the middle or end of a transaction
245  * when we were expecting the beginning of a transaction.  We must backtrack
246  * to locate the start of the transaction, then process raw records relating
247  * to the transaction until we reach our current point (jd) again.  If
248  * we find a matching streamid representing the end of a transaction instead
249  * of the expected start-of-transaction that record belongs to a wholely
250  * different meta-transaction and the record we seek is known to not be
251  * available.
252  *
253  * jd is the current record, directon is the normal scan direction (we have
254  * to scan in the reverse direction).
255  */
256 static
257 int
258 jaddrecord_backtrack(struct jsession *ss, struct jdata *jd)
259 {
260     struct jfile *jf = ss->ss_jfin;
261     struct jdata *scan;
262     struct jstream *js;
263     u_int16_t streamid;
264     u_int16_t scanid;
265 
266     assert(ss->ss_direction == JD_FORWARDS || ss->ss_direction == JD_BACKWARDS);
267     if (jmodes & JMODEF_INPUT_PIPE)
268 	return(-1);
269 
270     streamid = ((struct journal_rawrecbeg *)jd->jd_data)->streamid & JREC_STREAMID_MASK;
271 
272     if (ss->ss_direction == JD_FORWARDS) {
273 	/*
274 	 * Backtrack in the reverse direction looking for the transaction
275 	 * start bit.  If we find an end bit instead it belongs to an
276 	 * unrelated transaction using the same streamid and there is no
277 	 * point continuing.
278 	 */
279 	scan = jref(jd);
280 	while ((scan = jread(jf, scan, JD_BACKWARDS)) != NULL) {
281 	    scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid;
282 	    if ((scanid & JREC_STREAMID_MASK) != streamid)
283 		continue;
284 	    if (scanid & JREC_STREAMCTL_END) {
285 		jfree(jf, scan);
286 		return(-1);
287 	    }
288 	    if (scanid & JREC_STREAMCTL_BEGIN)
289 		break;
290 	}
291 
292 	/*
293 	 * Now jaddrecord the related records.
294 	 */
295 	while (scan != NULL && scan->jd_transid < jd->jd_transid) {
296 	    scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid;
297 	    if ((scanid & JREC_STREAMID_MASK) == streamid) {
298 		js = jaddrecord(ss, scan);
299 		assert(js == NULL);
300 	    }
301 	    scan = jread(jf, scan, JD_FORWARDS);
302 	}
303 	if (scan == NULL)
304 	    return(-1);
305 	jfree(jf, scan);
306     } else {
307 	/*
308 	 * Backtrack in the forwards direction looking for the transaction
309 	 * end bit.  If we find a start bit instead if belongs to an
310 	 * unrelated transaction using the same streamid and there is no
311 	 * point continuing.
312 	 */
313 	scan = jref(jd);
314 	while ((scan = jread(jf, scan, JD_FORWARDS)) != NULL) {
315 	    scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid;
316 	    if ((scanid & JREC_STREAMID_MASK) != streamid)
317 		continue;
318 	    if (scanid & JREC_STREAMCTL_BEGIN) {
319 		jfree(jf, scan);
320 		return(-1);
321 	    }
322 	    if (scanid & JREC_STREAMCTL_END)
323 		break;
324 	}
325 
326 	/*
327 	 * Now jaddrecord the related records.
328 	 */
329 	while (scan != NULL && scan->jd_transid > jd->jd_transid) {
330 	    scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid;
331 	    if ((scanid & JREC_STREAMID_MASK) == streamid) {
332 		js = jaddrecord(ss, scan);
333 		assert(js == NULL);
334 	    }
335 	    scan = jread(jf, scan, JD_BACKWARDS);
336 	}
337 	if (scan == NULL)
338 	    return(-1);
339 	jfree(jf, scan);
340     }
341     return(0);
342 }
343 
344 void
345 jscan_dispose(struct jstream *js)
346 {
347     struct jstream *jnext;
348 
349     if (js->js_alloc_buf) {
350 	free(js->js_alloc_buf);
351 	js->js_alloc_buf = NULL;
352 	js->js_alloc_size = 0;
353     }
354 
355     while (js) {
356 	jnext = js->js_next;
357 	jfree(js->js_session->ss_jfin, js->js_jdata);
358 	js->js_jdata = NULL;
359 	free(js);
360 	js = jnext;
361     }
362 }
363 
364 /*
365  * Read the specified block of data out of a linked set of jstream
366  * structures.  Returns 0 on success or an error code on error.
367  */
368 int
369 jsread(struct jstream *js, off_t off, void *buf, int bytes)
370 {
371     const void *ptr;
372     int n;
373 
374     while (bytes) {
375 	n = jsreadany(js, off, &ptr);
376 	if (n == 0)
377 	    return (ENOENT);
378 	if (n > bytes)
379 	    n = bytes;
380 	bcopy(ptr, buf, n);
381 	buf = (char *)buf + n;
382 	off += n;
383 	bytes -= n;
384     }
385     return(0);
386 }
387 
388 /*
389  * Read the specified block of data out of a linked set of jstream
390  * structures.  Attempt to return a pointer into the data set but
391  * allocate and copy if that is not possible.  Returns 0 on success
392  * or an error code on error.
393  */
394 int
395 jsreadp(struct jstream *js, off_t off, const void **bufp,
396 	int bytes)
397 {
398     int error = 0;
399     int n;
400 
401     n = jsreadany(js, off, bufp);
402     if (n < bytes) {
403 	if (js->js_alloc_size < bytes) {
404 	    if (js->js_alloc_buf)
405 		free(js->js_alloc_buf);
406 	    js->js_alloc_buf = malloc(bytes);
407 	    js->js_alloc_size = bytes;
408 	    if (js->js_alloc_buf == NULL)
409 		fprintf(stderr, "attempt to allocate %d bytes failed\n", bytes);
410 	    assert(js->js_alloc_buf != NULL);
411 	}
412 	error = jsread(js, off, js->js_alloc_buf, bytes);
413 	if (error) {
414 	    *bufp = NULL;
415 	} else {
416 	    *bufp = js->js_alloc_buf;
417 	}
418     }
419     return(error);
420 }
421 
422 int
423 jsreadcallback(struct jstream *js, ssize_t (*func)(int, const void *, size_t),
424 		int fd, off_t off, int bytes)
425 {
426     const void *bufp;
427     int res;
428     int n;
429     int r;
430 
431     res = 0;
432     while (bytes && (n = jsreadany(js, off, &bufp)) > 0) {
433 	if (n > bytes)
434 	    n = bytes;
435 	r = func(fd, bufp, n);
436 	if (r != n) {
437 	    if (res == 0)
438 		res = -1;
439 	}
440 	res += n;
441 	bytes -= n;
442 	off += n;
443     }
444     return(res);
445 }
446 
447 /*
448  * Return the largest contiguous buffer starting at the specified offset,
449  * or 0.
450  */
451 int
452 jsreadany(struct jstream *js, off_t off, const void **bufp)
453 {
454     struct jstream *scan;
455     int n;
456 
457     if ((scan = js->js_cache) == NULL || scan->js_normalized_off > off)
458 	scan = js;
459     while (scan && scan->js_normalized_off <= off) {
460 	js->js_cache = scan;
461 	if (scan->js_normalized_off + scan->js_normalized_size > off) {
462 	    n = (int)(off - scan->js_normalized_off);
463 	    *bufp = scan->js_normalized_base + n;
464 	    return(scan->js_normalized_size - n);
465 	}
466 	scan = scan->js_next;
467     }
468     return(0);
469 }
470 
471