1 /*
2  *  Copyright (c) 2015, Oracle and/or its affiliates. All rights reserved.
3  *
4  *  This program is free software; you can redistribute it and/or modify
5  *  it under the terms of the GNU General Public License, version 2.0,
6  *  as published by the Free Software Foundation.
7  *
8  *  This program is also distributed with certain software (including
9  *  but not limited to OpenSSL) that is licensed under separate terms,
10  *  as designated in a particular file or component or in included license
11  *  documentation.  The authors of MySQL hereby grant you an additional
12  *  permission to link the program and your derivative works with the
13  *  separately licensed software that they have included with MySQL.
14  *
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License, version 2.0, for more details.
19  *
20  *  You should have received a copy of the GNU General Public License
21  *  along with this program; if not, write to the Free Software
22  *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23  */
24 
25 package com.mysql.clusterj.tie;
26 
27 import java.nio.ByteBuffer;
28 import java.lang.reflect.Field;
29 import java.util.Arrays;
30 import java.util.Map;
31 import java.util.TreeMap;
32 import java.util.concurrent.ConcurrentLinkedQueue;
33 
34 import com.mysql.clusterj.ClusterJUserException;
35 import com.mysql.clusterj.core.util.I18NHelper;
36 import com.mysql.clusterj.core.util.Logger;
37 import com.mysql.clusterj.core.util.LoggerFactoryService;
38 
39 import sun.misc.Cleaner;
40 
41 /**
42  * This class implements a pool consisting of several size-based monotonically-growing queues of ByteBuffer.
43  * The number and sizes of the queues is determined by the constructor parameter int[] which
44  * specifies the sizes.
45  */
46 class VariableByteBufferPoolImpl {
47 
48     /** My message translator */
49     static final I18NHelper local = I18NHelper
50             .getInstance(VariableByteBufferPoolImpl.class);
51 
52     /** My logger */
53     static final Logger logger = LoggerFactoryService.getFactory()
54             .getInstance(VariableByteBufferPoolImpl.class);
55 
56     /** The queues of ByteBuffer */
57     TreeMap<Integer, ConcurrentLinkedQueue<ByteBuffer>> queues;
58 
59     /** The biggest size of any queue */
60     int biggest = 0;
61 
62     /** The cleaner method for non-pooled buffers */
63     static Field cleanerField = null;
64     static {
65         try {
66             ByteBuffer buffer = ByteBuffer.allocateDirect(1);
67             cleanerField = buffer.getClass().getDeclaredField("cleaner");
68             cleanerField.setAccessible(true);
69         } catch (Throwable t) {
70             String message = local.message("WARN_Buffer_Cleaning_Unusable", t.getClass().getName(), t.getMessage());
71             logger.warn(message);
72             // cannot use cleaner
73             cleanerField = null;
74         }
75     }
76 
77     /** Clean the non-pooled buffer after use. This frees the memory back to the system. Note that this
78      * only supports the current implementation of DirectByteBuffer and this support may change in future.
79      */
clean(ByteBuffer buffer)80     static void clean(ByteBuffer buffer) {
81         if (cleanerField != null) {
82             try {
83                 ((Cleaner)cleanerField.get(buffer)).clean();
84             } catch (Throwable t) {
85                 // oh well
86             }
87         }
88     }
89 
90     /** The guard initialization bytes. To enable the guard, change the size of the guard array. */
91     static byte[] guard = new byte[0];
92 
93     /** Initialize the guard */
94     static {
95         for (int i = 0; i < guard.length; ++i) {
96             guard[i] = (byte)10;
97         }
98     }
99 
100     /** Initialize the guard bytes following the allocated data in the buffer. */
initializeGuard(ByteBuffer buffer)101     void initializeGuard(ByteBuffer buffer) {
102      // the buffer has guard.length extra bytes in it, initialized with the guard bytes
103         buffer.position(buffer.capacity() - guard.length);
104         buffer.put(guard);
105         buffer.clear();
106     }
107 
108     /** Check the guard bytes which immediately follow the data in the buffer. */
checkGuard(ByteBuffer buffer)109     void checkGuard(ByteBuffer buffer) {
110         // only check if there is a direct buffer that is still viable
111         if (buffer.limit() == 0) return;
112         // the buffer has guard.length extra bytes in it, initialized with the guard bytes
113         buffer.position(buffer.capacity() - guard.length);
114         for (int i = 0; i < guard.length; ++i) {
115             if (buffer.get() != guard[i]) {
116                 throw new RuntimeException("ByteBufferPool failed guard test with buffer of length " +
117                         (buffer.capacity() - guard.length) + ": " + buffer.toString());
118             }
119         }
120     }
121 
122     /** Construct empty queues based on maximum size buffer each queue will handle */
VariableByteBufferPoolImpl(int[] bufferSizes)123     public VariableByteBufferPoolImpl(int[] bufferSizes) {
124         queues = new TreeMap<Integer, ConcurrentLinkedQueue<ByteBuffer>>();
125         for (int bufferSize: bufferSizes) {
126             queues.put(bufferSize + 1, new ConcurrentLinkedQueue<ByteBuffer>());
127             if (biggest < bufferSize) {
128                 biggest = bufferSize;
129             }
130         }
131         logger.info(local.message("MSG_ByteBuffer_Pools_Initialized", Arrays.toString(bufferSizes)));
132     }
133 
134     /** Borrow a buffer from the pool. The pool is the smallest that has buffers of the size needed.
135      * The buffer size is one less than the key because higherEntry is strictly higher.
136      * There is no method that returns the entry equal to or higher which is what we really want.
137      * If no buffer is in the pool, create a new one.
138      */
borrowBuffer(int sizeNeeded)139     public ByteBuffer borrowBuffer(int sizeNeeded) {
140         Map.Entry<Integer, ConcurrentLinkedQueue<ByteBuffer>> entry = queues.higherEntry(sizeNeeded);
141         ByteBuffer buffer = null;
142         if (entry == null) {
143             // oh no, we need a bigger size than any buffer pool, so log a message and direct allocate a buffer
144             if (logger.isDetailEnabled())
145                 logger.detail(local.message("MSG_Cannot_allocate_byte_buffer_from_pool", sizeNeeded, this.biggest));
146             buffer = ByteBuffer.allocateDirect(sizeNeeded + guard.length);
147             initializeGuard(buffer);
148             return buffer;
149         }
150         ConcurrentLinkedQueue<ByteBuffer>pool = entry.getValue();
151         int bufferSize = entry.getKey() - 1;
152         buffer = pool.poll();
153         if (buffer == null) {
154             buffer = ByteBuffer.allocateDirect(bufferSize + guard.length);
155             initializeGuard(buffer);
156         }
157         // reuse buffer without initializing the guard
158         buffer.clear();
159         return buffer;
160     }
161 
162     /** Return a buffer to the pool. */
returnBuffer(int sizeNeeded, ByteBuffer buffer)163     public void returnBuffer(int sizeNeeded, ByteBuffer buffer) {
164         checkGuard(buffer);
165         Map.Entry<Integer, ConcurrentLinkedQueue<ByteBuffer>> entry = this.queues.higherEntry(sizeNeeded);
166         // if this buffer came from a pool, return it
167         if (entry != null) {
168             int bufferSize = entry.getKey() - 1;
169             ConcurrentLinkedQueue<ByteBuffer> pool = entry.getValue();
170             pool.add(buffer);
171         } else {
172             // mark this buffer as unusable in case we ever see it again
173             buffer.limit(0);
174             // clean (deallocate memory) the buffer
175             clean(buffer);
176         }
177     }
178 
179 }
180