1 /**
2  * Copyright The Apache Software Foundation
3  *
4  * Licensed to the Apache Software Foundation (ASF) under one
5  * or more contributor license agreements.  See the NOTICE file
6  * distributed with this work for additional information
7  * regarding copyright ownership.  The ASF licenses this file
8  * to you under the Apache License, Version 2.0 (the
9  * "License"); you may not use this file except in compliance
10  * with the License.  You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 package org.apache.hadoop.hbase.io.hfile.bucket;
22 
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.Map;
26 import java.util.concurrent.atomic.AtomicLong;
27 
28 import org.apache.commons.collections.map.LinkedMap;
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
33 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
34 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
35 import org.codehaus.jackson.annotate.JsonIgnoreProperties;
36 
37 import com.google.common.base.Objects;
38 import com.google.common.base.Preconditions;
39 import com.google.common.primitives.Ints;
40 
41 /**
42  * This class is used to allocate a block with specified size and free the block
43  * when evicting. It manages an array of buckets, each bucket is associated with
44  * a size and caches elements up to this size. For a completely empty bucket, this
45  * size could be re-specified dynamically.
46  *
47  * This class is not thread safe.
48  */
49 @InterfaceAudience.Private
50 @JsonIgnoreProperties({"indexStatistics", "freeSize", "usedSize"})
51 public final class BucketAllocator {
52   private static final Log LOG = LogFactory.getLog(BucketAllocator.class);
53 
54   @JsonIgnoreProperties({"completelyFree", "uninstantiated"})
55   public final static class Bucket {
56     private long baseOffset;
57     private int itemAllocationSize, sizeIndex;
58     private int itemCount;
59     private int freeList[];
60     private int freeCount, usedCount;
61 
Bucket(long offset)62     public Bucket(long offset) {
63       baseOffset = offset;
64       sizeIndex = -1;
65     }
66 
reconfigure(int sizeIndex, int[] bucketSizes, long bucketCapacity)67     void reconfigure(int sizeIndex, int[] bucketSizes, long bucketCapacity) {
68       Preconditions.checkElementIndex(sizeIndex, bucketSizes.length);
69       this.sizeIndex = sizeIndex;
70       itemAllocationSize = bucketSizes[sizeIndex];
71       itemCount = (int) (bucketCapacity / (long) itemAllocationSize);
72       freeCount = itemCount;
73       usedCount = 0;
74       freeList = new int[itemCount];
75       for (int i = 0; i < freeCount; ++i)
76         freeList[i] = i;
77     }
78 
isUninstantiated()79     public boolean isUninstantiated() {
80       return sizeIndex == -1;
81     }
82 
sizeIndex()83     public int sizeIndex() {
84       return sizeIndex;
85     }
86 
getItemAllocationSize()87     public int getItemAllocationSize() {
88       return itemAllocationSize;
89     }
90 
hasFreeSpace()91     public boolean hasFreeSpace() {
92       return freeCount > 0;
93     }
94 
isCompletelyFree()95     public boolean isCompletelyFree() {
96       return usedCount == 0;
97     }
98 
freeCount()99     public int freeCount() {
100       return freeCount;
101     }
102 
usedCount()103     public int usedCount() {
104       return usedCount;
105     }
106 
getFreeBytes()107     public int getFreeBytes() {
108       return freeCount * itemAllocationSize;
109     }
110 
getUsedBytes()111     public int getUsedBytes() {
112       return usedCount * itemAllocationSize;
113     }
114 
getBaseOffset()115     public long getBaseOffset() {
116       return baseOffset;
117     }
118 
119     /**
120      * Allocate a block in this bucket, return the offset representing the
121      * position in physical space
122      * @return the offset in the IOEngine
123      */
allocate()124     public long allocate() {
125       assert freeCount > 0; // Else should not have been called
126       assert sizeIndex != -1;
127       ++usedCount;
128       long offset = baseOffset + (freeList[--freeCount] * itemAllocationSize);
129       assert offset >= 0;
130       return offset;
131     }
132 
addAllocation(long offset)133     public void addAllocation(long offset) throws BucketAllocatorException {
134       offset -= baseOffset;
135       if (offset < 0 || offset % itemAllocationSize != 0)
136         throw new BucketAllocatorException(
137             "Attempt to add allocation for bad offset: " + offset + " base="
138                 + baseOffset + ", bucket size=" + itemAllocationSize);
139       int idx = (int) (offset / itemAllocationSize);
140       boolean matchFound = false;
141       for (int i = 0; i < freeCount; ++i) {
142         if (matchFound) freeList[i - 1] = freeList[i];
143         else if (freeList[i] == idx) matchFound = true;
144       }
145       if (!matchFound)
146         throw new BucketAllocatorException("Couldn't find match for index "
147             + idx + " in free list");
148       ++usedCount;
149       --freeCount;
150     }
151 
free(long offset)152     private void free(long offset) {
153       offset -= baseOffset;
154       assert offset >= 0;
155       assert offset < itemCount * itemAllocationSize;
156       assert offset % itemAllocationSize == 0;
157       assert usedCount > 0;
158       assert freeCount < itemCount; // Else duplicate free
159       int item = (int) (offset / (long) itemAllocationSize);
160       assert !freeListContains(item);
161       --usedCount;
162       freeList[freeCount++] = item;
163     }
164 
165     private boolean freeListContains(int blockNo) {
166       for (int i = 0; i < freeCount; ++i) {
167         if (freeList[i] == blockNo) return true;
168       }
169       return false;
170     }
171   }
172 
173   final class BucketSizeInfo {
174     // Free bucket means it has space to allocate a block;
175     // Completely free bucket means it has no block.
176     private LinkedMap bucketList, freeBuckets, completelyFreeBuckets;
177     private int sizeIndex;
178 
179     BucketSizeInfo(int sizeIndex) {
180       bucketList = new LinkedMap();
181       freeBuckets = new LinkedMap();
182       completelyFreeBuckets = new LinkedMap();
183       this.sizeIndex = sizeIndex;
184     }
185 
186     public void instantiateBucket(Bucket b) {
187       assert b.isUninstantiated() || b.isCompletelyFree();
188       b.reconfigure(sizeIndex, bucketSizes, bucketCapacity);
189       bucketList.put(b, b);
190       freeBuckets.put(b, b);
191       completelyFreeBuckets.put(b, b);
192     }
193 
194     public int sizeIndex() {
195       return sizeIndex;
196     }
197 
198     /**
199      * Find a bucket to allocate a block
200      * @return the offset in the IOEngine
201      */
202     public long allocateBlock() {
203       Bucket b = null;
204       if (freeBuckets.size() > 0) {
205         // Use up an existing one first...
206         b = (Bucket) freeBuckets.lastKey();
207       }
208       if (b == null) {
209         b = grabGlobalCompletelyFreeBucket();
210         if (b != null) instantiateBucket(b);
211       }
212       if (b == null) return -1;
213       long result = b.allocate();
214       blockAllocated(b);
215       return result;
216     }
217 
218     void blockAllocated(Bucket b) {
219       if (!b.isCompletelyFree()) completelyFreeBuckets.remove(b);
220       if (!b.hasFreeSpace()) freeBuckets.remove(b);
221     }
222 
223     public Bucket findAndRemoveCompletelyFreeBucket() {
224       Bucket b = null;
225       assert bucketList.size() > 0;
226       if (bucketList.size() == 1) {
227         // So we never get complete starvation of a bucket for a size
228         return null;
229       }
230 
231       if (completelyFreeBuckets.size() > 0) {
232         b = (Bucket) completelyFreeBuckets.firstKey();
233         removeBucket(b);
234       }
235       return b;
236     }
237 
removeBucket(Bucket b)238     private void removeBucket(Bucket b) {
239       assert b.isCompletelyFree();
240       bucketList.remove(b);
241       freeBuckets.remove(b);
242       completelyFreeBuckets.remove(b);
243     }
244 
freeBlock(Bucket b, long offset)245     public void freeBlock(Bucket b, long offset) {
246       assert bucketList.containsKey(b);
247       // else we shouldn't have anything to free...
248       assert (!completelyFreeBuckets.containsKey(b));
249       b.free(offset);
250       if (!freeBuckets.containsKey(b)) freeBuckets.put(b, b);
251       if (b.isCompletelyFree()) completelyFreeBuckets.put(b, b);
252     }
253 
statistics()254     public IndexStatistics statistics() {
255       long free = 0, used = 0;
256       for (Object obj : bucketList.keySet()) {
257         Bucket b = (Bucket) obj;
258         free += b.freeCount();
259         used += b.usedCount();
260       }
261       return new IndexStatistics(free, used, bucketSizes[sizeIndex]);
262     }
263 
264     @Override
toString()265     public String toString() {
266       return Objects.toStringHelper(this.getClass())
267         .add("sizeIndex", sizeIndex)
268         .add("bucketSize", bucketSizes[sizeIndex])
269         .toString();
270     }
271   }
272 
273   // Default block size is 64K, so we choose more sizes near 64K, you'd better
274   // reset it according to your cluster's block size distribution
275   // TODO Support the view of block size distribution statistics
276   private static final int DEFAULT_BUCKET_SIZES[] = { 4 * 1024 + 1024, 8 * 1024 + 1024,
277       16 * 1024 + 1024, 32 * 1024 + 1024, 40 * 1024 + 1024, 48 * 1024 + 1024,
278       56 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024,
279       192 * 1024 + 1024, 256 * 1024 + 1024, 384 * 1024 + 1024,
280       512 * 1024 + 1024 };
281 
282   /**
283    * Round up the given block size to bucket size, and get the corresponding
284    * BucketSizeInfo
285    */
roundUpToBucketSizeInfo(int blockSize)286   public BucketSizeInfo roundUpToBucketSizeInfo(int blockSize) {
287     for (int i = 0; i < bucketSizes.length; ++i)
288       if (blockSize <= bucketSizes[i])
289         return bucketSizeInfos[i];
290     return null;
291   }
292 
293   static public final int FEWEST_ITEMS_IN_BUCKET = 4;
294 
295   private final int[] bucketSizes;
296   private final int bigItemSize;
297   // The capacity size for each bucket
298   private final long bucketCapacity;
299   private Bucket[] buckets;
300   private BucketSizeInfo[] bucketSizeInfos;
301   private final long totalSize;
302   private long usedSize = 0;
303 
BucketAllocator(long availableSpace, int[] bucketSizes)304   BucketAllocator(long availableSpace, int[] bucketSizes)
305       throws BucketAllocatorException {
306     this.bucketSizes = bucketSizes == null ? DEFAULT_BUCKET_SIZES : bucketSizes;
307     Arrays.sort(this.bucketSizes);
308     this.bigItemSize = Ints.max(this.bucketSizes);
309     this.bucketCapacity = FEWEST_ITEMS_IN_BUCKET * bigItemSize;
310     buckets = new Bucket[(int) (availableSpace / bucketCapacity)];
311     if (buckets.length < this.bucketSizes.length)
312       throw new BucketAllocatorException(
313           "Bucket allocator size too small - must have room for at least "
314               + this.bucketSizes.length + " buckets");
315     bucketSizeInfos = new BucketSizeInfo[this.bucketSizes.length];
316     for (int i = 0; i < this.bucketSizes.length; ++i) {
317       bucketSizeInfos[i] = new BucketSizeInfo(i);
318     }
319     for (int i = 0; i < buckets.length; ++i) {
320       buckets[i] = new Bucket(bucketCapacity * i);
321       bucketSizeInfos[i < this.bucketSizes.length ? i : this.bucketSizes.length - 1]
322           .instantiateBucket(buckets[i]);
323     }
324     this.totalSize = ((long) buckets.length) * bucketCapacity;
325   }
326 
327   /**
328    * Rebuild the allocator's data structures from a persisted map.
329    * @param availableSpace capacity of cache
330    * @param map A map stores the block key and BucketEntry(block's meta data
331    *          like offset, length)
332    * @param realCacheSize cached data size statistics for bucket cache
333    * @throws BucketAllocatorException
334    */
BucketAllocator(long availableSpace, int[] bucketSizes, Map<BlockCacheKey, BucketEntry> map, AtomicLong realCacheSize)335   BucketAllocator(long availableSpace, int[] bucketSizes, Map<BlockCacheKey, BucketEntry> map,
336       AtomicLong realCacheSize) throws BucketAllocatorException {
337     this(availableSpace, bucketSizes);
338 
339     // each bucket has an offset, sizeindex. probably the buckets are too big
340     // in our default state. so what we do is reconfigure them according to what
341     // we've found. we can only reconfigure each bucket once; if more than once,
342     // we know there's a bug, so we just log the info, throw, and start again...
343     boolean[] reconfigured = new boolean[buckets.length];
344     for (Map.Entry<BlockCacheKey, BucketEntry> entry : map.entrySet()) {
345       long foundOffset = entry.getValue().offset();
346       int foundLen = entry.getValue().getLength();
347       int bucketSizeIndex = -1;
348       for (int i = 0; i < bucketSizes.length; ++i) {
349         if (foundLen <= bucketSizes[i]) {
350           bucketSizeIndex = i;
351           break;
352         }
353       }
354       if (bucketSizeIndex == -1) {
355         throw new BucketAllocatorException(
356             "Can't match bucket size for the block with size " + foundLen);
357       }
358       int bucketNo = (int) (foundOffset / bucketCapacity);
359       if (bucketNo < 0 || bucketNo >= buckets.length)
360         throw new BucketAllocatorException("Can't find bucket " + bucketNo
361             + ", total buckets=" + buckets.length
362             + "; did you shrink the cache?");
363       Bucket b = buckets[bucketNo];
364       if (reconfigured[bucketNo]) {
365         if (b.sizeIndex() != bucketSizeIndex)
366           throw new BucketAllocatorException(
367               "Inconsistent allocation in bucket map;");
368       } else {
369         if (!b.isCompletelyFree())
370           throw new BucketAllocatorException("Reconfiguring bucket "
371               + bucketNo + " but it's already allocated; corrupt data");
372         // Need to remove the bucket from whichever list it's currently in at
373         // the moment...
374         BucketSizeInfo bsi = bucketSizeInfos[bucketSizeIndex];
375         BucketSizeInfo oldbsi = bucketSizeInfos[b.sizeIndex()];
376         oldbsi.removeBucket(b);
377         bsi.instantiateBucket(b);
378         reconfigured[bucketNo] = true;
379       }
380       realCacheSize.addAndGet(foundLen);
381       buckets[bucketNo].addAllocation(foundOffset);
382       usedSize += buckets[bucketNo].getItemAllocationSize();
383       bucketSizeInfos[bucketSizeIndex].blockAllocated(b);
384     }
385   }
386 
toString()387   public String toString() {
388     StringBuilder sb = new StringBuilder(1024);
389     for (int i = 0; i < buckets.length; ++i) {
390       Bucket b = buckets[i];
391       if (i > 0) sb.append(", ");
392       sb.append("bucket.").append(i).append(": size=").append(b.getItemAllocationSize());
393       sb.append(", freeCount=").append(b.freeCount()).append(", used=").append(b.usedCount());
394     }
395     return sb.toString();
396   }
397 
getUsedSize()398   public long getUsedSize() {
399     return this.usedSize;
400   }
401 
getFreeSize()402   public long getFreeSize() {
403     return this.totalSize - getUsedSize();
404   }
405 
getTotalSize()406   public long getTotalSize() {
407     return this.totalSize;
408   }
409 
410   /**
411    * Allocate a block with specified size. Return the offset
412    * @param blockSize size of block
413    * @throws BucketAllocatorException
414    * @throws CacheFullException
415    * @return the offset in the IOEngine
416    */
allocateBlock(int blockSize)417   public synchronized long allocateBlock(int blockSize) throws CacheFullException,
418       BucketAllocatorException {
419     assert blockSize > 0;
420     BucketSizeInfo bsi = roundUpToBucketSizeInfo(blockSize);
421     if (bsi == null) {
422       throw new BucketAllocatorException("Allocation too big size=" + blockSize +
423         "; adjust BucketCache sizes " + CacheConfig.BUCKET_CACHE_BUCKETS_KEY +
424         " to accomodate if size seems reasonable and you want it cached.");
425     }
426     long offset = bsi.allocateBlock();
427 
428     // Ask caller to free up space and try again!
429     if (offset < 0)
430       throw new CacheFullException(blockSize, bsi.sizeIndex());
431     usedSize += bucketSizes[bsi.sizeIndex()];
432     return offset;
433   }
434 
grabGlobalCompletelyFreeBucket()435   private Bucket grabGlobalCompletelyFreeBucket() {
436     for (BucketSizeInfo bsi : bucketSizeInfos) {
437       Bucket b = bsi.findAndRemoveCompletelyFreeBucket();
438       if (b != null) return b;
439     }
440     return null;
441   }
442 
443   /**
444    * Free a block with the offset
445    * @param offset block's offset
446    * @return size freed
447    */
freeBlock(long offset)448   public synchronized int freeBlock(long offset) {
449     int bucketNo = (int) (offset / bucketCapacity);
450     assert bucketNo >= 0 && bucketNo < buckets.length;
451     Bucket targetBucket = buckets[bucketNo];
452     bucketSizeInfos[targetBucket.sizeIndex()].freeBlock(targetBucket, offset);
453     usedSize -= targetBucket.getItemAllocationSize();
454     return targetBucket.getItemAllocationSize();
455   }
456 
457   public int sizeIndexOfAllocation(long offset) {
458     int bucketNo = (int) (offset / bucketCapacity);
459     assert bucketNo >= 0 && bucketNo < buckets.length;
460     Bucket targetBucket = buckets[bucketNo];
461     return targetBucket.sizeIndex();
462   }
463 
464   public int sizeOfAllocation(long offset) {
465     int bucketNo = (int) (offset / bucketCapacity);
466     assert bucketNo >= 0 && bucketNo < buckets.length;
467     Bucket targetBucket = buckets[bucketNo];
468     return targetBucket.getItemAllocationSize();
469   }
470 
471   static class IndexStatistics {
472     private long freeCount, usedCount, itemSize, totalCount;
473 
474     public long freeCount() {
475       return freeCount;
476     }
477 
478     public long usedCount() {
479       return usedCount;
480     }
481 
482     public long totalCount() {
483       return totalCount;
484     }
485 
486     public long freeBytes() {
487       return freeCount * itemSize;
488     }
489 
490     public long usedBytes() {
491       return usedCount * itemSize;
492     }
493 
494     public long totalBytes() {
495       return totalCount * itemSize;
496     }
497 
498     public long itemSize() {
499       return itemSize;
500     }
501 
502     public IndexStatistics(long free, long used, long itemSize) {
503       setTo(free, used, itemSize);
504     }
505 
506     public IndexStatistics() {
507       setTo(-1, -1, 0);
508     }
509 
510     public void setTo(long free, long used, long itemSize) {
511       this.itemSize = itemSize;
512       this.freeCount = free;
513       this.usedCount = used;
514       this.totalCount = free + used;
515     }
516   }
517 
518   public Bucket [] getBuckets() {
519     return this.buckets;
520   }
521 
522   void logStatistics() {
523     IndexStatistics total = new IndexStatistics();
524     IndexStatistics[] stats = getIndexStatistics(total);
525     LOG.info("Bucket allocator statistics follow:\n");
526     LOG.info("  Free bytes=" + total.freeBytes() + "+; used bytes="
527         + total.usedBytes() + "; total bytes=" + total.totalBytes());
528     for (IndexStatistics s : stats) {
529       LOG.info("  Object size " + s.itemSize() + " used=" + s.usedCount()
530           + "; free=" + s.freeCount() + "; total=" + s.totalCount());
531     }
532   }
533 
534   IndexStatistics[] getIndexStatistics(IndexStatistics grandTotal) {
535     IndexStatistics[] stats = getIndexStatistics();
536     long totalfree = 0, totalused = 0;
537     for (IndexStatistics stat : stats) {
538       totalfree += stat.freeBytes();
539       totalused += stat.usedBytes();
540     }
541     grandTotal.setTo(totalfree, totalused, 1);
542     return stats;
543   }
544 
545   IndexStatistics[] getIndexStatistics() {
546     IndexStatistics[] stats = new IndexStatistics[bucketSizes.length];
547     for (int i = 0; i < stats.length; ++i)
548       stats[i] = bucketSizeInfos[i].statistics();
549     return stats;
550   }
551 
552   public long freeBlock(long freeList[]) {
553     long sz = 0;
554     for (int i = 0; i < freeList.length; ++i)
555       sz += freeBlock(freeList[i]);
556     return sz;
557   }
558 
559 }
560