1// Copyright 2011 The LevelDB-Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5// Taken from: https://code.google.com/p/leveldb-go/source/browse/leveldb/record/record.go?r=1d5ccbe03246da926391ee12d1c6caae054ff4b0
6// License, authors and contributors informations can be found at bellow URLs respectively:
7// 	https://code.google.com/p/leveldb-go/source/browse/LICENSE
8//	https://code.google.com/p/leveldb-go/source/browse/AUTHORS
9//  https://code.google.com/p/leveldb-go/source/browse/CONTRIBUTORS
10
11// Package journal reads and writes sequences of journals. Each journal is a stream
12// of bytes that completes before the next journal starts.
13//
14// When reading, call Next to obtain an io.Reader for the next journal. Next will
15// return io.EOF when there are no more journals. It is valid to call Next
16// without reading the current journal to exhaustion.
17//
18// When writing, call Next to obtain an io.Writer for the next journal. Calling
19// Next finishes the current journal. Call Close to finish the final journal.
20//
21// Optionally, call Flush to finish the current journal and flush the underlying
22// writer without starting a new journal. To start a new journal after flushing,
23// call Next.
24//
25// Neither Readers or Writers are safe to use concurrently.
26//
27// Example code:
28//	func read(r io.Reader) ([]string, error) {
29//		var ss []string
30//		journals := journal.NewReader(r, nil, true, true)
31//		for {
32//			j, err := journals.Next()
33//			if err == io.EOF {
34//				break
35//			}
36//			if err != nil {
37//				return nil, err
38//			}
39//			s, err := ioutil.ReadAll(j)
40//			if err != nil {
41//				return nil, err
42//			}
43//			ss = append(ss, string(s))
44//		}
45//		return ss, nil
46//	}
47//
48//	func write(w io.Writer, ss []string) error {
49//		journals := journal.NewWriter(w)
50//		for _, s := range ss {
51//			j, err := journals.Next()
52//			if err != nil {
53//				return err
54//			}
55//			if _, err := j.Write([]byte(s)), err != nil {
56//				return err
57//			}
58//		}
59//		return journals.Close()
60//	}
61//
62// The wire format is that the stream is divided into 32KiB blocks, and each
63// block contains a number of tightly packed chunks. Chunks cannot cross block
64// boundaries. The last block may be shorter than 32 KiB. Any unused bytes in a
65// block must be zero.
66//
67// A journal maps to one or more chunks. Each chunk has a 7 byte header (a 4
68// byte checksum, a 2 byte little-endian uint16 length, and a 1 byte chunk type)
69// followed by a payload. The checksum is over the chunk type and the payload.
70//
71// There are four chunk types: whether the chunk is the full journal, or the
72// first, middle or last chunk of a multi-chunk journal. A multi-chunk journal
73// has one first chunk, zero or more middle chunks, and one last chunk.
74//
75// The wire format allows for limited recovery in the face of data corruption:
76// on a format error (such as a checksum mismatch), the reader moves to the
77// next block and looks for the next full or first chunk.
78package journal
79
80import (
81	"encoding/binary"
82	"fmt"
83	"io"
84
85	"github.com/syndtr/goleveldb/leveldb/errors"
86	"github.com/syndtr/goleveldb/leveldb/util"
87)
88
89// These constants are part of the wire format and should not be changed.
90const (
91	fullChunkType   = 1
92	firstChunkType  = 2
93	middleChunkType = 3
94	lastChunkType   = 4
95)
96
97const (
98	blockSize  = 32 * 1024
99	headerSize = 7
100)
101
102type flusher interface {
103	Flush() error
104}
105
106// ErrCorrupted is the error type that generated by corrupted block or chunk.
107type ErrCorrupted struct {
108	Size   int
109	Reason string
110}
111
112func (e *ErrCorrupted) Error() string {
113	return fmt.Sprintf("leveldb/journal: block/chunk corrupted: %s (%d bytes)", e.Reason, e.Size)
114}
115
116// Dropper is the interface that wrap simple Drop method. The Drop
117// method will be called when the journal reader dropping a block or chunk.
118type Dropper interface {
119	Drop(err error)
120}
121
122// Reader reads journals from an underlying io.Reader.
123type Reader struct {
124	// r is the underlying reader.
125	r io.Reader
126	// the dropper.
127	dropper Dropper
128	// strict flag.
129	strict bool
130	// checksum flag.
131	checksum bool
132	// seq is the sequence number of the current journal.
133	seq int
134	// buf[i:j] is the unread portion of the current chunk's payload.
135	// The low bound, i, excludes the chunk header.
136	i, j int
137	// n is the number of bytes of buf that are valid. Once reading has started,
138	// only the final block can have n < blockSize.
139	n int
140	// last is whether the current chunk is the last chunk of the journal.
141	last bool
142	// err is any accumulated error.
143	err error
144	// buf is the buffer.
145	buf [blockSize]byte
146}
147
148// NewReader returns a new reader. The dropper may be nil, and if
149// strict is true then corrupted or invalid chunk will halt the journal
150// reader entirely.
151func NewReader(r io.Reader, dropper Dropper, strict, checksum bool) *Reader {
152	return &Reader{
153		r:        r,
154		dropper:  dropper,
155		strict:   strict,
156		checksum: checksum,
157		last:     true,
158	}
159}
160
161var errSkip = errors.New("leveldb/journal: skipped")
162
163func (r *Reader) corrupt(n int, reason string, skip bool) error {
164	if r.dropper != nil {
165		r.dropper.Drop(&ErrCorrupted{n, reason})
166	}
167	if r.strict && !skip {
168		r.err = errors.NewErrCorrupted(nil, &ErrCorrupted{n, reason})
169		return r.err
170	}
171	return errSkip
172}
173
174// nextChunk sets r.buf[r.i:r.j] to hold the next chunk's payload, reading the
175// next block into the buffer if necessary.
176func (r *Reader) nextChunk(first bool) error {
177	for {
178		if r.j+headerSize <= r.n {
179			checksum := binary.LittleEndian.Uint32(r.buf[r.j+0 : r.j+4])
180			length := binary.LittleEndian.Uint16(r.buf[r.j+4 : r.j+6])
181			chunkType := r.buf[r.j+6]
182
183			if checksum == 0 && length == 0 && chunkType == 0 {
184				// Drop entire block.
185				m := r.n - r.j
186				r.i = r.n
187				r.j = r.n
188				return r.corrupt(m, "zero header", false)
189			} else {
190				m := r.n - r.j
191				r.i = r.j + headerSize
192				r.j = r.j + headerSize + int(length)
193				if r.j > r.n {
194					// Drop entire block.
195					r.i = r.n
196					r.j = r.n
197					return r.corrupt(m, "chunk length overflows block", false)
198				} else if r.checksum && checksum != util.NewCRC(r.buf[r.i-1:r.j]).Value() {
199					// Drop entire block.
200					r.i = r.n
201					r.j = r.n
202					return r.corrupt(m, "checksum mismatch", false)
203				}
204			}
205			if first && chunkType != fullChunkType && chunkType != firstChunkType {
206				m := r.j - r.i
207				r.i = r.j
208				// Report the error, but skip it.
209				return r.corrupt(m+headerSize, "orphan chunk", true)
210			}
211			r.last = chunkType == fullChunkType || chunkType == lastChunkType
212			return nil
213		}
214
215		// The last block.
216		if r.n < blockSize && r.n > 0 {
217			if !first {
218				return r.corrupt(0, "missing chunk part", false)
219			}
220			r.err = io.EOF
221			return r.err
222		}
223
224		// Read block.
225		n, err := io.ReadFull(r.r, r.buf[:])
226		if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
227			return err
228		}
229		if n == 0 {
230			if !first {
231				return r.corrupt(0, "missing chunk part", false)
232			}
233			r.err = io.EOF
234			return r.err
235		}
236		r.i, r.j, r.n = 0, 0, n
237	}
238}
239
240// Next returns a reader for the next journal. It returns io.EOF if there are no
241// more journals. The reader returned becomes stale after the next Next call,
242// and should no longer be used. If strict is false, the reader will returns
243// io.ErrUnexpectedEOF error when found corrupted journal.
244func (r *Reader) Next() (io.Reader, error) {
245	r.seq++
246	if r.err != nil {
247		return nil, r.err
248	}
249	r.i = r.j
250	for {
251		if err := r.nextChunk(true); err == nil {
252			break
253		} else if err != errSkip {
254			return nil, err
255		}
256	}
257	return &singleReader{r, r.seq, nil}, nil
258}
259
260// Reset resets the journal reader, allows reuse of the journal reader. Reset returns
261// last accumulated error.
262func (r *Reader) Reset(reader io.Reader, dropper Dropper, strict, checksum bool) error {
263	r.seq++
264	err := r.err
265	r.r = reader
266	r.dropper = dropper
267	r.strict = strict
268	r.checksum = checksum
269	r.i = 0
270	r.j = 0
271	r.n = 0
272	r.last = true
273	r.err = nil
274	return err
275}
276
277type singleReader struct {
278	r   *Reader
279	seq int
280	err error
281}
282
283func (x *singleReader) Read(p []byte) (int, error) {
284	r := x.r
285	if r.seq != x.seq {
286		return 0, errors.New("leveldb/journal: stale reader")
287	}
288	if x.err != nil {
289		return 0, x.err
290	}
291	if r.err != nil {
292		return 0, r.err
293	}
294	for r.i == r.j {
295		if r.last {
296			return 0, io.EOF
297		}
298		x.err = r.nextChunk(false)
299		if x.err != nil {
300			if x.err == errSkip {
301				x.err = io.ErrUnexpectedEOF
302			}
303			return 0, x.err
304		}
305	}
306	n := copy(p, r.buf[r.i:r.j])
307	r.i += n
308	return n, nil
309}
310
311func (x *singleReader) ReadByte() (byte, error) {
312	r := x.r
313	if r.seq != x.seq {
314		return 0, errors.New("leveldb/journal: stale reader")
315	}
316	if x.err != nil {
317		return 0, x.err
318	}
319	if r.err != nil {
320		return 0, r.err
321	}
322	for r.i == r.j {
323		if r.last {
324			return 0, io.EOF
325		}
326		x.err = r.nextChunk(false)
327		if x.err != nil {
328			if x.err == errSkip {
329				x.err = io.ErrUnexpectedEOF
330			}
331			return 0, x.err
332		}
333	}
334	c := r.buf[r.i]
335	r.i++
336	return c, nil
337}
338
339// Writer writes journals to an underlying io.Writer.
340type Writer struct {
341	// w is the underlying writer.
342	w io.Writer
343	// seq is the sequence number of the current journal.
344	seq int
345	// f is w as a flusher.
346	f flusher
347	// buf[i:j] is the bytes that will become the current chunk.
348	// The low bound, i, includes the chunk header.
349	i, j int
350	// buf[:written] has already been written to w.
351	// written is zero unless Flush has been called.
352	written int
353	// first is whether the current chunk is the first chunk of the journal.
354	first bool
355	// pending is whether a chunk is buffered but not yet written.
356	pending bool
357	// err is any accumulated error.
358	err error
359	// buf is the buffer.
360	buf [blockSize]byte
361}
362
363// NewWriter returns a new Writer.
364func NewWriter(w io.Writer) *Writer {
365	f, _ := w.(flusher)
366	return &Writer{
367		w: w,
368		f: f,
369	}
370}
371
372// fillHeader fills in the header for the pending chunk.
373func (w *Writer) fillHeader(last bool) {
374	if w.i+headerSize > w.j || w.j > blockSize {
375		panic("leveldb/journal: bad writer state")
376	}
377	if last {
378		if w.first {
379			w.buf[w.i+6] = fullChunkType
380		} else {
381			w.buf[w.i+6] = lastChunkType
382		}
383	} else {
384		if w.first {
385			w.buf[w.i+6] = firstChunkType
386		} else {
387			w.buf[w.i+6] = middleChunkType
388		}
389	}
390	binary.LittleEndian.PutUint32(w.buf[w.i+0:w.i+4], util.NewCRC(w.buf[w.i+6:w.j]).Value())
391	binary.LittleEndian.PutUint16(w.buf[w.i+4:w.i+6], uint16(w.j-w.i-headerSize))
392}
393
394// writeBlock writes the buffered block to the underlying writer, and reserves
395// space for the next chunk's header.
396func (w *Writer) writeBlock() {
397	_, w.err = w.w.Write(w.buf[w.written:])
398	w.i = 0
399	w.j = headerSize
400	w.written = 0
401}
402
403// writePending finishes the current journal and writes the buffer to the
404// underlying writer.
405func (w *Writer) writePending() {
406	if w.err != nil {
407		return
408	}
409	if w.pending {
410		w.fillHeader(true)
411		w.pending = false
412	}
413	_, w.err = w.w.Write(w.buf[w.written:w.j])
414	w.written = w.j
415}
416
417// Close finishes the current journal and closes the writer.
418func (w *Writer) Close() error {
419	w.seq++
420	w.writePending()
421	if w.err != nil {
422		return w.err
423	}
424	w.err = errors.New("leveldb/journal: closed Writer")
425	return nil
426}
427
428// Flush finishes the current journal, writes to the underlying writer, and
429// flushes it if that writer implements interface{ Flush() error }.
430func (w *Writer) Flush() error {
431	w.seq++
432	w.writePending()
433	if w.err != nil {
434		return w.err
435	}
436	if w.f != nil {
437		w.err = w.f.Flush()
438		return w.err
439	}
440	return nil
441}
442
443// Reset resets the journal writer, allows reuse of the journal writer. Reset
444// will also closes the journal writer if not already.
445func (w *Writer) Reset(writer io.Writer) (err error) {
446	w.seq++
447	if w.err == nil {
448		w.writePending()
449		err = w.err
450	}
451	w.w = writer
452	w.f, _ = writer.(flusher)
453	w.i = 0
454	w.j = 0
455	w.written = 0
456	w.first = false
457	w.pending = false
458	w.err = nil
459	return
460}
461
462// Next returns a writer for the next journal. The writer returned becomes stale
463// after the next Close, Flush or Next call, and should no longer be used.
464func (w *Writer) Next() (io.Writer, error) {
465	w.seq++
466	if w.err != nil {
467		return nil, w.err
468	}
469	if w.pending {
470		w.fillHeader(true)
471	}
472	w.i = w.j
473	w.j = w.j + headerSize
474	// Check if there is room in the block for the header.
475	if w.j > blockSize {
476		// Fill in the rest of the block with zeroes.
477		for k := w.i; k < blockSize; k++ {
478			w.buf[k] = 0
479		}
480		w.writeBlock()
481		if w.err != nil {
482			return nil, w.err
483		}
484	}
485	w.first = true
486	w.pending = true
487	return singleWriter{w, w.seq}, nil
488}
489
490type singleWriter struct {
491	w   *Writer
492	seq int
493}
494
495func (x singleWriter) Write(p []byte) (int, error) {
496	w := x.w
497	if w.seq != x.seq {
498		return 0, errors.New("leveldb/journal: stale writer")
499	}
500	if w.err != nil {
501		return 0, w.err
502	}
503	n0 := len(p)
504	for len(p) > 0 {
505		// Write a block, if it is full.
506		if w.j == blockSize {
507			w.fillHeader(false)
508			w.writeBlock()
509			if w.err != nil {
510				return 0, w.err
511			}
512			w.first = false
513		}
514		// Copy bytes into the buffer.
515		n := copy(w.buf[w.j:], p)
516		w.j += n
517		p = p[n:]
518	}
519	return n0, nil
520}
521