1 // This file is part of OpenTSDB. 2 // Copyright (C) 2010-2012 The OpenTSDB Authors. 3 // 4 // This program is free software: you can redistribute it and/or modify it 5 // under the terms of the GNU Lesser General Public License as published by 6 // the Free Software Foundation, either version 2.1 of the License, or (at your 7 // option) any later version. This program is distributed in the hope that it 8 // will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty 9 // of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser 10 // General Public License for more details. You should have received a copy 11 // of the GNU Lesser General Public License along with this program. If not, 12 // see <http://www.gnu.org/licenses/>. 13 package net.opentsdb.uid; 14 15 import java.nio.charset.Charset; 16 import java.util.ArrayList; 17 import java.util.Arrays; 18 import java.util.Collections; 19 import java.util.HashMap; 20 import java.util.HashSet; 21 import java.util.LinkedList; 22 import java.util.List; 23 import java.util.Map; 24 import java.util.Set; 25 import java.util.concurrent.ConcurrentHashMap; 26 27 import javax.xml.bind.DatatypeConverter; 28 29 import com.stumbleupon.async.Callback; 30 import com.stumbleupon.async.Deferred; 31 32 import org.hbase.async.AtomicIncrementRequest; 33 import org.hbase.async.Bytes; 34 import org.hbase.async.DeleteRequest; 35 import org.hbase.async.GetRequest; 36 import org.hbase.async.HBaseClient; 37 import org.hbase.async.HBaseException; 38 import org.hbase.async.KeyValue; 39 import org.hbase.async.PutRequest; 40 import org.hbase.async.Scanner; 41 import org.hbase.async.Bytes.ByteMap; 42 import org.slf4j.Logger; 43 import org.slf4j.LoggerFactory; 44 45 import net.opentsdb.core.Const; 46 import net.opentsdb.core.Internal; 47 import net.opentsdb.core.TSDB; 48 import net.opentsdb.meta.UIDMeta; 49 50 /** 51 * Represents a table of Unique IDs, manages the lookup and creation of IDs. 52 * <p> 53 * Don't attempt to use {@code equals()} or {@code hashCode()} on 54 * this class. 55 * @see UniqueIdInterface 56 */ 57 @SuppressWarnings("deprecation") // Dunno why even with this, compiler warns. 58 public final class UniqueId implements UniqueIdInterface { 59 private static final Logger LOG = LoggerFactory.getLogger(UniqueId.class); 60 61 /** Enumerator for different types of UIDS @since 2.0 */ 62 public enum UniqueIdType { 63 METRIC, 64 TAGK, 65 TAGV 66 } 67 68 /** Charset used to convert Strings to byte arrays and back. */ 69 private static final Charset CHARSET = Charset.forName("ISO-8859-1"); 70 /** The single column family used by this class. */ 71 private static final byte[] ID_FAMILY = toBytes("id"); 72 /** The single column family used by this class. */ 73 private static final byte[] NAME_FAMILY = toBytes("name"); 74 /** Row key of the special row used to track the max ID already assigned. */ 75 private static final byte[] MAXID_ROW = { 0 }; 76 /** How many time do we try to assign an ID before giving up. */ 77 private static final short MAX_ATTEMPTS_ASSIGN_ID = 3; 78 /** How many time do we try to apply an edit before giving up. */ 79 private static final short MAX_ATTEMPTS_PUT = 6; 80 /** How many time do we try to assign a random ID before giving up. */ 81 private static final short MAX_ATTEMPTS_ASSIGN_RANDOM_ID = 10; 82 /** Initial delay in ms for exponential backoff to retry failed RPCs. */ 83 private static final short INITIAL_EXP_BACKOFF_DELAY = 800; 84 /** Maximum number of results to return in suggest(). */ 85 private static final short MAX_SUGGESTIONS = 25; 86 87 /** HBase client to use. */ 88 private final HBaseClient client; 89 /** Table where IDs are stored. */ 90 private final byte[] table; 91 /** The kind of UniqueId, used as the column qualifier. */ 92 private final byte[] kind; 93 /** The type of UID represented by this cache */ 94 private final UniqueIdType type; 95 /** Number of bytes on which each ID is encoded. */ 96 private final short id_width; 97 /** Whether or not to randomize new IDs */ 98 private final boolean randomize_id; 99 100 /** Cache for forward mappings (name to ID). */ 101 private final ConcurrentHashMap<String, byte[]> name_cache = 102 new ConcurrentHashMap<String, byte[]>(); 103 /** Cache for backward mappings (ID to name). 104 * The ID in the key is a byte[] converted to a String to be Comparable. */ 105 private final ConcurrentHashMap<String, String> id_cache = 106 new ConcurrentHashMap<String, String>(); 107 /** Map of pending UID assignments */ 108 private final HashMap<String, Deferred<byte[]>> pending_assignments = 109 new HashMap<String, Deferred<byte[]>>(); 110 /** Set of UID rename */ 111 private final Set<String> renaming_id_names = 112 Collections.synchronizedSet(new HashSet<String>()); 113 114 /** Number of times we avoided reading from HBase thanks to the cache. */ 115 private volatile int cache_hits; 116 /** Number of times we had to read from HBase and populate the cache. */ 117 private volatile int cache_misses; 118 /** How many times we collided with an existing ID when attempting to 119 * generate a new UID */ 120 private volatile int random_id_collisions; 121 /** How many times assignments have been rejected by the UID filter */ 122 private volatile int rejected_assignments; 123 124 /** TSDB object used for filtering and/or meta generation. */ 125 private TSDB tsdb; 126 127 /** 128 * Constructor. 129 * @param client The HBase client to use. 130 * @param table The name of the HBase table to use. 131 * @param kind The kind of Unique ID this instance will deal with. 132 * @param width The number of bytes on which Unique IDs should be encoded. 133 * @throws IllegalArgumentException if width is negative or too small/large 134 * or if kind is an empty string. 135 */ UniqueId(final HBaseClient client, final byte[] table, final String kind, final int width)136 public UniqueId(final HBaseClient client, final byte[] table, final String kind, 137 final int width) { 138 this(client, table, kind, width, false); 139 } 140 141 /** 142 * Constructor. 143 * @param client The HBase client to use. 144 * @param table The name of the HBase table to use. 145 * @param kind The kind of Unique ID this instance will deal with. 146 * @param width The number of bytes on which Unique IDs should be encoded. 147 * @param Whether or not to randomize new UIDs 148 * @throws IllegalArgumentException if width is negative or too small/large 149 * or if kind is an empty string. 150 * @since 2.2 151 */ UniqueId(final HBaseClient client, final byte[] table, final String kind, final int width, final boolean randomize_id)152 public UniqueId(final HBaseClient client, final byte[] table, final String kind, 153 final int width, final boolean randomize_id) { 154 this.client = client; 155 this.table = table; 156 if (kind.isEmpty()) { 157 throw new IllegalArgumentException("Empty string as 'kind' argument!"); 158 } 159 this.kind = toBytes(kind); 160 type = stringToUniqueIdType(kind); 161 if (width < 1 || width > 8) { 162 throw new IllegalArgumentException("Invalid width: " + width); 163 } 164 this.id_width = (short) width; 165 this.randomize_id = randomize_id; 166 } 167 168 /** 169 * Constructor. 170 * @param tsdb The TSDB this UID object belongs to 171 * @param table The name of the HBase table to use. 172 * @param kind The kind of Unique ID this instance will deal with. 173 * @param width The number of bytes on which Unique IDs should be encoded. 174 * @param Whether or not to randomize new UIDs 175 * @throws IllegalArgumentException if width is negative or too small/large 176 * or if kind is an empty string. 177 * @since 2.3 178 */ UniqueId(final TSDB tsdb, final byte[] table, final String kind, final int width, final boolean randomize_id)179 public UniqueId(final TSDB tsdb, final byte[] table, final String kind, 180 final int width, final boolean randomize_id) { 181 this.client = tsdb.getClient(); 182 this.tsdb = tsdb; 183 this.table = table; 184 if (kind.isEmpty()) { 185 throw new IllegalArgumentException("Empty string as 'kind' argument!"); 186 } 187 this.kind = toBytes(kind); 188 type = stringToUniqueIdType(kind); 189 if (width < 1 || width > 8) { 190 throw new IllegalArgumentException("Invalid width: " + width); 191 } 192 this.id_width = (short) width; 193 this.randomize_id = randomize_id; 194 } 195 196 /** The number of times we avoided reading from HBase thanks to the cache. */ cacheHits()197 public int cacheHits() { 198 return cache_hits; 199 } 200 201 /** The number of times we had to read from HBase and populate the cache. */ cacheMisses()202 public int cacheMisses() { 203 return cache_misses; 204 } 205 206 /** Returns the number of elements stored in the internal cache. */ cacheSize()207 public int cacheSize() { 208 return name_cache.size() + id_cache.size(); 209 } 210 211 /** Returns the number of random UID collisions */ randomIdCollisions()212 public int randomIdCollisions() { 213 return random_id_collisions; 214 } 215 216 /** Returns the number of UID assignments rejected by the filter */ rejectedAssignments()217 public int rejectedAssignments() { 218 return rejected_assignments; 219 } 220 kind()221 public String kind() { 222 return fromBytes(kind); 223 } 224 width()225 public short width() { 226 return id_width; 227 } 228 229 /** @param tsdb Whether or not to track new UIDMeta objects */ setTSDB(final TSDB tsdb)230 public void setTSDB(final TSDB tsdb) { 231 this.tsdb = tsdb; 232 } 233 234 /** The largest possible ID given the number of bytes the IDs are 235 * represented on. 236 * @deprecated Use {@link Internal.getMaxUnsignedValueOnBytes} 237 */ maxPossibleId()238 public long maxPossibleId() { 239 return Internal.getMaxUnsignedValueOnBytes(id_width); 240 } 241 242 /** 243 * Causes this instance to discard all its in-memory caches. 244 * @since 1.1 245 */ dropCaches()246 public void dropCaches() { 247 name_cache.clear(); 248 id_cache.clear(); 249 } 250 251 /** 252 * Finds the name associated with a given ID. 253 * <p> 254 * <strong>This method is blocking.</strong> Its use within OpenTSDB itself 255 * is discouraged, please use {@link #getNameAsync} instead. 256 * @param id The ID associated with that name. 257 * @see #getId(String) 258 * @see #getOrCreateId(String) 259 * @throws NoSuchUniqueId if the given ID is not assigned. 260 * @throws HBaseException if there is a problem communicating with HBase. 261 * @throws IllegalArgumentException if the ID given in argument is encoded 262 * on the wrong number of bytes. 263 */ getName(final byte[] id)264 public String getName(final byte[] id) throws NoSuchUniqueId, HBaseException { 265 try { 266 return getNameAsync(id).joinUninterruptibly(); 267 } catch (RuntimeException e) { 268 throw e; 269 } catch (Exception e) { 270 throw new RuntimeException("Should never be here", e); 271 } 272 } 273 274 /** 275 * Finds the name associated with a given ID. 276 * 277 * @param id The ID associated with that name. 278 * @see #getId(String) 279 * @see #getOrCreateIdAsync(String) 280 * @throws NoSuchUniqueId if the given ID is not assigned. 281 * @throws HBaseException if there is a problem communicating with HBase. 282 * @throws IllegalArgumentException if the ID given in argument is encoded 283 * on the wrong number of bytes. 284 * @since 1.1 285 */ getNameAsync(final byte[] id)286 public Deferred<String> getNameAsync(final byte[] id) { 287 if (id.length != id_width) { 288 throw new IllegalArgumentException("Wrong id.length = " + id.length 289 + " which is != " + id_width 290 + " required for '" + kind() + '\''); 291 } 292 final String name = getNameFromCache(id); 293 if (name != null) { 294 cache_hits++; 295 return Deferred.fromResult(name); 296 } 297 cache_misses++; 298 class GetNameCB implements Callback<String, String> { 299 public String call(final String name) { 300 if (name == null) { 301 throw new NoSuchUniqueId(kind(), id); 302 } 303 addNameToCache(id, name); 304 addIdToCache(name, id); 305 return name; 306 } 307 } 308 return getNameFromHBase(id).addCallback(new GetNameCB()); 309 } 310 getNameFromCache(final byte[] id)311 private String getNameFromCache(final byte[] id) { 312 return id_cache.get(fromBytes(id)); 313 } 314 getNameFromHBase(final byte[] id)315 private Deferred<String> getNameFromHBase(final byte[] id) { 316 class NameFromHBaseCB implements Callback<String, byte[]> { 317 public String call(final byte[] name) { 318 return name == null ? null : fromBytes(name); 319 } 320 } 321 return hbaseGet(id, NAME_FAMILY).addCallback(new NameFromHBaseCB()); 322 } 323 addNameToCache(final byte[] id, final String name)324 private void addNameToCache(final byte[] id, final String name) { 325 final String key = fromBytes(id); 326 String found = id_cache.get(key); 327 if (found == null) { 328 found = id_cache.putIfAbsent(key, name); 329 } 330 if (found != null && !found.equals(name)) { 331 throw new IllegalStateException("id=" + Arrays.toString(id) + " => name=" 332 + name + ", already mapped to " + found); 333 } 334 } 335 getId(final String name)336 public byte[] getId(final String name) throws NoSuchUniqueName, HBaseException { 337 try { 338 return getIdAsync(name).joinUninterruptibly(); 339 } catch (RuntimeException e) { 340 throw e; 341 } catch (Exception e) { 342 throw new RuntimeException("Should never be here", e); 343 } 344 } 345 getIdAsync(final String name)346 public Deferred<byte[]> getIdAsync(final String name) { 347 final byte[] id = getIdFromCache(name); 348 if (id != null) { 349 cache_hits++; 350 return Deferred.fromResult(id); 351 } 352 cache_misses++; 353 class GetIdCB implements Callback<byte[], byte[]> { 354 public byte[] call(final byte[] id) { 355 if (id == null) { 356 throw new NoSuchUniqueName(kind(), name); 357 } 358 if (id.length != id_width) { 359 throw new IllegalStateException("Found id.length = " + id.length 360 + " which is != " + id_width 361 + " required for '" + kind() + '\''); 362 } 363 addIdToCache(name, id); 364 addNameToCache(id, name); 365 return id; 366 } 367 } 368 Deferred<byte[]> d = getIdFromHBase(name).addCallback(new GetIdCB()); 369 return d; 370 } 371 getIdFromCache(final String name)372 private byte[] getIdFromCache(final String name) { 373 return name_cache.get(name); 374 } 375 getIdFromHBase(final String name)376 private Deferred<byte[]> getIdFromHBase(final String name) { 377 return hbaseGet(toBytes(name), ID_FAMILY); 378 } 379 addIdToCache(final String name, final byte[] id)380 private void addIdToCache(final String name, final byte[] id) { 381 byte[] found = name_cache.get(name); 382 if (found == null) { 383 found = name_cache.putIfAbsent(name, 384 // Must make a defensive copy to be immune 385 // to any changes the caller may do on the 386 // array later on. 387 Arrays.copyOf(id, id.length)); 388 } 389 if (found != null && !Arrays.equals(found, id)) { 390 throw new IllegalStateException("name=" + name + " => id=" 391 + Arrays.toString(id) + ", already mapped to " 392 + Arrays.toString(found)); 393 } 394 } 395 396 /** 397 * Implements the process to allocate a new UID. 398 * This callback is re-used multiple times in a four step process: 399 * 1. Allocate a new UID via atomic increment. 400 * 2. Create the reverse mapping (ID to name). 401 * 3. Create the forward mapping (name to ID). 402 * 4. Return the new UID to the caller. 403 */ 404 private final class UniqueIdAllocator implements Callback<Object, Object> { 405 private final String name; // What we're trying to allocate an ID for. 406 private final Deferred<byte[]> assignment; // deferred to call back 407 private short attempt = randomize_id ? // Give up when zero. 408 MAX_ATTEMPTS_ASSIGN_RANDOM_ID : MAX_ATTEMPTS_ASSIGN_ID; 409 410 private HBaseException hbe = null; // Last exception caught. 411 // TODO(manolama) - right now if we retry the assignment it will create a 412 // callback chain MAX_ATTEMPTS_* long and call the ErrBack that many times. 413 // This can be cleaned up a fair amount but it may require changing the 414 // public behavior a bit. For now, the flag will prevent multiple attempts 415 // to execute the callback. 416 private boolean called = false; // whether we called the deferred or not 417 418 private long id = -1; // The ID we'll grab with an atomic increment. 419 private byte row[]; // The same ID, as a byte array. 420 421 private static final byte ALLOCATE_UID = 0; 422 private static final byte CREATE_REVERSE_MAPPING = 1; 423 private static final byte CREATE_FORWARD_MAPPING = 2; 424 private static final byte DONE = 3; 425 private byte state = ALLOCATE_UID; // Current state of the process. 426 UniqueIdAllocator(final String name, final Deferred<byte[]> assignment)427 UniqueIdAllocator(final String name, final Deferred<byte[]> assignment) { 428 this.name = name; 429 this.assignment = assignment; 430 } 431 tryAllocate()432 Deferred<byte[]> tryAllocate() { 433 attempt--; 434 state = ALLOCATE_UID; 435 call(null); 436 return assignment; 437 } 438 439 @SuppressWarnings("unchecked") call(final Object arg)440 public Object call(final Object arg) { 441 if (attempt == 0) { 442 if (hbe == null && !randomize_id) { 443 throw new IllegalStateException("Should never happen!"); 444 } 445 LOG.error("Failed to assign an ID for kind='" + kind() 446 + "' name='" + name + "'", hbe); 447 if (hbe == null) { 448 throw new FailedToAssignUniqueIdException(kind(), name, 449 MAX_ATTEMPTS_ASSIGN_RANDOM_ID); 450 } 451 throw hbe; 452 } 453 454 if (arg instanceof Exception) { 455 final String msg = ("Failed attempt #" + (randomize_id 456 ? (MAX_ATTEMPTS_ASSIGN_RANDOM_ID - attempt) 457 : (MAX_ATTEMPTS_ASSIGN_ID - attempt)) 458 + " to assign an UID for " + kind() + ':' + name 459 + " at step #" + state); 460 if (arg instanceof HBaseException) { 461 LOG.error(msg, (Exception) arg); 462 hbe = (HBaseException) arg; 463 attempt--; 464 state = ALLOCATE_UID;; // Retry from the beginning. 465 } else { 466 LOG.error("WTF? Unexpected exception! " + msg, (Exception) arg); 467 return arg; // Unexpected exception, let it bubble up. 468 } 469 } 470 471 class ErrBack implements Callback<Object, Exception> { 472 public Object call(final Exception e) throws Exception { 473 if (!called) { 474 LOG.warn("Failed pending assignment for: " + name, e); 475 assignment.callback(e); 476 called = true; 477 } 478 return assignment; 479 } 480 } 481 482 final Deferred d; 483 switch (state) { 484 case ALLOCATE_UID: 485 d = allocateUid(); 486 break; 487 case CREATE_REVERSE_MAPPING: 488 d = createReverseMapping(arg); 489 break; 490 case CREATE_FORWARD_MAPPING: 491 d = createForwardMapping(arg); 492 break; 493 case DONE: 494 return done(arg); 495 default: 496 throw new AssertionError("Should never be here!"); 497 } 498 return d.addBoth(this).addErrback(new ErrBack()); 499 } 500 501 /** Generates either a random or a serial ID. If random, we need to 502 * make sure that there isn't a UID collision. 503 */ allocateUid()504 private Deferred<Long> allocateUid() { 505 LOG.info("Creating " + (randomize_id ? "a random " : "an ") + 506 "ID for kind='" + kind() + "' name='" + name + '\''); 507 508 state = CREATE_REVERSE_MAPPING; 509 if (randomize_id) { 510 return Deferred.fromResult(RandomUniqueId.getRandomUID()); 511 } else { 512 return client.atomicIncrement(new AtomicIncrementRequest(table, 513 MAXID_ROW, ID_FAMILY, kind)); 514 } 515 } 516 517 /** 518 * Create the reverse mapping. 519 * We do this before the forward one so that if we die before creating 520 * the forward mapping we don't run the risk of "publishing" a 521 * partially assigned ID. The reverse mapping on its own is harmless 522 * but the forward mapping without reverse mapping is bad as it would 523 * point to an ID that cannot be resolved. 524 */ createReverseMapping(final Object arg)525 private Deferred<Boolean> createReverseMapping(final Object arg) { 526 if (!(arg instanceof Long)) { 527 throw new IllegalStateException("Expected a Long but got " + arg); 528 } 529 id = (Long) arg; 530 if (id <= 0) { 531 throw new IllegalStateException("Got a negative ID from HBase: " + id); 532 } 533 LOG.info("Got ID=" + id 534 + " for kind='" + kind() + "' name='" + name + "'"); 535 row = Bytes.fromLong(id); 536 // row.length should actually be 8. 537 if (row.length < id_width) { 538 throw new IllegalStateException("OMG, row.length = " + row.length 539 + " which is less than " + id_width 540 + " for id=" + id 541 + " row=" + Arrays.toString(row)); 542 } 543 // Verify that we're going to drop bytes that are 0. 544 for (int i = 0; i < row.length - id_width; i++) { 545 if (row[i] != 0) { 546 final String message = "All Unique IDs for " + kind() 547 + " on " + id_width + " bytes are already assigned!"; 548 LOG.error("OMG " + message); 549 throw new IllegalStateException(message); 550 } 551 } 552 // Shrink the ID on the requested number of bytes. 553 row = Arrays.copyOfRange(row, row.length - id_width, row.length); 554 555 state = CREATE_FORWARD_MAPPING; 556 // We are CAS'ing the KV into existence -- the second argument is how 557 // we tell HBase we want to atomically create the KV, so that if there 558 // is already a KV in this cell, we'll fail. Technically we could do 559 // just a `put' here, as we have a freshly allocated UID, so there is 560 // not reason why a KV should already exist for this UID, but just to 561 // err on the safe side and catch really weird corruption cases, we do 562 // a CAS instead to create the KV. 563 return client.compareAndSet(reverseMapping(), HBaseClient.EMPTY_ARRAY); 564 } 565 reverseMapping()566 private PutRequest reverseMapping() { 567 return new PutRequest(table, row, NAME_FAMILY, kind, toBytes(name)); 568 } 569 createForwardMapping(final Object arg)570 private Deferred<?> createForwardMapping(final Object arg) { 571 if (!(arg instanceof Boolean)) { 572 throw new IllegalStateException("Expected a Boolean but got " + arg); 573 } 574 if (!((Boolean) arg)) { // Previous CAS failed. 575 if (randomize_id) { 576 // This random Id is already used by another row 577 LOG.warn("Detected random id collision and retrying kind='" + 578 kind() + "' name='" + name + "'"); 579 random_id_collisions++; 580 } else { 581 // something is really messed up then 582 LOG.error("WTF! Failed to CAS reverse mapping: " + reverseMapping() 583 + " -- run an fsck against the UID table!"); 584 } 585 attempt--; 586 state = ALLOCATE_UID; 587 return Deferred.fromResult(false); 588 } 589 590 state = DONE; 591 return client.compareAndSet(forwardMapping(), HBaseClient.EMPTY_ARRAY); 592 } 593 forwardMapping()594 private PutRequest forwardMapping() { 595 return new PutRequest(table, toBytes(name), ID_FAMILY, kind, row); 596 } 597 done(final Object arg)598 private Deferred<byte[]> done(final Object arg) { 599 if (!(arg instanceof Boolean)) { 600 throw new IllegalStateException("Expected a Boolean but got " + arg); 601 } 602 if (!((Boolean) arg)) { // Previous CAS failed. We lost a race. 603 LOG.warn("Race condition: tried to assign ID " + id + " to " 604 + kind() + ":" + name + ", but CAS failed on " 605 + forwardMapping() + ", which indicates this UID must have" 606 + " been allocated concurrently by another TSD or thread. " 607 + "So ID " + id + " was leaked."); 608 // If two TSDs attempted to allocate a UID for the same name at the 609 // same time, they would both have allocated a UID, and created a 610 // reverse mapping, and upon getting here, only one of them would 611 // manage to CAS this KV into existence. The one that loses the 612 // race will retry and discover the UID assigned by the winner TSD, 613 // and a UID will have been wasted in the process. No big deal. 614 if (randomize_id) { 615 // This random Id is already used by another row 616 LOG.warn("Detected random id collision between two tsdb " 617 + "servers kind='" + kind() + "' name='" + name + "'"); 618 random_id_collisions++; 619 } 620 621 class GetIdCB implements Callback<Object, byte[]> { 622 public Object call(final byte[] row) throws Exception { 623 assignment.callback(row); 624 return null; 625 } 626 } 627 getIdAsync(name).addCallback(new GetIdCB()); 628 return assignment; 629 } 630 631 cacheMapping(name, row); 632 633 if (tsdb != null && tsdb.getConfig().enable_realtime_uid()) { 634 final UIDMeta meta = new UIDMeta(type, row, name); 635 meta.storeNew(tsdb); 636 LOG.info("Wrote UIDMeta for: " + name); 637 tsdb.indexUIDMeta(meta); 638 } 639 640 synchronized(pending_assignments) { 641 if (pending_assignments.remove(name) != null) { 642 LOG.info("Completed pending assignment for: " + name); 643 } 644 } 645 assignment.callback(row); 646 return assignment; 647 } 648 649 } 650 651 /** Adds the bidirectional mapping in the cache. */ cacheMapping(final String name, final byte[] id)652 private void cacheMapping(final String name, final byte[] id) { 653 addIdToCache(name, id); 654 addNameToCache(id, name); 655 } 656 657 /** 658 * Finds the ID associated with a given name or creates it. 659 * <p> 660 * <strong>This method is blocking.</strong> Its use within OpenTSDB itself 661 * is discouraged, please use {@link #getOrCreateIdAsync} instead. 662 * <p> 663 * The length of the byte array is fixed in advance by the implementation. 664 * 665 * @param name The name to lookup in the table or to assign an ID to. 666 * @throws HBaseException if there is a problem communicating with HBase. 667 * @throws IllegalStateException if all possible IDs are already assigned. 668 * @throws IllegalStateException if the ID found in HBase is encoded on the 669 * wrong number of bytes. 670 */ getOrCreateId(final String name)671 public byte[] getOrCreateId(final String name) throws HBaseException { 672 try { 673 return getIdAsync(name).joinUninterruptibly(); 674 } catch (NoSuchUniqueName e) { 675 if (tsdb != null && tsdb.getUidFilter() != null && 676 tsdb.getUidFilter().fillterUIDAssignments()) { 677 try { 678 if (!tsdb.getUidFilter().allowUIDAssignment(type, name, null, null) 679 .join()) { 680 rejected_assignments++; 681 throw new FailedToAssignUniqueIdException(new String(kind), name, 0, 682 "Blocked by UID filter."); 683 } 684 } catch (FailedToAssignUniqueIdException e1) { 685 throw e1; 686 } catch (InterruptedException e1) { 687 LOG.error("Interrupted", e1); 688 Thread.currentThread().interrupt(); 689 } catch (Exception e1) { 690 throw new RuntimeException("Should never be here", e1); 691 } 692 } 693 694 Deferred<byte[]> assignment = null; 695 boolean pending = false; 696 synchronized (pending_assignments) { 697 assignment = pending_assignments.get(name); 698 if (assignment == null) { 699 // to prevent UID leaks that can be caused when multiple time 700 // series for the same metric or tags arrive, we need to write a 701 // deferred to the pending map as quickly as possible. Then we can 702 // start the assignment process after we've stashed the deferred 703 // and released the lock 704 assignment = new Deferred<byte[]>(); 705 pending_assignments.put(name, assignment); 706 } else { 707 pending = true; 708 } 709 } 710 711 if (pending) { 712 LOG.info("Already waiting for UID assignment: " + name); 713 try { 714 return assignment.joinUninterruptibly(); 715 } catch (Exception e1) { 716 throw new RuntimeException("Should never be here", e1); 717 } 718 } 719 720 // start the assignment dance after stashing the deferred 721 byte[] uid = null; 722 try { 723 uid = new UniqueIdAllocator(name, assignment).tryAllocate().joinUninterruptibly(); 724 } catch (RuntimeException e1) { 725 throw e1; 726 } catch (Exception e1) { 727 throw new RuntimeException("Should never be here", e); 728 } finally { 729 synchronized (pending_assignments) { 730 if (pending_assignments.remove(name) != null) { 731 LOG.info("Completed pending assignment for: " + name); 732 } 733 } 734 } 735 return uid; 736 } catch (Exception e) { 737 throw new RuntimeException("Should never be here", e); 738 } 739 } 740 741 /** 742 * Finds the ID associated with a given name or creates it. 743 * <p> 744 * The length of the byte array is fixed in advance by the implementation. 745 * 746 * @param name The name to lookup in the table or to assign an ID to. 747 * @throws HBaseException if there is a problem communicating with HBase. 748 * @throws IllegalStateException if all possible IDs are already assigned. 749 * @throws IllegalStateException if the ID found in HBase is encoded on the 750 * wrong number of bytes. 751 * @since 1.2 752 */ getOrCreateIdAsync(final String name)753 public Deferred<byte[]> getOrCreateIdAsync(final String name) { 754 return getOrCreateIdAsync(name, null, null); 755 } 756 757 /** 758 * Finds the ID associated with a given name or creates it. 759 * <p> 760 * The length of the byte array is fixed in advance by the implementation. 761 * 762 * @param name The name to lookup in the table or to assign an ID to. 763 * @param metric Name of the metric associated with the UID for filtering. 764 * @param tags Tag set associated with the UID for filtering. 765 * @throws HBaseException if there is a problem communicating with HBase. 766 * @throws IllegalStateException if all possible IDs are already assigned. 767 * @throws IllegalStateException if the ID found in HBase is encoded on the 768 * wrong number of bytes. 769 * @since 2.3 770 */ getOrCreateIdAsync(final String name, final String metric, final Map<String, String> tags)771 public Deferred<byte[]> getOrCreateIdAsync(final String name, 772 final String metric, final Map<String, String> tags) { 773 // Look in the cache first. 774 final byte[] id = getIdFromCache(name); 775 if (id != null) { 776 cache_hits++; 777 return Deferred.fromResult(id); 778 } 779 // Not found in our cache, so look in HBase instead. 780 781 /** Triggers the assignment if allowed through the filter */ 782 class AssignmentAllowedCB implements Callback<Deferred<byte[]>, Boolean> { 783 @Override 784 public Deferred<byte[]> call(final Boolean allowed) throws Exception { 785 if (!allowed) { 786 rejected_assignments++; 787 return Deferred.fromError(new FailedToAssignUniqueIdException( 788 new String(kind), name, 0, "Blocked by UID filter.")); 789 } 790 791 Deferred<byte[]> assignment = null; 792 synchronized (pending_assignments) { 793 assignment = pending_assignments.get(name); 794 if (assignment == null) { 795 // to prevent UID leaks that can be caused when multiple time 796 // series for the same metric or tags arrive, we need to write a 797 // deferred to the pending map as quickly as possible. Then we can 798 // start the assignment process after we've stashed the deferred 799 // and released the lock 800 assignment = new Deferred<byte[]>(); 801 pending_assignments.put(name, assignment); 802 } else { 803 LOG.info("Already waiting for UID assignment: " + name); 804 return assignment; 805 } 806 } 807 808 // start the assignment dance after stashing the deferred 809 if (metric != null && LOG.isDebugEnabled()) { 810 LOG.debug("Assigning UID for '" + name + "' of type '" + type + 811 "' for series '" + metric + ", " + tags + "'"); 812 } 813 814 // start the assignment dance after stashing the deferred 815 return new UniqueIdAllocator(name, assignment).tryAllocate(); 816 } 817 @Override 818 public String toString() { 819 return "AssignmentAllowedCB"; 820 } 821 } 822 823 /** Triggers an assignment (possibly through the filter) if the exception 824 * returned was a NoSuchUniqueName. */ 825 class HandleNoSuchUniqueNameCB implements Callback<Object, Exception> { 826 public Object call(final Exception e) { 827 if (e instanceof NoSuchUniqueName) { 828 if (tsdb != null && tsdb.getUidFilter() != null && 829 tsdb.getUidFilter().fillterUIDAssignments()) { 830 return tsdb.getUidFilter() 831 .allowUIDAssignment(type, name, metric, tags) 832 .addCallbackDeferring(new AssignmentAllowedCB()); 833 } else { 834 return Deferred.fromResult(true) 835 .addCallbackDeferring(new AssignmentAllowedCB()); 836 } 837 } 838 return e; // Other unexpected exception, let it bubble up. 839 } 840 } 841 842 // Kick off the HBase lookup, and if we don't find it there either, start 843 // the process to allocate a UID. 844 return getIdAsync(name).addErrback(new HandleNoSuchUniqueNameCB()); 845 } 846 847 /** 848 * Attempts to find suggestions of names given a search term. 849 * <p> 850 * <strong>This method is blocking.</strong> Its use within OpenTSDB itself 851 * is discouraged, please use {@link #suggestAsync} instead. 852 * @param search The search term (possibly empty). 853 * @return A list of known valid names that have UIDs that sort of match 854 * the search term. If the search term is empty, returns the first few 855 * terms. 856 * @throws HBaseException if there was a problem getting suggestions from 857 * HBase. 858 */ suggest(final String search)859 public List<String> suggest(final String search) throws HBaseException { 860 return suggest(search, MAX_SUGGESTIONS); 861 } 862 863 /** 864 * Attempts to find suggestions of names given a search term. 865 * @param search The search term (possibly empty). 866 * @param max_results The number of results to return. Must be 1 or greater 867 * @return A list of known valid names that have UIDs that sort of match 868 * the search term. If the search term is empty, returns the first few 869 * terms. 870 * @throws HBaseException if there was a problem getting suggestions from 871 * HBase. 872 * @throws IllegalArgumentException if the count was less than 1 873 * @since 2.0 874 */ suggest(final String search, final int max_results)875 public List<String> suggest(final String search, final int max_results) 876 throws HBaseException { 877 if (max_results < 1) { 878 throw new IllegalArgumentException("Count must be greater than 0"); 879 } 880 try { 881 return suggestAsync(search, max_results).joinUninterruptibly(); 882 } catch (HBaseException e) { 883 throw e; 884 } catch (Exception e) { // Should never happen. 885 final String msg = "Unexpected exception caught by " 886 + this + ".suggest(" + search + ')'; 887 LOG.error(msg, e); 888 throw new RuntimeException(msg, e); // Should never happen. 889 } 890 } 891 892 /** 893 * Attempts to find suggestions of names given a search term. 894 * @param search The search term (possibly empty). 895 * @return A list of known valid names that have UIDs that sort of match 896 * the search term. If the search term is empty, returns the first few 897 * terms. 898 * @throws HBaseException if there was a problem getting suggestions from 899 * HBase. 900 * @since 1.1 901 */ suggestAsync(final String search, final int max_results)902 public Deferred<List<String>> suggestAsync(final String search, 903 final int max_results) { 904 return new SuggestCB(search, max_results).search(); 905 } 906 907 /** 908 * Helper callback to asynchronously scan HBase for suggestions. 909 */ 910 private final class SuggestCB 911 implements Callback<Object, ArrayList<ArrayList<KeyValue>>> { 912 private final LinkedList<String> suggestions = new LinkedList<String>(); 913 private final Scanner scanner; 914 private final int max_results; 915 SuggestCB(final String search, final int max_results)916 SuggestCB(final String search, final int max_results) { 917 this.max_results = max_results; 918 this.scanner = getSuggestScanner(client, table, search, kind, max_results); 919 } 920 921 @SuppressWarnings("unchecked") search()922 Deferred<List<String>> search() { 923 return (Deferred) scanner.nextRows().addCallback(this); 924 } 925 call(final ArrayList<ArrayList<KeyValue>> rows)926 public Object call(final ArrayList<ArrayList<KeyValue>> rows) { 927 if (rows == null) { // We're done scanning. 928 return suggestions; 929 } 930 931 for (final ArrayList<KeyValue> row : rows) { 932 if (row.size() != 1) { 933 LOG.error("WTF shouldn't happen! Scanner " + scanner + " returned" 934 + " a row that doesn't have exactly 1 KeyValue: " + row); 935 if (row.isEmpty()) { 936 continue; 937 } 938 } 939 final byte[] key = row.get(0).key(); 940 final String name = fromBytes(key); 941 final byte[] id = row.get(0).value(); 942 final byte[] cached_id = name_cache.get(name); 943 if (cached_id == null) { 944 cacheMapping(name, id); 945 } else if (!Arrays.equals(id, cached_id)) { 946 throw new IllegalStateException("WTF? For kind=" + kind() 947 + " name=" + name + ", we have id=" + Arrays.toString(cached_id) 948 + " in cache, but just scanned id=" + Arrays.toString(id)); 949 } 950 suggestions.add(name); 951 if ((short) suggestions.size() >= max_results) { // We have enough. 952 return scanner.close().addCallback(new Callback<Object, Object>() { 953 @Override 954 public Object call(Object ignored) throws Exception { 955 return suggestions; 956 } 957 }); 958 } 959 row.clear(); // free() 960 } 961 return search(); // Get more suggestions. 962 } 963 } 964 965 /** 966 * Reassigns the UID to a different name (non-atomic). 967 * <p> 968 * Whatever was the UID of {@code oldname} will be given to {@code newname}. 969 * {@code oldname} will no longer be assigned a UID. 970 * <p> 971 * Beware that the assignment change is <b>not atommic</b>. If two threads 972 * or processes attempt to rename the same UID differently, the result is 973 * unspecified and might even be inconsistent. This API is only here for 974 * administrative purposes, not for normal programmatic interactions. 975 * @param oldname The old name to rename. 976 * @param newname The new name. 977 * @throws NoSuchUniqueName if {@code oldname} wasn't assigned. 978 * @throws IllegalArgumentException if {@code newname} was already assigned. 979 * @throws HBaseException if there was a problem with HBase while trying to 980 * update the mapping. 981 */ 982 public void rename(final String oldname, final String newname) { 983 final byte[] row = getId(oldname); 984 final String row_string = fromBytes(row); 985 { 986 byte[] id = null; 987 try { 988 id = getId(newname); 989 } catch (NoSuchUniqueName e) { 990 // OK, we don't want the new name to be assigned. 991 } 992 if (id != null) { 993 throw new IllegalArgumentException("When trying rename(\"" + oldname 994 + "\", \"" + newname + "\") on " + this + ": new name already" 995 + " assigned ID=" + Arrays.toString(id)); 996 } 997 } 998 999 if (renaming_id_names.contains(row_string) 1000 || renaming_id_names.contains(newname)) { 1001 throw new IllegalArgumentException("Ongoing rename on the same ID(\"" 1002 + Arrays.toString(row) + "\") or an identical new name(\"" + newname 1003 + "\")"); 1004 } 1005 renaming_id_names.add(row_string); 1006 renaming_id_names.add(newname); 1007 1008 final byte[] newnameb = toBytes(newname); 1009 1010 // Update the reverse mapping first, so that if we die before updating 1011 // the forward mapping we don't run the risk of "publishing" a 1012 // partially assigned ID. The reverse mapping on its own is harmless 1013 // but the forward mapping without reverse mapping is bad. 1014 try { 1015 final PutRequest reverse_mapping = new PutRequest( 1016 table, row, NAME_FAMILY, kind, newnameb); 1017 hbasePutWithRetry(reverse_mapping, MAX_ATTEMPTS_PUT, 1018 INITIAL_EXP_BACKOFF_DELAY); 1019 } catch (HBaseException e) { 1020 LOG.error("When trying rename(\"" + oldname 1021 + "\", \"" + newname + "\") on " + this + ": Failed to update reverse" 1022 + " mapping for ID=" + Arrays.toString(row), e); 1023 renaming_id_names.remove(row_string); 1024 renaming_id_names.remove(newname); 1025 throw e; 1026 } 1027 1028 // Now create the new forward mapping. 1029 try { 1030 final PutRequest forward_mapping = new PutRequest( 1031 table, newnameb, ID_FAMILY, kind, row); 1032 hbasePutWithRetry(forward_mapping, MAX_ATTEMPTS_PUT, 1033 INITIAL_EXP_BACKOFF_DELAY); 1034 } catch (HBaseException e) { 1035 LOG.error("When trying rename(\"" + oldname 1036 + "\", \"" + newname + "\") on " + this + ": Failed to create the" 1037 + " new forward mapping with ID=" + Arrays.toString(row), e); 1038 renaming_id_names.remove(row_string); 1039 renaming_id_names.remove(newname); 1040 throw e; 1041 } 1042 1043 // Update cache. 1044 addIdToCache(newname, row); // add new name -> ID 1045 id_cache.put(fromBytes(row), newname); // update ID -> new name 1046 name_cache.remove(oldname); // remove old name -> ID 1047 1048 // Delete the old forward mapping. 1049 try { 1050 final DeleteRequest old_forward_mapping = new DeleteRequest( 1051 table, toBytes(oldname), ID_FAMILY, kind); 1052 client.delete(old_forward_mapping).joinUninterruptibly(); 1053 } catch (HBaseException e) { 1054 LOG.error("When trying rename(\"" + oldname 1055 + "\", \"" + newname + "\") on " + this + ": Failed to remove the" 1056 + " old forward mapping for ID=" + Arrays.toString(row), e); 1057 throw e; 1058 } catch (Exception e) { 1059 final String msg = "Unexpected exception when trying rename(\"" + oldname 1060 + "\", \"" + newname + "\") on " + this + ": Failed to remove the" 1061 + " old forward mapping for ID=" + Arrays.toString(row); 1062 LOG.error("WTF? " + msg, e); 1063 throw new RuntimeException(msg, e); 1064 } finally { 1065 renaming_id_names.remove(row_string); 1066 renaming_id_names.remove(newname); 1067 } 1068 // Success! 1069 } 1070 1071 /** 1072 * Attempts to remove the mappings for the given string from the UID table 1073 * as well as the cache. If used, the caller should remove the entry from all 1074 * TSD caches as well. 1075 * <p> 1076 * WARNING: This is a best attempt only method in that we'll lookup the UID 1077 * for the given string, then issue two delete requests, one for each mapping. 1078 * If either mapping fails then the cache can be re-populated later on with 1079 * stale data. In that case, please run the FSCK utility. 1080 * <p> 1081 * WARNING 2: This method will NOT delete time series data or TSMeta data 1082 * associated with the UIDs. It only removes them from the UID table. Deleting 1083 * a metric is generally safe as you won't query over it in the future. But 1084 * deleting tag keys or values can cause queries to fail if they find data 1085 * without a corresponding name. 1086 * 1087 * @param name The name of the UID to delete 1088 * @return A deferred to wait on for completion. The result will be null if 1089 * successful, an exception otherwise. 1090 * @throws NoSuchUniqueName if the UID string did not exist in storage 1091 * @throws IllegalStateException if the TSDB wasn't set for this UID object 1092 * @since 2.2 1093 */ 1094 public Deferred<Object> deleteAsync(final String name) { 1095 if (tsdb == null) { 1096 throw new IllegalStateException("The TSDB is null for this UID object."); 1097 } 1098 final byte[] uid = new byte[id_width]; 1099 final ArrayList<Deferred<Object>> deferreds = 1100 new ArrayList<Deferred<Object>>(2); 1101 1102 /** Catches errors and still cleans out the cache */ 1103 class ErrCB implements Callback<Object, Exception> { 1104 @Override 1105 public Object call(final Exception ex) throws Exception { 1106 name_cache.remove(name); 1107 id_cache.remove(fromBytes(uid)); 1108 LOG.error("Failed to delete " + fromBytes(kind) + " UID " + name 1109 + " but still cleared the cache", ex); 1110 return ex; 1111 } 1112 } 1113 1114 /** Used to wait on the group of delete requests */ 1115 class GroupCB implements Callback<Deferred<Object>, ArrayList<Object>> { 1116 @Override 1117 public Deferred<Object> call(final ArrayList<Object> response) 1118 throws Exception { 1119 name_cache.remove(name); 1120 id_cache.remove(fromBytes(uid)); 1121 LOG.info("Successfully deleted " + fromBytes(kind) + " UID " + name); 1122 return Deferred.fromResult(null); 1123 } 1124 } 1125 1126 /** Called after fetching the UID from storage */ 1127 class LookupCB implements Callback<Deferred<Object>, byte[]> { 1128 @Override 1129 public Deferred<Object> call(final byte[] stored_uid) throws Exception { 1130 if (stored_uid == null) { 1131 return Deferred.fromError(new NoSuchUniqueName(kind(), name)); 1132 } 1133 System.arraycopy(stored_uid, 0, uid, 0, id_width); 1134 final DeleteRequest forward = 1135 new DeleteRequest(table, toBytes(name), ID_FAMILY, kind); 1136 deferreds.add(tsdb.getClient().delete(forward)); 1137 1138 final DeleteRequest reverse = 1139 new DeleteRequest(table, uid, NAME_FAMILY, kind); 1140 deferreds.add(tsdb.getClient().delete(reverse)); 1141 1142 final DeleteRequest meta = new DeleteRequest(table, uid, NAME_FAMILY, 1143 toBytes((type.toString().toLowerCase() + "_meta"))); 1144 deferreds.add(tsdb.getClient().delete(meta)); 1145 return Deferred.group(deferreds).addCallbackDeferring(new GroupCB()); 1146 } 1147 } 1148 1149 final byte[] cached_uid = name_cache.get(name); 1150 if (cached_uid == null) { 1151 return getIdFromHBase(name).addCallbackDeferring(new LookupCB()) 1152 .addErrback(new ErrCB()); 1153 } 1154 System.arraycopy(cached_uid, 0, uid, 0, id_width); 1155 final DeleteRequest forward = 1156 new DeleteRequest(table, toBytes(name), ID_FAMILY, kind); 1157 deferreds.add(tsdb.getClient().delete(forward)); 1158 1159 final DeleteRequest reverse = 1160 new DeleteRequest(table, uid, NAME_FAMILY, kind); 1161 deferreds.add(tsdb.getClient().delete(reverse)); 1162 1163 final DeleteRequest meta = new DeleteRequest(table, uid, NAME_FAMILY, 1164 toBytes((type.toString().toLowerCase() + "_meta"))); 1165 deferreds.add(tsdb.getClient().delete(meta)); 1166 return Deferred.group(deferreds).addCallbackDeferring(new GroupCB()) 1167 .addErrback(new ErrCB()); 1168 } 1169 1170 /** The start row to scan on empty search strings. `!' = first ASCII char. */ 1171 private static final byte[] START_ROW = new byte[] { '!' }; 1172 1173 /** The end row to scan on empty search strings. `~' = last ASCII char. */ 1174 private static final byte[] END_ROW = new byte[] { '~' }; 1175 1176 /** 1177 * Creates a scanner that scans the right range of rows for suggestions. 1178 * @param client The HBase client to use. 1179 * @param tsd_uid_table Table where IDs are stored. 1180 * @param search The string to start searching at 1181 * @param kind_or_null The kind of UID to search or null for any kinds. 1182 * @param max_results The max number of results to return 1183 */ 1184 private static Scanner getSuggestScanner(final HBaseClient client, 1185 final byte[] tsd_uid_table, final String search, 1186 final byte[] kind_or_null, final int max_results) { 1187 final byte[] start_row; 1188 final byte[] end_row; 1189 if (search.isEmpty()) { 1190 start_row = START_ROW; 1191 end_row = END_ROW; 1192 } else { 1193 start_row = toBytes(search); 1194 end_row = Arrays.copyOf(start_row, start_row.length); 1195 end_row[start_row.length - 1]++; 1196 } 1197 final Scanner scanner = client.newScanner(tsd_uid_table); 1198 scanner.setStartKey(start_row); 1199 scanner.setStopKey(end_row); 1200 scanner.setFamily(ID_FAMILY); 1201 if (kind_or_null != null) { 1202 scanner.setQualifier(kind_or_null); 1203 } 1204 scanner.setMaxNumRows(max_results <= 4096 ? max_results : 4096); 1205 return scanner; 1206 } 1207 1208 /** Returns the cell of the specified row key, using family:kind. */ 1209 private Deferred<byte[]> hbaseGet(final byte[] key, final byte[] family) { 1210 final GetRequest get = new GetRequest(table, key); 1211 get.family(family).qualifier(kind); 1212 class GetCB implements Callback<byte[], ArrayList<KeyValue>> { 1213 public byte[] call(final ArrayList<KeyValue> row) { 1214 if (row == null || row.isEmpty()) { 1215 return null; 1216 } 1217 return row.get(0).value(); 1218 } 1219 } 1220 return client.get(get).addCallback(new GetCB()); 1221 } 1222 1223 /** 1224 * Attempts to run the PutRequest given in argument, retrying if needed. 1225 * 1226 * Puts are synchronized. 1227 * 1228 * @param put The PutRequest to execute. 1229 * @param attempts The maximum number of attempts. 1230 * @param wait The initial amount of time in ms to sleep for after a 1231 * failure. This amount is doubled after each failed attempt. 1232 * @throws HBaseException if all the attempts have failed. This exception 1233 * will be the exception of the last attempt. 1234 */ 1235 private void hbasePutWithRetry(final PutRequest put, short attempts, short wait) 1236 throws HBaseException { 1237 put.setBufferable(false); // TODO(tsuna): Remove once this code is async. 1238 while (attempts-- > 0) { 1239 try { 1240 client.put(put).joinUninterruptibly(); 1241 return; 1242 } catch (HBaseException e) { 1243 if (attempts > 0) { 1244 LOG.error("Put failed, attempts left=" + attempts 1245 + " (retrying in " + wait + " ms), put=" + put, e); 1246 try { 1247 Thread.sleep(wait); 1248 } catch (InterruptedException ie) { 1249 throw new RuntimeException("interrupted", ie); 1250 } 1251 wait *= 2; 1252 } else { 1253 throw e; 1254 } 1255 } catch (Exception e) { 1256 LOG.error("WTF? Unexpected exception type, put=" + put, e); 1257 } 1258 } 1259 throw new IllegalStateException("This code should never be reached!"); 1260 } 1261 1262 private static byte[] toBytes(final String s) { 1263 return s.getBytes(CHARSET); 1264 } 1265 1266 private static String fromBytes(final byte[] b) { 1267 return new String(b, CHARSET); 1268 } 1269 1270 /** Returns a human readable string representation of the object. */ 1271 public String toString() { 1272 return "UniqueId(" + fromBytes(table) + ", " + kind() + ", " + id_width + ")"; 1273 } 1274 1275 /** 1276 * Converts a byte array to a hex encoded, upper case string with padding 1277 * @param uid The ID to convert 1278 * @return the UID as a hex string 1279 * @throws NullPointerException if the ID was null 1280 * @since 2.0 1281 */ 1282 public static String uidToString(final byte[] uid) { 1283 return DatatypeConverter.printHexBinary(uid); 1284 } 1285 1286 /** 1287 * Converts a hex string to a byte array 1288 * If the {@code uid} is less than {@code uid_length * 2} characters wide, it 1289 * will be padded with 0s to conform to the spec. E.g. if the tagk width is 3 1290 * and the given {@code uid} string is "1", the string will be padded to 1291 * "000001" and then converted to a byte array to reach 3 bytes. 1292 * All {@code uid}s are padded to 1 byte. If given "1", and {@code uid_length} 1293 * is 0, the uid will be padded to "01" then converted. 1294 * @param uid The UID to convert 1295 * @return The UID as a byte array 1296 * @throws NullPointerException if the ID was null 1297 * @throws IllegalArgumentException if the string is not valid hex 1298 * @since 2.0 1299 */ 1300 public static byte[] stringToUid(final String uid) { 1301 return stringToUid(uid, (short)0); 1302 } 1303 1304 /** 1305 * Converts a UID to an integer value. The array must be the same length as 1306 * uid_length or an exception will be thrown. 1307 * @param uid The hex encoded UID to convert 1308 * @param uid_length Length the array SHOULD be according to the UID config 1309 * @return The UID converted to an integer 1310 * @throws IllegalArgumentException if the length of the byte array does not 1311 * match the uid_length value 1312 * @since 2.1 1313 */ 1314 public static long uidToLong(final String uid, final short uid_length) { 1315 return uidToLong(stringToUid(uid), uid_length); 1316 } 1317 1318 /** 1319 * Converts a UID to an integer value. The array must be the same length as 1320 * uid_length or an exception will be thrown. 1321 * @param uid The byte array to convert 1322 * @param uid_length Length the array SHOULD be according to the UID config 1323 * @return The UID converted to an integer 1324 * @throws IllegalArgumentException if the length of the byte array does not 1325 * match the uid_length value 1326 * @since 2.1 1327 */ 1328 public static long uidToLong(final byte[] uid, final short uid_length) { 1329 if (uid.length != uid_length) { 1330 throw new IllegalArgumentException("UID was " + uid.length 1331 + " bytes long but expected to be " + uid_length); 1332 } 1333 1334 final byte[] uid_raw = new byte[8]; 1335 System.arraycopy(uid, 0, uid_raw, 8 - uid_length, uid_length); 1336 return Bytes.getLong(uid_raw); 1337 } 1338 1339 /** 1340 * Converts a Long to a byte array with the proper UID width 1341 * @param uid The UID to convert 1342 * @param width The width of the UID in bytes 1343 * @return The UID as a byte array 1344 * @throws IllegalStateException if the UID is larger than the width would 1345 * allow 1346 * @since 2.1 1347 */ 1348 public static byte[] longToUID(final long uid, final short width) { 1349 // Verify that we're going to drop bytes that are 0. 1350 final byte[] padded = Bytes.fromLong(uid); 1351 for (int i = 0; i < padded.length - width; i++) { 1352 if (padded[i] != 0) { 1353 final String message = "UID " + Long.toString(uid) + 1354 " was too large for " + width + " bytes"; 1355 LOG.error("OMG " + message); 1356 throw new IllegalStateException(message); 1357 } 1358 } 1359 // Shrink the ID on the requested number of bytes. 1360 return Arrays.copyOfRange(padded, padded.length - width, padded.length); 1361 } 1362 1363 /** 1364 * Appends the given UID to the given string buffer, followed by "\\E". 1365 * @param buf The buffer to append 1366 * @param id The UID to add as a binary regex pattern 1367 * @since 2.1 1368 */ 1369 public static void addIdToRegexp(final StringBuilder buf, final byte[] id) { 1370 boolean backslash = false; 1371 for (final byte b : id) { 1372 buf.append((char) (b & 0xFF)); 1373 if (b == 'E' && backslash) { // If we saw a `\' and now we have a `E'. 1374 // So we just terminated the quoted section because we just added \E 1375 // to `buf'. So let's put a literal \E now and start quoting again. 1376 buf.append("\\\\E\\Q"); 1377 } else { 1378 backslash = b == '\\'; 1379 } 1380 } 1381 buf.append("\\E"); 1382 } 1383 1384 /** 1385 * Attempts to convert the given string to a type enumerator 1386 * @param type The string to convert 1387 * @return a valid UniqueIdType if matched 1388 * @throws IllegalArgumentException if the string did not match a type 1389 * @since 2.0 1390 */ 1391 public static UniqueIdType stringToUniqueIdType(final String type) { 1392 if (type.toLowerCase().equals("metric") || 1393 type.toLowerCase().equals("metrics")) { 1394 return UniqueIdType.METRIC; 1395 } else if (type.toLowerCase().equals("tagk")) { 1396 return UniqueIdType.TAGK; 1397 } else if (type.toLowerCase().equals("tagv")) { 1398 return UniqueIdType.TAGV; 1399 } else { 1400 throw new IllegalArgumentException("Invalid type requested: " + type); 1401 } 1402 } 1403 1404 /** 1405 * Converts a hex string to a byte array 1406 * If the {@code uid} is less than {@code uid_length * 2} characters wide, it 1407 * will be padded with 0s to conform to the spec. E.g. if the tagk width is 3 1408 * and the given {@code uid} string is "1", the string will be padded to 1409 * "000001" and then converted to a byte array to reach 3 bytes. 1410 * All {@code uid}s are padded to 1 byte. If given "1", and {@code uid_length} 1411 * is 0, the uid will be padded to "01" then converted. 1412 * @param uid The UID to convert 1413 * @param uid_length An optional length, in bytes, that the UID must conform 1414 * to. Set to 0 if not used. 1415 * @return The UID as a byte array 1416 * @throws NullPointerException if the ID was null 1417 * @throws IllegalArgumentException if the string is not valid hex 1418 * @since 2.0 1419 */ 1420 public static byte[] stringToUid(final String uid, final short uid_length) { 1421 if (uid == null || uid.isEmpty()) { 1422 throw new IllegalArgumentException("UID was empty"); 1423 } 1424 String id = uid; 1425 if (uid_length > 0) { 1426 while (id.length() < uid_length * 2) { 1427 id = "0" + id; 1428 } 1429 } else { 1430 if (id.length() % 2 > 0) { 1431 id = "0" + id; 1432 } 1433 } 1434 return DatatypeConverter.parseHexBinary(id); 1435 } 1436 1437 /** 1438 * Extracts the TSUID from a storage row key that includes the timestamp. 1439 * @param row_key The row key to process 1440 * @param metric_width The width of the metric 1441 * @param timestamp_width The width of the timestamp 1442 * @return The TSUID as a byte array 1443 * @throws IllegalArgumentException if the row key is missing tags or it is 1444 * corrupt such as a salted key when salting is disabled or vice versa. 1445 */ 1446 public static byte[] getTSUIDFromKey(final byte[] row_key, 1447 final short metric_width, final short timestamp_width) { 1448 int idx = 0; 1449 // validation 1450 final int tag_pair_width = TSDB.tagk_width() + TSDB.tagv_width(); 1451 final int tags_length = row_key.length - 1452 (Const.SALT_WIDTH() + metric_width + timestamp_width); 1453 if (tags_length < tag_pair_width || (tags_length % tag_pair_width) != 0) { 1454 throw new IllegalArgumentException( 1455 "Row key is missing tags or it is corrupted " + Arrays.toString(row_key)); 1456 } 1457 final byte[] tsuid = new byte[ 1458 row_key.length - timestamp_width - Const.SALT_WIDTH()]; 1459 for (int i = Const.SALT_WIDTH(); i < row_key.length; i++) { 1460 if (i < Const.SALT_WIDTH() + metric_width || 1461 i >= (Const.SALT_WIDTH() + metric_width + timestamp_width)) { 1462 tsuid[idx] = row_key[i]; 1463 idx++; 1464 } 1465 } 1466 return tsuid; 1467 } 1468 1469 /** 1470 * Extracts a list of tagks and tagvs as individual values in a list 1471 * @param tsuid The tsuid to parse 1472 * @return A list of tagk/tagv UIDs alternating with tagk, tagv, tagk, tagv 1473 * @throws IllegalArgumentException if the TSUID is malformed 1474 * @since 2.1 1475 */ 1476 public static List<byte[]> getTagsFromTSUID(final String tsuid) { 1477 if (tsuid == null || tsuid.isEmpty()) { 1478 throw new IllegalArgumentException("Missing TSUID"); 1479 } 1480 if (tsuid.length() <= TSDB.metrics_width() * 2) { 1481 throw new IllegalArgumentException( 1482 "TSUID is too short, may be missing tags"); 1483 } 1484 1485 final List<byte[]> tags = new ArrayList<byte[]>(); 1486 final int pair_width = (TSDB.tagk_width() * 2) + (TSDB.tagv_width() * 2); 1487 1488 // start after the metric then iterate over each tagk/tagv pair 1489 for (int i = TSDB.metrics_width() * 2; i < tsuid.length(); i+= pair_width) { 1490 if (i + pair_width > tsuid.length()){ 1491 throw new IllegalArgumentException( 1492 "The TSUID appears to be malformed, improper tag width"); 1493 } 1494 String tag = tsuid.substring(i, i + (TSDB.tagk_width() * 2)); 1495 tags.add(UniqueId.stringToUid(tag)); 1496 tag = tsuid.substring(i + (TSDB.tagk_width() * 2), i + pair_width); 1497 tags.add(UniqueId.stringToUid(tag)); 1498 } 1499 return tags; 1500 } 1501 1502 /** 1503 * Extracts a list of tagk/tagv pairs from a tsuid 1504 * @param tsuid The tsuid to parse 1505 * @return A list of tagk/tagv UID pairs 1506 * @throws IllegalArgumentException if the TSUID is malformed 1507 * @since 2.0 1508 */ 1509 public static List<byte[]> getTagPairsFromTSUID(final String tsuid) { 1510 if (tsuid == null || tsuid.isEmpty()) { 1511 throw new IllegalArgumentException("Missing TSUID"); 1512 } 1513 if (tsuid.length() <= TSDB.metrics_width() * 2) { 1514 throw new IllegalArgumentException( 1515 "TSUID is too short, may be missing tags"); 1516 } 1517 1518 final List<byte[]> tags = new ArrayList<byte[]>(); 1519 final int pair_width = (TSDB.tagk_width() * 2) + (TSDB.tagv_width() * 2); 1520 1521 // start after the metric then iterate over each tagk/tagv pair 1522 for (int i = TSDB.metrics_width() * 2; i < tsuid.length(); i+= pair_width) { 1523 if (i + pair_width > tsuid.length()){ 1524 throw new IllegalArgumentException( 1525 "The TSUID appears to be malformed, improper tag width"); 1526 } 1527 String tag = tsuid.substring(i, i + pair_width); 1528 tags.add(UniqueId.stringToUid(tag)); 1529 } 1530 return tags; 1531 } 1532 1533 /** 1534 * Extracts a list of tagk/tagv pairs from a tsuid 1535 * @param tsuid The tsuid to parse 1536 * @return A list of tagk/tagv UID pairs 1537 * @throws IllegalArgumentException if the TSUID is malformed 1538 * @since 2.0 1539 */ 1540 public static List<byte[]> getTagPairsFromTSUID(final byte[] tsuid) { 1541 if (tsuid == null) { 1542 throw new IllegalArgumentException("Missing TSUID"); 1543 } 1544 if (tsuid.length <= TSDB.metrics_width()) { 1545 throw new IllegalArgumentException( 1546 "TSUID is too short, may be missing tags"); 1547 } 1548 1549 final List<byte[]> tags = new ArrayList<byte[]>(); 1550 final int pair_width = TSDB.tagk_width() + TSDB.tagv_width(); 1551 1552 // start after the metric then iterate over each tagk/tagv pair 1553 for (int i = TSDB.metrics_width(); i < tsuid.length; i+= pair_width) { 1554 if (i + pair_width > tsuid.length){ 1555 throw new IllegalArgumentException( 1556 "The TSUID appears to be malformed, improper tag width"); 1557 } 1558 tags.add(Arrays.copyOfRange(tsuid, i, i + pair_width)); 1559 } 1560 return tags; 1561 } 1562 1563 /** 1564 * Returns a map of max UIDs from storage for the given list of UID types 1565 * @param tsdb The TSDB to which we belong 1566 * @param kinds A list of qualifiers to fetch 1567 * @return A map with the "kind" as the key and the maximum assigned UID as 1568 * the value 1569 * @since 2.0 1570 */ 1571 public static Deferred<Map<String, Long>> getUsedUIDs(final TSDB tsdb, 1572 final byte[][] kinds) { 1573 1574 /** 1575 * Returns a map with 0 if the max ID row hasn't been initialized yet, 1576 * otherwise the map has actual data 1577 */ 1578 final class GetCB implements Callback<Map<String, Long>, 1579 ArrayList<KeyValue>> { 1580 1581 @Override 1582 public Map<String, Long> call(final ArrayList<KeyValue> row) 1583 throws Exception { 1584 1585 final Map<String, Long> results = new HashMap<String, Long>(3); 1586 if (row == null || row.isEmpty()) { 1587 // it could be the case that this is the first time the TSD has run 1588 // and the user hasn't put any metrics in, so log and return 0s 1589 LOG.info("Could not find the UID assignment row"); 1590 for (final byte[] kind : kinds) { 1591 results.put(new String(kind, CHARSET), 0L); 1592 } 1593 return results; 1594 } 1595 1596 for (final KeyValue column : row) { 1597 results.put(new String(column.qualifier(), CHARSET), 1598 Bytes.getLong(column.value())); 1599 } 1600 1601 // if the user is starting with a fresh UID table, we need to account 1602 // for missing columns 1603 for (final byte[] kind : kinds) { 1604 if (results.get(new String(kind, CHARSET)) == null) { 1605 results.put(new String(kind, CHARSET), 0L); 1606 } 1607 } 1608 return results; 1609 } 1610 1611 } 1612 1613 final GetRequest get = new GetRequest(tsdb.uidTable(), MAXID_ROW); 1614 get.family(ID_FAMILY); 1615 get.qualifiers(kinds); 1616 return tsdb.getClient().get(get).addCallback(new GetCB()); 1617 } 1618 1619 /** 1620 * Pre-load UID caches, scanning up to "tsd.core.preload_uid_cache.max_entries" 1621 * rows from the UID table. 1622 * @param tsdb The TSDB to use 1623 * @param uid_cache_map A map of {@link UniqueId} objects keyed on the kind. 1624 * @throws HBaseException Passes any HBaseException from HBase scanner. 1625 * @throws RuntimeException Wraps any non HBaseException from HBase scanner. 1626 * @2.1 1627 */ 1628 public static void preloadUidCache(final TSDB tsdb, 1629 final ByteMap<UniqueId> uid_cache_map) throws HBaseException { 1630 int max_results = tsdb.getConfig().getInt( 1631 "tsd.core.preload_uid_cache.max_entries"); 1632 LOG.info("Preloading uid cache with max_results=" + max_results); 1633 if (max_results <= 0) { 1634 return; 1635 } 1636 Scanner scanner = null; 1637 try { 1638 int num_rows = 0; 1639 scanner = getSuggestScanner(tsdb.getClient(), tsdb.uidTable(), "", null, 1640 max_results); 1641 for (ArrayList<ArrayList<KeyValue>> rows = scanner.nextRows().join(); 1642 rows != null; 1643 rows = scanner.nextRows().join()) { 1644 for (final ArrayList<KeyValue> row : rows) { 1645 for (KeyValue kv: row) { 1646 final String name = fromBytes(kv.key()); 1647 final byte[] kind = kv.qualifier(); 1648 final byte[] id = kv.value(); 1649 LOG.debug("id='{}', name='{}', kind='{}'", Arrays.toString(id), 1650 name, fromBytes(kind)); 1651 UniqueId uid_cache = uid_cache_map.get(kind); 1652 if (uid_cache != null) { 1653 uid_cache.cacheMapping(name, id); 1654 } 1655 } 1656 num_rows += row.size(); 1657 row.clear(); // free() 1658 if (num_rows >= max_results) { 1659 break; 1660 } 1661 } 1662 } 1663 for (UniqueId unique_id_table : uid_cache_map.values()) { 1664 LOG.info("After preloading, uid cache '{}' has {} ids and {} names.", 1665 unique_id_table.kind(), 1666 unique_id_table.id_cache.size(), 1667 unique_id_table.name_cache.size()); 1668 } 1669 } catch (Exception e) { 1670 if (e instanceof HBaseException) { 1671 throw (HBaseException)e; 1672 } else if (e instanceof RuntimeException) { 1673 throw (RuntimeException)e; 1674 } else { 1675 throw new RuntimeException("Error while preloading IDs", e); 1676 } 1677 } finally { 1678 if (scanner != null) { 1679 scanner.close(); 1680 } 1681 } 1682 } 1683 } 1684