1// Copyright 2011 The LevelDB-Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5// Taken from: https://code.google.com/p/leveldb-go/source/browse/leveldb/record/record.go?r=1d5ccbe03246da926391ee12d1c6caae054ff4b0
6// License, authors and contributors informations can be found at bellow URLs respectively:
7// 	https://code.google.com/p/leveldb-go/source/browse/LICENSE
8//	https://code.google.com/p/leveldb-go/source/browse/AUTHORS
9//  https://code.google.com/p/leveldb-go/source/browse/CONTRIBUTORS
10
11// Package journal reads and writes sequences of journals. Each journal is a stream
12// of bytes that completes before the next journal starts.
13//
14// When reading, call Next to obtain an io.Reader for the next journal. Next will
15// return io.EOF when there are no more journals. It is valid to call Next
16// without reading the current journal to exhaustion.
17//
18// When writing, call Next to obtain an io.Writer for the next journal. Calling
19// Next finishes the current journal. Call Close to finish the final journal.
20//
21// Optionally, call Flush to finish the current journal and flush the underlying
22// writer without starting a new journal. To start a new journal after flushing,
23// call Next.
24//
25// Neither Readers or Writers are safe to use concurrently.
26//
27// Example code:
28//	func read(r io.Reader) ([]string, error) {
29//		var ss []string
30//		journals := journal.NewReader(r, nil, true, true)
31//		for {
32//			j, err := journals.Next()
33//			if err == io.EOF {
34//				break
35//			}
36//			if err != nil {
37//				return nil, err
38//			}
39//			s, err := ioutil.ReadAll(j)
40//			if err != nil {
41//				return nil, err
42//			}
43//			ss = append(ss, string(s))
44//		}
45//		return ss, nil
46//	}
47//
48//	func write(w io.Writer, ss []string) error {
49//		journals := journal.NewWriter(w)
50//		for _, s := range ss {
51//			j, err := journals.Next()
52//			if err != nil {
53//				return err
54//			}
55//			if _, err := j.Write([]byte(s)), err != nil {
56//				return err
57//			}
58//		}
59//		return journals.Close()
60//	}
61//
62// The wire format is that the stream is divided into 32KiB blocks, and each
63// block contains a number of tightly packed chunks. Chunks cannot cross block
64// boundaries. The last block may be shorter than 32 KiB. Any unused bytes in a
65// block must be zero.
66//
67// A journal maps to one or more chunks. Each chunk has a 7 byte header (a 4
68// byte checksum, a 2 byte little-endian uint16 length, and a 1 byte chunk type)
69// followed by a payload. The checksum is over the chunk type and the payload.
70//
71// There are four chunk types: whether the chunk is the full journal, or the
72// first, middle or last chunk of a multi-chunk journal. A multi-chunk journal
73// has one first chunk, zero or more middle chunks, and one last chunk.
74//
75// The wire format allows for limited recovery in the face of data corruption:
76// on a format error (such as a checksum mismatch), the reader moves to the
77// next block and looks for the next full or first chunk.
78package journal
79
80import (
81	"encoding/binary"
82	"fmt"
83	"io"
84
85	"github.com/syndtr/goleveldb/leveldb/errors"
86	"github.com/syndtr/goleveldb/leveldb/storage"
87	"github.com/syndtr/goleveldb/leveldb/util"
88)
89
90// These constants are part of the wire format and should not be changed.
91const (
92	fullChunkType   = 1
93	firstChunkType  = 2
94	middleChunkType = 3
95	lastChunkType   = 4
96)
97
98const (
99	blockSize  = 32 * 1024
100	headerSize = 7
101)
102
103type flusher interface {
104	Flush() error
105}
106
107// ErrCorrupted is the error type that generated by corrupted block or chunk.
108type ErrCorrupted struct {
109	Size   int
110	Reason string
111}
112
113func (e *ErrCorrupted) Error() string {
114	return fmt.Sprintf("leveldb/journal: block/chunk corrupted: %s (%d bytes)", e.Reason, e.Size)
115}
116
117// Dropper is the interface that wrap simple Drop method. The Drop
118// method will be called when the journal reader dropping a block or chunk.
119type Dropper interface {
120	Drop(err error)
121}
122
123// Reader reads journals from an underlying io.Reader.
124type Reader struct {
125	// r is the underlying reader.
126	r io.Reader
127	// the dropper.
128	dropper Dropper
129	// strict flag.
130	strict bool
131	// checksum flag.
132	checksum bool
133	// seq is the sequence number of the current journal.
134	seq int
135	// buf[i:j] is the unread portion of the current chunk's payload.
136	// The low bound, i, excludes the chunk header.
137	i, j int
138	// n is the number of bytes of buf that are valid. Once reading has started,
139	// only the final block can have n < blockSize.
140	n int
141	// last is whether the current chunk is the last chunk of the journal.
142	last bool
143	// err is any accumulated error.
144	err error
145	// buf is the buffer.
146	buf [blockSize]byte
147}
148
149// NewReader returns a new reader. The dropper may be nil, and if
150// strict is true then corrupted or invalid chunk will halt the journal
151// reader entirely.
152func NewReader(r io.Reader, dropper Dropper, strict, checksum bool) *Reader {
153	return &Reader{
154		r:        r,
155		dropper:  dropper,
156		strict:   strict,
157		checksum: checksum,
158		last:     true,
159	}
160}
161
162var errSkip = errors.New("leveldb/journal: skipped")
163
164func (r *Reader) corrupt(n int, reason string, skip bool) error {
165	if r.dropper != nil {
166		r.dropper.Drop(&ErrCorrupted{n, reason})
167	}
168	if r.strict && !skip {
169		r.err = errors.NewErrCorrupted(storage.FileDesc{}, &ErrCorrupted{n, reason})
170		return r.err
171	}
172	return errSkip
173}
174
175// nextChunk sets r.buf[r.i:r.j] to hold the next chunk's payload, reading the
176// next block into the buffer if necessary.
177func (r *Reader) nextChunk(first bool) error {
178	for {
179		if r.j+headerSize <= r.n {
180			checksum := binary.LittleEndian.Uint32(r.buf[r.j+0 : r.j+4])
181			length := binary.LittleEndian.Uint16(r.buf[r.j+4 : r.j+6])
182			chunkType := r.buf[r.j+6]
183			unprocBlock := r.n - r.j
184			if checksum == 0 && length == 0 && chunkType == 0 {
185				// Drop entire block.
186				r.i = r.n
187				r.j = r.n
188				return r.corrupt(unprocBlock, "zero header", false)
189			}
190			if chunkType < fullChunkType || chunkType > lastChunkType {
191				// Drop entire block.
192				r.i = r.n
193				r.j = r.n
194				return r.corrupt(unprocBlock, fmt.Sprintf("invalid chunk type %#x", chunkType), false)
195			}
196			r.i = r.j + headerSize
197			r.j = r.j + headerSize + int(length)
198			if r.j > r.n {
199				// Drop entire block.
200				r.i = r.n
201				r.j = r.n
202				return r.corrupt(unprocBlock, "chunk length overflows block", false)
203			} else if r.checksum && checksum != util.NewCRC(r.buf[r.i-1:r.j]).Value() {
204				// Drop entire block.
205				r.i = r.n
206				r.j = r.n
207				return r.corrupt(unprocBlock, "checksum mismatch", false)
208			}
209			if first && chunkType != fullChunkType && chunkType != firstChunkType {
210				chunkLength := (r.j - r.i) + headerSize
211				r.i = r.j
212				// Report the error, but skip it.
213				return r.corrupt(chunkLength, "orphan chunk", true)
214			}
215			r.last = chunkType == fullChunkType || chunkType == lastChunkType
216			return nil
217		}
218
219		// The last block.
220		if r.n < blockSize && r.n > 0 {
221			if !first {
222				return r.corrupt(0, "missing chunk part", false)
223			}
224			r.err = io.EOF
225			return r.err
226		}
227
228		// Read block.
229		n, err := io.ReadFull(r.r, r.buf[:])
230		if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
231			return err
232		}
233		if n == 0 {
234			if !first {
235				return r.corrupt(0, "missing chunk part", false)
236			}
237			r.err = io.EOF
238			return r.err
239		}
240		r.i, r.j, r.n = 0, 0, n
241	}
242}
243
244// Next returns a reader for the next journal. It returns io.EOF if there are no
245// more journals. The reader returned becomes stale after the next Next call,
246// and should no longer be used. If strict is false, the reader will returns
247// io.ErrUnexpectedEOF error when found corrupted journal.
248func (r *Reader) Next() (io.Reader, error) {
249	r.seq++
250	if r.err != nil {
251		return nil, r.err
252	}
253	r.i = r.j
254	for {
255		if err := r.nextChunk(true); err == nil {
256			break
257		} else if err != errSkip {
258			return nil, err
259		}
260	}
261	return &singleReader{r, r.seq, nil}, nil
262}
263
264// Reset resets the journal reader, allows reuse of the journal reader. Reset returns
265// last accumulated error.
266func (r *Reader) Reset(reader io.Reader, dropper Dropper, strict, checksum bool) error {
267	r.seq++
268	err := r.err
269	r.r = reader
270	r.dropper = dropper
271	r.strict = strict
272	r.checksum = checksum
273	r.i = 0
274	r.j = 0
275	r.n = 0
276	r.last = true
277	r.err = nil
278	return err
279}
280
281type singleReader struct {
282	r   *Reader
283	seq int
284	err error
285}
286
287func (x *singleReader) Read(p []byte) (int, error) {
288	r := x.r
289	if r.seq != x.seq {
290		return 0, errors.New("leveldb/journal: stale reader")
291	}
292	if x.err != nil {
293		return 0, x.err
294	}
295	if r.err != nil {
296		return 0, r.err
297	}
298	for r.i == r.j {
299		if r.last {
300			return 0, io.EOF
301		}
302		x.err = r.nextChunk(false)
303		if x.err != nil {
304			if x.err == errSkip {
305				x.err = io.ErrUnexpectedEOF
306			}
307			return 0, x.err
308		}
309	}
310	n := copy(p, r.buf[r.i:r.j])
311	r.i += n
312	return n, nil
313}
314
315func (x *singleReader) ReadByte() (byte, error) {
316	r := x.r
317	if r.seq != x.seq {
318		return 0, errors.New("leveldb/journal: stale reader")
319	}
320	if x.err != nil {
321		return 0, x.err
322	}
323	if r.err != nil {
324		return 0, r.err
325	}
326	for r.i == r.j {
327		if r.last {
328			return 0, io.EOF
329		}
330		x.err = r.nextChunk(false)
331		if x.err != nil {
332			if x.err == errSkip {
333				x.err = io.ErrUnexpectedEOF
334			}
335			return 0, x.err
336		}
337	}
338	c := r.buf[r.i]
339	r.i++
340	return c, nil
341}
342
343// Writer writes journals to an underlying io.Writer.
344type Writer struct {
345	// w is the underlying writer.
346	w io.Writer
347	// seq is the sequence number of the current journal.
348	seq int
349	// f is w as a flusher.
350	f flusher
351	// buf[i:j] is the bytes that will become the current chunk.
352	// The low bound, i, includes the chunk header.
353	i, j int
354	// buf[:written] has already been written to w.
355	// written is zero unless Flush has been called.
356	written int
357	// first is whether the current chunk is the first chunk of the journal.
358	first bool
359	// pending is whether a chunk is buffered but not yet written.
360	pending bool
361	// err is any accumulated error.
362	err error
363	// buf is the buffer.
364	buf [blockSize]byte
365}
366
367// NewWriter returns a new Writer.
368func NewWriter(w io.Writer) *Writer {
369	f, _ := w.(flusher)
370	return &Writer{
371		w: w,
372		f: f,
373	}
374}
375
376// fillHeader fills in the header for the pending chunk.
377func (w *Writer) fillHeader(last bool) {
378	if w.i+headerSize > w.j || w.j > blockSize {
379		panic("leveldb/journal: bad writer state")
380	}
381	if last {
382		if w.first {
383			w.buf[w.i+6] = fullChunkType
384		} else {
385			w.buf[w.i+6] = lastChunkType
386		}
387	} else {
388		if w.first {
389			w.buf[w.i+6] = firstChunkType
390		} else {
391			w.buf[w.i+6] = middleChunkType
392		}
393	}
394	binary.LittleEndian.PutUint32(w.buf[w.i+0:w.i+4], util.NewCRC(w.buf[w.i+6:w.j]).Value())
395	binary.LittleEndian.PutUint16(w.buf[w.i+4:w.i+6], uint16(w.j-w.i-headerSize))
396}
397
398// writeBlock writes the buffered block to the underlying writer, and reserves
399// space for the next chunk's header.
400func (w *Writer) writeBlock() {
401	_, w.err = w.w.Write(w.buf[w.written:])
402	w.i = 0
403	w.j = headerSize
404	w.written = 0
405}
406
407// writePending finishes the current journal and writes the buffer to the
408// underlying writer.
409func (w *Writer) writePending() {
410	if w.err != nil {
411		return
412	}
413	if w.pending {
414		w.fillHeader(true)
415		w.pending = false
416	}
417	_, w.err = w.w.Write(w.buf[w.written:w.j])
418	w.written = w.j
419}
420
421// Close finishes the current journal and closes the writer.
422func (w *Writer) Close() error {
423	w.seq++
424	w.writePending()
425	if w.err != nil {
426		return w.err
427	}
428	w.err = errors.New("leveldb/journal: closed Writer")
429	return nil
430}
431
432// Flush finishes the current journal, writes to the underlying writer, and
433// flushes it if that writer implements interface{ Flush() error }.
434func (w *Writer) Flush() error {
435	w.seq++
436	w.writePending()
437	if w.err != nil {
438		return w.err
439	}
440	if w.f != nil {
441		w.err = w.f.Flush()
442		return w.err
443	}
444	return nil
445}
446
447// Reset resets the journal writer, allows reuse of the journal writer. Reset
448// will also closes the journal writer if not already.
449func (w *Writer) Reset(writer io.Writer) (err error) {
450	w.seq++
451	if w.err == nil {
452		w.writePending()
453		err = w.err
454	}
455	w.w = writer
456	w.f, _ = writer.(flusher)
457	w.i = 0
458	w.j = 0
459	w.written = 0
460	w.first = false
461	w.pending = false
462	w.err = nil
463	return
464}
465
466// Next returns a writer for the next journal. The writer returned becomes stale
467// after the next Close, Flush or Next call, and should no longer be used.
468func (w *Writer) Next() (io.Writer, error) {
469	w.seq++
470	if w.err != nil {
471		return nil, w.err
472	}
473	if w.pending {
474		w.fillHeader(true)
475	}
476	w.i = w.j
477	w.j = w.j + headerSize
478	// Check if there is room in the block for the header.
479	if w.j > blockSize {
480		// Fill in the rest of the block with zeroes.
481		for k := w.i; k < blockSize; k++ {
482			w.buf[k] = 0
483		}
484		w.writeBlock()
485		if w.err != nil {
486			return nil, w.err
487		}
488	}
489	w.first = true
490	w.pending = true
491	return singleWriter{w, w.seq}, nil
492}
493
494type singleWriter struct {
495	w   *Writer
496	seq int
497}
498
499func (x singleWriter) Write(p []byte) (int, error) {
500	w := x.w
501	if w.seq != x.seq {
502		return 0, errors.New("leveldb/journal: stale writer")
503	}
504	if w.err != nil {
505		return 0, w.err
506	}
507	n0 := len(p)
508	for len(p) > 0 {
509		// Write a block, if it is full.
510		if w.j == blockSize {
511			w.fillHeader(false)
512			w.writeBlock()
513			if w.err != nil {
514				return 0, w.err
515			}
516			w.first = false
517		}
518		// Copy bytes into the buffer.
519		n := copy(w.buf[w.j:], p)
520		w.j += n
521		p = p[n:]
522	}
523	return n0, nil
524}
525