xref: /dragonfly/sbin/jscan/jstream.c (revision 3d201fd0)
1 /*
2  * Copyright (c) 2004,2005 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  *
34  * $DragonFly: src/sbin/jscan/jstream.c,v 1.8 2005/09/07 07:20:23 dillon Exp $
35  */
36 
37 #include "jscan.h"
38 
39 static struct jhash	*JHashAry[JHASH_SIZE];
40 
41 static void jnormalize(struct jstream *js);
42 static int jaddrecord_backtrack(struct jsession *ss, struct jdata *jd);
43 
44 /*
45  * Integrate a raw record.  Deal with the transaction begin and end flags
46  * to create a forward-referenced collection of jstream records.  If we are
47  * able to complete a transaction, the first js associated with that
48  * transaction is returned.
49  *
50  * XXX we need to store the data for very large multi-record transactions
51  * separately since it might not fit into memory.
52  *
53  * Note that a transaction might represent a huge I/O operation, resulting
54  * in an overall node structure that spans gigabytes, but individual
55  * subrecord leaf nodes are limited in size and we depend on this to simplify
56  * the handling of leaf records.
57  *
58  * A transaction may cover several raw records.  The jstream collection for
59  * a transaction is only returned when the entire transaction has been
60  * successfully scanned.  Due to the interleaving of transactions the ordering
61  * of returned JS's may be different (not exactly reversed) when scanning a
62  * journal backwards verses forwards.  Since parallel operations are
63  * theoretically non-conflicting, this should not present a problem.
64  */
65 struct jstream *
66 jaddrecord(struct jsession *ss, struct jdata *jd)
67 {
68     struct journal_rawrecbeg *head;
69     struct jstream *js;
70     struct jhash *jh;
71     struct jhash **jhp;
72 
73     js = malloc(sizeof(struct jstream));
74     bzero(js, sizeof(struct jstream));
75     js->js_jdata = jref(jd);
76     js->js_head = (void *)jd->jd_data;
77     js->js_session = ss;
78     head = js->js_head;
79 
80     /*
81      * Check for a completely self-contained transaction, just return the
82      * js if possible.
83      */
84     if ((head->streamid & (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)) ==
85 	(JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)
86     ) {
87 	jnormalize(js);
88 	return (js);
89     }
90 
91 retry:
92     /*
93      * Check for an open transaction in the hash table.
94      */
95     jhp = &JHashAry[head->streamid & JHASH_MASK];
96     while ((jh = *jhp) != NULL) {
97 	if (jh->jh_session == ss &&
98 	   ((jh->jh_transid ^ head->streamid) & JREC_STREAMID_MASK) == 0
99 	) {
100 	    break;
101 	}
102 	jhp = &jh->jh_hash;
103     }
104 
105     /*
106      * We might have picked up a transaction in the middle, which we
107      * detect by not finding a hash record coupled with a raw record
108      * whos JREC_STREAMCTL_BEGIN bit is not set (or JREC_STREAMCTL_END
109      * bit if we are scanning backwards).
110      *
111      * When this case occurs we have to backtrack to locate the
112      * BEGIN (END if scanning backwards) and collect those records
113      * in order to obtain a complete transaction.
114      *
115      * This case most often occurs when a batch operation runs in
116      * prefix set whos starting raw-record transaction id is in
117      * the middle of one or more meta-transactions.  It's a bit of
118      * a tricky situation, but easily resolvable by scanning the
119      * prefix set backwards (forwards if originally scanning backwards)
120      * to locate the raw record representing the start (end) of the
121      * transaction.
122      */
123     if (jh == NULL) {
124 	if (ss->ss_direction == JD_FORWARDS &&
125 	    (head->streamid & JREC_STREAMCTL_BEGIN) == 0
126 	) {
127 	    if (verbose_opt > 1)
128 		fprintf(stderr, "mid-transaction detected transid %016llx streamid %04x\n", jd->jd_transid, head->streamid & JREC_STREAMID_MASK);
129 	    if (jaddrecord_backtrack(ss, jd) == 0) {
130 		if (verbose_opt)
131 		    fprintf(stderr, "mid-transaction streamid %04x collection succeeded\n", head->streamid & JREC_STREAMID_MASK);
132 		goto retry;
133 	    }
134 	    fprintf(stderr, "mid-transaction streamid %04x collection failed\n", head->streamid & JREC_STREAMID_MASK);
135 	    jscan_dispose(js);
136 	    return(NULL);
137 	} else if (ss->ss_direction == JD_BACKWARDS &&
138 	    (head->streamid & JREC_STREAMCTL_END) == 0
139 	) {
140 	    if (verbose_opt > 1)
141 		fprintf(stderr, "mid-transaction detected transid %016llx streamid %04x\n", jd->jd_transid, head->streamid & JREC_STREAMID_MASK);
142 	    if (jaddrecord_backtrack(ss, jd) == 0) {
143 		if (verbose_opt)
144 		    fprintf(stderr, "mid-transaction streamid %04x collection succeeded\n", head->streamid & JREC_STREAMID_MASK);
145 		goto retry;
146 	    }
147 	    fprintf(stderr, "mid-transaction streamid %04x collection failed\n", head->streamid & JREC_STREAMID_MASK);
148 	    jscan_dispose(js);
149 	    return(NULL);
150 	}
151     }
152 
153     /*
154      * If we've made it to here and we still don't have a hash record
155      * to track the transaction, create one.
156      */
157     if (jh == NULL) {
158 	jh = malloc(sizeof(*jh));
159 	bzero(jh, sizeof(*jh));
160 	*jhp = jh;
161 	jh->jh_first = js;
162 	jh->jh_last = js;
163 	jh->jh_transid = head->streamid;
164 	jh->jh_session = ss;
165 	return (NULL);
166     }
167 
168     /*
169      * Emplace the stream segment
170      */
171     jh->jh_transid |= head->streamid & JREC_STREAMCTL_MASK;
172     if (ss->ss_direction == JD_FORWARDS) {
173 	jh->jh_last->js_next = js;
174 	jh->jh_last = js;
175     } else {
176 	js->js_next = jh->jh_first;
177 	jh->jh_first = js;
178     }
179 
180     /*
181      * If the transaction is complete, remove the hash entry and return the
182      * js representing the beginning of the transaction.  Otherwise leave
183      * the hash entry intact and return NULL.
184      */
185     if ((jh->jh_transid & (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)) ==
186 	(JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)
187     ) {
188 	*jhp = jh->jh_hash;
189 	js = jh->jh_first;
190 	free(jh);
191 
192 	jnormalize(js);
193     } else {
194 	js = NULL;
195     }
196     return (js);
197 }
198 
199 /*
200  * Renormalize the jscan list to remove all the meta record headers
201  * and trailers except for the very first one.
202  */
203 static
204 void
205 jnormalize(struct jstream *js)
206 {
207     struct jstream *jscan;
208     off_t off;
209 
210     js->js_normalized_off = 0;
211     js->js_normalized_base = (void *)js->js_head;
212     js->js_normalized_size = js->js_head->recsize - sizeof(struct journal_rawrecend);
213     js->js_normalized_total = js->js_normalized_size;
214     off = js->js_normalized_size;
215     for (jscan = js->js_next; jscan; jscan = jscan->js_next) {
216 	jscan->js_normalized_off = off;
217 	jscan->js_normalized_base = (char *)jscan->js_head +
218 		sizeof(struct journal_rawrecbeg);
219 	jscan->js_normalized_size = jscan->js_head->recsize -
220 	       sizeof(struct journal_rawrecbeg) -
221 	       sizeof(struct journal_rawrecend);
222 	off += jscan->js_normalized_size;
223 	js->js_normalized_total += jscan->js_normalized_size;
224     }
225 }
226 
227 /*
228  * For sanity's sake I will describe the normal backtracking that must occur,
229  * but this routine must also operate on reverse-scanned (undo) records
230  * by forward tracking.
231  *
232  * A record has been found that represents the middle or end of a transaction
233  * when we were expecting the beginning of a transaction.  We must backtrack
234  * to locate the start of the transaction, then process raw records relating
235  * to the transaction until we reach our current point (jd) again.  If
236  * we find a matching streamid representing the end of a transaction instead
237  * of the expected start-of-transaction that record belongs to a wholely
238  * different meta-transaction and the record we seek is known to not be
239  * available.
240  *
241  * jd is the current record, directon is the normal scan direction (we have
242  * to scan in the reverse direction).
243  */
244 static
245 int
246 jaddrecord_backtrack(struct jsession *ss, struct jdata *jd)
247 {
248     struct jfile *jf = ss->ss_jfin;
249     struct jdata *scan;
250     struct jstream *js;
251     u_int16_t streamid;
252     u_int16_t scanid;
253 
254     assert(ss->ss_direction == JD_FORWARDS || ss->ss_direction == JD_BACKWARDS);
255     if (jmodes & JMODEF_INPUT_PIPE)
256 	return(-1);
257 
258     streamid = ((struct journal_rawrecbeg *)jd->jd_data)->streamid & JREC_STREAMID_MASK;
259 
260     if (ss->ss_direction == JD_FORWARDS) {
261 	/*
262 	 * Backtrack in the reverse direction looking for the transaction
263 	 * start bit.  If we find an end bit instead it belongs to an
264 	 * unrelated transaction using the same streamid and there is no
265 	 * point continuing.
266 	 */
267 	scan = jref(jd);
268 	while ((scan = jread(jf, scan, JD_BACKWARDS)) != NULL) {
269 	    scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid;
270 	    if ((scanid & JREC_STREAMID_MASK) != streamid)
271 		continue;
272 	    if (scanid & JREC_STREAMCTL_END) {
273 		jfree(jf, scan);
274 		return(-1);
275 	    }
276 	    if (scanid & JREC_STREAMCTL_BEGIN)
277 		break;
278 	}
279 
280 	/*
281 	 * Now jaddrecord the related records.
282 	 */
283 	while (scan != NULL && scan->jd_transid < jd->jd_transid) {
284 	    scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid;
285 	    if ((scanid & JREC_STREAMID_MASK) == streamid) {
286 		js = jaddrecord(ss, scan);
287 		assert(js == NULL);
288 	    }
289 	    scan = jread(jf, scan, JD_FORWARDS);
290 	}
291 	if (scan == NULL)
292 	    return(-1);
293 	jfree(jf, scan);
294     } else {
295 	/*
296 	 * Backtrack in the forwards direction looking for the transaction
297 	 * end bit.  If we find a start bit instead if belongs to an
298 	 * unrelated transaction using the same streamid and there is no
299 	 * point continuing.
300 	 */
301 	scan = jref(jd);
302 	while ((scan = jread(jf, scan, JD_FORWARDS)) != NULL) {
303 	    scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid;
304 	    if ((scanid & JREC_STREAMID_MASK) != streamid)
305 		continue;
306 	    if (scanid & JREC_STREAMCTL_BEGIN) {
307 		jfree(jf, scan);
308 		return(-1);
309 	    }
310 	    if (scanid & JREC_STREAMCTL_END)
311 		break;
312 	}
313 
314 	/*
315 	 * Now jaddrecord the related records.
316 	 */
317 	while (scan != NULL && scan->jd_transid > jd->jd_transid) {
318 	    scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid;
319 	    if ((scanid & JREC_STREAMID_MASK) == streamid) {
320 		js = jaddrecord(ss, scan);
321 		assert(js == NULL);
322 	    }
323 	    scan = jread(jf, scan, JD_BACKWARDS);
324 	}
325 	if (scan == NULL)
326 	    return(-1);
327 	jfree(jf, scan);
328     }
329     return(0);
330 }
331 
332 void
333 jscan_dispose(struct jstream *js)
334 {
335     struct jstream *jnext;
336 
337     if (js->js_alloc_buf) {
338 	free(js->js_alloc_buf);
339 	js->js_alloc_buf = NULL;
340 	js->js_alloc_size = 0;
341     }
342 
343     while (js) {
344 	jnext = js->js_next;
345 	jfree(js->js_session->ss_jfin, js->js_jdata);
346 	js->js_jdata = NULL;
347 	free(js);
348 	js = jnext;
349     }
350 }
351 
352 /*
353  * Read the specified block of data out of a linked set of jstream
354  * structures.  Returns 0 on success or an error code on error.
355  */
356 int
357 jsread(struct jstream *js, off_t off, void *buf, int bytes)
358 {
359     const void *ptr;
360     int n;
361 
362     while (bytes) {
363 	n = jsreadany(js, off, &ptr);
364 	if (n == 0)
365 	    return (ENOENT);
366 	if (n > bytes)
367 	    n = bytes;
368 	bcopy(ptr, buf, n);
369 	buf = (char *)buf + n;
370 	off += n;
371 	bytes -= n;
372     }
373     return(0);
374 }
375 
376 /*
377  * Read the specified block of data out of a linked set of jstream
378  * structures.  Attempt to return a pointer into the data set but
379  * allocate and copy if that is not possible.  Returns 0 on success
380  * or an error code on error.
381  */
382 int
383 jsreadp(struct jstream *js, off_t off, const void **bufp,
384 	int bytes)
385 {
386     int error = 0;
387     int n;
388 
389     n = jsreadany(js, off, bufp);
390     if (n < bytes) {
391 	if (js->js_alloc_size < bytes) {
392 	    if (js->js_alloc_buf)
393 		free(js->js_alloc_buf);
394 	    js->js_alloc_buf = malloc(bytes);
395 	    js->js_alloc_size = bytes;
396 	    if (js->js_alloc_buf == NULL)
397 		fprintf(stderr, "attempt to allocate %d bytes failed\n", bytes);
398 	    assert(js->js_alloc_buf != NULL);
399 	}
400 	error = jsread(js, off, js->js_alloc_buf, bytes);
401 	if (error) {
402 	    *bufp = NULL;
403 	} else {
404 	    *bufp = js->js_alloc_buf;
405 	}
406     }
407     return(error);
408 }
409 
410 int
411 jsreadcallback(struct jstream *js, ssize_t (*func)(int, const void *, size_t),
412 		int fd, off_t off, int bytes)
413 {
414     const void *bufp;
415     int res;
416     int n;
417     int r;
418 
419     res = 0;
420     while (bytes && (n = jsreadany(js, off, &bufp)) > 0) {
421 	if (n > bytes)
422 	    n = bytes;
423 	r = func(fd, bufp, n);
424 	if (r != n) {
425 	    if (res == 0)
426 		res = -1;
427 	}
428 	res += n;
429 	bytes -= n;
430 	off += n;
431     }
432     return(res);
433 }
434 
435 /*
436  * Return the largest contiguous buffer starting at the specified offset,
437  * or 0.
438  */
439 int
440 jsreadany(struct jstream *js, off_t off, const void **bufp)
441 {
442     struct jstream *scan;
443     int n;
444 
445     if ((scan = js->js_cache) == NULL || scan->js_normalized_off > off)
446 	scan = js;
447     while (scan && scan->js_normalized_off <= off) {
448 	js->js_cache = scan;
449 	if (scan->js_normalized_off + scan->js_normalized_size > off) {
450 	    n = (int)(off - scan->js_normalized_off);
451 	    *bufp = scan->js_normalized_base + n;
452 	    return(scan->js_normalized_size - n);
453 	}
454 	scan = scan->js_next;
455     }
456     return(0);
457 }
458 
459