xref: /dragonfly/sbin/jscan/jstream.c (revision 1bf4b486)
1 /*
2  * Copyright (c) 2004,2005 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  *
34  * $DragonFly: src/sbin/jscan/jstream.c,v 1.4 2005/07/06 06:06:44 dillon Exp $
35  */
36 
37 #include "jscan.h"
38 
39 static struct jhash	*JHashAry[JHASH_SIZE];
40 
41 static struct jstream *jaddrecord(struct jfile *jf, struct jstream *js);
42 static void jnormalize(struct jstream *js);
43 
44 /*
45  * Locate the next (or previous) complete virtual stream transaction given a
46  * file descriptor and direction.  Keep track of partial stream records as
47  * a side effect.
48  *
49  * Note that a transaction might represent a huge I/O operation, resulting
50  * in an overall node structure that spans gigabytes, but individual
51  * subrecord leaf nodes are limited in size and we depend on this to simplify
52  * the handling of leaf records.
53  *
54  * A transaction may cover several raw records.  The jstream collection for
55  * a transaction is only returned when the entire transaction has been
56  * successfully scanned.  Due to the interleaving of transactions the ordering
57  * of returned JS's may be different (not exactly reversed) when scanning a
58  * journal backwards verses forwards.  Since parallel operations are
59  * theoretically non-conflicting, this should not present a problem.
60  */
61 struct jstream *
62 jscan_stream(struct jfile *jf)
63 {
64     struct journal_rawrecbeg head;
65     struct journal_rawrecend tail;
66     struct journal_ackrecord ack;
67     int recsize;
68     int search;
69     int error;
70     struct jstream *js;
71 
72     /*
73      * Get the current offset and make sure it is 16-byte aligned.  If it
74      * isn't, align it and enter search mode.
75      */
76     if (jf->jf_pos & 15) {
77 	jf_warn(jf, "realigning bad offset and entering search mode");
78 	jalign(jf);
79 	search = 1;
80     } else {
81 	search = 0;
82     }
83 
84     error = 0;
85     js = NULL;
86 
87     if (jf->jf_direction == JF_FORWARDS) {
88 	/*
89 	 * Scan the journal forwards.  Note that the file pointer might not
90 	 * be seekable.
91 	 */
92 	while ((error = jread(jf, &head, sizeof(head))) == 0) {
93 	    if (head.begmagic != JREC_BEGMAGIC) {
94 		if (search == 0)
95 		    jf_warn(jf, "bad beginmagic, searching for new record");
96 		search = 1;
97 		jalign(jf);
98 		continue;
99 	    }
100 	    recsize = (head.recsize + 15) & ~15;
101 	    if (recsize <= 0) {
102 		jf_warn(jf, "bad recordsize: %d\n", recsize);
103 		search = 1;
104 		jalign(jf);
105 		continue;
106 	    }
107 	    jset(jf);
108 	    js = malloc(offsetof(struct jstream, js_data[recsize]));
109 	    bzero(js, sizeof(struct jstream));
110 	    bcopy(&head, js->js_data, sizeof(head));
111 	    error = jread(jf, js->js_data + sizeof(head), recsize - sizeof(head));
112 	    if (error) {
113 		jf_warn(jf, "Incomplete stream record\n");
114 		jreturn(jf);
115 		free(js);
116 		js = NULL;
117 		break;
118 	    }
119 
120 	    /*
121 	     * XXX if the stream is full duplex send the ack back now.  This
122 	     * really needs to be delayed until the transaction is committed,
123 	     * but there are stalling issues if the transaction being
124 	     * collected exceeds to the size of the FIFO.  So for now this
125 	     * is just for testing.
126 	     */
127 	    if (jf->jf_flags & JF_FULL_DUPLEX) {
128 		bzero(&ack, sizeof(ack));
129 		ack.rbeg.begmagic = JREC_BEGMAGIC;
130 		ack.rbeg.streamid = JREC_STREAMID_ACK;
131 		ack.rbeg.transid = head.transid;
132 		ack.rbeg.recsize = sizeof(ack);
133 		ack.rend.endmagic = JREC_ENDMAGIC;
134 		ack.rend.recsize = sizeof(ack);
135 		jwrite(jf, &ack, sizeof(ack));
136 	    }
137 
138 	    /*
139 	     * note: recsize is aligned (the actual record size),
140 	     * head.recsize is unaligned (the actual payload size).
141 	     */
142 	    js->js_size = head.recsize;
143 	    bcopy(js->js_data + recsize - sizeof(tail), &tail, sizeof(tail));
144 	    if (tail.endmagic != JREC_ENDMAGIC) {
145 		jf_warn(jf, "bad endmagic, searching for new record");
146 		search = 1;
147 		jreturn(jf);
148 		free(js);
149 		js = NULL;
150 		continue;
151 	    }
152 	    jflush(jf);
153 	    if ((js = jaddrecord(jf, js)) != NULL)
154 		break;
155 	}
156     } else {
157 	/*
158 	 * Scan the journal backwards.  Note that jread()'s reverse-seek and
159 	 * read.  The data read will be forward ordered, however.
160 	 */
161 	while ((error = jread(jf, &tail, sizeof(tail))) == 0) {
162 	    if (tail.endmagic != JREC_ENDMAGIC) {
163 		if (search == 0)
164 		    jf_warn(jf, "bad endmagic, searching for new record");
165 		search = 1;
166 		jalign(jf);
167 		continue;
168 	    }
169 	    recsize = (tail.recsize + 15) & ~15;
170 	    if (recsize <= 0) {
171 		jf_warn(jf, "bad recordsize: %d\n", recsize);
172 		search = 1;
173 		jalign(jf);
174 		continue;
175 	    }
176 	    jset(jf);
177 	    js = malloc(offsetof(struct jstream, js_data[recsize]));
178 	    bzero(js, sizeof(struct jstream));
179 	    bcopy(&tail, js->js_data + recsize - sizeof(tail), sizeof(tail));
180 	    error = jread(jf, js->js_data, recsize - sizeof(tail));
181 
182 	    if (error) {
183 		jf_warn(jf, "Incomplete stream record\n");
184 		jreturn(jf);
185 		free(js);
186 		js = NULL;
187 		break;
188 	    }
189 	    js->js_size = tail.recsize;
190 	    bcopy(js->js_data + recsize - sizeof(tail), &tail, sizeof(tail));
191 	    bcopy(js->js_data, &head, sizeof(head));
192 	    if (head.begmagic != JREC_BEGMAGIC) {
193 		jf_warn(jf, "bad begmagic, searching for new record");
194 		search = 1;
195 		jreturn(jf);
196 		free(js);
197 		continue;
198 	    }
199 	    jflush(jf);
200 	    if ((js = jaddrecord(jf, js)) != NULL)
201 		break;
202 	}
203     }
204     jf->jf_error = error;
205     return(js);
206 }
207 
208 /*
209  * Integrate a jstream record.  Deal with the transaction begin and end flags
210  * to create a forward-referenced collection of jstream records.  If we are
211  * able to complete a transaction, the first js associated with that
212  * transaction is returned.
213  *
214  * XXX we need to store the data for very large multi-record transactions
215  * separately since it might not fit into memory.
216  */
217 static struct jstream *
218 jaddrecord(struct jfile *jf, struct jstream *js)
219 {
220     struct journal_rawrecbeg *head = (void *)js->js_data;
221     struct jhash *jh;
222     struct jhash **jhp;
223 
224     /*
225      * Check for a completely self-contained transaction, just return the
226      * js if possible.
227      */
228     if ((head->streamid & (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)) ==
229 	(JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)
230     ) {
231 	jnormalize(js);
232 	return (js);
233     }
234 
235     /*
236      * Check for an open transaction in the hash table, create a new one
237      * if necessary.
238      */
239     jhp = &JHashAry[head->streamid & JHASH_MASK];
240     while ((jh = *jhp) != NULL) {
241 	if (((jh->jh_transid ^ head->streamid) & JREC_STREAMID_MASK) == 0)
242 	    break;
243 	jhp = &jh->jh_hash;
244     }
245     if (jh == NULL) {
246 	jh = malloc(sizeof(*jh));
247 	bzero(jh, sizeof(*jh));
248 	*jhp = jh;
249 	jh->jh_first = js;
250 	jh->jh_last = js;
251 	jh->jh_transid = head->streamid;
252 	return (NULL);
253     }
254 
255     /*
256      * Emplace the stream segment
257      */
258     jh->jh_transid |= head->streamid & JREC_STREAMCTL_MASK;
259     if (jf->jf_direction == JF_FORWARDS) {
260 	jh->jh_last->js_next = js;
261 	jh->jh_last = js;
262     } else {
263 	js->js_next = jh->jh_first;
264 	jh->jh_first = js;
265     }
266 
267     /*
268      * If the transaction is complete, remove the hash entry and return the
269      * js representing the beginning of the transaction.
270      */
271     if ((jh->jh_transid & (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)) ==
272 	(JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)
273     ) {
274 	*jhp = jh->jh_hash;
275 	js = jh->jh_first;
276 	free(jh);
277 
278 	jnormalize(js);
279     } else {
280 	js = NULL;
281     }
282     return (js);
283 }
284 
285 /*
286  * Renormalize the jscan list to remove all the meta record headers
287  * and trailers except for the very first one.
288  */
289 static
290 void
291 jnormalize(struct jstream *js)
292 {
293     struct jstream *jscan;
294     off_t off;
295 
296     js->js_normalized_off = 0;
297     js->js_normalized_base = js->js_data;
298     js->js_normalized_size = ((struct journal_rawrecbeg *)js->js_data)->recsize - sizeof(struct journal_rawrecend);
299     js->js_normalized_total = js->js_normalized_size;
300     off = js->js_normalized_size;
301     for (jscan = js->js_next; jscan; jscan = jscan->js_next) {
302 	jscan->js_normalized_off = off;
303 	jscan->js_normalized_base = jscan->js_data +
304 		sizeof(struct journal_rawrecbeg);
305 	jscan->js_normalized_size = jscan->js_size -
306 	       sizeof(struct journal_rawrecbeg) -
307 	       sizeof(struct journal_rawrecend);
308 	off += jscan->js_normalized_size;
309 	js->js_normalized_total += jscan->js_normalized_size;
310     }
311 }
312 
313 void
314 jscan_dispose(struct jstream *js)
315 {
316     struct jstream *jnext;
317 
318     if (js->js_alloc_buf) {
319 	free(js->js_alloc_buf);
320 	js->js_alloc_buf = NULL;
321 	js->js_alloc_size = 0;
322     }
323 
324     while (js) {
325 	jnext = js->js_next;
326 	free(js);
327 	js = jnext;
328     }
329 }
330 
331 /*
332  * Read the specified block of data out of a linked set of jstream
333  * structures.  Returns 0 on success or an error code on error.
334  */
335 int
336 jsread(struct jstream *js, off_t off, void *buf, int bytes)
337 {
338     const void *ptr;
339     int n;
340 
341     while (bytes) {
342 	n = jsreadany(js, off, &ptr);
343 	if (n == 0)
344 	    return (ENOENT);
345 	if (n > bytes)
346 	    n = bytes;
347 	bcopy(ptr, buf, n);
348 	buf = (char *)buf + n;
349 	off += n;
350 	bytes -= n;
351     }
352     return(0);
353 }
354 
355 /*
356  * Read the specified block of data out of a linked set of jstream
357  * structures.  Attempt to return a pointer into the data set but
358  * allocate and copy if that is not possible.  Returns 0 on success
359  * or an error code on error.
360  */
361 int
362 jsreadp(struct jstream *js, off_t off, const void **bufp,
363 	int bytes)
364 {
365     int error = 0;
366     int n;
367 
368     n = jsreadany(js, off, bufp);
369     if (n < bytes) {
370 	if (js->js_alloc_size < bytes) {
371 	    if (js->js_alloc_buf)
372 		free(js->js_alloc_buf);
373 	    js->js_alloc_buf = malloc(bytes);
374 	    js->js_alloc_size = bytes;
375 	    assert(js->js_alloc_buf != NULL);
376 	}
377 	error = jsread(js, off, js->js_alloc_buf, bytes);
378 	if (error) {
379 	    *bufp = NULL;
380 	} else {
381 	    *bufp = js->js_alloc_buf;
382 	}
383     }
384     return(error);
385 }
386 
387 int
388 jsreadcallback(struct jstream *js, ssize_t (*func)(int, const void *, size_t),
389 		int fd, off_t off, int bytes)
390 {
391     const void *bufp;
392     int res;
393     int n;
394     int r;
395 
396     res = 0;
397     while (bytes && (n = jsreadany(js, off, &bufp)) > 0) {
398 	if (n > bytes)
399 	    n = bytes;
400 	r = func(fd, bufp, n);
401 	if (r != n) {
402 	    if (res == 0)
403 		res = -1;
404 	}
405 	res += n;
406 	bytes -= n;
407 	off += n;
408     }
409     return(res);
410 }
411 
412 /*
413  * Return the largest contiguous buffer starting at the specified offset,
414  * or 0.
415  */
416 int
417 jsreadany(struct jstream *js, off_t off, const void **bufp)
418 {
419     struct jstream *scan;
420     int n;
421 
422     if ((scan = js->js_cache) == NULL || scan->js_normalized_off > off)
423 	scan = js;
424     while (scan && scan->js_normalized_off <= off) {
425 	js->js_cache = scan;
426 	if (scan->js_normalized_off + scan->js_normalized_size > off) {
427 	    n = (int)(off - scan->js_normalized_off);
428 	    *bufp = scan->js_normalized_base + n;
429 	    return(scan->js_normalized_size - n);
430 	}
431 	scan = scan->js_next;
432     }
433     return(0);
434 }
435 
436