1// Copyright 2011 The LevelDB-Go Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5// Taken from: https://code.google.com/p/leveldb-go/source/browse/leveldb/record/record.go?r=1d5ccbe03246da926391ee12d1c6caae054ff4b0 6// License, authors and contributors informations can be found at bellow URLs respectively: 7// https://code.google.com/p/leveldb-go/source/browse/LICENSE 8// https://code.google.com/p/leveldb-go/source/browse/AUTHORS 9// https://code.google.com/p/leveldb-go/source/browse/CONTRIBUTORS 10 11// Package journal reads and writes sequences of journals. Each journal is a stream 12// of bytes that completes before the next journal starts. 13// 14// When reading, call Next to obtain an io.Reader for the next journal. Next will 15// return io.EOF when there are no more journals. It is valid to call Next 16// without reading the current journal to exhaustion. 17// 18// When writing, call Next to obtain an io.Writer for the next journal. Calling 19// Next finishes the current journal. Call Close to finish the final journal. 20// 21// Optionally, call Flush to finish the current journal and flush the underlying 22// writer without starting a new journal. To start a new journal after flushing, 23// call Next. 24// 25// Neither Readers or Writers are safe to use concurrently. 26// 27// Example code: 28// func read(r io.Reader) ([]string, error) { 29// var ss []string 30// journals := journal.NewReader(r, nil, true, true) 31// for { 32// j, err := journals.Next() 33// if err == io.EOF { 34// break 35// } 36// if err != nil { 37// return nil, err 38// } 39// s, err := ioutil.ReadAll(j) 40// if err != nil { 41// return nil, err 42// } 43// ss = append(ss, string(s)) 44// } 45// return ss, nil 46// } 47// 48// func write(w io.Writer, ss []string) error { 49// journals := journal.NewWriter(w) 50// for _, s := range ss { 51// j, err := journals.Next() 52// if err != nil { 53// return err 54// } 55// if _, err := j.Write([]byte(s)), err != nil { 56// return err 57// } 58// } 59// return journals.Close() 60// } 61// 62// The wire format is that the stream is divided into 32KiB blocks, and each 63// block contains a number of tightly packed chunks. Chunks cannot cross block 64// boundaries. The last block may be shorter than 32 KiB. Any unused bytes in a 65// block must be zero. 66// 67// A journal maps to one or more chunks. Each chunk has a 7 byte header (a 4 68// byte checksum, a 2 byte little-endian uint16 length, and a 1 byte chunk type) 69// followed by a payload. The checksum is over the chunk type and the payload. 70// 71// There are four chunk types: whether the chunk is the full journal, or the 72// first, middle or last chunk of a multi-chunk journal. A multi-chunk journal 73// has one first chunk, zero or more middle chunks, and one last chunk. 74// 75// The wire format allows for limited recovery in the face of data corruption: 76// on a format error (such as a checksum mismatch), the reader moves to the 77// next block and looks for the next full or first chunk. 78package journal 79 80import ( 81 "encoding/binary" 82 "fmt" 83 "io" 84 85 "github.com/syndtr/goleveldb/leveldb/errors" 86 "github.com/syndtr/goleveldb/leveldb/util" 87) 88 89// These constants are part of the wire format and should not be changed. 90const ( 91 fullChunkType = 1 92 firstChunkType = 2 93 middleChunkType = 3 94 lastChunkType = 4 95) 96 97const ( 98 blockSize = 32 * 1024 99 headerSize = 7 100) 101 102type flusher interface { 103 Flush() error 104} 105 106// ErrCorrupted is the error type that generated by corrupted block or chunk. 107type ErrCorrupted struct { 108 Size int 109 Reason string 110} 111 112func (e *ErrCorrupted) Error() string { 113 return fmt.Sprintf("leveldb/journal: block/chunk corrupted: %s (%d bytes)", e.Reason, e.Size) 114} 115 116// Dropper is the interface that wrap simple Drop method. The Drop 117// method will be called when the journal reader dropping a block or chunk. 118type Dropper interface { 119 Drop(err error) 120} 121 122// Reader reads journals from an underlying io.Reader. 123type Reader struct { 124 // r is the underlying reader. 125 r io.Reader 126 // the dropper. 127 dropper Dropper 128 // strict flag. 129 strict bool 130 // checksum flag. 131 checksum bool 132 // seq is the sequence number of the current journal. 133 seq int 134 // buf[i:j] is the unread portion of the current chunk's payload. 135 // The low bound, i, excludes the chunk header. 136 i, j int 137 // n is the number of bytes of buf that are valid. Once reading has started, 138 // only the final block can have n < blockSize. 139 n int 140 // last is whether the current chunk is the last chunk of the journal. 141 last bool 142 // err is any accumulated error. 143 err error 144 // buf is the buffer. 145 buf [blockSize]byte 146} 147 148// NewReader returns a new reader. The dropper may be nil, and if 149// strict is true then corrupted or invalid chunk will halt the journal 150// reader entirely. 151func NewReader(r io.Reader, dropper Dropper, strict, checksum bool) *Reader { 152 return &Reader{ 153 r: r, 154 dropper: dropper, 155 strict: strict, 156 checksum: checksum, 157 last: true, 158 } 159} 160 161var errSkip = errors.New("leveldb/journal: skipped") 162 163func (r *Reader) corrupt(n int, reason string, skip bool) error { 164 if r.dropper != nil { 165 r.dropper.Drop(&ErrCorrupted{n, reason}) 166 } 167 if r.strict && !skip { 168 r.err = errors.NewErrCorrupted(nil, &ErrCorrupted{n, reason}) 169 return r.err 170 } 171 return errSkip 172} 173 174// nextChunk sets r.buf[r.i:r.j] to hold the next chunk's payload, reading the 175// next block into the buffer if necessary. 176func (r *Reader) nextChunk(first bool) error { 177 for { 178 if r.j+headerSize <= r.n { 179 checksum := binary.LittleEndian.Uint32(r.buf[r.j+0 : r.j+4]) 180 length := binary.LittleEndian.Uint16(r.buf[r.j+4 : r.j+6]) 181 chunkType := r.buf[r.j+6] 182 183 if checksum == 0 && length == 0 && chunkType == 0 { 184 // Drop entire block. 185 m := r.n - r.j 186 r.i = r.n 187 r.j = r.n 188 return r.corrupt(m, "zero header", false) 189 } else { 190 m := r.n - r.j 191 r.i = r.j + headerSize 192 r.j = r.j + headerSize + int(length) 193 if r.j > r.n { 194 // Drop entire block. 195 r.i = r.n 196 r.j = r.n 197 return r.corrupt(m, "chunk length overflows block", false) 198 } else if r.checksum && checksum != util.NewCRC(r.buf[r.i-1:r.j]).Value() { 199 // Drop entire block. 200 r.i = r.n 201 r.j = r.n 202 return r.corrupt(m, "checksum mismatch", false) 203 } 204 } 205 if first && chunkType != fullChunkType && chunkType != firstChunkType { 206 m := r.j - r.i 207 r.i = r.j 208 // Report the error, but skip it. 209 return r.corrupt(m+headerSize, "orphan chunk", true) 210 } 211 r.last = chunkType == fullChunkType || chunkType == lastChunkType 212 return nil 213 } 214 215 // The last block. 216 if r.n < blockSize && r.n > 0 { 217 if !first { 218 return r.corrupt(0, "missing chunk part", false) 219 } 220 r.err = io.EOF 221 return r.err 222 } 223 224 // Read block. 225 n, err := io.ReadFull(r.r, r.buf[:]) 226 if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { 227 return err 228 } 229 if n == 0 { 230 if !first { 231 return r.corrupt(0, "missing chunk part", false) 232 } 233 r.err = io.EOF 234 return r.err 235 } 236 r.i, r.j, r.n = 0, 0, n 237 } 238} 239 240// Next returns a reader for the next journal. It returns io.EOF if there are no 241// more journals. The reader returned becomes stale after the next Next call, 242// and should no longer be used. If strict is false, the reader will returns 243// io.ErrUnexpectedEOF error when found corrupted journal. 244func (r *Reader) Next() (io.Reader, error) { 245 r.seq++ 246 if r.err != nil { 247 return nil, r.err 248 } 249 r.i = r.j 250 for { 251 if err := r.nextChunk(true); err == nil { 252 break 253 } else if err != errSkip { 254 return nil, err 255 } 256 } 257 return &singleReader{r, r.seq, nil}, nil 258} 259 260// Reset resets the journal reader, allows reuse of the journal reader. Reset returns 261// last accumulated error. 262func (r *Reader) Reset(reader io.Reader, dropper Dropper, strict, checksum bool) error { 263 r.seq++ 264 err := r.err 265 r.r = reader 266 r.dropper = dropper 267 r.strict = strict 268 r.checksum = checksum 269 r.i = 0 270 r.j = 0 271 r.n = 0 272 r.last = true 273 r.err = nil 274 return err 275} 276 277type singleReader struct { 278 r *Reader 279 seq int 280 err error 281} 282 283func (x *singleReader) Read(p []byte) (int, error) { 284 r := x.r 285 if r.seq != x.seq { 286 return 0, errors.New("leveldb/journal: stale reader") 287 } 288 if x.err != nil { 289 return 0, x.err 290 } 291 if r.err != nil { 292 return 0, r.err 293 } 294 for r.i == r.j { 295 if r.last { 296 return 0, io.EOF 297 } 298 x.err = r.nextChunk(false) 299 if x.err != nil { 300 if x.err == errSkip { 301 x.err = io.ErrUnexpectedEOF 302 } 303 return 0, x.err 304 } 305 } 306 n := copy(p, r.buf[r.i:r.j]) 307 r.i += n 308 return n, nil 309} 310 311func (x *singleReader) ReadByte() (byte, error) { 312 r := x.r 313 if r.seq != x.seq { 314 return 0, errors.New("leveldb/journal: stale reader") 315 } 316 if x.err != nil { 317 return 0, x.err 318 } 319 if r.err != nil { 320 return 0, r.err 321 } 322 for r.i == r.j { 323 if r.last { 324 return 0, io.EOF 325 } 326 x.err = r.nextChunk(false) 327 if x.err != nil { 328 if x.err == errSkip { 329 x.err = io.ErrUnexpectedEOF 330 } 331 return 0, x.err 332 } 333 } 334 c := r.buf[r.i] 335 r.i++ 336 return c, nil 337} 338 339// Writer writes journals to an underlying io.Writer. 340type Writer struct { 341 // w is the underlying writer. 342 w io.Writer 343 // seq is the sequence number of the current journal. 344 seq int 345 // f is w as a flusher. 346 f flusher 347 // buf[i:j] is the bytes that will become the current chunk. 348 // The low bound, i, includes the chunk header. 349 i, j int 350 // buf[:written] has already been written to w. 351 // written is zero unless Flush has been called. 352 written int 353 // first is whether the current chunk is the first chunk of the journal. 354 first bool 355 // pending is whether a chunk is buffered but not yet written. 356 pending bool 357 // err is any accumulated error. 358 err error 359 // buf is the buffer. 360 buf [blockSize]byte 361} 362 363// NewWriter returns a new Writer. 364func NewWriter(w io.Writer) *Writer { 365 f, _ := w.(flusher) 366 return &Writer{ 367 w: w, 368 f: f, 369 } 370} 371 372// fillHeader fills in the header for the pending chunk. 373func (w *Writer) fillHeader(last bool) { 374 if w.i+headerSize > w.j || w.j > blockSize { 375 panic("leveldb/journal: bad writer state") 376 } 377 if last { 378 if w.first { 379 w.buf[w.i+6] = fullChunkType 380 } else { 381 w.buf[w.i+6] = lastChunkType 382 } 383 } else { 384 if w.first { 385 w.buf[w.i+6] = firstChunkType 386 } else { 387 w.buf[w.i+6] = middleChunkType 388 } 389 } 390 binary.LittleEndian.PutUint32(w.buf[w.i+0:w.i+4], util.NewCRC(w.buf[w.i+6:w.j]).Value()) 391 binary.LittleEndian.PutUint16(w.buf[w.i+4:w.i+6], uint16(w.j-w.i-headerSize)) 392} 393 394// writeBlock writes the buffered block to the underlying writer, and reserves 395// space for the next chunk's header. 396func (w *Writer) writeBlock() { 397 _, w.err = w.w.Write(w.buf[w.written:]) 398 w.i = 0 399 w.j = headerSize 400 w.written = 0 401} 402 403// writePending finishes the current journal and writes the buffer to the 404// underlying writer. 405func (w *Writer) writePending() { 406 if w.err != nil { 407 return 408 } 409 if w.pending { 410 w.fillHeader(true) 411 w.pending = false 412 } 413 _, w.err = w.w.Write(w.buf[w.written:w.j]) 414 w.written = w.j 415} 416 417// Close finishes the current journal and closes the writer. 418func (w *Writer) Close() error { 419 w.seq++ 420 w.writePending() 421 if w.err != nil { 422 return w.err 423 } 424 w.err = errors.New("leveldb/journal: closed Writer") 425 return nil 426} 427 428// Flush finishes the current journal, writes to the underlying writer, and 429// flushes it if that writer implements interface{ Flush() error }. 430func (w *Writer) Flush() error { 431 w.seq++ 432 w.writePending() 433 if w.err != nil { 434 return w.err 435 } 436 if w.f != nil { 437 w.err = w.f.Flush() 438 return w.err 439 } 440 return nil 441} 442 443// Reset resets the journal writer, allows reuse of the journal writer. Reset 444// will also closes the journal writer if not already. 445func (w *Writer) Reset(writer io.Writer) (err error) { 446 w.seq++ 447 if w.err == nil { 448 w.writePending() 449 err = w.err 450 } 451 w.w = writer 452 w.f, _ = writer.(flusher) 453 w.i = 0 454 w.j = 0 455 w.written = 0 456 w.first = false 457 w.pending = false 458 w.err = nil 459 return 460} 461 462// Next returns a writer for the next journal. The writer returned becomes stale 463// after the next Close, Flush or Next call, and should no longer be used. 464func (w *Writer) Next() (io.Writer, error) { 465 w.seq++ 466 if w.err != nil { 467 return nil, w.err 468 } 469 if w.pending { 470 w.fillHeader(true) 471 } 472 w.i = w.j 473 w.j = w.j + headerSize 474 // Check if there is room in the block for the header. 475 if w.j > blockSize { 476 // Fill in the rest of the block with zeroes. 477 for k := w.i; k < blockSize; k++ { 478 w.buf[k] = 0 479 } 480 w.writeBlock() 481 if w.err != nil { 482 return nil, w.err 483 } 484 } 485 w.first = true 486 w.pending = true 487 return singleWriter{w, w.seq}, nil 488} 489 490type singleWriter struct { 491 w *Writer 492 seq int 493} 494 495func (x singleWriter) Write(p []byte) (int, error) { 496 w := x.w 497 if w.seq != x.seq { 498 return 0, errors.New("leveldb/journal: stale writer") 499 } 500 if w.err != nil { 501 return 0, w.err 502 } 503 n0 := len(p) 504 for len(p) > 0 { 505 // Write a block, if it is full. 506 if w.j == blockSize { 507 w.fillHeader(false) 508 w.writeBlock() 509 if w.err != nil { 510 return 0, w.err 511 } 512 w.first = false 513 } 514 // Copy bytes into the buffer. 515 n := copy(w.buf[w.j:], p) 516 w.j += n 517 p = p[n:] 518 } 519 return n0, nil 520} 521