1 /* 2 * Created: Apr 12, 2013 3 */ 4 package org.xerial.snappy; 5 6 import static org.xerial.snappy.SnappyFramed.COMPRESSED_DATA_FLAG; 7 import static org.xerial.snappy.SnappyFramed.HEADER_BYTES; 8 import static org.xerial.snappy.SnappyFramed.UNCOMPRESSED_DATA_FLAG; 9 import static org.xerial.snappy.SnappyFramed.maskedCrc32c; 10 11 import java.io.IOException; 12 import java.io.InputStream; 13 import java.io.OutputStream; 14 import java.nio.ByteBuffer; 15 import java.nio.ByteOrder; 16 import java.nio.channels.Channels; 17 import java.nio.channels.ClosedChannelException; 18 import java.nio.channels.ReadableByteChannel; 19 import java.nio.channels.WritableByteChannel; 20 21 import org.xerial.snappy.pool.BufferPool; 22 import org.xerial.snappy.pool.DefaultPoolFactory; 23 24 /** 25 * Implements the <a 26 * href="http://snappy.googlecode.com/svn/trunk/framing_format.txt" 27 * >x-snappy-framed</a> as an {@link OutputStream} and 28 * {@link WritableByteChannel}. 29 * 30 * @author Brett Okken 31 * @since 1.1.0 32 */ 33 public final class SnappyFramedOutputStream 34 extends OutputStream 35 implements 36 WritableByteChannel 37 { 38 39 /** 40 * The x-snappy-framed specification allows for a chunk size up to 41 * 16,777,211 bytes in length. However, it also goes on to state: 42 * <p> 43 * <code> 44 * We place an additional restriction that the uncompressed data in a chunk 45 * must be no longer than 65536 bytes. This allows consumers to easily use 46 * small fixed-size buffers. 47 * </code> 48 * </p> 49 */ 50 public static final int MAX_BLOCK_SIZE = 64 * 1024; 51 52 /** 53 * The default block size to use. 54 */ 55 public static final int DEFAULT_BLOCK_SIZE = MAX_BLOCK_SIZE; 56 57 /** 58 * The default min compression ratio to use. 59 */ 60 public static final double DEFAULT_MIN_COMPRESSION_RATIO = 0.85d; 61 62 private final ByteBuffer headerBuffer = ByteBuffer.allocate(8).order( 63 ByteOrder.LITTLE_ENDIAN); 64 private final BufferPool bufferPool; 65 private final int blockSize; 66 private final ByteBuffer buffer; 67 private final ByteBuffer directInputBuffer; 68 private final ByteBuffer outputBuffer; 69 private final double minCompressionRatio; 70 71 private final WritableByteChannel out; 72 73 // private int position; 74 private boolean closed; 75 76 /** 77 * Creates a new {@link SnappyFramedOutputStream} using the {@link #DEFAULT_BLOCK_SIZE} 78 * and {@link #DEFAULT_MIN_COMPRESSION_RATIO}. 79 * <p> 80 * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers. 81 * </p> 82 * 83 * @param out The underlying {@link OutputStream} to write to. Must not be 84 * {@code null}. 85 * @throws IOException 86 */ SnappyFramedOutputStream(OutputStream out)87 public SnappyFramedOutputStream(OutputStream out) 88 throws IOException 89 { 90 this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO, DefaultPoolFactory.getDefaultPool()); 91 } 92 93 /** 94 * Creates a new {@link SnappyFramedOutputStream} using the {@link #DEFAULT_BLOCK_SIZE} 95 * and {@link #DEFAULT_MIN_COMPRESSION_RATIO}. 96 * 97 * @param out The underlying {@link OutputStream} to write to. Must not be 98 * {@code null}. 99 * @param bufferPool Used to obtain buffer instances. Must not be {@code null}. 100 * @throws IOException 101 */ SnappyFramedOutputStream(OutputStream out, BufferPool bufferPool)102 public SnappyFramedOutputStream(OutputStream out, BufferPool bufferPool) 103 throws IOException 104 { 105 this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO, bufferPool); 106 } 107 108 /** 109 * Creates a new {@link SnappyFramedOutputStream} instance. 110 * <p> 111 * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers. 112 * </p> 113 * 114 * @param out The underlying {@link OutputStream} to write to. Must not be 115 * {@code null}. 116 * @param blockSize The block size (of raw data) to compress before writing frames 117 * to <i>out</i>. Must be in (0, 65536]. 118 * @param minCompressionRatio Defines the minimum compression ratio ( 119 * {@code compressedLength / rawLength}) that must be achieved to 120 * write the compressed data. This must be in (0, 1.0]. 121 * @throws IOException 122 */ SnappyFramedOutputStream(OutputStream out, int blockSize, double minCompressionRatio)123 public SnappyFramedOutputStream(OutputStream out, int blockSize, 124 double minCompressionRatio) 125 throws IOException 126 { 127 this(Channels.newChannel(out), blockSize, minCompressionRatio, DefaultPoolFactory.getDefaultPool()); 128 } 129 130 /** 131 * Creates a new {@link SnappyFramedOutputStream} instance. 132 * 133 * @param out The underlying {@link OutputStream} to write to. Must not be 134 * {@code null}. 135 * @param blockSize The block size (of raw data) to compress before writing frames 136 * to <i>out</i>. Must be in (0, 65536]. 137 * @param minCompressionRatio Defines the minimum compression ratio ( 138 * {@code compressedLength / rawLength}) that must be achieved to 139 * write the compressed data. This must be in (0, 1.0]. 140 * @param bufferPool Used to obtain buffer instances. Must not be {@code null}. 141 * @throws IOException 142 */ SnappyFramedOutputStream(OutputStream out, int blockSize, double minCompressionRatio, BufferPool bufferPool)143 public SnappyFramedOutputStream(OutputStream out, int blockSize, 144 double minCompressionRatio, BufferPool bufferPool) 145 throws IOException 146 { 147 this(Channels.newChannel(out), blockSize, minCompressionRatio, bufferPool); 148 } 149 150 /** 151 * Creates a new {@link SnappyFramedOutputStream} using the 152 * {@link #DEFAULT_BLOCK_SIZE} and {@link #DEFAULT_MIN_COMPRESSION_RATIO}. 153 * <p> 154 * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers. 155 * </p> 156 * 157 * @param out The underlying {@link WritableByteChannel} to write to. Must 158 * not be {@code null}. 159 * @throws IOException 160 * @since 1.1.1 161 */ SnappyFramedOutputStream(WritableByteChannel out)162 public SnappyFramedOutputStream(WritableByteChannel out) 163 throws IOException 164 { 165 this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO, DefaultPoolFactory.getDefaultPool()); 166 } 167 168 /** 169 * Creates a new {@link SnappyFramedOutputStream} using the 170 * {@link #DEFAULT_BLOCK_SIZE} and {@link #DEFAULT_MIN_COMPRESSION_RATIO}. 171 * <p> 172 * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers. 173 * </p> 174 * 175 * @param out The underlying {@link WritableByteChannel} to write to. Must 176 * not be {@code null}. 177 * @param bufferPool Used to obtain buffer instances. Must not be {@code null}. 178 * @throws IOException 179 */ SnappyFramedOutputStream(WritableByteChannel out, BufferPool bufferPool)180 public SnappyFramedOutputStream(WritableByteChannel out, BufferPool bufferPool) 181 throws IOException 182 { 183 this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO, bufferPool); 184 } 185 186 /** 187 * Creates a new {@link SnappyFramedOutputStream} instance. 188 * 189 * @param out The underlying {@link WritableByteChannel} to write to. Must 190 * not be {@code null}. 191 * @param blockSize The block size (of raw data) to compress before writing frames 192 * to <i>out</i>. Must be in (0, 65536]. 193 * @param minCompressionRatio Defines the minimum compression ratio ( 194 * {@code compressedLength / rawLength}) that must be achieved to 195 * write the compressed data. This must be in (0, 1.0]. 196 * @throws IOException 197 * @since 1.1.1 198 */ SnappyFramedOutputStream(WritableByteChannel out, int blockSize, double minCompressionRatio)199 public SnappyFramedOutputStream(WritableByteChannel out, int blockSize, 200 double minCompressionRatio) 201 throws IOException 202 { 203 this(out, blockSize, minCompressionRatio, DefaultPoolFactory.getDefaultPool()); 204 } 205 206 /** 207 * Creates a new {@link SnappyFramedOutputStream} instance. 208 * 209 * @param out The underlying {@link WritableByteChannel} to write to. Must 210 * not be {@code null}. 211 * @param blockSize The block size (of raw data) to compress before writing frames 212 * to <i>out</i>. Must be in (0, 65536]. 213 * @param minCompressionRatio Defines the minimum compression ratio ( 214 * {@code compressedLength / rawLength}) that must be achieved to 215 * write the compressed data. This must be in (0, 1.0]. 216 * @param bufferPool Used to obtain buffer instances. Must not be {@code null}. 217 * @throws IOException 218 * @since 1.1.1 219 */ SnappyFramedOutputStream(WritableByteChannel out, int blockSize, double minCompressionRatio, BufferPool bufferPool)220 public SnappyFramedOutputStream(WritableByteChannel out, int blockSize, 221 double minCompressionRatio, BufferPool bufferPool) 222 throws IOException 223 { 224 if (out == null) { 225 throw new NullPointerException("out is null"); 226 } 227 228 if (bufferPool == null) { 229 throw new NullPointerException("buffer pool is null"); 230 } 231 232 if (minCompressionRatio <= 0 || minCompressionRatio > 1.0) { 233 throw new IllegalArgumentException("minCompressionRatio " 234 + minCompressionRatio + " must be in (0,1.0]"); 235 } 236 237 if (blockSize <= 0 || blockSize > MAX_BLOCK_SIZE) { 238 throw new IllegalArgumentException("block size " + blockSize 239 + " must be in (0, 65536]"); 240 } 241 this.blockSize = blockSize; 242 this.out = out; 243 this.minCompressionRatio = minCompressionRatio; 244 245 this.bufferPool = bufferPool; 246 buffer = ByteBuffer.wrap(bufferPool.allocateArray(blockSize), 0, blockSize); 247 directInputBuffer = bufferPool.allocateDirect(blockSize); 248 outputBuffer = bufferPool.allocateDirect(Snappy 249 .maxCompressedLength(blockSize)); 250 251 writeHeader(out); 252 } 253 254 /** 255 * Writes the implementation specific header or "marker bytes" to 256 * <i>out</i>. 257 * 258 * @param out The underlying {@link OutputStream}. 259 * @throws IOException 260 */ writeHeader(WritableByteChannel out)261 private void writeHeader(WritableByteChannel out) 262 throws IOException 263 { 264 out.write(ByteBuffer.wrap(HEADER_BYTES)); 265 } 266 267 /** 268 * {@inheritDoc} 269 */ 270 @Override isOpen()271 public boolean isOpen() 272 { 273 return !closed; 274 } 275 276 @Override write(int b)277 public void write(int b) 278 throws IOException 279 { 280 if (closed) { 281 throw new IOException("Stream is closed"); 282 } 283 if (buffer.remaining() <= 0) { 284 flushBuffer(); 285 } 286 buffer.put((byte) b); 287 } 288 289 @Override write(byte[] input, int offset, int length)290 public void write(byte[] input, int offset, int length) 291 throws IOException 292 { 293 if (closed) { 294 throw new IOException("Stream is closed"); 295 } 296 297 if (input == null) { 298 throw new NullPointerException(); 299 } 300 else if ((offset < 0) || (offset > input.length) || (length < 0) 301 || ((offset + length) > input.length) 302 || ((offset + length) < 0)) { 303 throw new IndexOutOfBoundsException(); 304 } 305 306 while (length > 0) { 307 if (buffer.remaining() <= 0) { 308 flushBuffer(); 309 } 310 311 final int toPut = Math.min(length, buffer.remaining()); 312 buffer.put(input, offset, toPut); 313 offset += toPut; 314 length -= toPut; 315 } 316 } 317 318 /** 319 * {@inheritDoc} 320 */ 321 @Override write(ByteBuffer src)322 public int write(ByteBuffer src) 323 throws IOException 324 { 325 if (closed) { 326 throw new ClosedChannelException(); 327 } 328 329 if (buffer.remaining() <= 0) { 330 flushBuffer(); 331 } 332 333 final int srcLength = src.remaining(); 334 335 // easy case: enough free space in buffer for entire input 336 if (buffer.remaining() >= src.remaining()) { 337 buffer.put(src); 338 return srcLength; 339 } 340 341 // store current limit 342 final int srcEnd = src.position() + src.remaining(); 343 344 while ((src.position() + buffer.remaining()) <= srcEnd) { 345 // fill partial buffer as much as possible and flush 346 src.limit(src.position() + buffer.remaining()); 347 buffer.put(src); 348 flushBuffer(); 349 } 350 351 // reset original limit 352 src.limit(srcEnd); 353 354 // copy remaining partial block into now-empty buffer 355 buffer.put(src); 356 357 return srcLength; 358 } 359 360 /** 361 * Transfers all the content from <i>is</i> to this {@link OutputStream}. 362 * This potentially limits the amount of buffering required to compress 363 * content. 364 * 365 * @param is The source of data to compress. 366 * @return The number of bytes read from <i>is</i>. 367 * @throws IOException 368 * @since 1.1.1 369 */ transferFrom(InputStream is)370 public long transferFrom(InputStream is) 371 throws IOException 372 { 373 if (closed) { 374 throw new ClosedChannelException(); 375 } 376 377 if (is == null) { 378 throw new NullPointerException(); 379 } 380 381 if (buffer.remaining() == 0) { 382 flushBuffer(); 383 } 384 385 assert buffer.hasArray(); 386 final byte[] bytes = buffer.array(); 387 388 final int arrayOffset = buffer.arrayOffset(); 389 long totTransfered = 0; 390 int read; 391 while ((read = is.read(bytes, arrayOffset + buffer.position(), 392 buffer.remaining())) != -1) { 393 buffer.position(buffer.position() + read); 394 395 if (buffer.remaining() == 0) { 396 flushBuffer(); 397 } 398 399 totTransfered += read; 400 } 401 402 return totTransfered; 403 } 404 405 /** 406 * Transfers all the content from <i>rbc</i> to this 407 * {@link WritableByteChannel}. This potentially limits the amount of 408 * buffering required to compress content. 409 * 410 * @param rbc The source of data to compress. 411 * @return The number of bytes read from <i>rbc</i>. 412 * @throws IOException 413 * @since 1.1.1 414 */ transferFrom(ReadableByteChannel rbc)415 public long transferFrom(ReadableByteChannel rbc) 416 throws IOException 417 { 418 if (closed) { 419 throw new ClosedChannelException(); 420 } 421 422 if (rbc == null) { 423 throw new NullPointerException(); 424 } 425 426 if (buffer.remaining() == 0) { 427 flushBuffer(); 428 } 429 430 long totTransfered = 0; 431 int read; 432 while ((read = rbc.read(buffer)) != -1) { 433 if (buffer.remaining() == 0) { 434 flushBuffer(); 435 } 436 437 totTransfered += read; 438 } 439 440 return totTransfered; 441 } 442 443 @Override flush()444 public final void flush() 445 throws IOException 446 { 447 if (closed) { 448 throw new IOException("Stream is closed"); 449 } 450 flushBuffer(); 451 } 452 453 @Override close()454 public final void close() 455 throws IOException 456 { 457 if (closed) { 458 return; 459 } 460 try { 461 flush(); 462 out.close(); 463 } 464 finally { 465 closed = true; 466 bufferPool.releaseArray(buffer.array()); 467 bufferPool.releaseDirect(directInputBuffer); 468 bufferPool.releaseDirect(outputBuffer); 469 } 470 } 471 472 /** 473 * Compresses and writes out any buffered data. This does nothing if there 474 * is no currently buffered data. 475 * 476 * @throws IOException 477 */ flushBuffer()478 private void flushBuffer() 479 throws IOException 480 { 481 if (buffer.position() > 0) { 482 buffer.flip(); 483 writeCompressed(buffer); 484 buffer.clear(); 485 buffer.limit(blockSize); 486 } 487 } 488 489 /** 490 * {@link SnappyFramed#maskedCrc32c(byte[], int, int)} the crc, compresses 491 * the data, determines if the compression ratio is acceptable and calls 492 * {@link #writeBlock(java.nio.channels.WritableByteChannel, java.nio.ByteBuffer, boolean, int)} to 493 * actually write the frame. 494 * 495 * @param buffer 496 * @throws IOException 497 */ writeCompressed(ByteBuffer buffer)498 private void writeCompressed(ByteBuffer buffer) 499 throws IOException 500 { 501 502 final byte[] input = buffer.array(); 503 final int length = buffer.remaining(); 504 505 // crc is based on the user supplied input data 506 final int crc32c = maskedCrc32c(input, 0, length); 507 508 directInputBuffer.clear(); 509 directInputBuffer.put(buffer); 510 directInputBuffer.flip(); 511 512 outputBuffer.clear(); 513 Snappy.compress(directInputBuffer, outputBuffer); 514 515 final int compressedLength = outputBuffer.remaining(); 516 517 // only use the compressed data if compression ratio is <= the 518 // minCompressonRatio 519 if (((double) compressedLength / (double) length) <= minCompressionRatio) { 520 writeBlock(out, outputBuffer, true, crc32c); 521 } 522 else { 523 // otherwise use the uncompressed data. 524 buffer.flip(); 525 writeBlock(out, buffer, false, crc32c); 526 } 527 } 528 529 /** 530 * Write a frame (block) to <i>out</i>. 531 * 532 * @param out The {@link OutputStream} to write to. 533 * @param data The data to write. 534 * @param compressed Indicates if <i>data</i> is the compressed or raw content. 535 * This is based on whether the compression ratio desired is 536 * reached. 537 * @param crc32c The calculated checksum. 538 * @throws IOException 539 */ writeBlock(final WritableByteChannel out, ByteBuffer data, boolean compressed, int crc32c)540 private void writeBlock(final WritableByteChannel out, ByteBuffer data, 541 boolean compressed, int crc32c) 542 throws IOException 543 { 544 545 headerBuffer.clear(); 546 headerBuffer.put((byte) (compressed ? COMPRESSED_DATA_FLAG 547 : UNCOMPRESSED_DATA_FLAG)); 548 549 // the length written out to the header is both the checksum and the 550 // frame 551 final int headerLength = data.remaining() + 4; 552 553 // write length 554 headerBuffer.put((byte) headerLength); 555 headerBuffer.put((byte) (headerLength >>> 8)); 556 headerBuffer.put((byte) (headerLength >>> 16)); 557 558 // write crc32c of user input data 559 headerBuffer.putInt(crc32c); 560 561 headerBuffer.flip(); 562 563 // write the header 564 out.write(headerBuffer); 565 // write the raw data 566 out.write(data); 567 } 568 } 569