1 /* 2 * Copyright (c) 2004,2005 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 * 34 * $DragonFly: src/sbin/jscan/jstream.c,v 1.8 2005/09/07 07:20:23 dillon Exp $ 35 */ 36 37 #include "jscan.h" 38 39 static struct jhash *JHashAry[JHASH_SIZE]; 40 41 static void jnormalize(struct jstream *js); 42 static int jaddrecord_backtrack(struct jsession *ss, struct jdata *jd); 43 44 /* 45 * Integrate a raw record. Deal with the transaction begin and end flags 46 * to create a forward-referenced collection of jstream records. If we are 47 * able to complete a transaction, the first js associated with that 48 * transaction is returned. 49 * 50 * XXX we need to store the data for very large multi-record transactions 51 * separately since it might not fit into memory. 52 * 53 * Note that a transaction might represent a huge I/O operation, resulting 54 * in an overall node structure that spans gigabytes, but individual 55 * subrecord leaf nodes are limited in size and we depend on this to simplify 56 * the handling of leaf records. 57 * 58 * A transaction may cover several raw records. The jstream collection for 59 * a transaction is only returned when the entire transaction has been 60 * successfully scanned. Due to the interleaving of transactions the ordering 61 * of returned JS's may be different (not exactly reversed) when scanning a 62 * journal backwards verses forwards. Since parallel operations are 63 * theoretically non-conflicting, this should not present a problem. 64 */ 65 struct jstream * 66 jaddrecord(struct jsession *ss, struct jdata *jd) 67 { 68 struct journal_rawrecbeg *head; 69 struct jstream *js; 70 struct jhash *jh; 71 struct jhash **jhp; 72 73 js = malloc(sizeof(struct jstream)); 74 bzero(js, sizeof(struct jstream)); 75 js->js_jdata = jref(jd); 76 js->js_head = (void *)jd->jd_data; 77 js->js_session = ss; 78 head = js->js_head; 79 80 /* 81 * Check for a completely self-contained transaction, just return the 82 * js if possible. 83 */ 84 if ((head->streamid & (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)) == 85 (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END) 86 ) { 87 jnormalize(js); 88 return (js); 89 } 90 91 retry: 92 /* 93 * Check for an open transaction in the hash table. 94 */ 95 jhp = &JHashAry[head->streamid & JHASH_MASK]; 96 while ((jh = *jhp) != NULL) { 97 if (jh->jh_session == ss && 98 ((jh->jh_transid ^ head->streamid) & JREC_STREAMID_MASK) == 0 99 ) { 100 break; 101 } 102 jhp = &jh->jh_hash; 103 } 104 105 /* 106 * We might have picked up a transaction in the middle, which we 107 * detect by not finding a hash record coupled with a raw record 108 * whos JREC_STREAMCTL_BEGIN bit is not set (or JREC_STREAMCTL_END 109 * bit if we are scanning backwards). 110 * 111 * When this case occurs we have to backtrack to locate the 112 * BEGIN (END if scanning backwards) and collect those records 113 * in order to obtain a complete transaction. 114 * 115 * This case most often occurs when a batch operation runs in 116 * prefix set whos starting raw-record transaction id is in 117 * the middle of one or more meta-transactions. It's a bit of 118 * a tricky situation, but easily resolvable by scanning the 119 * prefix set backwards (forwards if originally scanning backwards) 120 * to locate the raw record representing the start (end) of the 121 * transaction. 122 */ 123 if (jh == NULL) { 124 if (ss->ss_direction == JD_FORWARDS && 125 (head->streamid & JREC_STREAMCTL_BEGIN) == 0 126 ) { 127 if (verbose_opt > 1) 128 fprintf(stderr, "mid-transaction detected transid %016llx streamid %04x\n", jd->jd_transid, head->streamid & JREC_STREAMID_MASK); 129 if (jaddrecord_backtrack(ss, jd) == 0) { 130 if (verbose_opt) 131 fprintf(stderr, "mid-transaction streamid %04x collection succeeded\n", head->streamid & JREC_STREAMID_MASK); 132 goto retry; 133 } 134 fprintf(stderr, "mid-transaction streamid %04x collection failed\n", head->streamid & JREC_STREAMID_MASK); 135 jscan_dispose(js); 136 return(NULL); 137 } else if (ss->ss_direction == JD_BACKWARDS && 138 (head->streamid & JREC_STREAMCTL_END) == 0 139 ) { 140 if (verbose_opt > 1) 141 fprintf(stderr, "mid-transaction detected transid %016llx streamid %04x\n", jd->jd_transid, head->streamid & JREC_STREAMID_MASK); 142 if (jaddrecord_backtrack(ss, jd) == 0) { 143 if (verbose_opt) 144 fprintf(stderr, "mid-transaction streamid %04x collection succeeded\n", head->streamid & JREC_STREAMID_MASK); 145 goto retry; 146 } 147 fprintf(stderr, "mid-transaction streamid %04x collection failed\n", head->streamid & JREC_STREAMID_MASK); 148 jscan_dispose(js); 149 return(NULL); 150 } 151 } 152 153 /* 154 * If we've made it to here and we still don't have a hash record 155 * to track the transaction, create one. 156 */ 157 if (jh == NULL) { 158 jh = malloc(sizeof(*jh)); 159 bzero(jh, sizeof(*jh)); 160 *jhp = jh; 161 jh->jh_first = js; 162 jh->jh_last = js; 163 jh->jh_transid = head->streamid; 164 jh->jh_session = ss; 165 return (NULL); 166 } 167 168 /* 169 * Emplace the stream segment 170 */ 171 jh->jh_transid |= head->streamid & JREC_STREAMCTL_MASK; 172 if (ss->ss_direction == JD_FORWARDS) { 173 jh->jh_last->js_next = js; 174 jh->jh_last = js; 175 } else { 176 js->js_next = jh->jh_first; 177 jh->jh_first = js; 178 } 179 180 /* 181 * If the transaction is complete, remove the hash entry and return the 182 * js representing the beginning of the transaction. Otherwise leave 183 * the hash entry intact and return NULL. 184 */ 185 if ((jh->jh_transid & (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)) == 186 (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END) 187 ) { 188 *jhp = jh->jh_hash; 189 js = jh->jh_first; 190 free(jh); 191 192 jnormalize(js); 193 } else { 194 js = NULL; 195 } 196 return (js); 197 } 198 199 /* 200 * Renormalize the jscan list to remove all the meta record headers 201 * and trailers except for the very first one. 202 */ 203 static 204 void 205 jnormalize(struct jstream *js) 206 { 207 struct jstream *jscan; 208 off_t off; 209 210 js->js_normalized_off = 0; 211 js->js_normalized_base = (void *)js->js_head; 212 js->js_normalized_size = js->js_head->recsize - sizeof(struct journal_rawrecend); 213 js->js_normalized_total = js->js_normalized_size; 214 off = js->js_normalized_size; 215 for (jscan = js->js_next; jscan; jscan = jscan->js_next) { 216 jscan->js_normalized_off = off; 217 jscan->js_normalized_base = (char *)jscan->js_head + 218 sizeof(struct journal_rawrecbeg); 219 jscan->js_normalized_size = jscan->js_head->recsize - 220 sizeof(struct journal_rawrecbeg) - 221 sizeof(struct journal_rawrecend); 222 off += jscan->js_normalized_size; 223 js->js_normalized_total += jscan->js_normalized_size; 224 } 225 } 226 227 /* 228 * For sanity's sake I will describe the normal backtracking that must occur, 229 * but this routine must also operate on reverse-scanned (undo) records 230 * by forward tracking. 231 * 232 * A record has been found that represents the middle or end of a transaction 233 * when we were expecting the beginning of a transaction. We must backtrack 234 * to locate the start of the transaction, then process raw records relating 235 * to the transaction until we reach our current point (jd) again. If 236 * we find a matching streamid representing the end of a transaction instead 237 * of the expected start-of-transaction that record belongs to a wholely 238 * different meta-transaction and the record we seek is known to not be 239 * available. 240 * 241 * jd is the current record, directon is the normal scan direction (we have 242 * to scan in the reverse direction). 243 */ 244 static 245 int 246 jaddrecord_backtrack(struct jsession *ss, struct jdata *jd) 247 { 248 struct jfile *jf = ss->ss_jfin; 249 struct jdata *scan; 250 struct jstream *js; 251 u_int16_t streamid; 252 u_int16_t scanid; 253 254 assert(ss->ss_direction == JD_FORWARDS || ss->ss_direction == JD_BACKWARDS); 255 if (jmodes & JMODEF_INPUT_PIPE) 256 return(-1); 257 258 streamid = ((struct journal_rawrecbeg *)jd->jd_data)->streamid & JREC_STREAMID_MASK; 259 260 if (ss->ss_direction == JD_FORWARDS) { 261 /* 262 * Backtrack in the reverse direction looking for the transaction 263 * start bit. If we find an end bit instead it belongs to an 264 * unrelated transaction using the same streamid and there is no 265 * point continuing. 266 */ 267 scan = jref(jd); 268 while ((scan = jread(jf, scan, JD_BACKWARDS)) != NULL) { 269 scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid; 270 if ((scanid & JREC_STREAMID_MASK) != streamid) 271 continue; 272 if (scanid & JREC_STREAMCTL_END) { 273 jfree(jf, scan); 274 return(-1); 275 } 276 if (scanid & JREC_STREAMCTL_BEGIN) 277 break; 278 } 279 280 /* 281 * Now jaddrecord the related records. 282 */ 283 while (scan != NULL && scan->jd_transid < jd->jd_transid) { 284 scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid; 285 if ((scanid & JREC_STREAMID_MASK) == streamid) { 286 js = jaddrecord(ss, scan); 287 assert(js == NULL); 288 } 289 scan = jread(jf, scan, JD_FORWARDS); 290 } 291 if (scan == NULL) 292 return(-1); 293 jfree(jf, scan); 294 } else { 295 /* 296 * Backtrack in the forwards direction looking for the transaction 297 * end bit. If we find a start bit instead if belongs to an 298 * unrelated transaction using the same streamid and there is no 299 * point continuing. 300 */ 301 scan = jref(jd); 302 while ((scan = jread(jf, scan, JD_FORWARDS)) != NULL) { 303 scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid; 304 if ((scanid & JREC_STREAMID_MASK) != streamid) 305 continue; 306 if (scanid & JREC_STREAMCTL_BEGIN) { 307 jfree(jf, scan); 308 return(-1); 309 } 310 if (scanid & JREC_STREAMCTL_END) 311 break; 312 } 313 314 /* 315 * Now jaddrecord the related records. 316 */ 317 while (scan != NULL && scan->jd_transid > jd->jd_transid) { 318 scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid; 319 if ((scanid & JREC_STREAMID_MASK) == streamid) { 320 js = jaddrecord(ss, scan); 321 assert(js == NULL); 322 } 323 scan = jread(jf, scan, JD_BACKWARDS); 324 } 325 if (scan == NULL) 326 return(-1); 327 jfree(jf, scan); 328 } 329 return(0); 330 } 331 332 void 333 jscan_dispose(struct jstream *js) 334 { 335 struct jstream *jnext; 336 337 if (js->js_alloc_buf) { 338 free(js->js_alloc_buf); 339 js->js_alloc_buf = NULL; 340 js->js_alloc_size = 0; 341 } 342 343 while (js) { 344 jnext = js->js_next; 345 jfree(js->js_session->ss_jfin, js->js_jdata); 346 js->js_jdata = NULL; 347 free(js); 348 js = jnext; 349 } 350 } 351 352 /* 353 * Read the specified block of data out of a linked set of jstream 354 * structures. Returns 0 on success or an error code on error. 355 */ 356 int 357 jsread(struct jstream *js, off_t off, void *buf, int bytes) 358 { 359 const void *ptr; 360 int n; 361 362 while (bytes) { 363 n = jsreadany(js, off, &ptr); 364 if (n == 0) 365 return (ENOENT); 366 if (n > bytes) 367 n = bytes; 368 bcopy(ptr, buf, n); 369 buf = (char *)buf + n; 370 off += n; 371 bytes -= n; 372 } 373 return(0); 374 } 375 376 /* 377 * Read the specified block of data out of a linked set of jstream 378 * structures. Attempt to return a pointer into the data set but 379 * allocate and copy if that is not possible. Returns 0 on success 380 * or an error code on error. 381 */ 382 int 383 jsreadp(struct jstream *js, off_t off, const void **bufp, 384 int bytes) 385 { 386 int error = 0; 387 int n; 388 389 n = jsreadany(js, off, bufp); 390 if (n < bytes) { 391 if (js->js_alloc_size < bytes) { 392 if (js->js_alloc_buf) 393 free(js->js_alloc_buf); 394 js->js_alloc_buf = malloc(bytes); 395 js->js_alloc_size = bytes; 396 if (js->js_alloc_buf == NULL) 397 fprintf(stderr, "attempt to allocate %d bytes failed\n", bytes); 398 assert(js->js_alloc_buf != NULL); 399 } 400 error = jsread(js, off, js->js_alloc_buf, bytes); 401 if (error) { 402 *bufp = NULL; 403 } else { 404 *bufp = js->js_alloc_buf; 405 } 406 } 407 return(error); 408 } 409 410 int 411 jsreadcallback(struct jstream *js, ssize_t (*func)(int, const void *, size_t), 412 int fd, off_t off, int bytes) 413 { 414 const void *bufp; 415 int res; 416 int n; 417 int r; 418 419 res = 0; 420 while (bytes && (n = jsreadany(js, off, &bufp)) > 0) { 421 if (n > bytes) 422 n = bytes; 423 r = func(fd, bufp, n); 424 if (r != n) { 425 if (res == 0) 426 res = -1; 427 } 428 res += n; 429 bytes -= n; 430 off += n; 431 } 432 return(res); 433 } 434 435 /* 436 * Return the largest contiguous buffer starting at the specified offset, 437 * or 0. 438 */ 439 int 440 jsreadany(struct jstream *js, off_t off, const void **bufp) 441 { 442 struct jstream *scan; 443 int n; 444 445 if ((scan = js->js_cache) == NULL || scan->js_normalized_off > off) 446 scan = js; 447 while (scan && scan->js_normalized_off <= off) { 448 js->js_cache = scan; 449 if (scan->js_normalized_off + scan->js_normalized_size > off) { 450 n = (int)(off - scan->js_normalized_off); 451 *bufp = scan->js_normalized_base + n; 452 return(scan->js_normalized_size - n); 453 } 454 scan = scan->js_next; 455 } 456 return(0); 457 } 458 459