1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.crypto; 19 20 import java.io.FileDescriptor; 21 import java.io.FileInputStream; 22 import java.io.FilterInputStream; 23 import java.io.IOException; 24 import java.io.InputStream; 25 import java.nio.ByteBuffer; 26 import java.nio.channels.ReadableByteChannel; 27 import java.security.GeneralSecurityException; 28 import java.util.EnumSet; 29 import java.util.Queue; 30 import java.util.concurrent.ConcurrentLinkedQueue; 31 32 import org.apache.hadoop.classification.InterfaceAudience; 33 import org.apache.hadoop.classification.InterfaceStability; 34 import org.apache.hadoop.fs.ByteBufferReadable; 35 import org.apache.hadoop.fs.CanSetDropBehind; 36 import org.apache.hadoop.fs.CanSetReadahead; 37 import org.apache.hadoop.fs.HasEnhancedByteBufferAccess; 38 import org.apache.hadoop.fs.HasFileDescriptor; 39 import org.apache.hadoop.fs.PositionedReadable; 40 import org.apache.hadoop.fs.ReadOption; 41 import org.apache.hadoop.fs.Seekable; 42 import org.apache.hadoop.io.ByteBufferPool; 43 44 import com.google.common.base.Preconditions; 45 46 /** 47 * CryptoInputStream decrypts data. It is not thread-safe. AES CTR mode is 48 * required in order to ensure that the plain text and cipher text have a 1:1 49 * mapping. The decryption is buffer based. The key points of the decryption 50 * are (1) calculating the counter and (2) padding through stream position: 51 * <p/> 52 * counter = base + pos/(algorithm blocksize); 53 * padding = pos%(algorithm blocksize); 54 * <p/> 55 * The underlying stream offset is maintained as state. 56 */ 57 @InterfaceAudience.Private 58 @InterfaceStability.Evolving 59 public class CryptoInputStream extends FilterInputStream implements 60 Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor, 61 CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess, 62 ReadableByteChannel { 63 private final byte[] oneByteBuf = new byte[1]; 64 private final CryptoCodec codec; 65 private final Decryptor decryptor; 66 private final int bufferSize; 67 68 /** 69 * Input data buffer. The data starts at inBuffer.position() and ends at 70 * to inBuffer.limit(). 71 */ 72 private ByteBuffer inBuffer; 73 74 /** 75 * The decrypted data buffer. The data starts at outBuffer.position() and 76 * ends at outBuffer.limit(); 77 */ 78 private ByteBuffer outBuffer; 79 private long streamOffset = 0; // Underlying stream offset. 80 81 /** 82 * Whether the underlying stream supports 83 * {@link org.apache.hadoop.fs.ByteBufferReadable} 84 */ 85 private Boolean usingByteBufferRead = null; 86 87 /** 88 * Padding = pos%(algorithm blocksize); Padding is put into {@link #inBuffer} 89 * before any other data goes in. The purpose of padding is to put the input 90 * data at proper position. 91 */ 92 private byte padding; 93 private boolean closed; 94 private final byte[] key; 95 private final byte[] initIV; 96 private byte[] iv; 97 private final boolean isByteBufferReadable; 98 private final boolean isReadableByteChannel; 99 100 /** DirectBuffer pool */ 101 private final Queue<ByteBuffer> bufferPool = 102 new ConcurrentLinkedQueue<ByteBuffer>(); 103 /** Decryptor pool */ 104 private final Queue<Decryptor> decryptorPool = 105 new ConcurrentLinkedQueue<Decryptor>(); 106 CryptoInputStream(InputStream in, CryptoCodec codec, int bufferSize, byte[] key, byte[] iv)107 public CryptoInputStream(InputStream in, CryptoCodec codec, 108 int bufferSize, byte[] key, byte[] iv) throws IOException { 109 this(in, codec, bufferSize, key, iv, 110 CryptoStreamUtils.getInputStreamOffset(in)); 111 } 112 CryptoInputStream(InputStream in, CryptoCodec codec, int bufferSize, byte[] key, byte[] iv, long streamOffset)113 public CryptoInputStream(InputStream in, CryptoCodec codec, 114 int bufferSize, byte[] key, byte[] iv, long streamOffset) throws IOException { 115 super(in); 116 CryptoStreamUtils.checkCodec(codec); 117 this.bufferSize = CryptoStreamUtils.checkBufferSize(codec, bufferSize); 118 this.codec = codec; 119 this.key = key.clone(); 120 this.initIV = iv.clone(); 121 this.iv = iv.clone(); 122 this.streamOffset = streamOffset; 123 isByteBufferReadable = in instanceof ByteBufferReadable; 124 isReadableByteChannel = in instanceof ReadableByteChannel; 125 inBuffer = ByteBuffer.allocateDirect(this.bufferSize); 126 outBuffer = ByteBuffer.allocateDirect(this.bufferSize); 127 decryptor = getDecryptor(); 128 resetStreamOffset(streamOffset); 129 } 130 CryptoInputStream(InputStream in, CryptoCodec codec, byte[] key, byte[] iv)131 public CryptoInputStream(InputStream in, CryptoCodec codec, 132 byte[] key, byte[] iv) throws IOException { 133 this(in, codec, CryptoStreamUtils.getBufferSize(codec.getConf()), key, iv); 134 } 135 getWrappedStream()136 public InputStream getWrappedStream() { 137 return in; 138 } 139 140 /** 141 * Decryption is buffer based. 142 * If there is data in {@link #outBuffer}, then read it out of this buffer. 143 * If there is no data in {@link #outBuffer}, then read more from the 144 * underlying stream and do the decryption. 145 * @param b the buffer into which the decrypted data is read. 146 * @param off the buffer offset. 147 * @param len the maximum number of decrypted data bytes to read. 148 * @return int the total number of decrypted data bytes read into the buffer. 149 * @throws IOException 150 */ 151 @Override read(byte[] b, int off, int len)152 public int read(byte[] b, int off, int len) throws IOException { 153 checkStream(); 154 if (b == null) { 155 throw new NullPointerException(); 156 } else if (off < 0 || len < 0 || len > b.length - off) { 157 throw new IndexOutOfBoundsException(); 158 } else if (len == 0) { 159 return 0; 160 } 161 162 final int remaining = outBuffer.remaining(); 163 if (remaining > 0) { 164 int n = Math.min(len, remaining); 165 outBuffer.get(b, off, n); 166 return n; 167 } else { 168 int n = 0; 169 170 /* 171 * Check whether the underlying stream is {@link ByteBufferReadable}, 172 * it can avoid bytes copy. 173 */ 174 if (usingByteBufferRead == null) { 175 if (isByteBufferReadable || isReadableByteChannel) { 176 try { 177 n = isByteBufferReadable ? 178 ((ByteBufferReadable) in).read(inBuffer) : 179 ((ReadableByteChannel) in).read(inBuffer); 180 usingByteBufferRead = Boolean.TRUE; 181 } catch (UnsupportedOperationException e) { 182 usingByteBufferRead = Boolean.FALSE; 183 } 184 } else { 185 usingByteBufferRead = Boolean.FALSE; 186 } 187 if (!usingByteBufferRead) { 188 n = readFromUnderlyingStream(inBuffer); 189 } 190 } else { 191 if (usingByteBufferRead) { 192 n = isByteBufferReadable ? ((ByteBufferReadable) in).read(inBuffer) : 193 ((ReadableByteChannel) in).read(inBuffer); 194 } else { 195 n = readFromUnderlyingStream(inBuffer); 196 } 197 } 198 if (n <= 0) { 199 return n; 200 } 201 202 streamOffset += n; // Read n bytes 203 decrypt(decryptor, inBuffer, outBuffer, padding); 204 padding = afterDecryption(decryptor, inBuffer, streamOffset, iv); 205 n = Math.min(len, outBuffer.remaining()); 206 outBuffer.get(b, off, n); 207 return n; 208 } 209 } 210 211 /** Read data from underlying stream. */ readFromUnderlyingStream(ByteBuffer inBuffer)212 private int readFromUnderlyingStream(ByteBuffer inBuffer) throws IOException { 213 final int toRead = inBuffer.remaining(); 214 final byte[] tmp = getTmpBuf(); 215 final int n = in.read(tmp, 0, toRead); 216 if (n > 0) { 217 inBuffer.put(tmp, 0, n); 218 } 219 return n; 220 } 221 222 private byte[] tmpBuf; getTmpBuf()223 private byte[] getTmpBuf() { 224 if (tmpBuf == null) { 225 tmpBuf = new byte[bufferSize]; 226 } 227 return tmpBuf; 228 } 229 230 /** 231 * Do the decryption using inBuffer as input and outBuffer as output. 232 * Upon return, inBuffer is cleared; the decrypted data starts at 233 * outBuffer.position() and ends at outBuffer.limit(); 234 */ decrypt(Decryptor decryptor, ByteBuffer inBuffer, ByteBuffer outBuffer, byte padding)235 private void decrypt(Decryptor decryptor, ByteBuffer inBuffer, 236 ByteBuffer outBuffer, byte padding) throws IOException { 237 Preconditions.checkState(inBuffer.position() >= padding); 238 if(inBuffer.position() == padding) { 239 // There is no real data in inBuffer. 240 return; 241 } 242 inBuffer.flip(); 243 outBuffer.clear(); 244 decryptor.decrypt(inBuffer, outBuffer); 245 inBuffer.clear(); 246 outBuffer.flip(); 247 if (padding > 0) { 248 /* 249 * The plain text and cipher text have a 1:1 mapping, they start at the 250 * same position. 251 */ 252 outBuffer.position(padding); 253 } 254 } 255 256 /** 257 * This method is executed immediately after decryption. Check whether 258 * decryptor should be updated and recalculate padding if needed. 259 */ afterDecryption(Decryptor decryptor, ByteBuffer inBuffer, long position, byte[] iv)260 private byte afterDecryption(Decryptor decryptor, ByteBuffer inBuffer, 261 long position, byte[] iv) throws IOException { 262 byte padding = 0; 263 if (decryptor.isContextReset()) { 264 /* 265 * This code is generally not executed since the decryptor usually 266 * maintains decryption context (e.g. the counter) internally. However, 267 * some implementations can't maintain context so a re-init is necessary 268 * after each decryption call. 269 */ 270 updateDecryptor(decryptor, position, iv); 271 padding = getPadding(position); 272 inBuffer.position(padding); 273 } 274 return padding; 275 } 276 getCounter(long position)277 private long getCounter(long position) { 278 return position / codec.getCipherSuite().getAlgorithmBlockSize(); 279 } 280 getPadding(long position)281 private byte getPadding(long position) { 282 return (byte)(position % codec.getCipherSuite().getAlgorithmBlockSize()); 283 } 284 285 /** Calculate the counter and iv, update the decryptor. */ updateDecryptor(Decryptor decryptor, long position, byte[] iv)286 private void updateDecryptor(Decryptor decryptor, long position, byte[] iv) 287 throws IOException { 288 final long counter = getCounter(position); 289 codec.calculateIV(initIV, counter, iv); 290 decryptor.init(key, iv); 291 } 292 293 /** 294 * Reset the underlying stream offset; clear {@link #inBuffer} and 295 * {@link #outBuffer}. This Typically happens during {@link #seek(long)} 296 * or {@link #skip(long)}. 297 */ resetStreamOffset(long offset)298 private void resetStreamOffset(long offset) throws IOException { 299 streamOffset = offset; 300 inBuffer.clear(); 301 outBuffer.clear(); 302 outBuffer.limit(0); 303 updateDecryptor(decryptor, offset, iv); 304 padding = getPadding(offset); 305 inBuffer.position(padding); // Set proper position for input data. 306 } 307 308 @Override close()309 public void close() throws IOException { 310 if (closed) { 311 return; 312 } 313 314 super.close(); 315 freeBuffers(); 316 closed = true; 317 } 318 319 /** Positioned read. It is thread-safe */ 320 @Override read(long position, byte[] buffer, int offset, int length)321 public int read(long position, byte[] buffer, int offset, int length) 322 throws IOException { 323 checkStream(); 324 try { 325 final int n = ((PositionedReadable) in).read(position, buffer, offset, 326 length); 327 if (n > 0) { 328 // This operation does not change the current offset of the file 329 decrypt(position, buffer, offset, n); 330 } 331 332 return n; 333 } catch (ClassCastException e) { 334 throw new UnsupportedOperationException("This stream does not support " + 335 "positioned read."); 336 } 337 } 338 339 /** 340 * Decrypt length bytes in buffer starting at offset. Output is also put 341 * into buffer starting at offset. It is thread-safe. 342 */ decrypt(long position, byte[] buffer, int offset, int length)343 private void decrypt(long position, byte[] buffer, int offset, int length) 344 throws IOException { 345 ByteBuffer inBuffer = getBuffer(); 346 ByteBuffer outBuffer = getBuffer(); 347 Decryptor decryptor = null; 348 try { 349 decryptor = getDecryptor(); 350 byte[] iv = initIV.clone(); 351 updateDecryptor(decryptor, position, iv); 352 byte padding = getPadding(position); 353 inBuffer.position(padding); // Set proper position for input data. 354 355 int n = 0; 356 while (n < length) { 357 int toDecrypt = Math.min(length - n, inBuffer.remaining()); 358 inBuffer.put(buffer, offset + n, toDecrypt); 359 // Do decryption 360 decrypt(decryptor, inBuffer, outBuffer, padding); 361 362 outBuffer.get(buffer, offset + n, toDecrypt); 363 n += toDecrypt; 364 padding = afterDecryption(decryptor, inBuffer, position + n, iv); 365 } 366 } finally { 367 returnBuffer(inBuffer); 368 returnBuffer(outBuffer); 369 returnDecryptor(decryptor); 370 } 371 } 372 373 /** Positioned read fully. It is thread-safe */ 374 @Override readFully(long position, byte[] buffer, int offset, int length)375 public void readFully(long position, byte[] buffer, int offset, int length) 376 throws IOException { 377 checkStream(); 378 try { 379 ((PositionedReadable) in).readFully(position, buffer, offset, length); 380 if (length > 0) { 381 // This operation does not change the current offset of the file 382 decrypt(position, buffer, offset, length); 383 } 384 } catch (ClassCastException e) { 385 throw new UnsupportedOperationException("This stream does not support " + 386 "positioned readFully."); 387 } 388 } 389 390 @Override readFully(long position, byte[] buffer)391 public void readFully(long position, byte[] buffer) throws IOException { 392 readFully(position, buffer, 0, buffer.length); 393 } 394 395 /** Seek to a position. */ 396 @Override seek(long pos)397 public void seek(long pos) throws IOException { 398 Preconditions.checkArgument(pos >= 0, "Cannot seek to negative offset."); 399 checkStream(); 400 try { 401 /* 402 * If data of target pos in the underlying stream has already been read 403 * and decrypted in outBuffer, we just need to re-position outBuffer. 404 */ 405 if (pos <= streamOffset && pos >= (streamOffset - outBuffer.remaining())) { 406 int forward = (int) (pos - (streamOffset - outBuffer.remaining())); 407 if (forward > 0) { 408 outBuffer.position(outBuffer.position() + forward); 409 } 410 } else { 411 ((Seekable) in).seek(pos); 412 resetStreamOffset(pos); 413 } 414 } catch (ClassCastException e) { 415 throw new UnsupportedOperationException("This stream does not support " + 416 "seek."); 417 } 418 } 419 420 /** Skip n bytes */ 421 @Override skip(long n)422 public long skip(long n) throws IOException { 423 Preconditions.checkArgument(n >= 0, "Negative skip length."); 424 checkStream(); 425 426 if (n == 0) { 427 return 0; 428 } else if (n <= outBuffer.remaining()) { 429 int pos = outBuffer.position() + (int) n; 430 outBuffer.position(pos); 431 return n; 432 } else { 433 /* 434 * Subtract outBuffer.remaining() to see how many bytes we need to 435 * skip in the underlying stream. Add outBuffer.remaining() to the 436 * actual number of skipped bytes in the underlying stream to get the 437 * number of skipped bytes from the user's point of view. 438 */ 439 n -= outBuffer.remaining(); 440 long skipped = in.skip(n); 441 if (skipped < 0) { 442 skipped = 0; 443 } 444 long pos = streamOffset + skipped; 445 skipped += outBuffer.remaining(); 446 resetStreamOffset(pos); 447 return skipped; 448 } 449 } 450 451 /** Get underlying stream position. */ 452 @Override getPos()453 public long getPos() throws IOException { 454 checkStream(); 455 // Equals: ((Seekable) in).getPos() - outBuffer.remaining() 456 return streamOffset - outBuffer.remaining(); 457 } 458 459 /** ByteBuffer read. */ 460 @Override read(ByteBuffer buf)461 public int read(ByteBuffer buf) throws IOException { 462 checkStream(); 463 if (isByteBufferReadable || isReadableByteChannel) { 464 final int unread = outBuffer.remaining(); 465 if (unread > 0) { // Have unread decrypted data in buffer. 466 int toRead = buf.remaining(); 467 if (toRead <= unread) { 468 final int limit = outBuffer.limit(); 469 outBuffer.limit(outBuffer.position() + toRead); 470 buf.put(outBuffer); 471 outBuffer.limit(limit); 472 return toRead; 473 } else { 474 buf.put(outBuffer); 475 } 476 } 477 478 final int pos = buf.position(); 479 final int n = isByteBufferReadable ? ((ByteBufferReadable) in).read(buf) : 480 ((ReadableByteChannel) in).read(buf); 481 if (n > 0) { 482 streamOffset += n; // Read n bytes 483 decrypt(buf, n, pos); 484 } 485 486 if (n >= 0) { 487 return unread + n; 488 } else { 489 if (unread == 0) { 490 return -1; 491 } else { 492 return unread; 493 } 494 } 495 } else { 496 int n = 0; 497 if (buf.hasArray()) { 498 n = read(buf.array(), buf.position(), buf.remaining()); 499 if (n > 0) { 500 buf.position(buf.position() + n); 501 } 502 } else { 503 byte[] tmp = new byte[buf.remaining()]; 504 n = read(tmp); 505 if (n > 0) { 506 buf.put(tmp, 0, n); 507 } 508 } 509 return n; 510 } 511 } 512 513 /** 514 * Decrypt all data in buf: total n bytes from given start position. 515 * Output is also buf and same start position. 516 * buf.position() and buf.limit() should be unchanged after decryption. 517 */ decrypt(ByteBuffer buf, int n, int start)518 private void decrypt(ByteBuffer buf, int n, int start) 519 throws IOException { 520 final int pos = buf.position(); 521 final int limit = buf.limit(); 522 int len = 0; 523 while (len < n) { 524 buf.position(start + len); 525 buf.limit(start + len + Math.min(n - len, inBuffer.remaining())); 526 inBuffer.put(buf); 527 // Do decryption 528 try { 529 decrypt(decryptor, inBuffer, outBuffer, padding); 530 buf.position(start + len); 531 buf.limit(limit); 532 len += outBuffer.remaining(); 533 buf.put(outBuffer); 534 } finally { 535 padding = afterDecryption(decryptor, inBuffer, streamOffset - (n - len), iv); 536 } 537 } 538 buf.position(pos); 539 } 540 541 @Override available()542 public int available() throws IOException { 543 checkStream(); 544 545 return in.available() + outBuffer.remaining(); 546 } 547 548 @Override markSupported()549 public boolean markSupported() { 550 return false; 551 } 552 553 @Override mark(int readLimit)554 public void mark(int readLimit) { 555 } 556 557 @Override reset()558 public void reset() throws IOException { 559 throw new IOException("Mark/reset not supported"); 560 } 561 562 @Override seekToNewSource(long targetPos)563 public boolean seekToNewSource(long targetPos) throws IOException { 564 Preconditions.checkArgument(targetPos >= 0, 565 "Cannot seek to negative offset."); 566 checkStream(); 567 try { 568 boolean result = ((Seekable) in).seekToNewSource(targetPos); 569 resetStreamOffset(targetPos); 570 return result; 571 } catch (ClassCastException e) { 572 throw new UnsupportedOperationException("This stream does not support " + 573 "seekToNewSource."); 574 } 575 } 576 577 @Override read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts)578 public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, 579 EnumSet<ReadOption> opts) throws IOException, 580 UnsupportedOperationException { 581 checkStream(); 582 try { 583 if (outBuffer.remaining() > 0) { 584 // Have some decrypted data unread, need to reset. 585 ((Seekable) in).seek(getPos()); 586 resetStreamOffset(getPos()); 587 } 588 final ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in). 589 read(bufferPool, maxLength, opts); 590 if (buffer != null) { 591 final int n = buffer.remaining(); 592 if (n > 0) { 593 streamOffset += buffer.remaining(); // Read n bytes 594 final int pos = buffer.position(); 595 decrypt(buffer, n, pos); 596 } 597 } 598 return buffer; 599 } catch (ClassCastException e) { 600 throw new UnsupportedOperationException("This stream does not support " + 601 "enhanced byte buffer access."); 602 } 603 } 604 605 @Override releaseBuffer(ByteBuffer buffer)606 public void releaseBuffer(ByteBuffer buffer) { 607 try { 608 ((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer); 609 } catch (ClassCastException e) { 610 throw new UnsupportedOperationException("This stream does not support " + 611 "release buffer."); 612 } 613 } 614 615 @Override setReadahead(Long readahead)616 public void setReadahead(Long readahead) throws IOException, 617 UnsupportedOperationException { 618 try { 619 ((CanSetReadahead) in).setReadahead(readahead); 620 } catch (ClassCastException e) { 621 throw new UnsupportedOperationException("This stream does not support " + 622 "setting the readahead caching strategy."); 623 } 624 } 625 626 @Override setDropBehind(Boolean dropCache)627 public void setDropBehind(Boolean dropCache) throws IOException, 628 UnsupportedOperationException { 629 try { 630 ((CanSetDropBehind) in).setDropBehind(dropCache); 631 } catch (ClassCastException e) { 632 throw new UnsupportedOperationException("This stream does not " + 633 "support setting the drop-behind caching setting."); 634 } 635 } 636 637 @Override getFileDescriptor()638 public FileDescriptor getFileDescriptor() throws IOException { 639 if (in instanceof HasFileDescriptor) { 640 return ((HasFileDescriptor) in).getFileDescriptor(); 641 } else if (in instanceof FileInputStream) { 642 return ((FileInputStream) in).getFD(); 643 } else { 644 return null; 645 } 646 } 647 648 @Override read()649 public int read() throws IOException { 650 return (read(oneByteBuf, 0, 1) == -1) ? -1 : (oneByteBuf[0] & 0xff); 651 } 652 checkStream()653 private void checkStream() throws IOException { 654 if (closed) { 655 throw new IOException("Stream closed"); 656 } 657 } 658 659 /** Get direct buffer from pool */ getBuffer()660 private ByteBuffer getBuffer() { 661 ByteBuffer buffer = bufferPool.poll(); 662 if (buffer == null) { 663 buffer = ByteBuffer.allocateDirect(bufferSize); 664 } 665 666 return buffer; 667 } 668 669 /** Return direct buffer to pool */ returnBuffer(ByteBuffer buf)670 private void returnBuffer(ByteBuffer buf) { 671 if (buf != null) { 672 buf.clear(); 673 bufferPool.add(buf); 674 } 675 } 676 677 /** Forcibly free the direct buffers. */ freeBuffers()678 private void freeBuffers() { 679 CryptoStreamUtils.freeDB(inBuffer); 680 CryptoStreamUtils.freeDB(outBuffer); 681 cleanBufferPool(); 682 } 683 684 /** Clean direct buffer pool */ cleanBufferPool()685 private void cleanBufferPool() { 686 ByteBuffer buf; 687 while ((buf = bufferPool.poll()) != null) { 688 CryptoStreamUtils.freeDB(buf); 689 } 690 } 691 692 /** Get decryptor from pool */ getDecryptor()693 private Decryptor getDecryptor() throws IOException { 694 Decryptor decryptor = decryptorPool.poll(); 695 if (decryptor == null) { 696 try { 697 decryptor = codec.createDecryptor(); 698 } catch (GeneralSecurityException e) { 699 throw new IOException(e); 700 } 701 } 702 703 return decryptor; 704 } 705 706 /** Return decryptor to pool */ returnDecryptor(Decryptor decryptor)707 private void returnDecryptor(Decryptor decryptor) { 708 if (decryptor != null) { 709 decryptorPool.add(decryptor); 710 } 711 } 712 713 @Override isOpen()714 public boolean isOpen() { 715 return !closed; 716 } 717 } 718