1 /*
2  * Created: Apr 12, 2013
3  */
4 package org.xerial.snappy;
5 
6 import static org.xerial.snappy.SnappyFramed.COMPRESSED_DATA_FLAG;
7 import static org.xerial.snappy.SnappyFramed.HEADER_BYTES;
8 import static org.xerial.snappy.SnappyFramed.UNCOMPRESSED_DATA_FLAG;
9 import static org.xerial.snappy.SnappyFramed.maskedCrc32c;
10 
11 import java.io.IOException;
12 import java.io.InputStream;
13 import java.io.OutputStream;
14 import java.nio.ByteBuffer;
15 import java.nio.ByteOrder;
16 import java.nio.channels.Channels;
17 import java.nio.channels.ClosedChannelException;
18 import java.nio.channels.ReadableByteChannel;
19 import java.nio.channels.WritableByteChannel;
20 
21 import org.xerial.snappy.pool.BufferPool;
22 import org.xerial.snappy.pool.DefaultPoolFactory;
23 
24 /**
25  * Implements the <a
26  * href="http://snappy.googlecode.com/svn/trunk/framing_format.txt"
27  * >x-snappy-framed</a> as an {@link OutputStream} and
28  * {@link WritableByteChannel}.
29  *
30  * @author Brett Okken
31  * @since 1.1.0
32  */
33 public final class SnappyFramedOutputStream
34         extends OutputStream
35         implements
36         WritableByteChannel
37 {
38 
39     /**
40      * The x-snappy-framed specification allows for a chunk size up to
41      * 16,777,211 bytes in length. However, it also goes on to state:
42      * <p>
43      * <code>
44      * We place an additional restriction that the uncompressed data in a chunk
45      * must be no longer than 65536 bytes. This allows consumers to easily use
46      * small fixed-size buffers.
47      * </code>
48      * </p>
49      */
50     public static final int MAX_BLOCK_SIZE = 64 * 1024;
51 
52     /**
53      * The default block size to use.
54      */
55     public static final int DEFAULT_BLOCK_SIZE = MAX_BLOCK_SIZE;
56 
57     /**
58      * The default min compression ratio to use.
59      */
60     public static final double DEFAULT_MIN_COMPRESSION_RATIO = 0.85d;
61 
62     private final ByteBuffer headerBuffer = ByteBuffer.allocate(8).order(
63             ByteOrder.LITTLE_ENDIAN);
64     private final BufferPool bufferPool;
65     private final int blockSize;
66     private final ByteBuffer buffer;
67     private final ByteBuffer directInputBuffer;
68     private final ByteBuffer outputBuffer;
69     private final double minCompressionRatio;
70 
71     private final WritableByteChannel out;
72 
73     // private int position;
74     private boolean closed;
75 
76     /**
77      * Creates a new {@link SnappyFramedOutputStream} using the {@link #DEFAULT_BLOCK_SIZE}
78      * and {@link #DEFAULT_MIN_COMPRESSION_RATIO}.
79      * <p>
80      * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers.
81      * </p>
82      *
83      * @param out The underlying {@link OutputStream} to write to. Must not be
84      * {@code null}.
85      * @throws IOException
86      */
SnappyFramedOutputStream(OutputStream out)87     public SnappyFramedOutputStream(OutputStream out)
88             throws IOException
89     {
90         this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO, DefaultPoolFactory.getDefaultPool());
91     }
92 
93     /**
94      * Creates a new {@link SnappyFramedOutputStream} using the {@link #DEFAULT_BLOCK_SIZE}
95      * and {@link #DEFAULT_MIN_COMPRESSION_RATIO}.
96      *
97      * @param out The underlying {@link OutputStream} to write to. Must not be
98      * {@code null}.
99      * @param bufferPool Used to obtain buffer instances. Must not be {@code null}.
100      * @throws IOException
101      */
SnappyFramedOutputStream(OutputStream out, BufferPool bufferPool)102     public SnappyFramedOutputStream(OutputStream out, BufferPool bufferPool)
103             throws IOException
104     {
105         this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO, bufferPool);
106     }
107 
108     /**
109      * Creates a new {@link SnappyFramedOutputStream} instance.
110      * <p>
111      * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers.
112      * </p>
113      *
114      * @param out The underlying {@link OutputStream} to write to. Must not be
115      * {@code null}.
116      * @param blockSize The block size (of raw data) to compress before writing frames
117      * to <i>out</i>. Must be in (0, 65536].
118      * @param minCompressionRatio Defines the minimum compression ratio (
119      * {@code compressedLength / rawLength}) that must be achieved to
120      * write the compressed data. This must be in (0, 1.0].
121      * @throws IOException
122      */
SnappyFramedOutputStream(OutputStream out, int blockSize, double minCompressionRatio)123     public SnappyFramedOutputStream(OutputStream out, int blockSize,
124             double minCompressionRatio)
125             throws IOException
126     {
127         this(Channels.newChannel(out), blockSize, minCompressionRatio, DefaultPoolFactory.getDefaultPool());
128     }
129 
130     /**
131      * Creates a new {@link SnappyFramedOutputStream} instance.
132      *
133      * @param out The underlying {@link OutputStream} to write to. Must not be
134      * {@code null}.
135      * @param blockSize The block size (of raw data) to compress before writing frames
136      * to <i>out</i>. Must be in (0, 65536].
137      * @param minCompressionRatio Defines the minimum compression ratio (
138      * {@code compressedLength / rawLength}) that must be achieved to
139      * write the compressed data. This must be in (0, 1.0].
140      * @param bufferPool Used to obtain buffer instances. Must not be {@code null}.
141      * @throws IOException
142      */
SnappyFramedOutputStream(OutputStream out, int blockSize, double minCompressionRatio, BufferPool bufferPool)143     public SnappyFramedOutputStream(OutputStream out, int blockSize,
144             double minCompressionRatio, BufferPool bufferPool)
145             throws IOException
146     {
147         this(Channels.newChannel(out), blockSize, minCompressionRatio, bufferPool);
148     }
149 
150     /**
151      * Creates a new {@link SnappyFramedOutputStream} using the
152      * {@link #DEFAULT_BLOCK_SIZE} and {@link #DEFAULT_MIN_COMPRESSION_RATIO}.
153      * <p>
154      * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers.
155      * </p>
156      *
157      * @param out The underlying {@link WritableByteChannel} to write to. Must
158      * not be {@code null}.
159      * @throws IOException
160      * @since 1.1.1
161      */
SnappyFramedOutputStream(WritableByteChannel out)162     public SnappyFramedOutputStream(WritableByteChannel out)
163             throws IOException
164     {
165         this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO, DefaultPoolFactory.getDefaultPool());
166     }
167 
168     /**
169      * Creates a new {@link SnappyFramedOutputStream} using the
170      * {@link #DEFAULT_BLOCK_SIZE} and {@link #DEFAULT_MIN_COMPRESSION_RATIO}.
171      * <p>
172      * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers.
173      * </p>
174      *
175      * @param out The underlying {@link WritableByteChannel} to write to. Must
176      * not be {@code null}.
177      * @param bufferPool Used to obtain buffer instances. Must not be {@code null}.
178      * @throws IOException
179      */
SnappyFramedOutputStream(WritableByteChannel out, BufferPool bufferPool)180     public SnappyFramedOutputStream(WritableByteChannel out, BufferPool bufferPool)
181             throws IOException
182     {
183         this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO, bufferPool);
184     }
185 
186     /**
187      * Creates a new {@link SnappyFramedOutputStream} instance.
188      *
189      * @param out The underlying {@link WritableByteChannel} to write to. Must
190      * not be {@code null}.
191      * @param blockSize The block size (of raw data) to compress before writing frames
192      * to <i>out</i>. Must be in (0, 65536].
193      * @param minCompressionRatio Defines the minimum compression ratio (
194      * {@code compressedLength / rawLength}) that must be achieved to
195      * write the compressed data. This must be in (0, 1.0].
196      * @throws IOException
197      * @since 1.1.1
198      */
SnappyFramedOutputStream(WritableByteChannel out, int blockSize, double minCompressionRatio)199     public SnappyFramedOutputStream(WritableByteChannel out, int blockSize,
200             double minCompressionRatio)
201             throws IOException
202     {
203         this(out, blockSize, minCompressionRatio, DefaultPoolFactory.getDefaultPool());
204     }
205 
206     /**
207      * Creates a new {@link SnappyFramedOutputStream} instance.
208      *
209      * @param out The underlying {@link WritableByteChannel} to write to. Must
210      * not be {@code null}.
211      * @param blockSize The block size (of raw data) to compress before writing frames
212      * to <i>out</i>. Must be in (0, 65536].
213      * @param minCompressionRatio Defines the minimum compression ratio (
214      * {@code compressedLength / rawLength}) that must be achieved to
215      * write the compressed data. This must be in (0, 1.0].
216      * @param bufferPool Used to obtain buffer instances. Must not be {@code null}.
217      * @throws IOException
218      * @since 1.1.1
219      */
SnappyFramedOutputStream(WritableByteChannel out, int blockSize, double minCompressionRatio, BufferPool bufferPool)220     public SnappyFramedOutputStream(WritableByteChannel out, int blockSize,
221             double minCompressionRatio, BufferPool bufferPool)
222             throws IOException
223     {
224         if (out == null) {
225             throw new NullPointerException("out is null");
226         }
227 
228         if (bufferPool == null) {
229             throw new NullPointerException("buffer pool is null");
230         }
231 
232         if (minCompressionRatio <= 0 || minCompressionRatio > 1.0) {
233             throw new IllegalArgumentException("minCompressionRatio "
234                     + minCompressionRatio + " must be in (0,1.0]");
235         }
236 
237         if (blockSize <= 0 || blockSize > MAX_BLOCK_SIZE) {
238             throw new IllegalArgumentException("block size " + blockSize
239                     + " must be in (0, 65536]");
240         }
241         this.blockSize = blockSize;
242         this.out = out;
243         this.minCompressionRatio = minCompressionRatio;
244 
245         this.bufferPool = bufferPool;
246         buffer = ByteBuffer.wrap(bufferPool.allocateArray(blockSize), 0, blockSize);
247         directInputBuffer = bufferPool.allocateDirect(blockSize);
248         outputBuffer = bufferPool.allocateDirect(Snappy
249                 .maxCompressedLength(blockSize));
250 
251         writeHeader(out);
252     }
253 
254     /**
255      * Writes the implementation specific header or "marker bytes" to
256      * <i>out</i>.
257      *
258      * @param out The underlying {@link OutputStream}.
259      * @throws IOException
260      */
writeHeader(WritableByteChannel out)261     private void writeHeader(WritableByteChannel out)
262             throws IOException
263     {
264         out.write(ByteBuffer.wrap(HEADER_BYTES));
265     }
266 
267     /**
268      * {@inheritDoc}
269      */
270     @Override
isOpen()271     public boolean isOpen()
272     {
273         return !closed;
274     }
275 
276     @Override
write(int b)277     public void write(int b)
278             throws IOException
279     {
280         if (closed) {
281             throw new IOException("Stream is closed");
282         }
283         if (buffer.remaining() <= 0) {
284             flushBuffer();
285         }
286         buffer.put((byte) b);
287     }
288 
289     @Override
write(byte[] input, int offset, int length)290     public void write(byte[] input, int offset, int length)
291             throws IOException
292     {
293         if (closed) {
294             throw new IOException("Stream is closed");
295         }
296 
297         if (input == null) {
298             throw new NullPointerException();
299         }
300         else if ((offset < 0) || (offset > input.length) || (length < 0)
301                 || ((offset + length) > input.length)
302                 || ((offset + length) < 0)) {
303             throw new IndexOutOfBoundsException();
304         }
305 
306         while (length > 0) {
307             if (buffer.remaining() <= 0) {
308                 flushBuffer();
309             }
310 
311             final int toPut = Math.min(length, buffer.remaining());
312             buffer.put(input, offset, toPut);
313             offset += toPut;
314             length -= toPut;
315         }
316     }
317 
318     /**
319      * {@inheritDoc}
320      */
321     @Override
write(ByteBuffer src)322     public int write(ByteBuffer src)
323             throws IOException
324     {
325         if (closed) {
326             throw new ClosedChannelException();
327         }
328 
329         if (buffer.remaining() <= 0) {
330             flushBuffer();
331         }
332 
333         final int srcLength = src.remaining();
334 
335         // easy case: enough free space in buffer for entire input
336         if (buffer.remaining() >= src.remaining()) {
337             buffer.put(src);
338             return srcLength;
339         }
340 
341         // store current limit
342         final int srcEnd = src.position() + src.remaining();
343 
344         while ((src.position() + buffer.remaining()) <= srcEnd) {
345             // fill partial buffer as much as possible and flush
346             src.limit(src.position() + buffer.remaining());
347             buffer.put(src);
348             flushBuffer();
349         }
350 
351         // reset original limit
352         src.limit(srcEnd);
353 
354         // copy remaining partial block into now-empty buffer
355         buffer.put(src);
356 
357         return srcLength;
358     }
359 
360     /**
361      * Transfers all the content from <i>is</i> to this {@link OutputStream}.
362      * This potentially limits the amount of buffering required to compress
363      * content.
364      *
365      * @param is The source of data to compress.
366      * @return The number of bytes read from <i>is</i>.
367      * @throws IOException
368      * @since 1.1.1
369      */
transferFrom(InputStream is)370     public long transferFrom(InputStream is)
371             throws IOException
372     {
373         if (closed) {
374             throw new ClosedChannelException();
375         }
376 
377         if (is == null) {
378             throw new NullPointerException();
379         }
380 
381         if (buffer.remaining() == 0) {
382             flushBuffer();
383         }
384 
385         assert buffer.hasArray();
386         final byte[] bytes = buffer.array();
387 
388         final int arrayOffset = buffer.arrayOffset();
389         long totTransfered = 0;
390         int read;
391         while ((read = is.read(bytes, arrayOffset + buffer.position(),
392                 buffer.remaining())) != -1) {
393             buffer.position(buffer.position() + read);
394 
395             if (buffer.remaining() == 0) {
396                 flushBuffer();
397             }
398 
399             totTransfered += read;
400         }
401 
402         return totTransfered;
403     }
404 
405     /**
406      * Transfers all the content from <i>rbc</i> to this
407      * {@link WritableByteChannel}. This potentially limits the amount of
408      * buffering required to compress content.
409      *
410      * @param rbc The source of data to compress.
411      * @return The number of bytes read from <i>rbc</i>.
412      * @throws IOException
413      * @since 1.1.1
414      */
transferFrom(ReadableByteChannel rbc)415     public long transferFrom(ReadableByteChannel rbc)
416             throws IOException
417     {
418         if (closed) {
419             throw new ClosedChannelException();
420         }
421 
422         if (rbc == null) {
423             throw new NullPointerException();
424         }
425 
426         if (buffer.remaining() == 0) {
427             flushBuffer();
428         }
429 
430         long totTransfered = 0;
431         int read;
432         while ((read = rbc.read(buffer)) != -1) {
433             if (buffer.remaining() == 0) {
434                 flushBuffer();
435             }
436 
437             totTransfered += read;
438         }
439 
440         return totTransfered;
441     }
442 
443     @Override
flush()444     public final void flush()
445             throws IOException
446     {
447         if (closed) {
448             throw new IOException("Stream is closed");
449         }
450         flushBuffer();
451     }
452 
453     @Override
close()454     public final void close()
455             throws IOException
456     {
457         if (closed) {
458             return;
459         }
460         try {
461             flush();
462             out.close();
463         }
464         finally {
465             closed = true;
466             bufferPool.releaseArray(buffer.array());
467             bufferPool.releaseDirect(directInputBuffer);
468             bufferPool.releaseDirect(outputBuffer);
469         }
470     }
471 
472     /**
473      * Compresses and writes out any buffered data. This does nothing if there
474      * is no currently buffered data.
475      *
476      * @throws IOException
477      */
flushBuffer()478     private void flushBuffer()
479             throws IOException
480     {
481         if (buffer.position() > 0) {
482             buffer.flip();
483             writeCompressed(buffer);
484             buffer.clear();
485             buffer.limit(blockSize);
486         }
487     }
488 
489     /**
490      * {@link SnappyFramed#maskedCrc32c(byte[], int, int)} the crc, compresses
491      * the data, determines if the compression ratio is acceptable and calls
492      * {@link #writeBlock(java.nio.channels.WritableByteChannel, java.nio.ByteBuffer, boolean, int)} to
493      * actually write the frame.
494      *
495      * @param buffer
496      * @throws IOException
497      */
writeCompressed(ByteBuffer buffer)498     private void writeCompressed(ByteBuffer buffer)
499             throws IOException
500     {
501 
502         final byte[] input = buffer.array();
503         final int length = buffer.remaining();
504 
505         // crc is based on the user supplied input data
506         final int crc32c = maskedCrc32c(input, 0, length);
507 
508         directInputBuffer.clear();
509         directInputBuffer.put(buffer);
510         directInputBuffer.flip();
511 
512         outputBuffer.clear();
513         Snappy.compress(directInputBuffer, outputBuffer);
514 
515         final int compressedLength = outputBuffer.remaining();
516 
517         // only use the compressed data if compression ratio is <= the
518         // minCompressonRatio
519         if (((double) compressedLength / (double) length) <= minCompressionRatio) {
520             writeBlock(out, outputBuffer, true, crc32c);
521         }
522         else {
523             // otherwise use the uncompressed data.
524             buffer.flip();
525             writeBlock(out, buffer, false, crc32c);
526         }
527     }
528 
529     /**
530      * Write a frame (block) to <i>out</i>.
531      *
532      * @param out The {@link OutputStream} to write to.
533      * @param data The data to write.
534      * @param compressed Indicates if <i>data</i> is the compressed or raw content.
535      * This is based on whether the compression ratio desired is
536      * reached.
537      * @param crc32c The calculated checksum.
538      * @throws IOException
539      */
writeBlock(final WritableByteChannel out, ByteBuffer data, boolean compressed, int crc32c)540     private void writeBlock(final WritableByteChannel out, ByteBuffer data,
541             boolean compressed, int crc32c)
542             throws IOException
543     {
544 
545         headerBuffer.clear();
546         headerBuffer.put((byte) (compressed ? COMPRESSED_DATA_FLAG
547                 : UNCOMPRESSED_DATA_FLAG));
548 
549         // the length written out to the header is both the checksum and the
550         // frame
551         final int headerLength = data.remaining() + 4;
552 
553         // write length
554         headerBuffer.put((byte) headerLength);
555         headerBuffer.put((byte) (headerLength >>> 8));
556         headerBuffer.put((byte) (headerLength >>> 16));
557 
558         // write crc32c of user input data
559         headerBuffer.putInt(crc32c);
560 
561         headerBuffer.flip();
562 
563         // write the header
564         out.write(headerBuffer);
565         // write the raw data
566         out.write(data);
567     }
568 }
569