1 package org.xerial.snappy.pool;
2 
3 import java.lang.ref.SoftReference;
4 import java.nio.ByteBuffer;
5 import java.util.concurrent.ConcurrentHashMap;
6 import java.util.concurrent.ConcurrentLinkedDeque;
7 import java.util.concurrent.ConcurrentMap;
8 
9 /**
10  * A {@link BufferPool} implementation which caches values at fixed sizes.
11  * <p>
12  * Pooled instances are held as {@link SoftReference} to allow GC if necessary.
13  * </p>
14  * <p>
15  * The current fixed sizes are calculated as follows:
16  * <ul>
17  * <li>Values < 4KB return 4KB</li>
18  * <li>4KB - 32KB to 2KB</li>
19  * <li>32KB - 512KB  to 16KB</li>
20  * <li>512KB - 2MB to 128KB</li>
21  * <li>2MB - 16MB to 512KB</li>
22  * <li>16MB - 128MB to 4MB</li>
23  * <li>128MB - 512MB to 16MB</li>
24  * <li>512MB - 1.5 GB to 128MB</li>
25  * <li>Values > 1.5GB return {@link Integer#MAX_VALUE}</li>
26  * </ul>
27  * </p>
28  * @author Brett Okken
29  */
30 public final class CachingBufferPool implements BufferPool {
31 
32     private static interface IntFunction<E> {
create(int size)33         public E create(int size);
34     }
35 
36     private static final IntFunction<byte[]> ARRAY_FUNCTION = new IntFunction<byte[]>() {
37         @Override
38         public byte[] create(int size) {
39             return new byte[size];
40         }
41     };
42 
43     private static final IntFunction<ByteBuffer> DBB_FUNCTION = new IntFunction<ByteBuffer>() {
44         @Override
45         public ByteBuffer create(int size) {
46             return ByteBuffer.allocateDirect(size);
47         }
48     };
49 
50     private static final CachingBufferPool INSTANCE = new CachingBufferPool();
51 
52     private final ConcurrentMap<Integer, ConcurrentLinkedDeque<SoftReference<byte[]>>> bytes = new ConcurrentHashMap<>();
53     private final ConcurrentMap<Integer, ConcurrentLinkedDeque<SoftReference<ByteBuffer>>> buffers = new ConcurrentHashMap<>();
54 
CachingBufferPool()55     private CachingBufferPool() {
56     }
57 
58     /**
59      * Returns instance of {@link CachingBufferPool} for using cached buffers.
60      * @return instance of {@link CachingBufferPool} for using cached buffers.
61      */
getInstance()62     public static BufferPool getInstance() {
63         return INSTANCE;
64     }
65 
66     /**
67      * {@inheritDoc}
68      */
69     @Override
allocateArray(int size)70     public byte[] allocateArray(int size) {
71         if (size <= 0) {
72             throw new IllegalArgumentException("size is invalid: " + size);
73         }
74 
75         return getOrCreate(size, bytes, ARRAY_FUNCTION);
76     }
77 
78     /**
79      * {@inheritDoc}
80      */
81     @Override
releaseArray(byte[] buffer)82     public void releaseArray(byte[] buffer) {
83         if (buffer == null) {
84             throw new IllegalArgumentException("buffer is null");
85         }
86         returnValue(buffer, buffer.length, bytes);
87     }
88 
89     /**
90      * {@inheritDoc}
91      */
92     @Override
allocateDirect(int size)93     public ByteBuffer allocateDirect(int size) {
94         if (size <= 0) {
95             throw new IllegalArgumentException("size is invalid: " + size);
96         }
97 
98         return getOrCreate(size, buffers, DBB_FUNCTION);
99     }
100 
101     /**
102      * {@inheritDoc}
103      */
104     @Override
releaseDirect(ByteBuffer buffer)105     public void releaseDirect(ByteBuffer buffer) {
106         if (buffer == null) {
107             throw new IllegalArgumentException("buffer is null");
108         }
109         buffer.clear();
110         returnValue(buffer, buffer.capacity(), buffers);
111     }
112 
getOrCreate(final int size, final ConcurrentMap<Integer, ConcurrentLinkedDeque<SoftReference<E>>> map, final IntFunction<E> creator)113     private static <E> E getOrCreate(final int size, final ConcurrentMap<Integer, ConcurrentLinkedDeque<SoftReference<E>>> map, final IntFunction<E> creator) {
114         assert size > 0;
115         final int adjustedSize = adjustSize(size);
116         final ConcurrentLinkedDeque<SoftReference<E>> queue = optimisticGetEntry(adjustedSize, map);
117         SoftReference<E> entry;
118         while ((entry = queue.pollFirst()) != null) {
119             final E val = entry.get();
120             if (val != null) {
121                 return val;
122             }
123         }
124 
125         return creator.create(adjustedSize);
126     }
127 
128     /*
129      * This is package scope to allow direct unit testing.
130      */
adjustSize(int size)131     static int adjustSize(int size) {
132         assert size > 0;
133 
134         switch (Integer.numberOfLeadingZeros(size)) {
135             case 1:  // 1GB - 2GB
136             case 2:  // 512MB
137                 //if 512MB - 1.5 GB round to nearest 128 MB (2^27), else Integer.MAX_VALUE
138                 return size <= 0x6000_0000 ? roundToPowers(size, 27) : Integer.MAX_VALUE;
139             case 3:  //256MB
140             case 4:  //128MB
141                 //if 128MB - 512MB, round to nearest 16 MB
142                 return roundToPowers(size, 24);
143             case 5:  // 64MB
144             case 6:  // 32MB
145             case 7:  // 16MB
146                 //if 16MB - 128MB, round to nearest 4MB
147                 return roundToPowers(size, 22);
148             case 8:  //  8MB
149             case 9:  //  4MB
150             case 10: //  2MB
151                 //if 2MB - 16MB, round to nearest 512KB
152                 return roundToPowers(size, 19);
153             case 11: //  1MB
154             case 12: //512KB
155                 //if 512KB - 2MB, round to nearest 128KB
156                 return roundToPowers(size, 17);
157             case 13: //256KB
158             case 14: //128KB
159             case 15: // 64KB
160             case 16: // 32KB
161                 //if 32KB to 512KB, round to nearest 16KB
162                 return roundToPowers(size, 14);
163             case 17: // 16KB
164             case 18: //  8KB
165             case 19: //  4KB
166                 // if 4KB - 32KB, round to nearest 2KB
167                 return roundToPowers(size, 11);
168             default:
169                 return 4 * 1024;
170         }
171     }
172 
roundToPowers(int number, int bits)173     private static int roundToPowers(int number, int bits) {
174         final int mask = (0x7FFF_FFFF >> bits) << bits;
175         final int floor = number & mask;
176         return floor == number ? number : floor + (1 << bits);
177     }
178 
optimisticGetEntry(Integer key, ConcurrentMap<Integer, ConcurrentLinkedDeque<SoftReference<E>>> map)179     private static <E> ConcurrentLinkedDeque<SoftReference<E>> optimisticGetEntry(Integer key, ConcurrentMap<Integer, ConcurrentLinkedDeque<SoftReference<E>>> map) {
180         ConcurrentLinkedDeque<SoftReference<E>> val = map.get(key);
181         if (val == null) {
182             map.putIfAbsent(key, new ConcurrentLinkedDeque<SoftReference<E>>());
183             val = map.get(key);
184         }
185         return val;
186     }
187 
returnValue(E value, Integer size, ConcurrentMap<Integer, ConcurrentLinkedDeque<SoftReference<E>>> map)188     private static <E> void returnValue(E value, Integer size, ConcurrentMap<Integer, ConcurrentLinkedDeque<SoftReference<E>>> map) {
189         final ConcurrentLinkedDeque<SoftReference<E>> queue = map.get(size);
190         //no queue will exist if buffer was not originally obtained from this class
191         if (queue != null) {
192             //push this value onto deque first so that concurrent request can use it
193             queue.addFirst(new SoftReference<E>(value));
194 
195             //purge oldest entries have lost references
196             SoftReference<E> entry;
197             boolean lastEmpty = true;
198             while(lastEmpty && (entry = queue.peekLast()) != null) {
199                 if (entry.get() == null) {
200                     queue.removeLastOccurrence(entry);
201                 } else {
202                     lastEmpty = false;
203                 }
204             }
205         }
206     }
207 
208     /**
209      * {@inheritDoc}
210      */
211     @Override
toString()212     public String toString() {
213         return "CachingBufferPool [bytes=" + this.bytes + ", buffers=" + this.buffers + "]";
214     }
215 }
216 
217