1// Copyright 2011 The LevelDB-Go Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5// Taken from: https://code.google.com/p/leveldb-go/source/browse/leveldb/record/record.go?r=1d5ccbe03246da926391ee12d1c6caae054ff4b0 6// License, authors and contributors informations can be found at bellow URLs respectively: 7// https://code.google.com/p/leveldb-go/source/browse/LICENSE 8// https://code.google.com/p/leveldb-go/source/browse/AUTHORS 9// https://code.google.com/p/leveldb-go/source/browse/CONTRIBUTORS 10 11// Package journal reads and writes sequences of journals. Each journal is a stream 12// of bytes that completes before the next journal starts. 13// 14// When reading, call Next to obtain an io.Reader for the next journal. Next will 15// return io.EOF when there are no more journals. It is valid to call Next 16// without reading the current journal to exhaustion. 17// 18// When writing, call Next to obtain an io.Writer for the next journal. Calling 19// Next finishes the current journal. Call Close to finish the final journal. 20// 21// Optionally, call Flush to finish the current journal and flush the underlying 22// writer without starting a new journal. To start a new journal after flushing, 23// call Next. 24// 25// Neither Readers or Writers are safe to use concurrently. 26// 27// Example code: 28// func read(r io.Reader) ([]string, error) { 29// var ss []string 30// journals := journal.NewReader(r, nil, true, true) 31// for { 32// j, err := journals.Next() 33// if err == io.EOF { 34// break 35// } 36// if err != nil { 37// return nil, err 38// } 39// s, err := ioutil.ReadAll(j) 40// if err != nil { 41// return nil, err 42// } 43// ss = append(ss, string(s)) 44// } 45// return ss, nil 46// } 47// 48// func write(w io.Writer, ss []string) error { 49// journals := journal.NewWriter(w) 50// for _, s := range ss { 51// j, err := journals.Next() 52// if err != nil { 53// return err 54// } 55// if _, err := j.Write([]byte(s)), err != nil { 56// return err 57// } 58// } 59// return journals.Close() 60// } 61// 62// The wire format is that the stream is divided into 32KiB blocks, and each 63// block contains a number of tightly packed chunks. Chunks cannot cross block 64// boundaries. The last block may be shorter than 32 KiB. Any unused bytes in a 65// block must be zero. 66// 67// A journal maps to one or more chunks. Each chunk has a 7 byte header (a 4 68// byte checksum, a 2 byte little-endian uint16 length, and a 1 byte chunk type) 69// followed by a payload. The checksum is over the chunk type and the payload. 70// 71// There are four chunk types: whether the chunk is the full journal, or the 72// first, middle or last chunk of a multi-chunk journal. A multi-chunk journal 73// has one first chunk, zero or more middle chunks, and one last chunk. 74// 75// The wire format allows for limited recovery in the face of data corruption: 76// on a format error (such as a checksum mismatch), the reader moves to the 77// next block and looks for the next full or first chunk. 78package journal 79 80import ( 81 "encoding/binary" 82 "fmt" 83 "io" 84 85 "github.com/syndtr/goleveldb/leveldb/errors" 86 "github.com/syndtr/goleveldb/leveldb/storage" 87 "github.com/syndtr/goleveldb/leveldb/util" 88) 89 90// These constants are part of the wire format and should not be changed. 91const ( 92 fullChunkType = 1 93 firstChunkType = 2 94 middleChunkType = 3 95 lastChunkType = 4 96) 97 98const ( 99 blockSize = 32 * 1024 100 headerSize = 7 101) 102 103type flusher interface { 104 Flush() error 105} 106 107// ErrCorrupted is the error type that generated by corrupted block or chunk. 108type ErrCorrupted struct { 109 Size int 110 Reason string 111} 112 113func (e *ErrCorrupted) Error() string { 114 return fmt.Sprintf("leveldb/journal: block/chunk corrupted: %s (%d bytes)", e.Reason, e.Size) 115} 116 117// Dropper is the interface that wrap simple Drop method. The Drop 118// method will be called when the journal reader dropping a block or chunk. 119type Dropper interface { 120 Drop(err error) 121} 122 123// Reader reads journals from an underlying io.Reader. 124type Reader struct { 125 // r is the underlying reader. 126 r io.Reader 127 // the dropper. 128 dropper Dropper 129 // strict flag. 130 strict bool 131 // checksum flag. 132 checksum bool 133 // seq is the sequence number of the current journal. 134 seq int 135 // buf[i:j] is the unread portion of the current chunk's payload. 136 // The low bound, i, excludes the chunk header. 137 i, j int 138 // n is the number of bytes of buf that are valid. Once reading has started, 139 // only the final block can have n < blockSize. 140 n int 141 // last is whether the current chunk is the last chunk of the journal. 142 last bool 143 // err is any accumulated error. 144 err error 145 // buf is the buffer. 146 buf [blockSize]byte 147} 148 149// NewReader returns a new reader. The dropper may be nil, and if 150// strict is true then corrupted or invalid chunk will halt the journal 151// reader entirely. 152func NewReader(r io.Reader, dropper Dropper, strict, checksum bool) *Reader { 153 return &Reader{ 154 r: r, 155 dropper: dropper, 156 strict: strict, 157 checksum: checksum, 158 last: true, 159 } 160} 161 162var errSkip = errors.New("leveldb/journal: skipped") 163 164func (r *Reader) corrupt(n int, reason string, skip bool) error { 165 if r.dropper != nil { 166 r.dropper.Drop(&ErrCorrupted{n, reason}) 167 } 168 if r.strict && !skip { 169 r.err = errors.NewErrCorrupted(storage.FileDesc{}, &ErrCorrupted{n, reason}) 170 return r.err 171 } 172 return errSkip 173} 174 175// nextChunk sets r.buf[r.i:r.j] to hold the next chunk's payload, reading the 176// next block into the buffer if necessary. 177func (r *Reader) nextChunk(first bool) error { 178 for { 179 if r.j+headerSize <= r.n { 180 checksum := binary.LittleEndian.Uint32(r.buf[r.j+0 : r.j+4]) 181 length := binary.LittleEndian.Uint16(r.buf[r.j+4 : r.j+6]) 182 chunkType := r.buf[r.j+6] 183 unprocBlock := r.n - r.j 184 if checksum == 0 && length == 0 && chunkType == 0 { 185 // Drop entire block. 186 r.i = r.n 187 r.j = r.n 188 return r.corrupt(unprocBlock, "zero header", false) 189 } 190 if chunkType < fullChunkType || chunkType > lastChunkType { 191 // Drop entire block. 192 r.i = r.n 193 r.j = r.n 194 return r.corrupt(unprocBlock, fmt.Sprintf("invalid chunk type %#x", chunkType), false) 195 } 196 r.i = r.j + headerSize 197 r.j = r.j + headerSize + int(length) 198 if r.j > r.n { 199 // Drop entire block. 200 r.i = r.n 201 r.j = r.n 202 return r.corrupt(unprocBlock, "chunk length overflows block", false) 203 } else if r.checksum && checksum != util.NewCRC(r.buf[r.i-1:r.j]).Value() { 204 // Drop entire block. 205 r.i = r.n 206 r.j = r.n 207 return r.corrupt(unprocBlock, "checksum mismatch", false) 208 } 209 if first && chunkType != fullChunkType && chunkType != firstChunkType { 210 chunkLength := (r.j - r.i) + headerSize 211 r.i = r.j 212 // Report the error, but skip it. 213 return r.corrupt(chunkLength, "orphan chunk", true) 214 } 215 r.last = chunkType == fullChunkType || chunkType == lastChunkType 216 return nil 217 } 218 219 // The last block. 220 if r.n < blockSize && r.n > 0 { 221 if !first { 222 return r.corrupt(0, "missing chunk part", false) 223 } 224 r.err = io.EOF 225 return r.err 226 } 227 228 // Read block. 229 n, err := io.ReadFull(r.r, r.buf[:]) 230 if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { 231 return err 232 } 233 if n == 0 { 234 if !first { 235 return r.corrupt(0, "missing chunk part", false) 236 } 237 r.err = io.EOF 238 return r.err 239 } 240 r.i, r.j, r.n = 0, 0, n 241 } 242} 243 244// Next returns a reader for the next journal. It returns io.EOF if there are no 245// more journals. The reader returned becomes stale after the next Next call, 246// and should no longer be used. If strict is false, the reader will returns 247// io.ErrUnexpectedEOF error when found corrupted journal. 248func (r *Reader) Next() (io.Reader, error) { 249 r.seq++ 250 if r.err != nil { 251 return nil, r.err 252 } 253 r.i = r.j 254 for { 255 if err := r.nextChunk(true); err == nil { 256 break 257 } else if err != errSkip { 258 return nil, err 259 } 260 } 261 return &singleReader{r, r.seq, nil}, nil 262} 263 264// Reset resets the journal reader, allows reuse of the journal reader. Reset returns 265// last accumulated error. 266func (r *Reader) Reset(reader io.Reader, dropper Dropper, strict, checksum bool) error { 267 r.seq++ 268 err := r.err 269 r.r = reader 270 r.dropper = dropper 271 r.strict = strict 272 r.checksum = checksum 273 r.i = 0 274 r.j = 0 275 r.n = 0 276 r.last = true 277 r.err = nil 278 return err 279} 280 281type singleReader struct { 282 r *Reader 283 seq int 284 err error 285} 286 287func (x *singleReader) Read(p []byte) (int, error) { 288 r := x.r 289 if r.seq != x.seq { 290 return 0, errors.New("leveldb/journal: stale reader") 291 } 292 if x.err != nil { 293 return 0, x.err 294 } 295 if r.err != nil { 296 return 0, r.err 297 } 298 for r.i == r.j { 299 if r.last { 300 return 0, io.EOF 301 } 302 x.err = r.nextChunk(false) 303 if x.err != nil { 304 if x.err == errSkip { 305 x.err = io.ErrUnexpectedEOF 306 } 307 return 0, x.err 308 } 309 } 310 n := copy(p, r.buf[r.i:r.j]) 311 r.i += n 312 return n, nil 313} 314 315func (x *singleReader) ReadByte() (byte, error) { 316 r := x.r 317 if r.seq != x.seq { 318 return 0, errors.New("leveldb/journal: stale reader") 319 } 320 if x.err != nil { 321 return 0, x.err 322 } 323 if r.err != nil { 324 return 0, r.err 325 } 326 for r.i == r.j { 327 if r.last { 328 return 0, io.EOF 329 } 330 x.err = r.nextChunk(false) 331 if x.err != nil { 332 if x.err == errSkip { 333 x.err = io.ErrUnexpectedEOF 334 } 335 return 0, x.err 336 } 337 } 338 c := r.buf[r.i] 339 r.i++ 340 return c, nil 341} 342 343// Writer writes journals to an underlying io.Writer. 344type Writer struct { 345 // w is the underlying writer. 346 w io.Writer 347 // seq is the sequence number of the current journal. 348 seq int 349 // f is w as a flusher. 350 f flusher 351 // buf[i:j] is the bytes that will become the current chunk. 352 // The low bound, i, includes the chunk header. 353 i, j int 354 // buf[:written] has already been written to w. 355 // written is zero unless Flush has been called. 356 written int 357 // first is whether the current chunk is the first chunk of the journal. 358 first bool 359 // pending is whether a chunk is buffered but not yet written. 360 pending bool 361 // err is any accumulated error. 362 err error 363 // buf is the buffer. 364 buf [blockSize]byte 365} 366 367// NewWriter returns a new Writer. 368func NewWriter(w io.Writer) *Writer { 369 f, _ := w.(flusher) 370 return &Writer{ 371 w: w, 372 f: f, 373 } 374} 375 376// fillHeader fills in the header for the pending chunk. 377func (w *Writer) fillHeader(last bool) { 378 if w.i+headerSize > w.j || w.j > blockSize { 379 panic("leveldb/journal: bad writer state") 380 } 381 if last { 382 if w.first { 383 w.buf[w.i+6] = fullChunkType 384 } else { 385 w.buf[w.i+6] = lastChunkType 386 } 387 } else { 388 if w.first { 389 w.buf[w.i+6] = firstChunkType 390 } else { 391 w.buf[w.i+6] = middleChunkType 392 } 393 } 394 binary.LittleEndian.PutUint32(w.buf[w.i+0:w.i+4], util.NewCRC(w.buf[w.i+6:w.j]).Value()) 395 binary.LittleEndian.PutUint16(w.buf[w.i+4:w.i+6], uint16(w.j-w.i-headerSize)) 396} 397 398// writeBlock writes the buffered block to the underlying writer, and reserves 399// space for the next chunk's header. 400func (w *Writer) writeBlock() { 401 _, w.err = w.w.Write(w.buf[w.written:]) 402 w.i = 0 403 w.j = headerSize 404 w.written = 0 405} 406 407// writePending finishes the current journal and writes the buffer to the 408// underlying writer. 409func (w *Writer) writePending() { 410 if w.err != nil { 411 return 412 } 413 if w.pending { 414 w.fillHeader(true) 415 w.pending = false 416 } 417 _, w.err = w.w.Write(w.buf[w.written:w.j]) 418 w.written = w.j 419} 420 421// Close finishes the current journal and closes the writer. 422func (w *Writer) Close() error { 423 w.seq++ 424 w.writePending() 425 if w.err != nil { 426 return w.err 427 } 428 w.err = errors.New("leveldb/journal: closed Writer") 429 return nil 430} 431 432// Flush finishes the current journal, writes to the underlying writer, and 433// flushes it if that writer implements interface{ Flush() error }. 434func (w *Writer) Flush() error { 435 w.seq++ 436 w.writePending() 437 if w.err != nil { 438 return w.err 439 } 440 if w.f != nil { 441 w.err = w.f.Flush() 442 return w.err 443 } 444 return nil 445} 446 447// Reset resets the journal writer, allows reuse of the journal writer. Reset 448// will also closes the journal writer if not already. 449func (w *Writer) Reset(writer io.Writer) (err error) { 450 w.seq++ 451 if w.err == nil { 452 w.writePending() 453 err = w.err 454 } 455 w.w = writer 456 w.f, _ = writer.(flusher) 457 w.i = 0 458 w.j = 0 459 w.written = 0 460 w.first = false 461 w.pending = false 462 w.err = nil 463 return 464} 465 466// Next returns a writer for the next journal. The writer returned becomes stale 467// after the next Close, Flush or Next call, and should no longer be used. 468func (w *Writer) Next() (io.Writer, error) { 469 w.seq++ 470 if w.err != nil { 471 return nil, w.err 472 } 473 if w.pending { 474 w.fillHeader(true) 475 } 476 w.i = w.j 477 w.j = w.j + headerSize 478 // Check if there is room in the block for the header. 479 if w.j > blockSize { 480 // Fill in the rest of the block with zeroes. 481 for k := w.i; k < blockSize; k++ { 482 w.buf[k] = 0 483 } 484 w.writeBlock() 485 if w.err != nil { 486 return nil, w.err 487 } 488 } 489 w.first = true 490 w.pending = true 491 return singleWriter{w, w.seq}, nil 492} 493 494type singleWriter struct { 495 w *Writer 496 seq int 497} 498 499func (x singleWriter) Write(p []byte) (int, error) { 500 w := x.w 501 if w.seq != x.seq { 502 return 0, errors.New("leveldb/journal: stale writer") 503 } 504 if w.err != nil { 505 return 0, w.err 506 } 507 n0 := len(p) 508 for len(p) > 0 { 509 // Write a block, if it is full. 510 if w.j == blockSize { 511 w.fillHeader(false) 512 w.writeBlock() 513 if w.err != nil { 514 return 0, w.err 515 } 516 w.first = false 517 } 518 // Copy bytes into the buffer. 519 n := copy(w.buf[w.j:], p) 520 w.j += n 521 p = p[n:] 522 } 523 return n0, nil 524} 525