1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.crypto;
19 
20 import java.io.FileDescriptor;
21 import java.io.FileInputStream;
22 import java.io.FilterInputStream;
23 import java.io.IOException;
24 import java.io.InputStream;
25 import java.nio.ByteBuffer;
26 import java.nio.channels.ReadableByteChannel;
27 import java.security.GeneralSecurityException;
28 import java.util.EnumSet;
29 import java.util.Queue;
30 import java.util.concurrent.ConcurrentLinkedQueue;
31 
32 import org.apache.hadoop.classification.InterfaceAudience;
33 import org.apache.hadoop.classification.InterfaceStability;
34 import org.apache.hadoop.fs.ByteBufferReadable;
35 import org.apache.hadoop.fs.CanSetDropBehind;
36 import org.apache.hadoop.fs.CanSetReadahead;
37 import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
38 import org.apache.hadoop.fs.HasFileDescriptor;
39 import org.apache.hadoop.fs.PositionedReadable;
40 import org.apache.hadoop.fs.ReadOption;
41 import org.apache.hadoop.fs.Seekable;
42 import org.apache.hadoop.io.ByteBufferPool;
43 
44 import com.google.common.base.Preconditions;
45 
46 /**
47  * CryptoInputStream decrypts data. It is not thread-safe. AES CTR mode is
48  * required in order to ensure that the plain text and cipher text have a 1:1
49  * mapping. The decryption is buffer based. The key points of the decryption
50  * are (1) calculating the counter and (2) padding through stream position:
51  * <p/>
52  * counter = base + pos/(algorithm blocksize);
53  * padding = pos%(algorithm blocksize);
54  * <p/>
55  * The underlying stream offset is maintained as state.
56  */
57 @InterfaceAudience.Private
58 @InterfaceStability.Evolving
59 public class CryptoInputStream extends FilterInputStream implements
60     Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor,
61     CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess,
62     ReadableByteChannel {
63   private final byte[] oneByteBuf = new byte[1];
64   private final CryptoCodec codec;
65   private final Decryptor decryptor;
66   private final int bufferSize;
67 
68   /**
69    * Input data buffer. The data starts at inBuffer.position() and ends at
70    * to inBuffer.limit().
71    */
72   private ByteBuffer inBuffer;
73 
74   /**
75    * The decrypted data buffer. The data starts at outBuffer.position() and
76    * ends at outBuffer.limit();
77    */
78   private ByteBuffer outBuffer;
79   private long streamOffset = 0; // Underlying stream offset.
80 
81   /**
82    * Whether the underlying stream supports
83    * {@link org.apache.hadoop.fs.ByteBufferReadable}
84    */
85   private Boolean usingByteBufferRead = null;
86 
87   /**
88    * Padding = pos%(algorithm blocksize); Padding is put into {@link #inBuffer}
89    * before any other data goes in. The purpose of padding is to put the input
90    * data at proper position.
91    */
92   private byte padding;
93   private boolean closed;
94   private final byte[] key;
95   private final byte[] initIV;
96   private byte[] iv;
97   private final boolean isByteBufferReadable;
98   private final boolean isReadableByteChannel;
99 
100   /** DirectBuffer pool */
101   private final Queue<ByteBuffer> bufferPool =
102       new ConcurrentLinkedQueue<ByteBuffer>();
103   /** Decryptor pool */
104   private final Queue<Decryptor> decryptorPool =
105       new ConcurrentLinkedQueue<Decryptor>();
106 
CryptoInputStream(InputStream in, CryptoCodec codec, int bufferSize, byte[] key, byte[] iv)107   public CryptoInputStream(InputStream in, CryptoCodec codec,
108       int bufferSize, byte[] key, byte[] iv) throws IOException {
109     this(in, codec, bufferSize, key, iv,
110         CryptoStreamUtils.getInputStreamOffset(in));
111   }
112 
CryptoInputStream(InputStream in, CryptoCodec codec, int bufferSize, byte[] key, byte[] iv, long streamOffset)113   public CryptoInputStream(InputStream in, CryptoCodec codec,
114       int bufferSize, byte[] key, byte[] iv, long streamOffset) throws IOException {
115     super(in);
116     CryptoStreamUtils.checkCodec(codec);
117     this.bufferSize = CryptoStreamUtils.checkBufferSize(codec, bufferSize);
118     this.codec = codec;
119     this.key = key.clone();
120     this.initIV = iv.clone();
121     this.iv = iv.clone();
122     this.streamOffset = streamOffset;
123     isByteBufferReadable = in instanceof ByteBufferReadable;
124     isReadableByteChannel = in instanceof ReadableByteChannel;
125     inBuffer = ByteBuffer.allocateDirect(this.bufferSize);
126     outBuffer = ByteBuffer.allocateDirect(this.bufferSize);
127     decryptor = getDecryptor();
128     resetStreamOffset(streamOffset);
129   }
130 
CryptoInputStream(InputStream in, CryptoCodec codec, byte[] key, byte[] iv)131   public CryptoInputStream(InputStream in, CryptoCodec codec,
132       byte[] key, byte[] iv) throws IOException {
133     this(in, codec, CryptoStreamUtils.getBufferSize(codec.getConf()), key, iv);
134   }
135 
getWrappedStream()136   public InputStream getWrappedStream() {
137     return in;
138   }
139 
140   /**
141    * Decryption is buffer based.
142    * If there is data in {@link #outBuffer}, then read it out of this buffer.
143    * If there is no data in {@link #outBuffer}, then read more from the
144    * underlying stream and do the decryption.
145    * @param b the buffer into which the decrypted data is read.
146    * @param off the buffer offset.
147    * @param len the maximum number of decrypted data bytes to read.
148    * @return int the total number of decrypted data bytes read into the buffer.
149    * @throws IOException
150    */
151   @Override
read(byte[] b, int off, int len)152   public int read(byte[] b, int off, int len) throws IOException {
153     checkStream();
154     if (b == null) {
155       throw new NullPointerException();
156     } else if (off < 0 || len < 0 || len > b.length - off) {
157       throw new IndexOutOfBoundsException();
158     } else if (len == 0) {
159       return 0;
160     }
161 
162     final int remaining = outBuffer.remaining();
163     if (remaining > 0) {
164       int n = Math.min(len, remaining);
165       outBuffer.get(b, off, n);
166       return n;
167     } else {
168       int n = 0;
169 
170       /*
171        * Check whether the underlying stream is {@link ByteBufferReadable},
172        * it can avoid bytes copy.
173        */
174       if (usingByteBufferRead == null) {
175         if (isByteBufferReadable || isReadableByteChannel) {
176           try {
177             n = isByteBufferReadable ?
178                 ((ByteBufferReadable) in).read(inBuffer) :
179                   ((ReadableByteChannel) in).read(inBuffer);
180             usingByteBufferRead = Boolean.TRUE;
181           } catch (UnsupportedOperationException e) {
182             usingByteBufferRead = Boolean.FALSE;
183           }
184         } else {
185           usingByteBufferRead = Boolean.FALSE;
186         }
187         if (!usingByteBufferRead) {
188           n = readFromUnderlyingStream(inBuffer);
189         }
190       } else {
191         if (usingByteBufferRead) {
192           n = isByteBufferReadable ? ((ByteBufferReadable) in).read(inBuffer) :
193                 ((ReadableByteChannel) in).read(inBuffer);
194         } else {
195           n = readFromUnderlyingStream(inBuffer);
196         }
197       }
198       if (n <= 0) {
199         return n;
200       }
201 
202       streamOffset += n; // Read n bytes
203       decrypt(decryptor, inBuffer, outBuffer, padding);
204       padding = afterDecryption(decryptor, inBuffer, streamOffset, iv);
205       n = Math.min(len, outBuffer.remaining());
206       outBuffer.get(b, off, n);
207       return n;
208     }
209   }
210 
211   /** Read data from underlying stream. */
readFromUnderlyingStream(ByteBuffer inBuffer)212   private int readFromUnderlyingStream(ByteBuffer inBuffer) throws IOException {
213     final int toRead = inBuffer.remaining();
214     final byte[] tmp = getTmpBuf();
215     final int n = in.read(tmp, 0, toRead);
216     if (n > 0) {
217       inBuffer.put(tmp, 0, n);
218     }
219     return n;
220   }
221 
222   private byte[] tmpBuf;
getTmpBuf()223   private byte[] getTmpBuf() {
224     if (tmpBuf == null) {
225       tmpBuf = new byte[bufferSize];
226     }
227     return tmpBuf;
228   }
229 
230   /**
231    * Do the decryption using inBuffer as input and outBuffer as output.
232    * Upon return, inBuffer is cleared; the decrypted data starts at
233    * outBuffer.position() and ends at outBuffer.limit();
234    */
decrypt(Decryptor decryptor, ByteBuffer inBuffer, ByteBuffer outBuffer, byte padding)235   private void decrypt(Decryptor decryptor, ByteBuffer inBuffer,
236       ByteBuffer outBuffer, byte padding) throws IOException {
237     Preconditions.checkState(inBuffer.position() >= padding);
238     if(inBuffer.position() == padding) {
239       // There is no real data in inBuffer.
240       return;
241     }
242     inBuffer.flip();
243     outBuffer.clear();
244     decryptor.decrypt(inBuffer, outBuffer);
245     inBuffer.clear();
246     outBuffer.flip();
247     if (padding > 0) {
248       /*
249        * The plain text and cipher text have a 1:1 mapping, they start at the
250        * same position.
251        */
252       outBuffer.position(padding);
253     }
254   }
255 
256   /**
257    * This method is executed immediately after decryption. Check whether
258    * decryptor should be updated and recalculate padding if needed.
259    */
afterDecryption(Decryptor decryptor, ByteBuffer inBuffer, long position, byte[] iv)260   private byte afterDecryption(Decryptor decryptor, ByteBuffer inBuffer,
261       long position, byte[] iv) throws IOException {
262     byte padding = 0;
263     if (decryptor.isContextReset()) {
264       /*
265        * This code is generally not executed since the decryptor usually
266        * maintains decryption context (e.g. the counter) internally. However,
267        * some implementations can't maintain context so a re-init is necessary
268        * after each decryption call.
269        */
270       updateDecryptor(decryptor, position, iv);
271       padding = getPadding(position);
272       inBuffer.position(padding);
273     }
274     return padding;
275   }
276 
getCounter(long position)277   private long getCounter(long position) {
278     return position / codec.getCipherSuite().getAlgorithmBlockSize();
279   }
280 
getPadding(long position)281   private byte getPadding(long position) {
282     return (byte)(position % codec.getCipherSuite().getAlgorithmBlockSize());
283   }
284 
285   /** Calculate the counter and iv, update the decryptor. */
updateDecryptor(Decryptor decryptor, long position, byte[] iv)286   private void updateDecryptor(Decryptor decryptor, long position, byte[] iv)
287       throws IOException {
288     final long counter = getCounter(position);
289     codec.calculateIV(initIV, counter, iv);
290     decryptor.init(key, iv);
291   }
292 
293   /**
294    * Reset the underlying stream offset; clear {@link #inBuffer} and
295    * {@link #outBuffer}. This Typically happens during {@link #seek(long)}
296    * or {@link #skip(long)}.
297    */
resetStreamOffset(long offset)298   private void resetStreamOffset(long offset) throws IOException {
299     streamOffset = offset;
300     inBuffer.clear();
301     outBuffer.clear();
302     outBuffer.limit(0);
303     updateDecryptor(decryptor, offset, iv);
304     padding = getPadding(offset);
305     inBuffer.position(padding); // Set proper position for input data.
306   }
307 
308   @Override
close()309   public void close() throws IOException {
310     if (closed) {
311       return;
312     }
313 
314     super.close();
315     freeBuffers();
316     closed = true;
317   }
318 
319   /** Positioned read. It is thread-safe */
320   @Override
read(long position, byte[] buffer, int offset, int length)321   public int read(long position, byte[] buffer, int offset, int length)
322       throws IOException {
323     checkStream();
324     try {
325       final int n = ((PositionedReadable) in).read(position, buffer, offset,
326           length);
327       if (n > 0) {
328         // This operation does not change the current offset of the file
329         decrypt(position, buffer, offset, n);
330       }
331 
332       return n;
333     } catch (ClassCastException e) {
334       throw new UnsupportedOperationException("This stream does not support " +
335           "positioned read.");
336     }
337   }
338 
339   /**
340    * Decrypt length bytes in buffer starting at offset. Output is also put
341    * into buffer starting at offset. It is thread-safe.
342    */
decrypt(long position, byte[] buffer, int offset, int length)343   private void decrypt(long position, byte[] buffer, int offset, int length)
344       throws IOException {
345     ByteBuffer inBuffer = getBuffer();
346     ByteBuffer outBuffer = getBuffer();
347     Decryptor decryptor = null;
348     try {
349       decryptor = getDecryptor();
350       byte[] iv = initIV.clone();
351       updateDecryptor(decryptor, position, iv);
352       byte padding = getPadding(position);
353       inBuffer.position(padding); // Set proper position for input data.
354 
355       int n = 0;
356       while (n < length) {
357         int toDecrypt = Math.min(length - n, inBuffer.remaining());
358         inBuffer.put(buffer, offset + n, toDecrypt);
359         // Do decryption
360         decrypt(decryptor, inBuffer, outBuffer, padding);
361 
362         outBuffer.get(buffer, offset + n, toDecrypt);
363         n += toDecrypt;
364         padding = afterDecryption(decryptor, inBuffer, position + n, iv);
365       }
366     } finally {
367       returnBuffer(inBuffer);
368       returnBuffer(outBuffer);
369       returnDecryptor(decryptor);
370     }
371   }
372 
373   /** Positioned read fully. It is thread-safe */
374   @Override
readFully(long position, byte[] buffer, int offset, int length)375   public void readFully(long position, byte[] buffer, int offset, int length)
376       throws IOException {
377     checkStream();
378     try {
379       ((PositionedReadable) in).readFully(position, buffer, offset, length);
380       if (length > 0) {
381         // This operation does not change the current offset of the file
382         decrypt(position, buffer, offset, length);
383       }
384     } catch (ClassCastException e) {
385       throw new UnsupportedOperationException("This stream does not support " +
386           "positioned readFully.");
387     }
388   }
389 
390   @Override
readFully(long position, byte[] buffer)391   public void readFully(long position, byte[] buffer) throws IOException {
392     readFully(position, buffer, 0, buffer.length);
393   }
394 
395   /** Seek to a position. */
396   @Override
seek(long pos)397   public void seek(long pos) throws IOException {
398     Preconditions.checkArgument(pos >= 0, "Cannot seek to negative offset.");
399     checkStream();
400     try {
401       /*
402        * If data of target pos in the underlying stream has already been read
403        * and decrypted in outBuffer, we just need to re-position outBuffer.
404        */
405       if (pos <= streamOffset && pos >= (streamOffset - outBuffer.remaining())) {
406         int forward = (int) (pos - (streamOffset - outBuffer.remaining()));
407         if (forward > 0) {
408           outBuffer.position(outBuffer.position() + forward);
409         }
410       } else {
411         ((Seekable) in).seek(pos);
412         resetStreamOffset(pos);
413       }
414     } catch (ClassCastException e) {
415       throw new UnsupportedOperationException("This stream does not support " +
416           "seek.");
417     }
418   }
419 
420   /** Skip n bytes */
421   @Override
skip(long n)422   public long skip(long n) throws IOException {
423     Preconditions.checkArgument(n >= 0, "Negative skip length.");
424     checkStream();
425 
426     if (n == 0) {
427       return 0;
428     } else if (n <= outBuffer.remaining()) {
429       int pos = outBuffer.position() + (int) n;
430       outBuffer.position(pos);
431       return n;
432     } else {
433       /*
434        * Subtract outBuffer.remaining() to see how many bytes we need to
435        * skip in the underlying stream. Add outBuffer.remaining() to the
436        * actual number of skipped bytes in the underlying stream to get the
437        * number of skipped bytes from the user's point of view.
438        */
439       n -= outBuffer.remaining();
440       long skipped = in.skip(n);
441       if (skipped < 0) {
442         skipped = 0;
443       }
444       long pos = streamOffset + skipped;
445       skipped += outBuffer.remaining();
446       resetStreamOffset(pos);
447       return skipped;
448     }
449   }
450 
451   /** Get underlying stream position. */
452   @Override
getPos()453   public long getPos() throws IOException {
454     checkStream();
455     // Equals: ((Seekable) in).getPos() - outBuffer.remaining()
456     return streamOffset - outBuffer.remaining();
457   }
458 
459   /** ByteBuffer read. */
460   @Override
read(ByteBuffer buf)461   public int read(ByteBuffer buf) throws IOException {
462     checkStream();
463     if (isByteBufferReadable || isReadableByteChannel) {
464       final int unread = outBuffer.remaining();
465       if (unread > 0) { // Have unread decrypted data in buffer.
466         int toRead = buf.remaining();
467         if (toRead <= unread) {
468           final int limit = outBuffer.limit();
469           outBuffer.limit(outBuffer.position() + toRead);
470           buf.put(outBuffer);
471           outBuffer.limit(limit);
472           return toRead;
473         } else {
474           buf.put(outBuffer);
475         }
476       }
477 
478       final int pos = buf.position();
479       final int n = isByteBufferReadable ? ((ByteBufferReadable) in).read(buf) :
480             ((ReadableByteChannel) in).read(buf);
481       if (n > 0) {
482         streamOffset += n; // Read n bytes
483         decrypt(buf, n, pos);
484       }
485 
486       if (n >= 0) {
487         return unread + n;
488       } else {
489         if (unread == 0) {
490           return -1;
491         } else {
492           return unread;
493         }
494       }
495     } else {
496       int n = 0;
497       if (buf.hasArray()) {
498         n = read(buf.array(), buf.position(), buf.remaining());
499         if (n > 0) {
500           buf.position(buf.position() + n);
501         }
502       } else {
503         byte[] tmp = new byte[buf.remaining()];
504         n = read(tmp);
505         if (n > 0) {
506           buf.put(tmp, 0, n);
507         }
508       }
509       return n;
510     }
511   }
512 
513   /**
514    * Decrypt all data in buf: total n bytes from given start position.
515    * Output is also buf and same start position.
516    * buf.position() and buf.limit() should be unchanged after decryption.
517    */
decrypt(ByteBuffer buf, int n, int start)518   private void decrypt(ByteBuffer buf, int n, int start)
519       throws IOException {
520     final int pos = buf.position();
521     final int limit = buf.limit();
522     int len = 0;
523     while (len < n) {
524       buf.position(start + len);
525       buf.limit(start + len + Math.min(n - len, inBuffer.remaining()));
526       inBuffer.put(buf);
527       // Do decryption
528       try {
529         decrypt(decryptor, inBuffer, outBuffer, padding);
530         buf.position(start + len);
531         buf.limit(limit);
532         len += outBuffer.remaining();
533         buf.put(outBuffer);
534       } finally {
535         padding = afterDecryption(decryptor, inBuffer, streamOffset - (n - len), iv);
536       }
537     }
538     buf.position(pos);
539   }
540 
541   @Override
available()542   public int available() throws IOException {
543     checkStream();
544 
545     return in.available() + outBuffer.remaining();
546   }
547 
548   @Override
markSupported()549   public boolean markSupported() {
550     return false;
551   }
552 
553   @Override
mark(int readLimit)554   public void mark(int readLimit) {
555   }
556 
557   @Override
reset()558   public void reset() throws IOException {
559     throw new IOException("Mark/reset not supported");
560   }
561 
562   @Override
seekToNewSource(long targetPos)563   public boolean seekToNewSource(long targetPos) throws IOException {
564     Preconditions.checkArgument(targetPos >= 0,
565         "Cannot seek to negative offset.");
566     checkStream();
567     try {
568       boolean result = ((Seekable) in).seekToNewSource(targetPos);
569       resetStreamOffset(targetPos);
570       return result;
571     } catch (ClassCastException e) {
572       throw new UnsupportedOperationException("This stream does not support " +
573           "seekToNewSource.");
574     }
575   }
576 
577   @Override
read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts)578   public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
579       EnumSet<ReadOption> opts) throws IOException,
580       UnsupportedOperationException {
581     checkStream();
582     try {
583       if (outBuffer.remaining() > 0) {
584         // Have some decrypted data unread, need to reset.
585         ((Seekable) in).seek(getPos());
586         resetStreamOffset(getPos());
587       }
588       final ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in).
589           read(bufferPool, maxLength, opts);
590       if (buffer != null) {
591         final int n = buffer.remaining();
592         if (n > 0) {
593           streamOffset += buffer.remaining(); // Read n bytes
594           final int pos = buffer.position();
595           decrypt(buffer, n, pos);
596         }
597       }
598       return buffer;
599     } catch (ClassCastException e) {
600       throw new UnsupportedOperationException("This stream does not support " +
601           "enhanced byte buffer access.");
602     }
603   }
604 
605   @Override
releaseBuffer(ByteBuffer buffer)606   public void releaseBuffer(ByteBuffer buffer) {
607     try {
608       ((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer);
609     } catch (ClassCastException e) {
610       throw new UnsupportedOperationException("This stream does not support " +
611           "release buffer.");
612     }
613   }
614 
615   @Override
setReadahead(Long readahead)616   public void setReadahead(Long readahead) throws IOException,
617       UnsupportedOperationException {
618     try {
619       ((CanSetReadahead) in).setReadahead(readahead);
620     } catch (ClassCastException e) {
621       throw new UnsupportedOperationException("This stream does not support " +
622           "setting the readahead caching strategy.");
623     }
624   }
625 
626   @Override
setDropBehind(Boolean dropCache)627   public void setDropBehind(Boolean dropCache) throws IOException,
628       UnsupportedOperationException {
629     try {
630       ((CanSetDropBehind) in).setDropBehind(dropCache);
631     } catch (ClassCastException e) {
632       throw new UnsupportedOperationException("This stream does not " +
633           "support setting the drop-behind caching setting.");
634     }
635   }
636 
637   @Override
getFileDescriptor()638   public FileDescriptor getFileDescriptor() throws IOException {
639     if (in instanceof HasFileDescriptor) {
640       return ((HasFileDescriptor) in).getFileDescriptor();
641     } else if (in instanceof FileInputStream) {
642       return ((FileInputStream) in).getFD();
643     } else {
644       return null;
645     }
646   }
647 
648   @Override
read()649   public int read() throws IOException {
650     return (read(oneByteBuf, 0, 1) == -1) ? -1 : (oneByteBuf[0] & 0xff);
651   }
652 
checkStream()653   private void checkStream() throws IOException {
654     if (closed) {
655       throw new IOException("Stream closed");
656     }
657   }
658 
659   /** Get direct buffer from pool */
getBuffer()660   private ByteBuffer getBuffer() {
661     ByteBuffer buffer = bufferPool.poll();
662     if (buffer == null) {
663       buffer = ByteBuffer.allocateDirect(bufferSize);
664     }
665 
666     return buffer;
667   }
668 
669   /** Return direct buffer to pool */
returnBuffer(ByteBuffer buf)670   private void returnBuffer(ByteBuffer buf) {
671     if (buf != null) {
672       buf.clear();
673       bufferPool.add(buf);
674     }
675   }
676 
677   /** Forcibly free the direct buffers. */
freeBuffers()678   private void freeBuffers() {
679     CryptoStreamUtils.freeDB(inBuffer);
680     CryptoStreamUtils.freeDB(outBuffer);
681     cleanBufferPool();
682   }
683 
684   /** Clean direct buffer pool */
cleanBufferPool()685   private void cleanBufferPool() {
686     ByteBuffer buf;
687     while ((buf = bufferPool.poll()) != null) {
688       CryptoStreamUtils.freeDB(buf);
689     }
690   }
691 
692   /** Get decryptor from pool */
getDecryptor()693   private Decryptor getDecryptor() throws IOException {
694     Decryptor decryptor = decryptorPool.poll();
695     if (decryptor == null) {
696       try {
697         decryptor = codec.createDecryptor();
698       } catch (GeneralSecurityException e) {
699         throw new IOException(e);
700       }
701     }
702 
703     return decryptor;
704   }
705 
706   /** Return decryptor to pool */
returnDecryptor(Decryptor decryptor)707   private void returnDecryptor(Decryptor decryptor) {
708     if (decryptor != null) {
709       decryptorPool.add(decryptor);
710     }
711   }
712 
713   @Override
isOpen()714   public boolean isOpen() {
715     return !closed;
716   }
717 }
718