1 package org.xerial.snappy.pool; 2 3 import java.lang.ref.SoftReference; 4 import java.nio.ByteBuffer; 5 import java.util.concurrent.ConcurrentHashMap; 6 import java.util.concurrent.ConcurrentLinkedDeque; 7 import java.util.concurrent.ConcurrentMap; 8 9 /** 10 * A {@link BufferPool} implementation which caches values at fixed sizes. 11 * <p> 12 * Pooled instances are held as {@link SoftReference} to allow GC if necessary. 13 * </p> 14 * <p> 15 * The current fixed sizes are calculated as follows: 16 * <ul> 17 * <li>Values < 4KB return 4KB</li> 18 * <li>4KB - 32KB to 2KB</li> 19 * <li>32KB - 512KB to 16KB</li> 20 * <li>512KB - 2MB to 128KB</li> 21 * <li>2MB - 16MB to 512KB</li> 22 * <li>16MB - 128MB to 4MB</li> 23 * <li>128MB - 512MB to 16MB</li> 24 * <li>512MB - 1.5 GB to 128MB</li> 25 * <li>Values > 1.5GB return {@link Integer#MAX_VALUE}</li> 26 * </ul> 27 * </p> 28 * @author Brett Okken 29 */ 30 public final class CachingBufferPool implements BufferPool { 31 32 private static interface IntFunction<E> { create(int size)33 public E create(int size); 34 } 35 36 private static final IntFunction<byte[]> ARRAY_FUNCTION = new IntFunction<byte[]>() { 37 @Override 38 public byte[] create(int size) { 39 return new byte[size]; 40 } 41 }; 42 43 private static final IntFunction<ByteBuffer> DBB_FUNCTION = new IntFunction<ByteBuffer>() { 44 @Override 45 public ByteBuffer create(int size) { 46 return ByteBuffer.allocateDirect(size); 47 } 48 }; 49 50 private static final CachingBufferPool INSTANCE = new CachingBufferPool(); 51 52 private final ConcurrentMap<Integer, ConcurrentLinkedDeque<SoftReference<byte[]>>> bytes = new ConcurrentHashMap<>(); 53 private final ConcurrentMap<Integer, ConcurrentLinkedDeque<SoftReference<ByteBuffer>>> buffers = new ConcurrentHashMap<>(); 54 CachingBufferPool()55 private CachingBufferPool() { 56 } 57 58 /** 59 * Returns instance of {@link CachingBufferPool} for using cached buffers. 60 * @return instance of {@link CachingBufferPool} for using cached buffers. 61 */ getInstance()62 public static BufferPool getInstance() { 63 return INSTANCE; 64 } 65 66 /** 67 * {@inheritDoc} 68 */ 69 @Override allocateArray(int size)70 public byte[] allocateArray(int size) { 71 if (size <= 0) { 72 throw new IllegalArgumentException("size is invalid: " + size); 73 } 74 75 return getOrCreate(size, bytes, ARRAY_FUNCTION); 76 } 77 78 /** 79 * {@inheritDoc} 80 */ 81 @Override releaseArray(byte[] buffer)82 public void releaseArray(byte[] buffer) { 83 if (buffer == null) { 84 throw new IllegalArgumentException("buffer is null"); 85 } 86 returnValue(buffer, buffer.length, bytes); 87 } 88 89 /** 90 * {@inheritDoc} 91 */ 92 @Override allocateDirect(int size)93 public ByteBuffer allocateDirect(int size) { 94 if (size <= 0) { 95 throw new IllegalArgumentException("size is invalid: " + size); 96 } 97 98 return getOrCreate(size, buffers, DBB_FUNCTION); 99 } 100 101 /** 102 * {@inheritDoc} 103 */ 104 @Override releaseDirect(ByteBuffer buffer)105 public void releaseDirect(ByteBuffer buffer) { 106 if (buffer == null) { 107 throw new IllegalArgumentException("buffer is null"); 108 } 109 buffer.clear(); 110 returnValue(buffer, buffer.capacity(), buffers); 111 } 112 getOrCreate(final int size, final ConcurrentMap<Integer, ConcurrentLinkedDeque<SoftReference<E>>> map, final IntFunction<E> creator)113 private static <E> E getOrCreate(final int size, final ConcurrentMap<Integer, ConcurrentLinkedDeque<SoftReference<E>>> map, final IntFunction<E> creator) { 114 assert size > 0; 115 final int adjustedSize = adjustSize(size); 116 final ConcurrentLinkedDeque<SoftReference<E>> queue = optimisticGetEntry(adjustedSize, map); 117 SoftReference<E> entry; 118 while ((entry = queue.pollFirst()) != null) { 119 final E val = entry.get(); 120 if (val != null) { 121 return val; 122 } 123 } 124 125 return creator.create(adjustedSize); 126 } 127 128 /* 129 * This is package scope to allow direct unit testing. 130 */ adjustSize(int size)131 static int adjustSize(int size) { 132 assert size > 0; 133 134 switch (Integer.numberOfLeadingZeros(size)) { 135 case 1: // 1GB - 2GB 136 case 2: // 512MB 137 //if 512MB - 1.5 GB round to nearest 128 MB (2^27), else Integer.MAX_VALUE 138 return size <= 0x6000_0000 ? roundToPowers(size, 27) : Integer.MAX_VALUE; 139 case 3: //256MB 140 case 4: //128MB 141 //if 128MB - 512MB, round to nearest 16 MB 142 return roundToPowers(size, 24); 143 case 5: // 64MB 144 case 6: // 32MB 145 case 7: // 16MB 146 //if 16MB - 128MB, round to nearest 4MB 147 return roundToPowers(size, 22); 148 case 8: // 8MB 149 case 9: // 4MB 150 case 10: // 2MB 151 //if 2MB - 16MB, round to nearest 512KB 152 return roundToPowers(size, 19); 153 case 11: // 1MB 154 case 12: //512KB 155 //if 512KB - 2MB, round to nearest 128KB 156 return roundToPowers(size, 17); 157 case 13: //256KB 158 case 14: //128KB 159 case 15: // 64KB 160 case 16: // 32KB 161 //if 32KB to 512KB, round to nearest 16KB 162 return roundToPowers(size, 14); 163 case 17: // 16KB 164 case 18: // 8KB 165 case 19: // 4KB 166 // if 4KB - 32KB, round to nearest 2KB 167 return roundToPowers(size, 11); 168 default: 169 return 4 * 1024; 170 } 171 } 172 roundToPowers(int number, int bits)173 private static int roundToPowers(int number, int bits) { 174 final int mask = (0x7FFF_FFFF >> bits) << bits; 175 final int floor = number & mask; 176 return floor == number ? number : floor + (1 << bits); 177 } 178 optimisticGetEntry(Integer key, ConcurrentMap<Integer, ConcurrentLinkedDeque<SoftReference<E>>> map)179 private static <E> ConcurrentLinkedDeque<SoftReference<E>> optimisticGetEntry(Integer key, ConcurrentMap<Integer, ConcurrentLinkedDeque<SoftReference<E>>> map) { 180 ConcurrentLinkedDeque<SoftReference<E>> val = map.get(key); 181 if (val == null) { 182 map.putIfAbsent(key, new ConcurrentLinkedDeque<SoftReference<E>>()); 183 val = map.get(key); 184 } 185 return val; 186 } 187 returnValue(E value, Integer size, ConcurrentMap<Integer, ConcurrentLinkedDeque<SoftReference<E>>> map)188 private static <E> void returnValue(E value, Integer size, ConcurrentMap<Integer, ConcurrentLinkedDeque<SoftReference<E>>> map) { 189 final ConcurrentLinkedDeque<SoftReference<E>> queue = map.get(size); 190 //no queue will exist if buffer was not originally obtained from this class 191 if (queue != null) { 192 //push this value onto deque first so that concurrent request can use it 193 queue.addFirst(new SoftReference<E>(value)); 194 195 //purge oldest entries have lost references 196 SoftReference<E> entry; 197 boolean lastEmpty = true; 198 while(lastEmpty && (entry = queue.peekLast()) != null) { 199 if (entry.get() == null) { 200 queue.removeLastOccurrence(entry); 201 } else { 202 lastEmpty = false; 203 } 204 } 205 } 206 } 207 208 /** 209 * {@inheritDoc} 210 */ 211 @Override toString()212 public String toString() { 213 return "CachingBufferPool [bytes=" + this.bytes + ", buffers=" + this.buffers + "]"; 214 } 215 } 216 217