1package unsnap 2 3import ( 4 "bytes" 5 "encoding/binary" 6 "fmt" 7 "io" 8 "io/ioutil" 9 "os" 10 11 "hash/crc32" 12 13 snappy "github.com/golang/snappy" 14 // The C library can be used, but this makes the binary dependent 15 // lots of extraneous c-libraries; it is no longer stand-alone. Yuck. 16 // 17 // Therefore we comment out the "dgryski/go-csnappy" path and use the 18 // "github.com/golang/snappy/snappy" above instead. If you are 19 // performance limited and can deal with distributing more libraries, 20 // then this is easy to swap. 21 // 22 // If you swap, note that some of the tests won't pass 23 // because snappy-go produces slightly different (but still 24 // conformant) encodings on some data. Here are bindings 25 // to the C-snappy: 26 // snappy "github.com/dgryski/go-csnappy" 27) 28 29// SnappyFile: create a drop-in-replacement/wrapper for an *os.File that handles doing the unsnappification online as more is read from it 30 31type SnappyFile struct { 32 Fname string 33 34 Reader io.Reader 35 Writer io.Writer 36 37 // allow clients to substitute us for an os.File and just switch 38 // off compression if they don't want it. 39 SnappyEncodeDecodeOff bool // if true, we bypass straight to Filep 40 41 EncBuf FixedSizeRingBuf // holds any extra that isn't yet returned, encoded 42 DecBuf FixedSizeRingBuf // holds any extra that isn't yet returned, decoded 43 44 // for writing to stream-framed snappy 45 HeaderChunkWritten bool 46 47 // Sanity check: we can only read, or only write, to one SnappyFile. 48 // EncBuf and DecBuf are used differently in each mode. Verify 49 // that we are consistent with this flag. 50 Writing bool 51} 52 53var total int 54 55// for debugging, show state of buffers 56func (f *SnappyFile) Dump() { 57 fmt.Printf("EncBuf has length %d and contents:\n%s\n", len(f.EncBuf.Bytes()), string(f.EncBuf.Bytes())) 58 fmt.Printf("DecBuf has length %d and contents:\n%s\n", len(f.DecBuf.Bytes()), string(f.DecBuf.Bytes())) 59} 60 61func (f *SnappyFile) Read(p []byte) (n int, err error) { 62 63 if f.SnappyEncodeDecodeOff { 64 return f.Reader.Read(p) 65 } 66 67 if f.Writing { 68 panic("Reading on a write-only SnappyFile") 69 } 70 71 // before we unencrypt more, try to drain the DecBuf first 72 n, _ = f.DecBuf.Read(p) 73 if n > 0 { 74 total += n 75 return n, nil 76 } 77 78 //nEncRead, nDecAdded, err := UnsnapOneFrame(f.Filep, &f.EncBuf, &f.DecBuf, f.Fname) 79 _, _, err = UnsnapOneFrame(f.Reader, &f.EncBuf, &f.DecBuf, f.Fname) 80 if err != nil && err != io.EOF { 81 panic(err) 82 } 83 84 n, _ = f.DecBuf.Read(p) 85 86 if n > 0 { 87 total += n 88 return n, nil 89 } 90 if f.DecBuf.Readable == 0 { 91 if f.DecBuf.Readable == 0 && f.EncBuf.Readable == 0 { 92 // only now (when EncBuf is empty) can we give io.EOF. 93 // Any earlier, and we leave stuff un-decoded! 94 return 0, io.EOF 95 } 96 } 97 return 0, nil 98} 99 100func Open(name string) (file *SnappyFile, err error) { 101 fp, err := os.Open(name) 102 if err != nil { 103 return nil, err 104 } 105 // encoding in snappy can apparently go beyond the original size, so 106 // we make our buffers big enough, 2*max snappy chunk => 2 * CHUNK_MAX(65536) 107 108 snap := NewReader(fp) 109 snap.Fname = name 110 return snap, nil 111} 112 113func NewReader(r io.Reader) *SnappyFile { 114 return &SnappyFile{ 115 Reader: r, 116 EncBuf: *NewFixedSizeRingBuf(CHUNK_MAX * 2), // buffer of snappy encoded bytes 117 DecBuf: *NewFixedSizeRingBuf(CHUNK_MAX * 2), // buffer of snapppy decoded bytes 118 Writing: false, 119 } 120} 121 122func NewWriter(w io.Writer) *SnappyFile { 123 return &SnappyFile{ 124 Writer: w, 125 EncBuf: *NewFixedSizeRingBuf(65536), // on writing: temp for testing compression 126 DecBuf: *NewFixedSizeRingBuf(65536 * 2), // on writing: final buffer of snappy framed and encoded bytes 127 Writing: true, 128 } 129} 130 131func Create(name string) (file *SnappyFile, err error) { 132 fp, err := os.Create(name) 133 if err != nil { 134 return nil, err 135 } 136 snap := NewWriter(fp) 137 snap.Fname = name 138 return snap, nil 139} 140 141func (f *SnappyFile) Close() error { 142 if f.Writing { 143 wc, ok := f.Writer.(io.WriteCloser) 144 if ok { 145 return wc.Close() 146 } 147 return nil 148 } 149 rc, ok := f.Reader.(io.ReadCloser) 150 if ok { 151 return rc.Close() 152 } 153 return nil 154} 155 156func (f *SnappyFile) Sync() error { 157 file, ok := f.Writer.(*os.File) 158 if ok { 159 return file.Sync() 160 } 161 return nil 162} 163 164// for an increment of a frame at a time: 165// read from r into encBuf (encBuf is still encoded, thus the name), and write unsnappified frames into outDecodedBuf 166// the returned n: number of bytes read from the encrypted encBuf 167func UnsnapOneFrame(r io.Reader, encBuf *FixedSizeRingBuf, outDecodedBuf *FixedSizeRingBuf, fname string) (nEnc int64, nDec int64, err error) { 168 // b, err := ioutil.ReadAll(r) 169 // if err != nil { 170 // panic(err) 171 // } 172 173 nEnc = 0 174 nDec = 0 175 176 // read up to 65536 bytes from r into encBuf, at least a snappy frame 177 nread, err := io.CopyN(encBuf, r, 65536) // returns nwrotebytes, err 178 nEnc += nread 179 if err != nil { 180 if err == io.EOF { 181 if nread == 0 { 182 if encBuf.Readable == 0 { 183 return nEnc, nDec, io.EOF 184 } 185 // else we have bytes in encBuf, so decode them! 186 err = nil 187 } else { 188 // continue below, processing the nread bytes 189 err = nil 190 } 191 } else { 192 panic(err) 193 } 194 } 195 196 // flag for printing chunk size alignment messages 197 verbose := false 198 199 const snappyStreamHeaderSz = 10 200 const headerSz = 4 201 const crc32Sz = 4 202 // the magic 18 bytes accounts for the snappy streaming header and the first chunks size and checksum 203 // http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt 204 205 chunk := (*encBuf).Bytes() 206 207 // however we exit, advance as 208 // defer func() { (*encBuf).Next(N) }() 209 210 // 65536 is the max size of a snappy framed chunk. See 211 // http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt:91 212 // buf := make([]byte, 65536) 213 214 // fmt.Printf("read from file, b is len:%d with value: %#v\n", len(b), b) 215 // fmt.Printf("read from file, bcut is len:%d with value: %#v\n", len(bcut), bcut) 216 217 //fmt.Printf("raw bytes of chunksz are: %v\n", b[11:14]) 218 219 fourbytes := make([]byte, 4) 220 chunkCount := 0 221 222 for nDec < 65536 { 223 if len(chunk) == 0 { 224 break 225 } 226 chunkCount++ 227 fourbytes[3] = 0 228 copy(fourbytes, chunk[1:4]) 229 chunksz := binary.LittleEndian.Uint32(fourbytes) 230 chunk_type := chunk[0] 231 232 switch true { 233 case chunk_type == 0xff: 234 { // stream identifier 235 236 streamHeader := chunk[:snappyStreamHeaderSz] 237 if 0 != bytes.Compare(streamHeader, []byte{0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59}) { 238 panic("file had chunk starting with 0xff but then no magic snappy streaming protocol bytes, aborting.") 239 } else { 240 //fmt.Printf("got streaming snappy magic header just fine.\n") 241 } 242 chunk = chunk[snappyStreamHeaderSz:] 243 (*encBuf).Advance(snappyStreamHeaderSz) 244 nEnc += snappyStreamHeaderSz 245 continue 246 } 247 case chunk_type == 0x00: 248 { // compressed data 249 if verbose { 250 fmt.Fprintf(os.Stderr, "chunksz is %d while total bytes avail are: %d\n", int(chunksz), len(chunk)-4) 251 } 252 253 crc := binary.LittleEndian.Uint32(chunk[headerSz:(headerSz + crc32Sz)]) 254 section := chunk[(headerSz + crc32Sz):(headerSz + chunksz)] 255 256 dec, ok := snappy.Decode(nil, section) 257 if ok != nil { 258 // we've probably truncated a snappy frame at this point 259 // ok=snappy: corrupt input 260 // len(dec) == 0 261 // 262 panic(fmt.Sprintf("could not decode snappy stream: '%s' and len dec=%d and ok=%v\n", fname, len(dec), ok)) 263 264 // get back to caller with what we've got so far 265 return nEnc, nDec, nil 266 } 267 // fmt.Printf("ok, b is %#v , %#v\n", ok, dec) 268 269 // spit out decoded text 270 // n, err := w.Write(dec) 271 //fmt.Printf("len(dec) = %d, outDecodedBuf.Readable=%d\n", len(dec), outDecodedBuf.Readable) 272 bnb := bytes.NewBuffer(dec) 273 n, err := io.Copy(outDecodedBuf, bnb) 274 if err != nil { 275 //fmt.Printf("got n=%d, err= %s ; when trying to io.Copy(outDecodedBuf: N=%d, Readable=%d)\n", n, err, outDecodedBuf.N, outDecodedBuf.Readable) 276 panic(err) 277 } 278 if n != int64(len(dec)) { 279 panic("could not write all bytes to outDecodedBuf") 280 } 281 nDec += n 282 283 // verify the crc32 rotated checksum 284 m32 := masked_crc32c(dec) 285 if m32 != crc { 286 panic(fmt.Sprintf("crc32 masked failiure. expected: %v but got: %v", crc, m32)) 287 } else { 288 //fmt.Printf("\nchecksums match: %v == %v\n", crc, m32) 289 } 290 291 // move to next header 292 inc := (headerSz + int(chunksz)) 293 chunk = chunk[inc:] 294 (*encBuf).Advance(inc) 295 nEnc += int64(inc) 296 continue 297 } 298 case chunk_type == 0x01: 299 { // uncompressed data 300 301 //n, err := w.Write(chunk[(headerSz+crc32Sz):(headerSz + int(chunksz))]) 302 n, err := io.Copy(outDecodedBuf, bytes.NewBuffer(chunk[(headerSz+crc32Sz):(headerSz+int(chunksz))])) 303 if verbose { 304 //fmt.Printf("debug: n=%d err=%v chunksz=%d outDecodedBuf='%v'\n", n, err, chunksz, outDecodedBuf) 305 } 306 if err != nil { 307 panic(err) 308 } 309 if n != int64(chunksz-crc32Sz) { 310 panic("could not write all bytes to stdout") 311 } 312 nDec += n 313 314 inc := (headerSz + int(chunksz)) 315 chunk = chunk[inc:] 316 (*encBuf).Advance(inc) 317 nEnc += int64(inc) 318 continue 319 } 320 case chunk_type == 0xfe: 321 fallthrough // padding, just skip it 322 case chunk_type >= 0x80 && chunk_type <= 0xfd: 323 { // Reserved skippable chunks 324 //fmt.Printf("\nin reserved skippable chunks, at nEnc=%v\n", nEnc) 325 inc := (headerSz + int(chunksz)) 326 chunk = chunk[inc:] 327 nEnc += int64(inc) 328 (*encBuf).Advance(inc) 329 continue 330 } 331 332 default: 333 panic(fmt.Sprintf("unrecognized/unsupported chunk type %#v", chunk_type)) 334 } 335 336 } // end for{} 337 338 return nEnc, nDec, err 339 //return int64(N), nil 340} 341 342// for whole file at once: 343// 344// receive on stdin a stream of bytes in the snappy-streaming framed 345// format, defined here: http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt 346// Grab each frame, run it through the snappy decoder, and spit out 347// each frame all joined back-to-back on stdout. 348// 349func Unsnappy(r io.Reader, w io.Writer) (err error) { 350 b, err := ioutil.ReadAll(r) 351 if err != nil { 352 panic(err) 353 } 354 355 // flag for printing chunk size alignment messages 356 verbose := false 357 358 const snappyStreamHeaderSz = 10 359 const headerSz = 4 360 const crc32Sz = 4 361 // the magic 18 bytes accounts for the snappy streaming header and the first chunks size and checksum 362 // http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt 363 364 chunk := b[:] 365 366 // 65536 is the max size of a snappy framed chunk. See 367 // http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt:91 368 //buf := make([]byte, 65536) 369 370 // fmt.Printf("read from file, b is len:%d with value: %#v\n", len(b), b) 371 // fmt.Printf("read from file, bcut is len:%d with value: %#v\n", len(bcut), bcut) 372 373 //fmt.Printf("raw bytes of chunksz are: %v\n", b[11:14]) 374 375 fourbytes := make([]byte, 4) 376 chunkCount := 0 377 378 for { 379 if len(chunk) == 0 { 380 break 381 } 382 chunkCount++ 383 fourbytes[3] = 0 384 copy(fourbytes, chunk[1:4]) 385 chunksz := binary.LittleEndian.Uint32(fourbytes) 386 chunk_type := chunk[0] 387 388 switch true { 389 case chunk_type == 0xff: 390 { // stream identifier 391 392 streamHeader := chunk[:snappyStreamHeaderSz] 393 if 0 != bytes.Compare(streamHeader, []byte{0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59}) { 394 panic("file had chunk starting with 0xff but then no magic snappy streaming protocol bytes, aborting.") 395 } else { 396 //fmt.Printf("got streaming snappy magic header just fine.\n") 397 } 398 chunk = chunk[snappyStreamHeaderSz:] 399 continue 400 } 401 case chunk_type == 0x00: 402 { // compressed data 403 if verbose { 404 fmt.Fprintf(os.Stderr, "chunksz is %d while total bytes avail are: %d\n", int(chunksz), len(chunk)-4) 405 } 406 407 //crc := binary.LittleEndian.Uint32(chunk[headerSz:(headerSz + crc32Sz)]) 408 section := chunk[(headerSz + crc32Sz):(headerSz + chunksz)] 409 410 dec, ok := snappy.Decode(nil, section) 411 if ok != nil { 412 panic("could not decode snappy stream") 413 } 414 // fmt.Printf("ok, b is %#v , %#v\n", ok, dec) 415 416 // spit out decoded text 417 n, err := w.Write(dec) 418 if err != nil { 419 panic(err) 420 } 421 if n != len(dec) { 422 panic("could not write all bytes to stdout") 423 } 424 425 // TODO: verify the crc32 rotated checksum? 426 427 // move to next header 428 chunk = chunk[(headerSz + int(chunksz)):] 429 continue 430 } 431 case chunk_type == 0x01: 432 { // uncompressed data 433 434 //crc := binary.LittleEndian.Uint32(chunk[headerSz:(headerSz + crc32Sz)]) 435 section := chunk[(headerSz + crc32Sz):(headerSz + chunksz)] 436 437 n, err := w.Write(section) 438 if err != nil { 439 panic(err) 440 } 441 if n != int(chunksz-crc32Sz) { 442 panic("could not write all bytes to stdout") 443 } 444 445 chunk = chunk[(headerSz + int(chunksz)):] 446 continue 447 } 448 case chunk_type == 0xfe: 449 fallthrough // padding, just skip it 450 case chunk_type >= 0x80 && chunk_type <= 0xfd: 451 { // Reserved skippable chunks 452 chunk = chunk[(headerSz + int(chunksz)):] 453 continue 454 } 455 456 default: 457 panic(fmt.Sprintf("unrecognized/unsupported chunk type %#v", chunk_type)) 458 } 459 460 } // end for{} 461 462 return nil 463} 464 465// 0xff 0x06 0x00 0x00 sNaPpY 466var SnappyStreamHeaderMagic = []byte{0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59} 467 468const CHUNK_MAX = 65536 469const _STREAM_TO_STREAM_BLOCK_SIZE = CHUNK_MAX 470const _STREAM_IDENTIFIER = `sNaPpY` 471const _COMPRESSED_CHUNK = 0x00 472const _UNCOMPRESSED_CHUNK = 0x01 473const _IDENTIFIER_CHUNK = 0xff 474const _RESERVED_UNSKIPPABLE0 = 0x02 // chunk ranges are [inclusive, exclusive) 475const _RESERVED_UNSKIPPABLE1 = 0x80 476const _RESERVED_SKIPPABLE0 = 0x80 477const _RESERVED_SKIPPABLE1 = 0xff 478 479// the minimum percent of bytes compression must save to be enabled in automatic 480// mode 481const _COMPRESSION_THRESHOLD = .125 482 483var crctab *crc32.Table 484 485func init() { 486 crctab = crc32.MakeTable(crc32.Castagnoli) // this is correct table, matches the crc32c.c code used by python 487} 488 489func masked_crc32c(data []byte) uint32 { 490 491 // see the framing format specification, http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt 492 var crc uint32 = crc32.Checksum(data, crctab) 493 return (uint32((crc>>15)|(crc<<17)) + 0xa282ead8) 494} 495 496func ReadSnappyStreamCompressedFile(filename string) ([]byte, error) { 497 498 snappyFile, err := Open(filename) 499 if err != nil { 500 return []byte{}, err 501 } 502 503 var bb bytes.Buffer 504 _, err = bb.ReadFrom(snappyFile) 505 if err == io.EOF { 506 err = nil 507 } 508 if err != nil { 509 panic(err) 510 } 511 512 return bb.Bytes(), err 513} 514