1 /* 2 * Copyright (c) 2004,2005 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 * 34 * $DragonFly: src/sbin/jscan/jstream.c,v 1.8 2005/09/07 07:20:23 dillon Exp $ 35 */ 36 37 #include "jscan.h" 38 39 static struct jhash *JHashAry[JHASH_SIZE]; 40 41 static void jnormalize(struct jstream *js); 42 static int jaddrecord_backtrack(struct jsession *ss, struct jdata *jd); 43 44 /* 45 * Integrate a raw record. Deal with the transaction begin and end flags 46 * to create a forward-referenced collection of jstream records. If we are 47 * able to complete a transaction, the first js associated with that 48 * transaction is returned. 49 * 50 * XXX we need to store the data for very large multi-record transactions 51 * separately since it might not fit into memory. 52 * 53 * Note that a transaction might represent a huge I/O operation, resulting 54 * in an overall node structure that spans gigabytes, but individual 55 * subrecord leaf nodes are limited in size and we depend on this to simplify 56 * the handling of leaf records. 57 * 58 * A transaction may cover several raw records. The jstream collection for 59 * a transaction is only returned when the entire transaction has been 60 * successfully scanned. Due to the interleaving of transactions the ordering 61 * of returned JS's may be different (not exactly reversed) when scanning a 62 * journal backwards verses forwards. Since parallel operations are 63 * theoretically non-conflicting, this should not present a problem. 64 */ 65 struct jstream * 66 jaddrecord(struct jsession *ss, struct jdata *jd) 67 { 68 struct journal_rawrecbeg *head; 69 struct jstream *js; 70 struct jhash *jh; 71 struct jhash **jhp; 72 73 js = malloc(sizeof(struct jstream)); 74 bzero(js, sizeof(struct jstream)); 75 js->js_jdata = jref(jd); 76 js->js_head = (void *)jd->jd_data; 77 js->js_session = ss; 78 head = js->js_head; 79 80 /* 81 * Check for a completely self-contained transaction, just return the 82 * js if possible. 83 */ 84 if ((head->streamid & (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)) == 85 (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END) 86 ) { 87 jnormalize(js); 88 return (js); 89 } 90 91 retry: 92 /* 93 * Check for an open transaction in the hash table. 94 */ 95 jhp = &JHashAry[head->streamid & JHASH_MASK]; 96 while ((jh = *jhp) != NULL) { 97 if (jh->jh_session == ss && 98 ((jh->jh_transid ^ head->streamid) & JREC_STREAMID_MASK) == 0 99 ) { 100 break; 101 } 102 jhp = &jh->jh_hash; 103 } 104 105 /* 106 * We might have picked up a transaction in the middle, which we 107 * detect by not finding a hash record coupled with a raw record 108 * whos JREC_STREAMCTL_BEGIN bit is not set (or JREC_STREAMCTL_END 109 * bit if we are scanning backwards). 110 * 111 * When this case occurs we have to backtrack to locate the 112 * BEGIN (END if scanning backwards) and collect those records 113 * in order to obtain a complete transaction. 114 * 115 * This case most often occurs when a batch operation runs in 116 * prefix set whos starting raw-record transaction id is in 117 * the middle of one or more meta-transactions. It's a bit of 118 * a tricky situation, but easily resolvable by scanning the 119 * prefix set backwards (forwards if originally scanning backwards) 120 * to locate the raw record representing the start (end) of the 121 * transaction. 122 */ 123 if (jh == NULL) { 124 if (ss->ss_direction == JD_FORWARDS && 125 (head->streamid & JREC_STREAMCTL_BEGIN) == 0 126 ) { 127 if (verbose_opt > 1) 128 fprintf(stderr, "mid-transaction detected transid %016jx " 129 "streamid %04x\n", 130 (uintmax_t)jd->jd_transid, 131 head->streamid & JREC_STREAMID_MASK); 132 if (jaddrecord_backtrack(ss, jd) == 0) { 133 if (verbose_opt) 134 fprintf(stderr, "mid-transaction streamid %04x collection " 135 "succeeded\n", 136 head->streamid & JREC_STREAMID_MASK); 137 goto retry; 138 } 139 fprintf(stderr, "mid-transaction streamid %04x collection failed\n", 140 head->streamid & JREC_STREAMID_MASK); 141 jscan_dispose(js); 142 return(NULL); 143 } else if (ss->ss_direction == JD_BACKWARDS && 144 (head->streamid & JREC_STREAMCTL_END) == 0 145 ) { 146 if (verbose_opt > 1) 147 fprintf(stderr, "mid-transaction detected transid %016jx " 148 "streamid %04x\n", 149 (uintmax_t)jd->jd_transid, 150 head->streamid & JREC_STREAMID_MASK); 151 if (jaddrecord_backtrack(ss, jd) == 0) { 152 if (verbose_opt) 153 fprintf(stderr, "mid-transaction streamid %04x " 154 "collection succeeded\n", 155 head->streamid & JREC_STREAMID_MASK); 156 goto retry; 157 } 158 fprintf(stderr, "mid-transaction streamid %04x collection failed\n", 159 head->streamid & JREC_STREAMID_MASK); 160 jscan_dispose(js); 161 return(NULL); 162 } 163 } 164 165 /* 166 * If we've made it to here and we still don't have a hash record 167 * to track the transaction, create one. 168 */ 169 if (jh == NULL) { 170 jh = malloc(sizeof(*jh)); 171 bzero(jh, sizeof(*jh)); 172 *jhp = jh; 173 jh->jh_first = js; 174 jh->jh_last = js; 175 jh->jh_transid = head->streamid; 176 jh->jh_session = ss; 177 return (NULL); 178 } 179 180 /* 181 * Emplace the stream segment 182 */ 183 jh->jh_transid |= head->streamid & JREC_STREAMCTL_MASK; 184 if (ss->ss_direction == JD_FORWARDS) { 185 jh->jh_last->js_next = js; 186 jh->jh_last = js; 187 } else { 188 js->js_next = jh->jh_first; 189 jh->jh_first = js; 190 } 191 192 /* 193 * If the transaction is complete, remove the hash entry and return the 194 * js representing the beginning of the transaction. Otherwise leave 195 * the hash entry intact and return NULL. 196 */ 197 if ((jh->jh_transid & (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)) == 198 (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END) 199 ) { 200 *jhp = jh->jh_hash; 201 js = jh->jh_first; 202 free(jh); 203 204 jnormalize(js); 205 } else { 206 js = NULL; 207 } 208 return (js); 209 } 210 211 /* 212 * Renormalize the jscan list to remove all the meta record headers 213 * and trailers except for the very first one. 214 */ 215 static 216 void 217 jnormalize(struct jstream *js) 218 { 219 struct jstream *jscan; 220 off_t off; 221 222 js->js_normalized_off = 0; 223 js->js_normalized_base = (void *)js->js_head; 224 js->js_normalized_size = js->js_head->recsize - sizeof(struct journal_rawrecend); 225 js->js_normalized_total = js->js_normalized_size; 226 off = js->js_normalized_size; 227 for (jscan = js->js_next; jscan; jscan = jscan->js_next) { 228 jscan->js_normalized_off = off; 229 jscan->js_normalized_base = (char *)jscan->js_head + 230 sizeof(struct journal_rawrecbeg); 231 jscan->js_normalized_size = jscan->js_head->recsize - 232 sizeof(struct journal_rawrecbeg) - 233 sizeof(struct journal_rawrecend); 234 off += jscan->js_normalized_size; 235 js->js_normalized_total += jscan->js_normalized_size; 236 } 237 } 238 239 /* 240 * For sanity's sake I will describe the normal backtracking that must occur, 241 * but this routine must also operate on reverse-scanned (undo) records 242 * by forward tracking. 243 * 244 * A record has been found that represents the middle or end of a transaction 245 * when we were expecting the beginning of a transaction. We must backtrack 246 * to locate the start of the transaction, then process raw records relating 247 * to the transaction until we reach our current point (jd) again. If 248 * we find a matching streamid representing the end of a transaction instead 249 * of the expected start-of-transaction that record belongs to a wholely 250 * different meta-transaction and the record we seek is known to not be 251 * available. 252 * 253 * jd is the current record, directon is the normal scan direction (we have 254 * to scan in the reverse direction). 255 */ 256 static 257 int 258 jaddrecord_backtrack(struct jsession *ss, struct jdata *jd) 259 { 260 struct jfile *jf = ss->ss_jfin; 261 struct jdata *scan; 262 struct jstream *js; 263 u_int16_t streamid; 264 u_int16_t scanid; 265 266 assert(ss->ss_direction == JD_FORWARDS || ss->ss_direction == JD_BACKWARDS); 267 if (jmodes & JMODEF_INPUT_PIPE) 268 return(-1); 269 270 streamid = ((struct journal_rawrecbeg *)jd->jd_data)->streamid & JREC_STREAMID_MASK; 271 272 if (ss->ss_direction == JD_FORWARDS) { 273 /* 274 * Backtrack in the reverse direction looking for the transaction 275 * start bit. If we find an end bit instead it belongs to an 276 * unrelated transaction using the same streamid and there is no 277 * point continuing. 278 */ 279 scan = jref(jd); 280 while ((scan = jread(jf, scan, JD_BACKWARDS)) != NULL) { 281 scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid; 282 if ((scanid & JREC_STREAMID_MASK) != streamid) 283 continue; 284 if (scanid & JREC_STREAMCTL_END) { 285 jfree(jf, scan); 286 return(-1); 287 } 288 if (scanid & JREC_STREAMCTL_BEGIN) 289 break; 290 } 291 292 /* 293 * Now jaddrecord the related records. 294 */ 295 while (scan != NULL && scan->jd_transid < jd->jd_transid) { 296 scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid; 297 if ((scanid & JREC_STREAMID_MASK) == streamid) { 298 js = jaddrecord(ss, scan); 299 assert(js == NULL); 300 } 301 scan = jread(jf, scan, JD_FORWARDS); 302 } 303 if (scan == NULL) 304 return(-1); 305 jfree(jf, scan); 306 } else { 307 /* 308 * Backtrack in the forwards direction looking for the transaction 309 * end bit. If we find a start bit instead if belongs to an 310 * unrelated transaction using the same streamid and there is no 311 * point continuing. 312 */ 313 scan = jref(jd); 314 while ((scan = jread(jf, scan, JD_FORWARDS)) != NULL) { 315 scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid; 316 if ((scanid & JREC_STREAMID_MASK) != streamid) 317 continue; 318 if (scanid & JREC_STREAMCTL_BEGIN) { 319 jfree(jf, scan); 320 return(-1); 321 } 322 if (scanid & JREC_STREAMCTL_END) 323 break; 324 } 325 326 /* 327 * Now jaddrecord the related records. 328 */ 329 while (scan != NULL && scan->jd_transid > jd->jd_transid) { 330 scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid; 331 if ((scanid & JREC_STREAMID_MASK) == streamid) { 332 js = jaddrecord(ss, scan); 333 assert(js == NULL); 334 } 335 scan = jread(jf, scan, JD_BACKWARDS); 336 } 337 if (scan == NULL) 338 return(-1); 339 jfree(jf, scan); 340 } 341 return(0); 342 } 343 344 void 345 jscan_dispose(struct jstream *js) 346 { 347 struct jstream *jnext; 348 349 if (js->js_alloc_buf) { 350 free(js->js_alloc_buf); 351 js->js_alloc_buf = NULL; 352 js->js_alloc_size = 0; 353 } 354 355 while (js) { 356 jnext = js->js_next; 357 jfree(js->js_session->ss_jfin, js->js_jdata); 358 js->js_jdata = NULL; 359 free(js); 360 js = jnext; 361 } 362 } 363 364 /* 365 * Read the specified block of data out of a linked set of jstream 366 * structures. Returns 0 on success or an error code on error. 367 */ 368 int 369 jsread(struct jstream *js, off_t off, void *buf, int bytes) 370 { 371 const void *ptr; 372 int n; 373 374 while (bytes) { 375 n = jsreadany(js, off, &ptr); 376 if (n == 0) 377 return (ENOENT); 378 if (n > bytes) 379 n = bytes; 380 bcopy(ptr, buf, n); 381 buf = (char *)buf + n; 382 off += n; 383 bytes -= n; 384 } 385 return(0); 386 } 387 388 /* 389 * Read the specified block of data out of a linked set of jstream 390 * structures. Attempt to return a pointer into the data set but 391 * allocate and copy if that is not possible. Returns 0 on success 392 * or an error code on error. 393 */ 394 int 395 jsreadp(struct jstream *js, off_t off, const void **bufp, 396 int bytes) 397 { 398 int error = 0; 399 int n; 400 401 n = jsreadany(js, off, bufp); 402 if (n < bytes) { 403 if (js->js_alloc_size < bytes) { 404 if (js->js_alloc_buf) 405 free(js->js_alloc_buf); 406 js->js_alloc_buf = malloc(bytes); 407 js->js_alloc_size = bytes; 408 if (js->js_alloc_buf == NULL) 409 fprintf(stderr, "attempt to allocate %d bytes failed\n", bytes); 410 assert(js->js_alloc_buf != NULL); 411 } 412 error = jsread(js, off, js->js_alloc_buf, bytes); 413 if (error) { 414 *bufp = NULL; 415 } else { 416 *bufp = js->js_alloc_buf; 417 } 418 } 419 return(error); 420 } 421 422 int 423 jsreadcallback(struct jstream *js, ssize_t (*func)(int, const void *, size_t), 424 int fd, off_t off, int bytes) 425 { 426 const void *bufp; 427 int res; 428 int n; 429 int r; 430 431 res = 0; 432 while (bytes && (n = jsreadany(js, off, &bufp)) > 0) { 433 if (n > bytes) 434 n = bytes; 435 r = func(fd, bufp, n); 436 if (r != n) { 437 if (res == 0) 438 res = -1; 439 } 440 res += n; 441 bytes -= n; 442 off += n; 443 } 444 return(res); 445 } 446 447 /* 448 * Return the largest contiguous buffer starting at the specified offset, 449 * or 0. 450 */ 451 int 452 jsreadany(struct jstream *js, off_t off, const void **bufp) 453 { 454 struct jstream *scan; 455 int n; 456 457 if ((scan = js->js_cache) == NULL || scan->js_normalized_off > off) 458 scan = js; 459 while (scan && scan->js_normalized_off <= off) { 460 js->js_cache = scan; 461 if (scan->js_normalized_off + scan->js_normalized_size > off) { 462 n = (int)(off - scan->js_normalized_off); 463 *bufp = scan->js_normalized_base + n; 464 return(scan->js_normalized_size - n); 465 } 466 scan = scan->js_next; 467 } 468 return(0); 469 } 470 471