1package unsnap
2
3import (
4	"bytes"
5	"encoding/binary"
6	"fmt"
7	"io"
8	"io/ioutil"
9	"os"
10
11	"hash/crc32"
12
13	snappy "github.com/golang/snappy"
14	// The C library can be used, but this makes the binary dependent
15	// lots of extraneous c-libraries; it is no longer stand-alone. Yuck.
16	//
17	// Therefore we comment out the "dgryski/go-csnappy" path and use the
18	// "github.com/golang/snappy/snappy" above instead. If you are
19	// performance limited and can deal with distributing more libraries,
20	// then this is easy to swap.
21	//
22	// If you swap, note that some of the tests won't pass
23	// because snappy-go produces slightly different (but still
24	// conformant) encodings on some data. Here are bindings
25	// to the C-snappy:
26	// snappy "github.com/dgryski/go-csnappy"
27)
28
29// SnappyFile: create a drop-in-replacement/wrapper for an *os.File that handles doing the unsnappification online as more is read from it
30
31type SnappyFile struct {
32	Fname string
33
34	Reader io.Reader
35	Writer io.Writer
36
37	// allow clients to substitute us for an os.File and just switch
38	// off compression if they don't want it.
39	SnappyEncodeDecodeOff bool // if true, we bypass straight to Filep
40
41	EncBuf FixedSizeRingBuf // holds any extra that isn't yet returned, encoded
42	DecBuf FixedSizeRingBuf // holds any extra that isn't yet returned, decoded
43
44	// for writing to stream-framed snappy
45	HeaderChunkWritten bool
46
47	// Sanity check: we can only read, or only write, to one SnappyFile.
48	// EncBuf and DecBuf are used differently in each mode. Verify
49	// that we are consistent with this flag.
50	Writing bool
51}
52
53var total int
54
55// for debugging, show state of buffers
56func (f *SnappyFile) Dump() {
57	fmt.Printf("EncBuf has length %d and contents:\n%s\n", len(f.EncBuf.Bytes()), string(f.EncBuf.Bytes()))
58	fmt.Printf("DecBuf has length %d and contents:\n%s\n", len(f.DecBuf.Bytes()), string(f.DecBuf.Bytes()))
59}
60
61func (f *SnappyFile) Read(p []byte) (n int, err error) {
62
63	if f.SnappyEncodeDecodeOff {
64		return f.Reader.Read(p)
65	}
66
67	if f.Writing {
68		panic("Reading on a write-only SnappyFile")
69	}
70
71	// before we unencrypt more, try to drain the DecBuf first
72	n, _ = f.DecBuf.Read(p)
73	if n > 0 {
74		total += n
75		return n, nil
76	}
77
78	//nEncRead, nDecAdded, err := UnsnapOneFrame(f.Filep, &f.EncBuf, &f.DecBuf, f.Fname)
79	_, _, err = UnsnapOneFrame(f.Reader, &f.EncBuf, &f.DecBuf, f.Fname)
80	if err != nil && err != io.EOF {
81		panic(err)
82	}
83
84	n, _ = f.DecBuf.Read(p)
85
86	if n > 0 {
87		total += n
88		return n, nil
89	}
90	if f.DecBuf.Readable == 0 {
91		if f.DecBuf.Readable == 0 && f.EncBuf.Readable == 0 {
92			// only now (when EncBuf is empty) can we give io.EOF.
93			// Any earlier, and we leave stuff un-decoded!
94			return 0, io.EOF
95		}
96	}
97	return 0, nil
98}
99
100func Open(name string) (file *SnappyFile, err error) {
101	fp, err := os.Open(name)
102	if err != nil {
103		return nil, err
104	}
105	// encoding in snappy can apparently go beyond the original size, so
106	// we make our buffers big enough, 2*max snappy chunk => 2 * CHUNK_MAX(65536)
107
108	snap := NewReader(fp)
109	snap.Fname = name
110	return snap, nil
111}
112
113func NewReader(r io.Reader) *SnappyFile {
114	return &SnappyFile{
115		Reader:  r,
116		EncBuf:  *NewFixedSizeRingBuf(CHUNK_MAX * 2), // buffer of snappy encoded bytes
117		DecBuf:  *NewFixedSizeRingBuf(CHUNK_MAX * 2), // buffer of snapppy decoded bytes
118		Writing: false,
119	}
120}
121
122func NewWriter(w io.Writer) *SnappyFile {
123	return &SnappyFile{
124		Writer:  w,
125		EncBuf:  *NewFixedSizeRingBuf(65536),     // on writing: temp for testing compression
126		DecBuf:  *NewFixedSizeRingBuf(65536 * 2), // on writing: final buffer of snappy framed and encoded bytes
127		Writing: true,
128	}
129}
130
131func Create(name string) (file *SnappyFile, err error) {
132	fp, err := os.Create(name)
133	if err != nil {
134		return nil, err
135	}
136	snap := NewWriter(fp)
137	snap.Fname = name
138	return snap, nil
139}
140
141func (f *SnappyFile) Close() error {
142	if f.Writing {
143		wc, ok := f.Writer.(io.WriteCloser)
144		if ok {
145			return wc.Close()
146		}
147		return nil
148	}
149	rc, ok := f.Reader.(io.ReadCloser)
150	if ok {
151		return rc.Close()
152	}
153	return nil
154}
155
156func (f *SnappyFile) Sync() error {
157	file, ok := f.Writer.(*os.File)
158	if ok {
159		return file.Sync()
160	}
161	return nil
162}
163
164// for an increment of a frame at a time:
165// read from r into encBuf (encBuf is still encoded, thus the name), and write unsnappified frames into outDecodedBuf
166//  the returned n: number of bytes read from the encrypted encBuf
167func UnsnapOneFrame(r io.Reader, encBuf *FixedSizeRingBuf, outDecodedBuf *FixedSizeRingBuf, fname string) (nEnc int64, nDec int64, err error) {
168	//	b, err := ioutil.ReadAll(r)
169	//	if err != nil {
170	//		panic(err)
171	//	}
172
173	nEnc = 0
174	nDec = 0
175
176	// read up to 65536 bytes from r into encBuf, at least a snappy frame
177	nread, err := io.CopyN(encBuf, r, 65536) // returns nwrotebytes, err
178	nEnc += nread
179	if err != nil {
180		if err == io.EOF {
181			if nread == 0 {
182				if encBuf.Readable == 0 {
183					return nEnc, nDec, io.EOF
184				}
185				// else we have bytes in encBuf, so decode them!
186				err = nil
187			} else {
188				// continue below, processing the nread bytes
189				err = nil
190			}
191		} else {
192			panic(err)
193		}
194	}
195
196	// flag for printing chunk size alignment messages
197	verbose := false
198
199	const snappyStreamHeaderSz = 10
200	const headerSz = 4
201	const crc32Sz = 4
202	// the magic 18 bytes accounts for the snappy streaming header and the first chunks size and checksum
203	// http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt
204
205	chunk := (*encBuf).Bytes()
206
207	// however we exit, advance as
208	//	defer func() { (*encBuf).Next(N) }()
209
210	// 65536 is the max size of a snappy framed chunk. See
211	// http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt:91
212	// buf := make([]byte, 65536)
213
214	//	fmt.Printf("read from file, b is len:%d with value: %#v\n", len(b), b)
215	//	fmt.Printf("read from file, bcut is len:%d with value: %#v\n", len(bcut), bcut)
216
217	//fmt.Printf("raw bytes of chunksz are: %v\n", b[11:14])
218
219	fourbytes := make([]byte, 4)
220	chunkCount := 0
221
222	for nDec < 65536 {
223		if len(chunk) == 0 {
224			break
225		}
226		chunkCount++
227		fourbytes[3] = 0
228		copy(fourbytes, chunk[1:4])
229		chunksz := binary.LittleEndian.Uint32(fourbytes)
230		chunk_type := chunk[0]
231
232		switch true {
233		case chunk_type == 0xff:
234			{ // stream identifier
235
236				streamHeader := chunk[:snappyStreamHeaderSz]
237				if 0 != bytes.Compare(streamHeader, []byte{0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59}) {
238					panic("file had chunk starting with 0xff but then no magic snappy streaming protocol bytes, aborting.")
239				} else {
240					//fmt.Printf("got streaming snappy magic header just fine.\n")
241				}
242				chunk = chunk[snappyStreamHeaderSz:]
243				(*encBuf).Advance(snappyStreamHeaderSz)
244				nEnc += snappyStreamHeaderSz
245				continue
246			}
247		case chunk_type == 0x00:
248			{ // compressed data
249				if verbose {
250					fmt.Fprintf(os.Stderr, "chunksz is %d  while  total bytes avail are: %d\n", int(chunksz), len(chunk)-4)
251				}
252
253				crc := binary.LittleEndian.Uint32(chunk[headerSz:(headerSz + crc32Sz)])
254				section := chunk[(headerSz + crc32Sz):(headerSz + chunksz)]
255
256				dec, ok := snappy.Decode(nil, section)
257				if ok != nil {
258					// we've probably truncated a snappy frame at this point
259					// ok=snappy: corrupt input
260					// len(dec) == 0
261					//
262					panic(fmt.Sprintf("could not decode snappy stream: '%s' and len dec=%d and ok=%v\n", fname, len(dec), ok))
263
264					// get back to caller with what we've got so far
265					return nEnc, nDec, nil
266				}
267				//	fmt.Printf("ok, b is %#v , %#v\n", ok, dec)
268
269				// spit out decoded text
270				// n, err := w.Write(dec)
271				//fmt.Printf("len(dec) = %d,   outDecodedBuf.Readable=%d\n", len(dec), outDecodedBuf.Readable)
272				bnb := bytes.NewBuffer(dec)
273				n, err := io.Copy(outDecodedBuf, bnb)
274				if err != nil {
275					//fmt.Printf("got n=%d, err= %s ; when trying to io.Copy(outDecodedBuf: N=%d, Readable=%d)\n", n, err, outDecodedBuf.N, outDecodedBuf.Readable)
276					panic(err)
277				}
278				if n != int64(len(dec)) {
279					panic("could not write all bytes to outDecodedBuf")
280				}
281				nDec += n
282
283				// verify the crc32 rotated checksum
284				m32 := masked_crc32c(dec)
285				if m32 != crc {
286					panic(fmt.Sprintf("crc32 masked failiure. expected: %v but got: %v", crc, m32))
287				} else {
288					//fmt.Printf("\nchecksums match: %v == %v\n", crc, m32)
289				}
290
291				// move to next header
292				inc := (headerSz + int(chunksz))
293				chunk = chunk[inc:]
294				(*encBuf).Advance(inc)
295				nEnc += int64(inc)
296				continue
297			}
298		case chunk_type == 0x01:
299			{ // uncompressed data
300
301				//n, err := w.Write(chunk[(headerSz+crc32Sz):(headerSz + int(chunksz))])
302				n, err := io.Copy(outDecodedBuf, bytes.NewBuffer(chunk[(headerSz+crc32Sz):(headerSz+int(chunksz))]))
303				if verbose {
304					//fmt.Printf("debug: n=%d  err=%v  chunksz=%d  outDecodedBuf='%v'\n", n, err, chunksz, outDecodedBuf)
305				}
306				if err != nil {
307					panic(err)
308				}
309				if n != int64(chunksz-crc32Sz) {
310					panic("could not write all bytes to stdout")
311				}
312				nDec += n
313
314				inc := (headerSz + int(chunksz))
315				chunk = chunk[inc:]
316				(*encBuf).Advance(inc)
317				nEnc += int64(inc)
318				continue
319			}
320		case chunk_type == 0xfe:
321			fallthrough // padding, just skip it
322		case chunk_type >= 0x80 && chunk_type <= 0xfd:
323			{ //  Reserved skippable chunks
324				//fmt.Printf("\nin reserved skippable chunks, at nEnc=%v\n", nEnc)
325				inc := (headerSz + int(chunksz))
326				chunk = chunk[inc:]
327				nEnc += int64(inc)
328				(*encBuf).Advance(inc)
329				continue
330			}
331
332		default:
333			panic(fmt.Sprintf("unrecognized/unsupported chunk type %#v", chunk_type))
334		}
335
336	} // end for{}
337
338	return nEnc, nDec, err
339	//return int64(N), nil
340}
341
342// for whole file at once:
343//
344// receive on stdin a stream of bytes in the snappy-streaming framed
345//  format, defined here: http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt
346// Grab each frame, run it through the snappy decoder, and spit out
347//  each frame all joined back-to-back on stdout.
348//
349func Unsnappy(r io.Reader, w io.Writer) (err error) {
350	b, err := ioutil.ReadAll(r)
351	if err != nil {
352		panic(err)
353	}
354
355	// flag for printing chunk size alignment messages
356	verbose := false
357
358	const snappyStreamHeaderSz = 10
359	const headerSz = 4
360	const crc32Sz = 4
361	// the magic 18 bytes accounts for the snappy streaming header and the first chunks size and checksum
362	// http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt
363
364	chunk := b[:]
365
366	// 65536 is the max size of a snappy framed chunk. See
367	// http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt:91
368	//buf := make([]byte, 65536)
369
370	//	fmt.Printf("read from file, b is len:%d with value: %#v\n", len(b), b)
371	//	fmt.Printf("read from file, bcut is len:%d with value: %#v\n", len(bcut), bcut)
372
373	//fmt.Printf("raw bytes of chunksz are: %v\n", b[11:14])
374
375	fourbytes := make([]byte, 4)
376	chunkCount := 0
377
378	for {
379		if len(chunk) == 0 {
380			break
381		}
382		chunkCount++
383		fourbytes[3] = 0
384		copy(fourbytes, chunk[1:4])
385		chunksz := binary.LittleEndian.Uint32(fourbytes)
386		chunk_type := chunk[0]
387
388		switch true {
389		case chunk_type == 0xff:
390			{ // stream identifier
391
392				streamHeader := chunk[:snappyStreamHeaderSz]
393				if 0 != bytes.Compare(streamHeader, []byte{0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59}) {
394					panic("file had chunk starting with 0xff but then no magic snappy streaming protocol bytes, aborting.")
395				} else {
396					//fmt.Printf("got streaming snappy magic header just fine.\n")
397				}
398				chunk = chunk[snappyStreamHeaderSz:]
399				continue
400			}
401		case chunk_type == 0x00:
402			{ // compressed data
403				if verbose {
404					fmt.Fprintf(os.Stderr, "chunksz is %d  while  total bytes avail are: %d\n", int(chunksz), len(chunk)-4)
405				}
406
407				//crc := binary.LittleEndian.Uint32(chunk[headerSz:(headerSz + crc32Sz)])
408				section := chunk[(headerSz + crc32Sz):(headerSz + chunksz)]
409
410				dec, ok := snappy.Decode(nil, section)
411				if ok != nil {
412					panic("could not decode snappy stream")
413				}
414				//	fmt.Printf("ok, b is %#v , %#v\n", ok, dec)
415
416				// spit out decoded text
417				n, err := w.Write(dec)
418				if err != nil {
419					panic(err)
420				}
421				if n != len(dec) {
422					panic("could not write all bytes to stdout")
423				}
424
425				// TODO: verify the crc32 rotated checksum?
426
427				// move to next header
428				chunk = chunk[(headerSz + int(chunksz)):]
429				continue
430			}
431		case chunk_type == 0x01:
432			{ // uncompressed data
433
434				//crc := binary.LittleEndian.Uint32(chunk[headerSz:(headerSz + crc32Sz)])
435				section := chunk[(headerSz + crc32Sz):(headerSz + chunksz)]
436
437				n, err := w.Write(section)
438				if err != nil {
439					panic(err)
440				}
441				if n != int(chunksz-crc32Sz) {
442					panic("could not write all bytes to stdout")
443				}
444
445				chunk = chunk[(headerSz + int(chunksz)):]
446				continue
447			}
448		case chunk_type == 0xfe:
449			fallthrough // padding, just skip it
450		case chunk_type >= 0x80 && chunk_type <= 0xfd:
451			{ //  Reserved skippable chunks
452				chunk = chunk[(headerSz + int(chunksz)):]
453				continue
454			}
455
456		default:
457			panic(fmt.Sprintf("unrecognized/unsupported chunk type %#v", chunk_type))
458		}
459
460	} // end for{}
461
462	return nil
463}
464
465// 0xff 0x06 0x00 0x00 sNaPpY
466var SnappyStreamHeaderMagic = []byte{0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59}
467
468const CHUNK_MAX = 65536
469const _STREAM_TO_STREAM_BLOCK_SIZE = CHUNK_MAX
470const _STREAM_IDENTIFIER = `sNaPpY`
471const _COMPRESSED_CHUNK = 0x00
472const _UNCOMPRESSED_CHUNK = 0x01
473const _IDENTIFIER_CHUNK = 0xff
474const _RESERVED_UNSKIPPABLE0 = 0x02 // chunk ranges are [inclusive, exclusive)
475const _RESERVED_UNSKIPPABLE1 = 0x80
476const _RESERVED_SKIPPABLE0 = 0x80
477const _RESERVED_SKIPPABLE1 = 0xff
478
479// the minimum percent of bytes compression must save to be enabled in automatic
480// mode
481const _COMPRESSION_THRESHOLD = .125
482
483var crctab *crc32.Table
484
485func init() {
486	crctab = crc32.MakeTable(crc32.Castagnoli) // this is correct table, matches the crc32c.c code used by python
487}
488
489func masked_crc32c(data []byte) uint32 {
490
491	// see the framing format specification, http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt
492	var crc uint32 = crc32.Checksum(data, crctab)
493	return (uint32((crc>>15)|(crc<<17)) + 0xa282ead8)
494}
495
496func ReadSnappyStreamCompressedFile(filename string) ([]byte, error) {
497
498	snappyFile, err := Open(filename)
499	if err != nil {
500		return []byte{}, err
501	}
502
503	var bb bytes.Buffer
504	_, err = bb.ReadFrom(snappyFile)
505	if err == io.EOF {
506		err = nil
507	}
508	if err != nil {
509		panic(err)
510	}
511
512	return bb.Bytes(), err
513}
514