1 /* 2 * Copyright (c) 2004,2005 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 * 34 * $DragonFly: src/sbin/jscan/jstream.c,v 1.4 2005/07/06 06:06:44 dillon Exp $ 35 */ 36 37 #include "jscan.h" 38 39 static struct jhash *JHashAry[JHASH_SIZE]; 40 41 static struct jstream *jaddrecord(struct jfile *jf, struct jstream *js); 42 static void jnormalize(struct jstream *js); 43 44 /* 45 * Locate the next (or previous) complete virtual stream transaction given a 46 * file descriptor and direction. Keep track of partial stream records as 47 * a side effect. 48 * 49 * Note that a transaction might represent a huge I/O operation, resulting 50 * in an overall node structure that spans gigabytes, but individual 51 * subrecord leaf nodes are limited in size and we depend on this to simplify 52 * the handling of leaf records. 53 * 54 * A transaction may cover several raw records. The jstream collection for 55 * a transaction is only returned when the entire transaction has been 56 * successfully scanned. Due to the interleaving of transactions the ordering 57 * of returned JS's may be different (not exactly reversed) when scanning a 58 * journal backwards verses forwards. Since parallel operations are 59 * theoretically non-conflicting, this should not present a problem. 60 */ 61 struct jstream * 62 jscan_stream(struct jfile *jf) 63 { 64 struct journal_rawrecbeg head; 65 struct journal_rawrecend tail; 66 struct journal_ackrecord ack; 67 int recsize; 68 int search; 69 int error; 70 struct jstream *js; 71 72 /* 73 * Get the current offset and make sure it is 16-byte aligned. If it 74 * isn't, align it and enter search mode. 75 */ 76 if (jf->jf_pos & 15) { 77 jf_warn(jf, "realigning bad offset and entering search mode"); 78 jalign(jf); 79 search = 1; 80 } else { 81 search = 0; 82 } 83 84 error = 0; 85 js = NULL; 86 87 if (jf->jf_direction == JF_FORWARDS) { 88 /* 89 * Scan the journal forwards. Note that the file pointer might not 90 * be seekable. 91 */ 92 while ((error = jread(jf, &head, sizeof(head))) == 0) { 93 if (head.begmagic != JREC_BEGMAGIC) { 94 if (search == 0) 95 jf_warn(jf, "bad beginmagic, searching for new record"); 96 search = 1; 97 jalign(jf); 98 continue; 99 } 100 recsize = (head.recsize + 15) & ~15; 101 if (recsize <= 0) { 102 jf_warn(jf, "bad recordsize: %d\n", recsize); 103 search = 1; 104 jalign(jf); 105 continue; 106 } 107 jset(jf); 108 js = malloc(offsetof(struct jstream, js_data[recsize])); 109 bzero(js, sizeof(struct jstream)); 110 bcopy(&head, js->js_data, sizeof(head)); 111 error = jread(jf, js->js_data + sizeof(head), recsize - sizeof(head)); 112 if (error) { 113 jf_warn(jf, "Incomplete stream record\n"); 114 jreturn(jf); 115 free(js); 116 js = NULL; 117 break; 118 } 119 120 /* 121 * XXX if the stream is full duplex send the ack back now. This 122 * really needs to be delayed until the transaction is committed, 123 * but there are stalling issues if the transaction being 124 * collected exceeds to the size of the FIFO. So for now this 125 * is just for testing. 126 */ 127 if (jf->jf_flags & JF_FULL_DUPLEX) { 128 bzero(&ack, sizeof(ack)); 129 ack.rbeg.begmagic = JREC_BEGMAGIC; 130 ack.rbeg.streamid = JREC_STREAMID_ACK; 131 ack.rbeg.transid = head.transid; 132 ack.rbeg.recsize = sizeof(ack); 133 ack.rend.endmagic = JREC_ENDMAGIC; 134 ack.rend.recsize = sizeof(ack); 135 jwrite(jf, &ack, sizeof(ack)); 136 } 137 138 /* 139 * note: recsize is aligned (the actual record size), 140 * head.recsize is unaligned (the actual payload size). 141 */ 142 js->js_size = head.recsize; 143 bcopy(js->js_data + recsize - sizeof(tail), &tail, sizeof(tail)); 144 if (tail.endmagic != JREC_ENDMAGIC) { 145 jf_warn(jf, "bad endmagic, searching for new record"); 146 search = 1; 147 jreturn(jf); 148 free(js); 149 js = NULL; 150 continue; 151 } 152 jflush(jf); 153 if ((js = jaddrecord(jf, js)) != NULL) 154 break; 155 } 156 } else { 157 /* 158 * Scan the journal backwards. Note that jread()'s reverse-seek and 159 * read. The data read will be forward ordered, however. 160 */ 161 while ((error = jread(jf, &tail, sizeof(tail))) == 0) { 162 if (tail.endmagic != JREC_ENDMAGIC) { 163 if (search == 0) 164 jf_warn(jf, "bad endmagic, searching for new record"); 165 search = 1; 166 jalign(jf); 167 continue; 168 } 169 recsize = (tail.recsize + 15) & ~15; 170 if (recsize <= 0) { 171 jf_warn(jf, "bad recordsize: %d\n", recsize); 172 search = 1; 173 jalign(jf); 174 continue; 175 } 176 jset(jf); 177 js = malloc(offsetof(struct jstream, js_data[recsize])); 178 bzero(js, sizeof(struct jstream)); 179 bcopy(&tail, js->js_data + recsize - sizeof(tail), sizeof(tail)); 180 error = jread(jf, js->js_data, recsize - sizeof(tail)); 181 182 if (error) { 183 jf_warn(jf, "Incomplete stream record\n"); 184 jreturn(jf); 185 free(js); 186 js = NULL; 187 break; 188 } 189 js->js_size = tail.recsize; 190 bcopy(js->js_data + recsize - sizeof(tail), &tail, sizeof(tail)); 191 bcopy(js->js_data, &head, sizeof(head)); 192 if (head.begmagic != JREC_BEGMAGIC) { 193 jf_warn(jf, "bad begmagic, searching for new record"); 194 search = 1; 195 jreturn(jf); 196 free(js); 197 continue; 198 } 199 jflush(jf); 200 if ((js = jaddrecord(jf, js)) != NULL) 201 break; 202 } 203 } 204 jf->jf_error = error; 205 return(js); 206 } 207 208 /* 209 * Integrate a jstream record. Deal with the transaction begin and end flags 210 * to create a forward-referenced collection of jstream records. If we are 211 * able to complete a transaction, the first js associated with that 212 * transaction is returned. 213 * 214 * XXX we need to store the data for very large multi-record transactions 215 * separately since it might not fit into memory. 216 */ 217 static struct jstream * 218 jaddrecord(struct jfile *jf, struct jstream *js) 219 { 220 struct journal_rawrecbeg *head = (void *)js->js_data; 221 struct jhash *jh; 222 struct jhash **jhp; 223 224 /* 225 * Check for a completely self-contained transaction, just return the 226 * js if possible. 227 */ 228 if ((head->streamid & (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)) == 229 (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END) 230 ) { 231 jnormalize(js); 232 return (js); 233 } 234 235 /* 236 * Check for an open transaction in the hash table, create a new one 237 * if necessary. 238 */ 239 jhp = &JHashAry[head->streamid & JHASH_MASK]; 240 while ((jh = *jhp) != NULL) { 241 if (((jh->jh_transid ^ head->streamid) & JREC_STREAMID_MASK) == 0) 242 break; 243 jhp = &jh->jh_hash; 244 } 245 if (jh == NULL) { 246 jh = malloc(sizeof(*jh)); 247 bzero(jh, sizeof(*jh)); 248 *jhp = jh; 249 jh->jh_first = js; 250 jh->jh_last = js; 251 jh->jh_transid = head->streamid; 252 return (NULL); 253 } 254 255 /* 256 * Emplace the stream segment 257 */ 258 jh->jh_transid |= head->streamid & JREC_STREAMCTL_MASK; 259 if (jf->jf_direction == JF_FORWARDS) { 260 jh->jh_last->js_next = js; 261 jh->jh_last = js; 262 } else { 263 js->js_next = jh->jh_first; 264 jh->jh_first = js; 265 } 266 267 /* 268 * If the transaction is complete, remove the hash entry and return the 269 * js representing the beginning of the transaction. 270 */ 271 if ((jh->jh_transid & (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)) == 272 (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END) 273 ) { 274 *jhp = jh->jh_hash; 275 js = jh->jh_first; 276 free(jh); 277 278 jnormalize(js); 279 } else { 280 js = NULL; 281 } 282 return (js); 283 } 284 285 /* 286 * Renormalize the jscan list to remove all the meta record headers 287 * and trailers except for the very first one. 288 */ 289 static 290 void 291 jnormalize(struct jstream *js) 292 { 293 struct jstream *jscan; 294 off_t off; 295 296 js->js_normalized_off = 0; 297 js->js_normalized_base = js->js_data; 298 js->js_normalized_size = ((struct journal_rawrecbeg *)js->js_data)->recsize - sizeof(struct journal_rawrecend); 299 js->js_normalized_total = js->js_normalized_size; 300 off = js->js_normalized_size; 301 for (jscan = js->js_next; jscan; jscan = jscan->js_next) { 302 jscan->js_normalized_off = off; 303 jscan->js_normalized_base = jscan->js_data + 304 sizeof(struct journal_rawrecbeg); 305 jscan->js_normalized_size = jscan->js_size - 306 sizeof(struct journal_rawrecbeg) - 307 sizeof(struct journal_rawrecend); 308 off += jscan->js_normalized_size; 309 js->js_normalized_total += jscan->js_normalized_size; 310 } 311 } 312 313 void 314 jscan_dispose(struct jstream *js) 315 { 316 struct jstream *jnext; 317 318 if (js->js_alloc_buf) { 319 free(js->js_alloc_buf); 320 js->js_alloc_buf = NULL; 321 js->js_alloc_size = 0; 322 } 323 324 while (js) { 325 jnext = js->js_next; 326 free(js); 327 js = jnext; 328 } 329 } 330 331 /* 332 * Read the specified block of data out of a linked set of jstream 333 * structures. Returns 0 on success or an error code on error. 334 */ 335 int 336 jsread(struct jstream *js, off_t off, void *buf, int bytes) 337 { 338 const void *ptr; 339 int n; 340 341 while (bytes) { 342 n = jsreadany(js, off, &ptr); 343 if (n == 0) 344 return (ENOENT); 345 if (n > bytes) 346 n = bytes; 347 bcopy(ptr, buf, n); 348 buf = (char *)buf + n; 349 off += n; 350 bytes -= n; 351 } 352 return(0); 353 } 354 355 /* 356 * Read the specified block of data out of a linked set of jstream 357 * structures. Attempt to return a pointer into the data set but 358 * allocate and copy if that is not possible. Returns 0 on success 359 * or an error code on error. 360 */ 361 int 362 jsreadp(struct jstream *js, off_t off, const void **bufp, 363 int bytes) 364 { 365 int error = 0; 366 int n; 367 368 n = jsreadany(js, off, bufp); 369 if (n < bytes) { 370 if (js->js_alloc_size < bytes) { 371 if (js->js_alloc_buf) 372 free(js->js_alloc_buf); 373 js->js_alloc_buf = malloc(bytes); 374 js->js_alloc_size = bytes; 375 assert(js->js_alloc_buf != NULL); 376 } 377 error = jsread(js, off, js->js_alloc_buf, bytes); 378 if (error) { 379 *bufp = NULL; 380 } else { 381 *bufp = js->js_alloc_buf; 382 } 383 } 384 return(error); 385 } 386 387 int 388 jsreadcallback(struct jstream *js, ssize_t (*func)(int, const void *, size_t), 389 int fd, off_t off, int bytes) 390 { 391 const void *bufp; 392 int res; 393 int n; 394 int r; 395 396 res = 0; 397 while (bytes && (n = jsreadany(js, off, &bufp)) > 0) { 398 if (n > bytes) 399 n = bytes; 400 r = func(fd, bufp, n); 401 if (r != n) { 402 if (res == 0) 403 res = -1; 404 } 405 res += n; 406 bytes -= n; 407 off += n; 408 } 409 return(res); 410 } 411 412 /* 413 * Return the largest contiguous buffer starting at the specified offset, 414 * or 0. 415 */ 416 int 417 jsreadany(struct jstream *js, off_t off, const void **bufp) 418 { 419 struct jstream *scan; 420 int n; 421 422 if ((scan = js->js_cache) == NULL || scan->js_normalized_off > off) 423 scan = js; 424 while (scan && scan->js_normalized_off <= off) { 425 js->js_cache = scan; 426 if (scan->js_normalized_off + scan->js_normalized_size > off) { 427 n = (int)(off - scan->js_normalized_off); 428 *bufp = scan->js_normalized_base + n; 429 return(scan->js_normalized_size - n); 430 } 431 scan = scan->js_next; 432 } 433 return(0); 434 } 435 436