1 /** 2 * Copyright The Apache Software Foundation 3 * 4 * Licensed to the Apache Software Foundation (ASF) under one 5 * or more contributor license agreements. See the NOTICE file 6 * distributed with this work for additional information 7 * regarding copyright ownership. The ASF licenses this file 8 * to you under the Apache License, Version 2.0 (the 9 * "License"); you may not use this file except in compliance 10 * with the License. You may obtain a copy of the License at 11 * 12 * http://www.apache.org/licenses/LICENSE-2.0 13 * 14 * Unless required by applicable law or agreed to in writing, software 15 * distributed under the License is distributed on an "AS IS" BASIS, 16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 17 * See the License for the specific language governing permissions and 18 * limitations under the License. 19 */ 20 21 package org.apache.hadoop.hbase.io.hfile.bucket; 22 23 import java.util.ArrayList; 24 import java.util.Arrays; 25 import java.util.Map; 26 import java.util.concurrent.atomic.AtomicLong; 27 28 import org.apache.commons.collections.map.LinkedMap; 29 import org.apache.commons.logging.Log; 30 import org.apache.commons.logging.LogFactory; 31 import org.apache.hadoop.hbase.classification.InterfaceAudience; 32 import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 33 import org.apache.hadoop.hbase.io.hfile.CacheConfig; 34 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry; 35 import org.codehaus.jackson.annotate.JsonIgnoreProperties; 36 37 import com.google.common.base.Objects; 38 import com.google.common.base.Preconditions; 39 import com.google.common.primitives.Ints; 40 41 /** 42 * This class is used to allocate a block with specified size and free the block 43 * when evicting. It manages an array of buckets, each bucket is associated with 44 * a size and caches elements up to this size. For a completely empty bucket, this 45 * size could be re-specified dynamically. 46 * 47 * This class is not thread safe. 48 */ 49 @InterfaceAudience.Private 50 @JsonIgnoreProperties({"indexStatistics", "freeSize", "usedSize"}) 51 public final class BucketAllocator { 52 private static final Log LOG = LogFactory.getLog(BucketAllocator.class); 53 54 @JsonIgnoreProperties({"completelyFree", "uninstantiated"}) 55 public final static class Bucket { 56 private long baseOffset; 57 private int itemAllocationSize, sizeIndex; 58 private int itemCount; 59 private int freeList[]; 60 private int freeCount, usedCount; 61 Bucket(long offset)62 public Bucket(long offset) { 63 baseOffset = offset; 64 sizeIndex = -1; 65 } 66 reconfigure(int sizeIndex, int[] bucketSizes, long bucketCapacity)67 void reconfigure(int sizeIndex, int[] bucketSizes, long bucketCapacity) { 68 Preconditions.checkElementIndex(sizeIndex, bucketSizes.length); 69 this.sizeIndex = sizeIndex; 70 itemAllocationSize = bucketSizes[sizeIndex]; 71 itemCount = (int) (bucketCapacity / (long) itemAllocationSize); 72 freeCount = itemCount; 73 usedCount = 0; 74 freeList = new int[itemCount]; 75 for (int i = 0; i < freeCount; ++i) 76 freeList[i] = i; 77 } 78 isUninstantiated()79 public boolean isUninstantiated() { 80 return sizeIndex == -1; 81 } 82 sizeIndex()83 public int sizeIndex() { 84 return sizeIndex; 85 } 86 getItemAllocationSize()87 public int getItemAllocationSize() { 88 return itemAllocationSize; 89 } 90 hasFreeSpace()91 public boolean hasFreeSpace() { 92 return freeCount > 0; 93 } 94 isCompletelyFree()95 public boolean isCompletelyFree() { 96 return usedCount == 0; 97 } 98 freeCount()99 public int freeCount() { 100 return freeCount; 101 } 102 usedCount()103 public int usedCount() { 104 return usedCount; 105 } 106 getFreeBytes()107 public int getFreeBytes() { 108 return freeCount * itemAllocationSize; 109 } 110 getUsedBytes()111 public int getUsedBytes() { 112 return usedCount * itemAllocationSize; 113 } 114 getBaseOffset()115 public long getBaseOffset() { 116 return baseOffset; 117 } 118 119 /** 120 * Allocate a block in this bucket, return the offset representing the 121 * position in physical space 122 * @return the offset in the IOEngine 123 */ allocate()124 public long allocate() { 125 assert freeCount > 0; // Else should not have been called 126 assert sizeIndex != -1; 127 ++usedCount; 128 long offset = baseOffset + (freeList[--freeCount] * itemAllocationSize); 129 assert offset >= 0; 130 return offset; 131 } 132 addAllocation(long offset)133 public void addAllocation(long offset) throws BucketAllocatorException { 134 offset -= baseOffset; 135 if (offset < 0 || offset % itemAllocationSize != 0) 136 throw new BucketAllocatorException( 137 "Attempt to add allocation for bad offset: " + offset + " base=" 138 + baseOffset + ", bucket size=" + itemAllocationSize); 139 int idx = (int) (offset / itemAllocationSize); 140 boolean matchFound = false; 141 for (int i = 0; i < freeCount; ++i) { 142 if (matchFound) freeList[i - 1] = freeList[i]; 143 else if (freeList[i] == idx) matchFound = true; 144 } 145 if (!matchFound) 146 throw new BucketAllocatorException("Couldn't find match for index " 147 + idx + " in free list"); 148 ++usedCount; 149 --freeCount; 150 } 151 free(long offset)152 private void free(long offset) { 153 offset -= baseOffset; 154 assert offset >= 0; 155 assert offset < itemCount * itemAllocationSize; 156 assert offset % itemAllocationSize == 0; 157 assert usedCount > 0; 158 assert freeCount < itemCount; // Else duplicate free 159 int item = (int) (offset / (long) itemAllocationSize); 160 assert !freeListContains(item); 161 --usedCount; 162 freeList[freeCount++] = item; 163 } 164 165 private boolean freeListContains(int blockNo) { 166 for (int i = 0; i < freeCount; ++i) { 167 if (freeList[i] == blockNo) return true; 168 } 169 return false; 170 } 171 } 172 173 final class BucketSizeInfo { 174 // Free bucket means it has space to allocate a block; 175 // Completely free bucket means it has no block. 176 private LinkedMap bucketList, freeBuckets, completelyFreeBuckets; 177 private int sizeIndex; 178 179 BucketSizeInfo(int sizeIndex) { 180 bucketList = new LinkedMap(); 181 freeBuckets = new LinkedMap(); 182 completelyFreeBuckets = new LinkedMap(); 183 this.sizeIndex = sizeIndex; 184 } 185 186 public void instantiateBucket(Bucket b) { 187 assert b.isUninstantiated() || b.isCompletelyFree(); 188 b.reconfigure(sizeIndex, bucketSizes, bucketCapacity); 189 bucketList.put(b, b); 190 freeBuckets.put(b, b); 191 completelyFreeBuckets.put(b, b); 192 } 193 194 public int sizeIndex() { 195 return sizeIndex; 196 } 197 198 /** 199 * Find a bucket to allocate a block 200 * @return the offset in the IOEngine 201 */ 202 public long allocateBlock() { 203 Bucket b = null; 204 if (freeBuckets.size() > 0) { 205 // Use up an existing one first... 206 b = (Bucket) freeBuckets.lastKey(); 207 } 208 if (b == null) { 209 b = grabGlobalCompletelyFreeBucket(); 210 if (b != null) instantiateBucket(b); 211 } 212 if (b == null) return -1; 213 long result = b.allocate(); 214 blockAllocated(b); 215 return result; 216 } 217 218 void blockAllocated(Bucket b) { 219 if (!b.isCompletelyFree()) completelyFreeBuckets.remove(b); 220 if (!b.hasFreeSpace()) freeBuckets.remove(b); 221 } 222 223 public Bucket findAndRemoveCompletelyFreeBucket() { 224 Bucket b = null; 225 assert bucketList.size() > 0; 226 if (bucketList.size() == 1) { 227 // So we never get complete starvation of a bucket for a size 228 return null; 229 } 230 231 if (completelyFreeBuckets.size() > 0) { 232 b = (Bucket) completelyFreeBuckets.firstKey(); 233 removeBucket(b); 234 } 235 return b; 236 } 237 removeBucket(Bucket b)238 private void removeBucket(Bucket b) { 239 assert b.isCompletelyFree(); 240 bucketList.remove(b); 241 freeBuckets.remove(b); 242 completelyFreeBuckets.remove(b); 243 } 244 freeBlock(Bucket b, long offset)245 public void freeBlock(Bucket b, long offset) { 246 assert bucketList.containsKey(b); 247 // else we shouldn't have anything to free... 248 assert (!completelyFreeBuckets.containsKey(b)); 249 b.free(offset); 250 if (!freeBuckets.containsKey(b)) freeBuckets.put(b, b); 251 if (b.isCompletelyFree()) completelyFreeBuckets.put(b, b); 252 } 253 statistics()254 public IndexStatistics statistics() { 255 long free = 0, used = 0; 256 for (Object obj : bucketList.keySet()) { 257 Bucket b = (Bucket) obj; 258 free += b.freeCount(); 259 used += b.usedCount(); 260 } 261 return new IndexStatistics(free, used, bucketSizes[sizeIndex]); 262 } 263 264 @Override toString()265 public String toString() { 266 return Objects.toStringHelper(this.getClass()) 267 .add("sizeIndex", sizeIndex) 268 .add("bucketSize", bucketSizes[sizeIndex]) 269 .toString(); 270 } 271 } 272 273 // Default block size is 64K, so we choose more sizes near 64K, you'd better 274 // reset it according to your cluster's block size distribution 275 // TODO Support the view of block size distribution statistics 276 private static final int DEFAULT_BUCKET_SIZES[] = { 4 * 1024 + 1024, 8 * 1024 + 1024, 277 16 * 1024 + 1024, 32 * 1024 + 1024, 40 * 1024 + 1024, 48 * 1024 + 1024, 278 56 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024, 279 192 * 1024 + 1024, 256 * 1024 + 1024, 384 * 1024 + 1024, 280 512 * 1024 + 1024 }; 281 282 /** 283 * Round up the given block size to bucket size, and get the corresponding 284 * BucketSizeInfo 285 */ roundUpToBucketSizeInfo(int blockSize)286 public BucketSizeInfo roundUpToBucketSizeInfo(int blockSize) { 287 for (int i = 0; i < bucketSizes.length; ++i) 288 if (blockSize <= bucketSizes[i]) 289 return bucketSizeInfos[i]; 290 return null; 291 } 292 293 static public final int FEWEST_ITEMS_IN_BUCKET = 4; 294 295 private final int[] bucketSizes; 296 private final int bigItemSize; 297 // The capacity size for each bucket 298 private final long bucketCapacity; 299 private Bucket[] buckets; 300 private BucketSizeInfo[] bucketSizeInfos; 301 private final long totalSize; 302 private long usedSize = 0; 303 BucketAllocator(long availableSpace, int[] bucketSizes)304 BucketAllocator(long availableSpace, int[] bucketSizes) 305 throws BucketAllocatorException { 306 this.bucketSizes = bucketSizes == null ? DEFAULT_BUCKET_SIZES : bucketSizes; 307 Arrays.sort(this.bucketSizes); 308 this.bigItemSize = Ints.max(this.bucketSizes); 309 this.bucketCapacity = FEWEST_ITEMS_IN_BUCKET * bigItemSize; 310 buckets = new Bucket[(int) (availableSpace / bucketCapacity)]; 311 if (buckets.length < this.bucketSizes.length) 312 throw new BucketAllocatorException( 313 "Bucket allocator size too small - must have room for at least " 314 + this.bucketSizes.length + " buckets"); 315 bucketSizeInfos = new BucketSizeInfo[this.bucketSizes.length]; 316 for (int i = 0; i < this.bucketSizes.length; ++i) { 317 bucketSizeInfos[i] = new BucketSizeInfo(i); 318 } 319 for (int i = 0; i < buckets.length; ++i) { 320 buckets[i] = new Bucket(bucketCapacity * i); 321 bucketSizeInfos[i < this.bucketSizes.length ? i : this.bucketSizes.length - 1] 322 .instantiateBucket(buckets[i]); 323 } 324 this.totalSize = ((long) buckets.length) * bucketCapacity; 325 } 326 327 /** 328 * Rebuild the allocator's data structures from a persisted map. 329 * @param availableSpace capacity of cache 330 * @param map A map stores the block key and BucketEntry(block's meta data 331 * like offset, length) 332 * @param realCacheSize cached data size statistics for bucket cache 333 * @throws BucketAllocatorException 334 */ BucketAllocator(long availableSpace, int[] bucketSizes, Map<BlockCacheKey, BucketEntry> map, AtomicLong realCacheSize)335 BucketAllocator(long availableSpace, int[] bucketSizes, Map<BlockCacheKey, BucketEntry> map, 336 AtomicLong realCacheSize) throws BucketAllocatorException { 337 this(availableSpace, bucketSizes); 338 339 // each bucket has an offset, sizeindex. probably the buckets are too big 340 // in our default state. so what we do is reconfigure them according to what 341 // we've found. we can only reconfigure each bucket once; if more than once, 342 // we know there's a bug, so we just log the info, throw, and start again... 343 boolean[] reconfigured = new boolean[buckets.length]; 344 for (Map.Entry<BlockCacheKey, BucketEntry> entry : map.entrySet()) { 345 long foundOffset = entry.getValue().offset(); 346 int foundLen = entry.getValue().getLength(); 347 int bucketSizeIndex = -1; 348 for (int i = 0; i < bucketSizes.length; ++i) { 349 if (foundLen <= bucketSizes[i]) { 350 bucketSizeIndex = i; 351 break; 352 } 353 } 354 if (bucketSizeIndex == -1) { 355 throw new BucketAllocatorException( 356 "Can't match bucket size for the block with size " + foundLen); 357 } 358 int bucketNo = (int) (foundOffset / bucketCapacity); 359 if (bucketNo < 0 || bucketNo >= buckets.length) 360 throw new BucketAllocatorException("Can't find bucket " + bucketNo 361 + ", total buckets=" + buckets.length 362 + "; did you shrink the cache?"); 363 Bucket b = buckets[bucketNo]; 364 if (reconfigured[bucketNo]) { 365 if (b.sizeIndex() != bucketSizeIndex) 366 throw new BucketAllocatorException( 367 "Inconsistent allocation in bucket map;"); 368 } else { 369 if (!b.isCompletelyFree()) 370 throw new BucketAllocatorException("Reconfiguring bucket " 371 + bucketNo + " but it's already allocated; corrupt data"); 372 // Need to remove the bucket from whichever list it's currently in at 373 // the moment... 374 BucketSizeInfo bsi = bucketSizeInfos[bucketSizeIndex]; 375 BucketSizeInfo oldbsi = bucketSizeInfos[b.sizeIndex()]; 376 oldbsi.removeBucket(b); 377 bsi.instantiateBucket(b); 378 reconfigured[bucketNo] = true; 379 } 380 realCacheSize.addAndGet(foundLen); 381 buckets[bucketNo].addAllocation(foundOffset); 382 usedSize += buckets[bucketNo].getItemAllocationSize(); 383 bucketSizeInfos[bucketSizeIndex].blockAllocated(b); 384 } 385 } 386 toString()387 public String toString() { 388 StringBuilder sb = new StringBuilder(1024); 389 for (int i = 0; i < buckets.length; ++i) { 390 Bucket b = buckets[i]; 391 if (i > 0) sb.append(", "); 392 sb.append("bucket.").append(i).append(": size=").append(b.getItemAllocationSize()); 393 sb.append(", freeCount=").append(b.freeCount()).append(", used=").append(b.usedCount()); 394 } 395 return sb.toString(); 396 } 397 getUsedSize()398 public long getUsedSize() { 399 return this.usedSize; 400 } 401 getFreeSize()402 public long getFreeSize() { 403 return this.totalSize - getUsedSize(); 404 } 405 getTotalSize()406 public long getTotalSize() { 407 return this.totalSize; 408 } 409 410 /** 411 * Allocate a block with specified size. Return the offset 412 * @param blockSize size of block 413 * @throws BucketAllocatorException 414 * @throws CacheFullException 415 * @return the offset in the IOEngine 416 */ allocateBlock(int blockSize)417 public synchronized long allocateBlock(int blockSize) throws CacheFullException, 418 BucketAllocatorException { 419 assert blockSize > 0; 420 BucketSizeInfo bsi = roundUpToBucketSizeInfo(blockSize); 421 if (bsi == null) { 422 throw new BucketAllocatorException("Allocation too big size=" + blockSize + 423 "; adjust BucketCache sizes " + CacheConfig.BUCKET_CACHE_BUCKETS_KEY + 424 " to accomodate if size seems reasonable and you want it cached."); 425 } 426 long offset = bsi.allocateBlock(); 427 428 // Ask caller to free up space and try again! 429 if (offset < 0) 430 throw new CacheFullException(blockSize, bsi.sizeIndex()); 431 usedSize += bucketSizes[bsi.sizeIndex()]; 432 return offset; 433 } 434 grabGlobalCompletelyFreeBucket()435 private Bucket grabGlobalCompletelyFreeBucket() { 436 for (BucketSizeInfo bsi : bucketSizeInfos) { 437 Bucket b = bsi.findAndRemoveCompletelyFreeBucket(); 438 if (b != null) return b; 439 } 440 return null; 441 } 442 443 /** 444 * Free a block with the offset 445 * @param offset block's offset 446 * @return size freed 447 */ freeBlock(long offset)448 public synchronized int freeBlock(long offset) { 449 int bucketNo = (int) (offset / bucketCapacity); 450 assert bucketNo >= 0 && bucketNo < buckets.length; 451 Bucket targetBucket = buckets[bucketNo]; 452 bucketSizeInfos[targetBucket.sizeIndex()].freeBlock(targetBucket, offset); 453 usedSize -= targetBucket.getItemAllocationSize(); 454 return targetBucket.getItemAllocationSize(); 455 } 456 457 public int sizeIndexOfAllocation(long offset) { 458 int bucketNo = (int) (offset / bucketCapacity); 459 assert bucketNo >= 0 && bucketNo < buckets.length; 460 Bucket targetBucket = buckets[bucketNo]; 461 return targetBucket.sizeIndex(); 462 } 463 464 public int sizeOfAllocation(long offset) { 465 int bucketNo = (int) (offset / bucketCapacity); 466 assert bucketNo >= 0 && bucketNo < buckets.length; 467 Bucket targetBucket = buckets[bucketNo]; 468 return targetBucket.getItemAllocationSize(); 469 } 470 471 static class IndexStatistics { 472 private long freeCount, usedCount, itemSize, totalCount; 473 474 public long freeCount() { 475 return freeCount; 476 } 477 478 public long usedCount() { 479 return usedCount; 480 } 481 482 public long totalCount() { 483 return totalCount; 484 } 485 486 public long freeBytes() { 487 return freeCount * itemSize; 488 } 489 490 public long usedBytes() { 491 return usedCount * itemSize; 492 } 493 494 public long totalBytes() { 495 return totalCount * itemSize; 496 } 497 498 public long itemSize() { 499 return itemSize; 500 } 501 502 public IndexStatistics(long free, long used, long itemSize) { 503 setTo(free, used, itemSize); 504 } 505 506 public IndexStatistics() { 507 setTo(-1, -1, 0); 508 } 509 510 public void setTo(long free, long used, long itemSize) { 511 this.itemSize = itemSize; 512 this.freeCount = free; 513 this.usedCount = used; 514 this.totalCount = free + used; 515 } 516 } 517 518 public Bucket [] getBuckets() { 519 return this.buckets; 520 } 521 522 void logStatistics() { 523 IndexStatistics total = new IndexStatistics(); 524 IndexStatistics[] stats = getIndexStatistics(total); 525 LOG.info("Bucket allocator statistics follow:\n"); 526 LOG.info(" Free bytes=" + total.freeBytes() + "+; used bytes=" 527 + total.usedBytes() + "; total bytes=" + total.totalBytes()); 528 for (IndexStatistics s : stats) { 529 LOG.info(" Object size " + s.itemSize() + " used=" + s.usedCount() 530 + "; free=" + s.freeCount() + "; total=" + s.totalCount()); 531 } 532 } 533 534 IndexStatistics[] getIndexStatistics(IndexStatistics grandTotal) { 535 IndexStatistics[] stats = getIndexStatistics(); 536 long totalfree = 0, totalused = 0; 537 for (IndexStatistics stat : stats) { 538 totalfree += stat.freeBytes(); 539 totalused += stat.usedBytes(); 540 } 541 grandTotal.setTo(totalfree, totalused, 1); 542 return stats; 543 } 544 545 IndexStatistics[] getIndexStatistics() { 546 IndexStatistics[] stats = new IndexStatistics[bucketSizes.length]; 547 for (int i = 0; i < stats.length; ++i) 548 stats[i] = bucketSizeInfos[i].statistics(); 549 return stats; 550 } 551 552 public long freeBlock(long freeList[]) { 553 long sz = 0; 554 for (int i = 0; i < freeList.length; ++i) 555 sz += freeBlock(freeList[i]); 556 return sz; 557 } 558 559 } 560