xref: /dragonfly/sbin/jscan/jstream.c (revision 8164c1fe)
1 /*
2  * Copyright (c) 2004,2005 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  *
34  * $DragonFly: src/sbin/jscan/jstream.c,v 1.1 2005/03/07 02:38:28 dillon Exp $
35  */
36 
37 #include "jscan.h"
38 
39 static struct jhash	*JHashAry[JHASH_SIZE];
40 
41 static struct jstream *jaddrecord(struct jfile *jf, struct jstream *js);
42 
43 /*
44  * Locate the next (or previous) complete virtual stream transaction given a
45  * file descriptor and direction.  Keep track of partial stream records as
46  * a side effect.
47  *
48  * Note that a transaction might represent a huge I/O operation, resulting
49  * in an overall node structure that spans gigabytes, but individual
50  * subrecord leaf nodes are limited in size and we depend on this to simplify
51  * the handling of leaf records.
52  *
53  * A transaction may cover several raw records.  The jstream collection for
54  * a transaction is only returned when the entire transaction has been
55  * successfully scanned.  Due to the interleaving of transactions the ordering
56  * of returned JS's may be different (not exactly reversed) when scanning a
57  * journal backwards verses forwards.  Since parallel operations are
58  * theoretically non-conflicting, this should not present a problem.
59  */
60 struct jstream *
61 jscan_stream(struct jfile *jf)
62 {
63     struct journal_rawrecbeg head;
64     struct journal_rawrecend tail;
65     int recsize;
66     int search;
67     int error;
68     struct jstream *js;
69 
70     /*
71      * Get the current offset and make sure it is 16-byte aligned.  If it
72      * isn't, align it and enter search mode.
73      */
74     if (jf->jf_pos & 15) {
75 	jf_warn(jf, "realigning bad offset and entering search mode");
76 	jalign(jf);
77 	search = 1;
78     } else {
79 	search = 0;
80     }
81 
82     error = 0;
83     js = NULL;
84 
85     if (jf->jf_direction == JF_FORWARDS) {
86 	/*
87 	 * Scan the journal forwards.  Note that the file pointer might not
88 	 * be seekable.
89 	 */
90 	while ((error = jread(jf, &head, sizeof(head))) == 0) {
91 	    if (head.begmagic != JREC_BEGMAGIC) {
92 		if (search == 0)
93 		    jf_warn(jf, "bad beginmagic, searching for new record");
94 		search = 1;
95 		jalign(jf);
96 		continue;
97 	    }
98 	    recsize = (head.recsize + 15) & ~15;
99 	    if (recsize <= 0) {
100 		jf_warn(jf, "bad recordsize: %d\n", recsize);
101 		search = 1;
102 		jalign(jf);
103 		continue;
104 	    }
105 	    jset(jf);
106 	    js = malloc(offsetof(struct jstream, js_data[recsize]));
107 	    bzero(js, sizeof(struct jstream));
108 	    bcopy(&head, js->js_data, sizeof(head));
109 	    error = jread(jf, js->js_data + sizeof(head), recsize - sizeof(head));
110 	    if (error) {
111 		jf_warn(jf, "Incomplete stream record\n");
112 		jreturn(jf);
113 		free(js);
114 		js = NULL;
115 		break;
116 	    }
117 	    js->js_size = recsize;
118 	    bcopy(js->js_data + recsize - sizeof(tail), &tail, sizeof(tail));
119 	    if (tail.endmagic != JREC_ENDMAGIC) {
120 		jf_warn(jf, "bad endmagic, searching for new record");
121 		search = 1;
122 		jreturn(jf);
123 		free(js);
124 		js = NULL;
125 		continue;
126 	    }
127 	    jflush(jf);
128 	    if ((js = jaddrecord(jf, js)) != NULL)
129 		break;
130 	}
131     } else {
132 	/*
133 	 * Scan the journal backwards.  Note that jread()'s reverse-seek and
134 	 * read.  The data read will be forward ordered, however.
135 	 */
136 	while ((error = jread(jf, &tail, sizeof(tail))) == 0) {
137 	    if (tail.endmagic != JREC_ENDMAGIC) {
138 		if (search == 0)
139 		    jf_warn(jf, "bad endmagic, searching for new record");
140 		search = 1;
141 		jalign(jf);
142 		continue;
143 	    }
144 	    recsize = (tail.recsize + 15) & ~15;
145 	    if (recsize <= 0) {
146 		jf_warn(jf, "bad recordsize: %d\n", recsize);
147 		search = 1;
148 		jalign(jf);
149 		continue;
150 	    }
151 	    jset(jf);
152 	    js = malloc(offsetof(struct jstream, js_data[recsize]));
153 	    bzero(js, sizeof(struct jstream));
154 	    bcopy(&tail, js->js_data + recsize - sizeof(tail), sizeof(tail));
155 	    error = jread(jf, js->js_data, recsize - sizeof(tail));
156 
157 	    if (error) {
158 		jf_warn(jf, "Incomplete stream record\n");
159 		jreturn(jf);
160 		free(js);
161 		js = NULL;
162 		break;
163 	    }
164 	    js->js_size = recsize;
165 	    bcopy(js->js_data + recsize - sizeof(tail), &tail, sizeof(tail));
166 	    bcopy(js->js_data, &head, sizeof(head));
167 	    if (head.begmagic != JREC_BEGMAGIC) {
168 		jf_warn(jf, "bad begmagic, searching for new record");
169 		search = 1;
170 		jreturn(jf);
171 		free(js);
172 		continue;
173 	    }
174 	    jflush(jf);
175 	    if ((js = jaddrecord(jf, js)) != NULL)
176 		break;
177 	}
178     }
179     jf->jf_error = error;
180     return(js);
181 }
182 
183 /*
184  * Integrate a jstream record.  Deal with the transaction begin and end flags
185  * to create a forward-referenced collection of jstream records.  If we are
186  * able to complete a transaction, the first js associated with that
187  * transaction is returned.
188  *
189  * XXX we need to store the data for very large multi-record transactions
190  * separately since it might not fit into memory.
191  */
192 static struct jstream *
193 jaddrecord(struct jfile *jf, struct jstream *js)
194 {
195     struct journal_rawrecbeg *head = (void *)js->js_data;
196     struct jhash *jh;
197     struct jhash **jhp;
198     struct jstream *jscan;
199     off_t off;
200 
201     /*
202      * Check for a completely self-contained transaction, just return the
203      * js if possible.
204      */
205     if ((head->streamid & (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)) ==
206 	(JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)
207     ) {
208 	return (js);
209     }
210 
211     /*
212      * Check for an open transaction in the hash table, create a new one
213      * if necessary.
214      */
215     jhp = &JHashAry[head->streamid & JHASH_MASK];
216     while ((jh = *jhp) != NULL) {
217 	if (((jh->jh_transid ^ head->streamid) & JREC_STREAMID_MASK) == 0)
218 	    break;
219 	jhp = &jh->jh_hash;
220     }
221     if (jh == NULL) {
222 	jh = malloc(sizeof(*jh));
223 	bzero(jh, sizeof(*jh));
224 	*jhp = jh;
225 	jh->jh_first = js;
226 	jh->jh_last = js;
227 	jh->jh_transid = head->streamid;
228 	return (NULL);
229     }
230 
231     /*
232      * Emplace the stream segment
233      */
234     jh->jh_transid |= head->streamid & JREC_STREAMCTL_MASK;
235     if (jf->jf_direction == JF_FORWARDS) {
236 	jh->jh_last->js_next = js;
237 	jh->jh_last = js;
238     } else {
239 	js->js_next = jh->jh_first;
240 	jh->jh_first = js;
241     }
242 
243     /*
244      * If the transaction is complete, remove the hash entry and return the
245      * js representing the beginning of the transaction.
246      */
247     if ((jh->jh_transid & (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)) ==
248 	(JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)
249     ) {
250 	*jhp = jh->jh_hash;
251 	js = jh->jh_first;
252 	free(jh);
253 	off = 0;
254 	for (jscan = js; jscan; jscan = jscan->js_next) {
255 	    jscan->js_off = off;
256 	    off += jscan->js_size;
257 	}
258     } else {
259 	js = NULL;
260     }
261     return (js);
262 }
263 
264 void
265 jscan_dispose(struct jstream *js)
266 {
267     struct jstream *jnext;
268 
269     if (js->js_alloc_buf) {
270 	free(js->js_alloc_buf);
271 	js->js_alloc_buf = NULL;
272 	js->js_alloc_size = 0;
273     }
274 
275     while (js) {
276 	jnext = js->js_next;
277 	free(js);
278 	js = jnext;
279     }
280 }
281 
282 /*
283  * Read the specified block of data out of a linked set of jstream
284  * structures.  Returns 0 on success or an error code on error.
285  */
286 int
287 jsread(struct jstream *js, off_t off, void *buf, int bytes)
288 {
289     const void *ptr;
290     int n;
291 
292     while (bytes) {
293 	n = jsreadany(js, off, &ptr);
294 	if (n == 0)
295 	    return (ENOENT);
296 	if (n > bytes)
297 	    n = bytes;
298 	bcopy(ptr, buf, n);
299 	buf = (char *)buf + n;
300 	off += n;
301 	bytes -= n;
302     }
303     return(0);
304 }
305 
306 /*
307  * Read the specified block of data out of a linked set of jstream
308  * structures.  Attempt to return a pointer into the data set but
309  * allocate and copy if that is not possible.  Returns 0 on success
310  * or an error code on error.
311  */
312 int
313 jsreadp(struct jstream *js, off_t off, const void **bufp,
314 	int bytes)
315 {
316     char *ptr;
317     int error;
318     int n;
319     int o;
320 
321     n = jsreadany(js, off, bufp);
322     if (n < bytes) {
323 	if (js->js_alloc_size < bytes) {
324 	    if (js->js_alloc_buf)
325 		free(js->js_alloc_buf);
326 	    js->js_alloc_buf = malloc(bytes);
327 	    js->js_alloc_size = bytes;
328 	}
329 	error = jsread(js, off, js->js_alloc_buf, bytes);
330 	if (error) {
331 	    *bufp = NULL;
332 	} else {
333 	    *bufp = js->js_alloc_buf;
334 	}
335     }
336     return(error);
337 }
338 
339 /*
340  * Return the largest contiguous buffer starting at the specified offset,
341  * or 0.
342  */
343 int
344 jsreadany(struct jstream *js, off_t off, const void **bufp)
345 {
346     struct jstream *scan;
347     int n;
348 
349     if ((scan = js->js_cache) == NULL || scan->js_cache_off > off)
350 	scan = js;
351     while (scan && scan->js_off <= off) {
352 	js->js_cache = scan;
353 	js->js_cache_off = scan->js_off;
354 	if (scan->js_off + scan->js_size > off) {
355 	    n = (int)(off - scan->js_off);
356 	    *bufp = scan->js_data + n;
357 	    return(scan->js_size - n);
358 	}
359 	scan = scan->js_next;
360     }
361     return(0);
362 }
363 
364