1 /* 2 * Copyright (c) 2015, Oracle and/or its affiliates. All rights reserved. 3 * 4 * This program is free software; you can redistribute it and/or modify 5 * it under the terms of the GNU General Public License, version 2.0, 6 * as published by the Free Software Foundation. 7 * 8 * This program is also distributed with certain software (including 9 * but not limited to OpenSSL) that is licensed under separate terms, 10 * as designated in a particular file or component or in included license 11 * documentation. The authors of MySQL hereby grant you an additional 12 * permission to link the program and your derivative works with the 13 * separately licensed software that they have included with MySQL. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 * GNU General Public License, version 2.0, for more details. 19 * 20 * You should have received a copy of the GNU General Public License 21 * along with this program; if not, write to the Free Software 22 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA 23 */ 24 25 package com.mysql.clusterj.tie; 26 27 import java.nio.ByteBuffer; 28 import java.lang.reflect.Field; 29 import java.util.Arrays; 30 import java.util.Map; 31 import java.util.TreeMap; 32 import java.util.concurrent.ConcurrentLinkedQueue; 33 34 import com.mysql.clusterj.ClusterJUserException; 35 import com.mysql.clusterj.core.util.I18NHelper; 36 import com.mysql.clusterj.core.util.Logger; 37 import com.mysql.clusterj.core.util.LoggerFactoryService; 38 39 import sun.misc.Cleaner; 40 41 /** 42 * This class implements a pool consisting of several size-based monotonically-growing queues of ByteBuffer. 43 * The number and sizes of the queues is determined by the constructor parameter int[] which 44 * specifies the sizes. 45 */ 46 class VariableByteBufferPoolImpl { 47 48 /** My message translator */ 49 static final I18NHelper local = I18NHelper 50 .getInstance(VariableByteBufferPoolImpl.class); 51 52 /** My logger */ 53 static final Logger logger = LoggerFactoryService.getFactory() 54 .getInstance(VariableByteBufferPoolImpl.class); 55 56 /** The queues of ByteBuffer */ 57 TreeMap<Integer, ConcurrentLinkedQueue<ByteBuffer>> queues; 58 59 /** The biggest size of any queue */ 60 int biggest = 0; 61 62 /** The cleaner method for non-pooled buffers */ 63 static Field cleanerField = null; 64 static { 65 try { 66 ByteBuffer buffer = ByteBuffer.allocateDirect(1); 67 cleanerField = buffer.getClass().getDeclaredField("cleaner"); 68 cleanerField.setAccessible(true); 69 } catch (Throwable t) { 70 String message = local.message("WARN_Buffer_Cleaning_Unusable", t.getClass().getName(), t.getMessage()); 71 logger.warn(message); 72 // cannot use cleaner 73 cleanerField = null; 74 } 75 } 76 77 /** Clean the non-pooled buffer after use. This frees the memory back to the system. Note that this 78 * only supports the current implementation of DirectByteBuffer and this support may change in future. 79 */ clean(ByteBuffer buffer)80 static void clean(ByteBuffer buffer) { 81 if (cleanerField != null) { 82 try { 83 ((Cleaner)cleanerField.get(buffer)).clean(); 84 } catch (Throwable t) { 85 // oh well 86 } 87 } 88 } 89 90 /** The guard initialization bytes. To enable the guard, change the size of the guard array. */ 91 static byte[] guard = new byte[0]; 92 93 /** Initialize the guard */ 94 static { 95 for (int i = 0; i < guard.length; ++i) { 96 guard[i] = (byte)10; 97 } 98 } 99 100 /** Initialize the guard bytes following the allocated data in the buffer. */ initializeGuard(ByteBuffer buffer)101 void initializeGuard(ByteBuffer buffer) { 102 // the buffer has guard.length extra bytes in it, initialized with the guard bytes 103 buffer.position(buffer.capacity() - guard.length); 104 buffer.put(guard); 105 buffer.clear(); 106 } 107 108 /** Check the guard bytes which immediately follow the data in the buffer. */ checkGuard(ByteBuffer buffer)109 void checkGuard(ByteBuffer buffer) { 110 // only check if there is a direct buffer that is still viable 111 if (buffer.limit() == 0) return; 112 // the buffer has guard.length extra bytes in it, initialized with the guard bytes 113 buffer.position(buffer.capacity() - guard.length); 114 for (int i = 0; i < guard.length; ++i) { 115 if (buffer.get() != guard[i]) { 116 throw new RuntimeException("ByteBufferPool failed guard test with buffer of length " + 117 (buffer.capacity() - guard.length) + ": " + buffer.toString()); 118 } 119 } 120 } 121 122 /** Construct empty queues based on maximum size buffer each queue will handle */ VariableByteBufferPoolImpl(int[] bufferSizes)123 public VariableByteBufferPoolImpl(int[] bufferSizes) { 124 queues = new TreeMap<Integer, ConcurrentLinkedQueue<ByteBuffer>>(); 125 for (int bufferSize: bufferSizes) { 126 queues.put(bufferSize + 1, new ConcurrentLinkedQueue<ByteBuffer>()); 127 if (biggest < bufferSize) { 128 biggest = bufferSize; 129 } 130 } 131 logger.info(local.message("MSG_ByteBuffer_Pools_Initialized", Arrays.toString(bufferSizes))); 132 } 133 134 /** Borrow a buffer from the pool. The pool is the smallest that has buffers of the size needed. 135 * The buffer size is one less than the key because higherEntry is strictly higher. 136 * There is no method that returns the entry equal to or higher which is what we really want. 137 * If no buffer is in the pool, create a new one. 138 */ borrowBuffer(int sizeNeeded)139 public ByteBuffer borrowBuffer(int sizeNeeded) { 140 Map.Entry<Integer, ConcurrentLinkedQueue<ByteBuffer>> entry = queues.higherEntry(sizeNeeded); 141 ByteBuffer buffer = null; 142 if (entry == null) { 143 // oh no, we need a bigger size than any buffer pool, so log a message and direct allocate a buffer 144 if (logger.isDetailEnabled()) 145 logger.detail(local.message("MSG_Cannot_allocate_byte_buffer_from_pool", sizeNeeded, this.biggest)); 146 buffer = ByteBuffer.allocateDirect(sizeNeeded + guard.length); 147 initializeGuard(buffer); 148 return buffer; 149 } 150 ConcurrentLinkedQueue<ByteBuffer>pool = entry.getValue(); 151 int bufferSize = entry.getKey() - 1; 152 buffer = pool.poll(); 153 if (buffer == null) { 154 buffer = ByteBuffer.allocateDirect(bufferSize + guard.length); 155 initializeGuard(buffer); 156 } 157 // reuse buffer without initializing the guard 158 buffer.clear(); 159 return buffer; 160 } 161 162 /** Return a buffer to the pool. */ returnBuffer(int sizeNeeded, ByteBuffer buffer)163 public void returnBuffer(int sizeNeeded, ByteBuffer buffer) { 164 checkGuard(buffer); 165 Map.Entry<Integer, ConcurrentLinkedQueue<ByteBuffer>> entry = this.queues.higherEntry(sizeNeeded); 166 // if this buffer came from a pool, return it 167 if (entry != null) { 168 int bufferSize = entry.getKey() - 1; 169 ConcurrentLinkedQueue<ByteBuffer> pool = entry.getValue(); 170 pool.add(buffer); 171 } else { 172 // mark this buffer as unusable in case we ever see it again 173 buffer.limit(0); 174 // clean (deallocate memory) the buffer 175 clean(buffer); 176 } 177 } 178 179 } 180