1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.hbase.procedure2.store; 20 21 import java.io.IOException; 22 import java.io.InputStream; 23 import java.io.OutputStream; 24 import java.util.Iterator; 25 import java.util.Map; 26 import java.util.TreeMap; 27 28 import org.apache.hadoop.hbase.classification.InterfaceAudience; 29 import org.apache.hadoop.hbase.classification.InterfaceStability; 30 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos; 31 32 /** 33 * Keeps track of live procedures. 34 * 35 * It can be used by the ProcedureStore to identify which procedures are already 36 * deleted/completed to avoid the deserialization step on restart. 37 */ 38 @InterfaceAudience.Private 39 @InterfaceStability.Evolving 40 public class ProcedureStoreTracker { 41 private final TreeMap<Long, BitSetNode> map = new TreeMap<Long, BitSetNode>(); 42 43 private boolean keepDeletes = false; 44 private boolean partial = false; 45 46 private long minUpdatedProcId = Long.MAX_VALUE; 47 private long maxUpdatedProcId = Long.MIN_VALUE; 48 49 public enum DeleteState { YES, NO, MAYBE } 50 51 public static class BitSetNode { 52 private final static long WORD_MASK = 0xffffffffffffffffL; 53 private final static int ADDRESS_BITS_PER_WORD = 6; 54 private final static int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD; 55 private final static int MAX_NODE_SIZE = 1 << ADDRESS_BITS_PER_WORD; 56 57 private final boolean partial; 58 private long[] updated; 59 private long[] deleted; 60 private long start; 61 dump()62 public void dump() { 63 System.out.printf("%06d:%06d min=%d max=%d%n", getStart(), getEnd(), 64 getMinProcId(), getMaxProcId()); 65 System.out.println("Update:"); 66 for (int i = 0; i < updated.length; ++i) { 67 for (int j = 0; j < BITS_PER_WORD; ++j) { 68 System.out.print((updated[i] & (1L << j)) != 0 ? "1" : "0"); 69 } 70 System.out.println(" " + i); 71 } 72 System.out.println(); 73 System.out.println("Delete:"); 74 for (int i = 0; i < deleted.length; ++i) { 75 for (int j = 0; j < BITS_PER_WORD; ++j) { 76 System.out.print((deleted[i] & (1L << j)) != 0 ? "1" : "0"); 77 } 78 System.out.println(" " + i); 79 } 80 System.out.println(); 81 } 82 BitSetNode(final long procId, final boolean partial)83 public BitSetNode(final long procId, final boolean partial) { 84 start = alignDown(procId); 85 86 int count = 1; 87 updated = new long[count]; 88 deleted = new long[count]; 89 for (int i = 0; i < count; ++i) { 90 updated[i] = 0; 91 deleted[i] = partial ? 0 : WORD_MASK; 92 } 93 94 this.partial = partial; 95 updateState(procId, false); 96 } 97 BitSetNode(final long start, final long[] updated, final long[] deleted)98 protected BitSetNode(final long start, final long[] updated, final long[] deleted) { 99 this.start = start; 100 this.updated = updated; 101 this.deleted = deleted; 102 this.partial = false; 103 } 104 update(final long procId)105 public void update(final long procId) { 106 updateState(procId, false); 107 } 108 delete(final long procId)109 public void delete(final long procId) { 110 updateState(procId, true); 111 } 112 getStart()113 public Long getStart() { 114 return start; 115 } 116 getEnd()117 public Long getEnd() { 118 return start + (updated.length << ADDRESS_BITS_PER_WORD) - 1; 119 } 120 contains(final long procId)121 public boolean contains(final long procId) { 122 return start <= procId && procId <= getEnd(); 123 } 124 isDeleted(final long procId)125 public DeleteState isDeleted(final long procId) { 126 int bitmapIndex = getBitmapIndex(procId); 127 int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD; 128 if (wordIndex >= deleted.length) { 129 return DeleteState.MAYBE; 130 } 131 return (deleted[wordIndex] & (1L << bitmapIndex)) != 0 ? DeleteState.YES : DeleteState.NO; 132 } 133 isUpdated(final long procId)134 private boolean isUpdated(final long procId) { 135 int bitmapIndex = getBitmapIndex(procId); 136 int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD; 137 if (wordIndex >= updated.length) { 138 return false; 139 } 140 return (updated[wordIndex] & (1L << bitmapIndex)) != 0; 141 } 142 isUpdated()143 public boolean isUpdated() { 144 // TODO: cache the value 145 for (int i = 0; i < updated.length; ++i) { 146 if ((updated[i] | deleted[i]) != WORD_MASK) { 147 return false; 148 } 149 } 150 return true; 151 } 152 isEmpty()153 public boolean isEmpty() { 154 // TODO: cache the value 155 for (int i = 0; i < deleted.length; ++i) { 156 if (deleted[i] != WORD_MASK) { 157 return false; 158 } 159 } 160 return true; 161 } 162 resetUpdates()163 public void resetUpdates() { 164 for (int i = 0; i < updated.length; ++i) { 165 updated[i] = 0; 166 } 167 } 168 undeleteAll()169 public void undeleteAll() { 170 for (int i = 0; i < updated.length; ++i) { 171 deleted[i] = 0; 172 } 173 } 174 unsetPartialFlag()175 public void unsetPartialFlag() { 176 for (int i = 0; i < updated.length; ++i) { 177 for (int j = 0; j < BITS_PER_WORD; ++j) { 178 if ((updated[i] & (1L << j)) == 0) { 179 deleted[i] |= (1L << j); 180 } 181 } 182 } 183 } 184 convert()185 public ProcedureProtos.ProcedureStoreTracker.TrackerNode convert() { 186 ProcedureProtos.ProcedureStoreTracker.TrackerNode.Builder builder = 187 ProcedureProtos.ProcedureStoreTracker.TrackerNode.newBuilder(); 188 builder.setStartId(start); 189 for (int i = 0; i < updated.length; ++i) { 190 builder.addUpdated(updated[i]); 191 builder.addDeleted(deleted[i]); 192 } 193 return builder.build(); 194 } 195 convert(ProcedureProtos.ProcedureStoreTracker.TrackerNode data)196 public static BitSetNode convert(ProcedureProtos.ProcedureStoreTracker.TrackerNode data) { 197 long start = data.getStartId(); 198 int size = data.getUpdatedCount(); 199 long[] updated = new long[size]; 200 long[] deleted = new long[size]; 201 for (int i = 0; i < size; ++i) { 202 updated[i] = data.getUpdated(i); 203 deleted[i] = data.getDeleted(i); 204 } 205 return new BitSetNode(start, updated, deleted); 206 } 207 208 // ======================================================================== 209 // Grow/Merge Helpers 210 // ======================================================================== canGrow(final long procId)211 public boolean canGrow(final long procId) { 212 return Math.abs(procId - start) < MAX_NODE_SIZE; 213 } 214 canMerge(final BitSetNode rightNode)215 public boolean canMerge(final BitSetNode rightNode) { 216 assert start < rightNode.getEnd(); 217 return (rightNode.getEnd() - start) < MAX_NODE_SIZE; 218 } 219 220 public void grow(final long procId) { 221 int delta, offset; 222 223 if (procId < start) { 224 // add to head 225 long newStart = alignDown(procId); 226 delta = (int)(start - newStart) >> ADDRESS_BITS_PER_WORD; 227 offset = delta; 228 start = newStart; 229 } else { 230 // Add to tail 231 long newEnd = alignUp(procId + 1); 232 delta = (int)(newEnd - getEnd()) >> ADDRESS_BITS_PER_WORD; 233 offset = 0; 234 } 235 236 long[] newBitmap; 237 int oldSize = updated.length; 238 239 newBitmap = new long[oldSize + delta]; 240 for (int i = 0; i < newBitmap.length; ++i) { 241 newBitmap[i] = 0; 242 } System.arraycopy(updated, 0, newBitmap, offset, oldSize)243 System.arraycopy(updated, 0, newBitmap, offset, oldSize); 244 updated = newBitmap; 245 246 newBitmap = new long[deleted.length + delta]; 247 for (int i = 0; i < newBitmap.length; ++i) { 248 newBitmap[i] = partial ? 0 : WORD_MASK; 249 } System.arraycopy(deleted, 0, newBitmap, offset, oldSize)250 System.arraycopy(deleted, 0, newBitmap, offset, oldSize); 251 deleted = newBitmap; 252 } 253 merge(final BitSetNode rightNode)254 public void merge(final BitSetNode rightNode) { 255 int delta = (int)(rightNode.getEnd() - getEnd()) >> ADDRESS_BITS_PER_WORD; 256 257 long[] newBitmap; 258 int oldSize = updated.length; 259 int newSize = (delta - rightNode.updated.length); 260 int offset = oldSize + newSize; 261 262 newBitmap = new long[oldSize + delta]; 263 System.arraycopy(updated, 0, newBitmap, 0, oldSize); 264 System.arraycopy(rightNode.updated, 0, newBitmap, offset, rightNode.updated.length); 265 updated = newBitmap; 266 267 newBitmap = new long[oldSize + delta]; 268 System.arraycopy(deleted, 0, newBitmap, 0, oldSize); 269 System.arraycopy(rightNode.deleted, 0, newBitmap, offset, rightNode.deleted.length); 270 deleted = newBitmap; 271 272 for (int i = 0; i < newSize; ++i) { 273 updated[offset + i] = 0; 274 deleted[offset + i] = partial ? 0 : WORD_MASK; 275 } 276 } 277 278 @Override toString()279 public String toString() { 280 return "BitSetNode(" + getStart() + "-" + getEnd() + ")"; 281 } 282 283 // ======================================================================== 284 // Min/Max Helpers 285 // ======================================================================== getMinProcId()286 public long getMinProcId() { 287 long minProcId = start; 288 for (int i = 0; i < deleted.length; ++i) { 289 if (deleted[i] == 0) { 290 return(minProcId); 291 } 292 293 if (deleted[i] != WORD_MASK) { 294 for (int j = 0; j < BITS_PER_WORD; ++j) { 295 if ((deleted[i] & (1L << j)) != 0) { 296 return minProcId + j; 297 } 298 } 299 } 300 301 minProcId += BITS_PER_WORD; 302 } 303 return minProcId; 304 } 305 getMaxProcId()306 public long getMaxProcId() { 307 long maxProcId = getEnd(); 308 for (int i = deleted.length - 1; i >= 0; --i) { 309 if (deleted[i] == 0) { 310 return maxProcId; 311 } 312 313 if (deleted[i] != WORD_MASK) { 314 for (int j = BITS_PER_WORD - 1; j >= 0; --j) { 315 if ((deleted[i] & (1L << j)) == 0) { 316 return maxProcId - (BITS_PER_WORD - 1 - j); 317 } 318 } 319 } 320 maxProcId -= BITS_PER_WORD; 321 } 322 return maxProcId; 323 } 324 325 // ======================================================================== 326 // Bitmap Helpers 327 // ======================================================================== getBitmapIndex(final long procId)328 private int getBitmapIndex(final long procId) { 329 return (int)(procId - start); 330 } 331 updateState(final long procId, final boolean isDeleted)332 private void updateState(final long procId, final boolean isDeleted) { 333 int bitmapIndex = getBitmapIndex(procId); 334 int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD; 335 long value = (1L << bitmapIndex); 336 337 if (isDeleted) { 338 updated[wordIndex] |= value; 339 deleted[wordIndex] |= value; 340 } else { 341 updated[wordIndex] |= value; 342 deleted[wordIndex] &= ~value; 343 } 344 } 345 346 // ======================================================================== 347 // Helpers 348 // ======================================================================== alignUp(final long x)349 private static long alignUp(final long x) { 350 return (x + (BITS_PER_WORD - 1)) & -BITS_PER_WORD; 351 } 352 alignDown(final long x)353 private static long alignDown(final long x) { 354 return x & -BITS_PER_WORD; 355 } 356 } 357 insert(long procId)358 public void insert(long procId) { 359 BitSetNode node = getOrCreateNode(procId); 360 node.update(procId); 361 trackProcIds(procId); 362 } 363 insert(final long procId, final long[] subProcIds)364 public void insert(final long procId, final long[] subProcIds) { 365 update(procId); 366 for (int i = 0; i < subProcIds.length; ++i) { 367 insert(subProcIds[i]); 368 } 369 } 370 update(long procId)371 public void update(long procId) { 372 Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId); 373 assert entry != null : "expected node to update procId=" + procId; 374 375 BitSetNode node = entry.getValue(); 376 assert node.contains(procId); 377 node.update(procId); 378 trackProcIds(procId); 379 } 380 delete(long procId)381 public void delete(long procId) { 382 Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId); 383 assert entry != null : "expected node to delete procId=" + procId; 384 385 BitSetNode node = entry.getValue(); 386 assert node.contains(procId) : "expected procId in the node"; 387 node.delete(procId); 388 389 if (!keepDeletes && node.isEmpty()) { 390 // TODO: RESET if (map.size() == 1) 391 map.remove(entry.getKey()); 392 } 393 394 trackProcIds(procId); 395 } 396 trackProcIds(long procId)397 private void trackProcIds(long procId) { 398 minUpdatedProcId = Math.min(minUpdatedProcId, procId); 399 maxUpdatedProcId = Math.max(maxUpdatedProcId, procId); 400 } 401 getUpdatedMinProcId()402 public long getUpdatedMinProcId() { 403 return minUpdatedProcId; 404 } 405 getUpdatedMaxProcId()406 public long getUpdatedMaxProcId() { 407 return maxUpdatedProcId; 408 } 409 410 @InterfaceAudience.Private setDeleted(final long procId, final boolean isDeleted)411 public void setDeleted(final long procId, final boolean isDeleted) { 412 BitSetNode node = getOrCreateNode(procId); 413 assert node.contains(procId) : "expected procId=" + procId + " in the node=" + node; 414 node.updateState(procId, isDeleted); 415 } 416 reset()417 public void reset() { 418 this.keepDeletes = false; 419 this.partial = false; 420 this.map.clear(); 421 resetUpdates(); 422 } 423 isDeleted(long procId)424 public DeleteState isDeleted(long procId) { 425 Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId); 426 if (entry != null && entry.getValue().contains(procId)) { 427 BitSetNode node = entry.getValue(); 428 DeleteState state = node.isDeleted(procId); 429 return partial && !node.isUpdated(procId) ? DeleteState.MAYBE : state; 430 } 431 return partial ? DeleteState.MAYBE : DeleteState.YES; 432 } 433 getMinProcId()434 public long getMinProcId() { 435 // TODO: Cache? 436 Map.Entry<Long, BitSetNode> entry = map.firstEntry(); 437 return entry == null ? 0 : entry.getValue().getMinProcId(); 438 } 439 setKeepDeletes(boolean keepDeletes)440 public void setKeepDeletes(boolean keepDeletes) { 441 this.keepDeletes = keepDeletes; 442 if (!keepDeletes) { 443 Iterator<Map.Entry<Long, BitSetNode>> it = map.entrySet().iterator(); 444 while (it.hasNext()) { 445 Map.Entry<Long, BitSetNode> entry = it.next(); 446 if (entry.getValue().isEmpty()) { 447 it.remove(); 448 } 449 } 450 } 451 } 452 setPartialFlag(boolean isPartial)453 public void setPartialFlag(boolean isPartial) { 454 if (this.partial && !isPartial) { 455 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 456 entry.getValue().unsetPartialFlag(); 457 } 458 } 459 this.partial = isPartial; 460 } 461 isEmpty()462 public boolean isEmpty() { 463 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 464 if (entry.getValue().isEmpty() == false) { 465 return false; 466 } 467 } 468 return true; 469 } 470 isUpdated()471 public boolean isUpdated() { 472 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 473 if (entry.getValue().isUpdated() == false) { 474 return false; 475 } 476 } 477 return true; 478 } 479 isTracking(long minId, long maxId)480 public boolean isTracking(long minId, long maxId) { 481 // TODO: we can make it more precise, instead of looking just at the block 482 return map.floorEntry(minId) != null || map.floorEntry(maxId) != null; 483 } 484 resetUpdates()485 public void resetUpdates() { 486 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 487 entry.getValue().resetUpdates(); 488 } 489 minUpdatedProcId = Long.MAX_VALUE; 490 maxUpdatedProcId = Long.MIN_VALUE; 491 } 492 undeleteAll()493 public void undeleteAll() { 494 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 495 entry.getValue().undeleteAll(); 496 } 497 } 498 getOrCreateNode(final long procId)499 private BitSetNode getOrCreateNode(final long procId) { 500 // can procId fit in the left node? 501 BitSetNode leftNode = null; 502 boolean leftCanGrow = false; 503 Map.Entry<Long, BitSetNode> leftEntry = map.floorEntry(procId); 504 if (leftEntry != null) { 505 leftNode = leftEntry.getValue(); 506 if (leftNode.contains(procId)) { 507 return leftNode; 508 } 509 leftCanGrow = leftNode.canGrow(procId); 510 } 511 512 BitSetNode rightNode = null; 513 boolean rightCanGrow = false; 514 Map.Entry<Long, BitSetNode> rightEntry = map.ceilingEntry(procId); 515 if (rightEntry != null) { 516 rightNode = rightEntry.getValue(); 517 rightCanGrow = rightNode.canGrow(procId); 518 if (leftNode != null) { 519 if (leftNode.canMerge(rightNode)) { 520 // merge left and right node 521 return mergeNodes(leftNode, rightNode); 522 } 523 524 if (leftCanGrow && rightCanGrow) { 525 if ((procId - leftNode.getEnd()) <= (rightNode.getStart() - procId)) { 526 // grow the left node 527 return growNode(leftNode, procId); 528 } 529 // grow the right node 530 return growNode(rightNode, procId); 531 } 532 } 533 } 534 535 // grow the left node 536 if (leftCanGrow) { 537 return growNode(leftNode, procId); 538 } 539 540 // grow the right node 541 if (rightCanGrow) { 542 return growNode(rightNode, procId); 543 } 544 545 // add new node 546 BitSetNode node = new BitSetNode(procId, partial); 547 map.put(node.getStart(), node); 548 return node; 549 } 550 growNode(BitSetNode node, long procId)551 private BitSetNode growNode(BitSetNode node, long procId) { 552 map.remove(node.getStart()); 553 node.grow(procId); 554 map.put(node.getStart(), node); 555 return node; 556 } 557 mergeNodes(BitSetNode leftNode, BitSetNode rightNode)558 private BitSetNode mergeNodes(BitSetNode leftNode, BitSetNode rightNode) { 559 assert leftNode.getStart() < rightNode.getStart(); 560 leftNode.merge(rightNode); 561 map.remove(rightNode.getStart()); 562 return leftNode; 563 } 564 565 public void dump() { 566 System.out.println("map " + map.size()); 567 System.out.println("isUpdated " + isUpdated()); 568 System.out.println("isEmpty " + isEmpty()); 569 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 570 entry.getValue().dump(); 571 } 572 } 573 574 public void writeTo(final OutputStream stream) throws IOException { 575 ProcedureProtos.ProcedureStoreTracker.Builder builder = 576 ProcedureProtos.ProcedureStoreTracker.newBuilder(); 577 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 578 builder.addNode(entry.getValue().convert()); 579 } 580 builder.build().writeDelimitedTo(stream); 581 } 582 583 public void readFrom(final InputStream stream) throws IOException { 584 reset(); 585 final ProcedureProtos.ProcedureStoreTracker data = 586 ProcedureProtos.ProcedureStoreTracker.parseDelimitedFrom(stream); 587 for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode: data.getNodeList()) { 588 final BitSetNode node = BitSetNode.convert(protoNode); 589 map.put(node.getStart(), node); 590 } 591 } 592 } 593