1 // This file is part of OpenTSDB.
2 // Copyright (C) 2010-2012  The OpenTSDB Authors.
3 //
4 // This program is free software: you can redistribute it and/or modify it
5 // under the terms of the GNU Lesser General Public License as published by
6 // the Free Software Foundation, either version 2.1 of the License, or (at your
7 // option) any later version.  This program is distributed in the hope that it
8 // will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
9 // of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser
10 // General Public License for more details.  You should have received a copy
11 // of the GNU Lesser General Public License along with this program.  If not,
12 // see <http://www.gnu.org/licenses/>.
13 package net.opentsdb.uid;
14 
15 import java.nio.charset.Charset;
16 import java.util.ArrayList;
17 import java.util.Arrays;
18 import java.util.Collections;
19 import java.util.HashMap;
20 import java.util.HashSet;
21 import java.util.LinkedList;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Set;
25 import java.util.concurrent.ConcurrentHashMap;
26 
27 import javax.xml.bind.DatatypeConverter;
28 
29 import com.stumbleupon.async.Callback;
30 import com.stumbleupon.async.Deferred;
31 
32 import org.hbase.async.AtomicIncrementRequest;
33 import org.hbase.async.Bytes;
34 import org.hbase.async.DeleteRequest;
35 import org.hbase.async.GetRequest;
36 import org.hbase.async.HBaseClient;
37 import org.hbase.async.HBaseException;
38 import org.hbase.async.KeyValue;
39 import org.hbase.async.PutRequest;
40 import org.hbase.async.Scanner;
41 import org.hbase.async.Bytes.ByteMap;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44 
45 import net.opentsdb.core.Const;
46 import net.opentsdb.core.Internal;
47 import net.opentsdb.core.TSDB;
48 import net.opentsdb.meta.UIDMeta;
49 
50 /**
51  * Represents a table of Unique IDs, manages the lookup and creation of IDs.
52  * <p>
53  * Don't attempt to use {@code equals()} or {@code hashCode()} on
54  * this class.
55  * @see UniqueIdInterface
56  */
57 @SuppressWarnings("deprecation")  // Dunno why even with this, compiler warns.
58 public final class UniqueId implements UniqueIdInterface {
59   private static final Logger LOG = LoggerFactory.getLogger(UniqueId.class);
60 
61   /** Enumerator for different types of UIDS @since 2.0 */
62   public enum UniqueIdType {
63     METRIC,
64     TAGK,
65     TAGV
66   }
67 
68   /** Charset used to convert Strings to byte arrays and back. */
69   private static final Charset CHARSET = Charset.forName("ISO-8859-1");
70   /** The single column family used by this class. */
71   private static final byte[] ID_FAMILY = toBytes("id");
72   /** The single column family used by this class. */
73   private static final byte[] NAME_FAMILY = toBytes("name");
74   /** Row key of the special row used to track the max ID already assigned. */
75   private static final byte[] MAXID_ROW = { 0 };
76   /** How many time do we try to assign an ID before giving up. */
77   private static final short MAX_ATTEMPTS_ASSIGN_ID = 3;
78   /** How many time do we try to apply an edit before giving up. */
79   private static final short MAX_ATTEMPTS_PUT = 6;
80   /** How many time do we try to assign a random ID before giving up. */
81   private static final short MAX_ATTEMPTS_ASSIGN_RANDOM_ID = 10;
82   /** Initial delay in ms for exponential backoff to retry failed RPCs. */
83   private static final short INITIAL_EXP_BACKOFF_DELAY = 800;
84   /** Maximum number of results to return in suggest(). */
85   private static final short MAX_SUGGESTIONS = 25;
86 
87   /** HBase client to use.  */
88   private final HBaseClient client;
89   /** Table where IDs are stored.  */
90   private final byte[] table;
91   /** The kind of UniqueId, used as the column qualifier. */
92   private final byte[] kind;
93   /** The type of UID represented by this cache */
94   private final UniqueIdType type;
95   /** Number of bytes on which each ID is encoded. */
96   private final short id_width;
97   /** Whether or not to randomize new IDs */
98   private final boolean randomize_id;
99 
100   /** Cache for forward mappings (name to ID). */
101   private final ConcurrentHashMap<String, byte[]> name_cache =
102     new ConcurrentHashMap<String, byte[]>();
103   /** Cache for backward mappings (ID to name).
104    * The ID in the key is a byte[] converted to a String to be Comparable. */
105   private final ConcurrentHashMap<String, String> id_cache =
106     new ConcurrentHashMap<String, String>();
107   /** Map of pending UID assignments */
108   private final HashMap<String, Deferred<byte[]>> pending_assignments =
109     new HashMap<String, Deferred<byte[]>>();
110   /** Set of UID rename */
111   private final Set<String> renaming_id_names =
112     Collections.synchronizedSet(new HashSet<String>());
113 
114   /** Number of times we avoided reading from HBase thanks to the cache. */
115   private volatile int cache_hits;
116   /** Number of times we had to read from HBase and populate the cache. */
117   private volatile int cache_misses;
118   /** How many times we collided with an existing ID when attempting to
119    * generate a new UID */
120   private volatile int random_id_collisions;
121   /** How many times assignments have been rejected by the UID filter */
122   private volatile int rejected_assignments;
123 
124   /** TSDB object used for filtering and/or meta generation. */
125   private TSDB tsdb;
126 
127   /**
128    * Constructor.
129    * @param client The HBase client to use.
130    * @param table The name of the HBase table to use.
131    * @param kind The kind of Unique ID this instance will deal with.
132    * @param width The number of bytes on which Unique IDs should be encoded.
133    * @throws IllegalArgumentException if width is negative or too small/large
134    * or if kind is an empty string.
135    */
UniqueId(final HBaseClient client, final byte[] table, final String kind, final int width)136   public UniqueId(final HBaseClient client, final byte[] table, final String kind,
137                   final int width) {
138     this(client, table, kind, width, false);
139   }
140 
141   /**
142    * Constructor.
143    * @param client The HBase client to use.
144    * @param table The name of the HBase table to use.
145    * @param kind The kind of Unique ID this instance will deal with.
146    * @param width The number of bytes on which Unique IDs should be encoded.
147    * @param Whether or not to randomize new UIDs
148    * @throws IllegalArgumentException if width is negative or too small/large
149    * or if kind is an empty string.
150    * @since 2.2
151    */
UniqueId(final HBaseClient client, final byte[] table, final String kind, final int width, final boolean randomize_id)152   public UniqueId(final HBaseClient client, final byte[] table, final String kind,
153                   final int width, final boolean randomize_id) {
154     this.client = client;
155     this.table = table;
156     if (kind.isEmpty()) {
157       throw new IllegalArgumentException("Empty string as 'kind' argument!");
158     }
159     this.kind = toBytes(kind);
160     type = stringToUniqueIdType(kind);
161     if (width < 1 || width > 8) {
162       throw new IllegalArgumentException("Invalid width: " + width);
163     }
164     this.id_width = (short) width;
165     this.randomize_id = randomize_id;
166   }
167 
168   /**
169    * Constructor.
170    * @param tsdb The TSDB this UID object belongs to
171    * @param table The name of the HBase table to use.
172    * @param kind The kind of Unique ID this instance will deal with.
173    * @param width The number of bytes on which Unique IDs should be encoded.
174    * @param Whether or not to randomize new UIDs
175    * @throws IllegalArgumentException if width is negative or too small/large
176    * or if kind is an empty string.
177    * @since 2.3
178    */
UniqueId(final TSDB tsdb, final byte[] table, final String kind, final int width, final boolean randomize_id)179   public UniqueId(final TSDB tsdb, final byte[] table, final String kind,
180                   final int width, final boolean randomize_id) {
181     this.client = tsdb.getClient();
182     this.tsdb = tsdb;
183     this.table = table;
184     if (kind.isEmpty()) {
185       throw new IllegalArgumentException("Empty string as 'kind' argument!");
186     }
187     this.kind = toBytes(kind);
188     type = stringToUniqueIdType(kind);
189     if (width < 1 || width > 8) {
190       throw new IllegalArgumentException("Invalid width: " + width);
191     }
192     this.id_width = (short) width;
193     this.randomize_id = randomize_id;
194   }
195 
196   /** The number of times we avoided reading from HBase thanks to the cache. */
cacheHits()197   public int cacheHits() {
198     return cache_hits;
199   }
200 
201   /** The number of times we had to read from HBase and populate the cache. */
cacheMisses()202   public int cacheMisses() {
203     return cache_misses;
204   }
205 
206   /** Returns the number of elements stored in the internal cache. */
cacheSize()207   public int cacheSize() {
208     return name_cache.size() + id_cache.size();
209   }
210 
211   /** Returns the number of random UID collisions */
randomIdCollisions()212   public int randomIdCollisions() {
213     return random_id_collisions;
214   }
215 
216   /** Returns the number of UID assignments rejected by the filter */
rejectedAssignments()217   public int rejectedAssignments() {
218     return rejected_assignments;
219   }
220 
kind()221   public String kind() {
222     return fromBytes(kind);
223   }
224 
width()225   public short width() {
226     return id_width;
227   }
228 
229   /** @param tsdb Whether or not to track new UIDMeta objects */
setTSDB(final TSDB tsdb)230   public void setTSDB(final TSDB tsdb) {
231     this.tsdb = tsdb;
232   }
233 
234   /** The largest possible ID given the number of bytes the IDs are
235    * represented on.
236    * @deprecated Use {@link Internal.getMaxUnsignedValueOnBytes}
237    */
maxPossibleId()238   public long maxPossibleId() {
239     return Internal.getMaxUnsignedValueOnBytes(id_width);
240   }
241 
242   /**
243    * Causes this instance to discard all its in-memory caches.
244    * @since 1.1
245    */
dropCaches()246   public void dropCaches() {
247     name_cache.clear();
248     id_cache.clear();
249   }
250 
251   /**
252    * Finds the name associated with a given ID.
253    * <p>
254    * <strong>This method is blocking.</strong>  Its use within OpenTSDB itself
255    * is discouraged, please use {@link #getNameAsync} instead.
256    * @param id The ID associated with that name.
257    * @see #getId(String)
258    * @see #getOrCreateId(String)
259    * @throws NoSuchUniqueId if the given ID is not assigned.
260    * @throws HBaseException if there is a problem communicating with HBase.
261    * @throws IllegalArgumentException if the ID given in argument is encoded
262    * on the wrong number of bytes.
263    */
getName(final byte[] id)264   public String getName(final byte[] id) throws NoSuchUniqueId, HBaseException {
265     try {
266       return getNameAsync(id).joinUninterruptibly();
267     } catch (RuntimeException e) {
268       throw e;
269     } catch (Exception e) {
270       throw new RuntimeException("Should never be here", e);
271     }
272   }
273 
274   /**
275    * Finds the name associated with a given ID.
276    *
277    * @param id The ID associated with that name.
278    * @see #getId(String)
279    * @see #getOrCreateIdAsync(String)
280    * @throws NoSuchUniqueId if the given ID is not assigned.
281    * @throws HBaseException if there is a problem communicating with HBase.
282    * @throws IllegalArgumentException if the ID given in argument is encoded
283    * on the wrong number of bytes.
284    * @since 1.1
285    */
getNameAsync(final byte[] id)286   public Deferred<String> getNameAsync(final byte[] id) {
287     if (id.length != id_width) {
288       throw new IllegalArgumentException("Wrong id.length = " + id.length
289                                          + " which is != " + id_width
290                                          + " required for '" + kind() + '\'');
291     }
292     final String name = getNameFromCache(id);
293     if (name != null) {
294       cache_hits++;
295       return Deferred.fromResult(name);
296     }
297     cache_misses++;
298     class GetNameCB implements Callback<String, String> {
299       public String call(final String name) {
300         if (name == null) {
301           throw new NoSuchUniqueId(kind(), id);
302         }
303         addNameToCache(id, name);
304         addIdToCache(name, id);
305         return name;
306       }
307     }
308     return getNameFromHBase(id).addCallback(new GetNameCB());
309   }
310 
getNameFromCache(final byte[] id)311   private String getNameFromCache(final byte[] id) {
312     return id_cache.get(fromBytes(id));
313   }
314 
getNameFromHBase(final byte[] id)315   private Deferred<String> getNameFromHBase(final byte[] id) {
316     class NameFromHBaseCB implements Callback<String, byte[]> {
317       public String call(final byte[] name) {
318         return name == null ? null : fromBytes(name);
319       }
320     }
321     return hbaseGet(id, NAME_FAMILY).addCallback(new NameFromHBaseCB());
322   }
323 
addNameToCache(final byte[] id, final String name)324   private void addNameToCache(final byte[] id, final String name) {
325     final String key = fromBytes(id);
326     String found = id_cache.get(key);
327     if (found == null) {
328       found = id_cache.putIfAbsent(key, name);
329     }
330     if (found != null && !found.equals(name)) {
331       throw new IllegalStateException("id=" + Arrays.toString(id) + " => name="
332           + name + ", already mapped to " + found);
333     }
334   }
335 
getId(final String name)336   public byte[] getId(final String name) throws NoSuchUniqueName, HBaseException {
337     try {
338       return getIdAsync(name).joinUninterruptibly();
339     } catch (RuntimeException e) {
340       throw e;
341     } catch (Exception e) {
342       throw new RuntimeException("Should never be here", e);
343     }
344   }
345 
getIdAsync(final String name)346   public Deferred<byte[]> getIdAsync(final String name) {
347     final byte[] id = getIdFromCache(name);
348     if (id != null) {
349       cache_hits++;
350       return Deferred.fromResult(id);
351     }
352     cache_misses++;
353     class GetIdCB implements Callback<byte[], byte[]> {
354       public byte[] call(final byte[] id) {
355         if (id == null) {
356           throw new NoSuchUniqueName(kind(), name);
357         }
358         if (id.length != id_width) {
359           throw new IllegalStateException("Found id.length = " + id.length
360                                           + " which is != " + id_width
361                                           + " required for '" + kind() + '\'');
362         }
363         addIdToCache(name, id);
364         addNameToCache(id, name);
365         return id;
366       }
367     }
368     Deferred<byte[]> d = getIdFromHBase(name).addCallback(new GetIdCB());
369     return d;
370   }
371 
getIdFromCache(final String name)372   private byte[] getIdFromCache(final String name) {
373     return name_cache.get(name);
374   }
375 
getIdFromHBase(final String name)376   private Deferred<byte[]> getIdFromHBase(final String name) {
377     return hbaseGet(toBytes(name), ID_FAMILY);
378   }
379 
addIdToCache(final String name, final byte[] id)380   private void addIdToCache(final String name, final byte[] id) {
381     byte[] found = name_cache.get(name);
382     if (found == null) {
383       found = name_cache.putIfAbsent(name,
384                                     // Must make a defensive copy to be immune
385                                     // to any changes the caller may do on the
386                                     // array later on.
387                                     Arrays.copyOf(id, id.length));
388     }
389     if (found != null && !Arrays.equals(found, id)) {
390       throw new IllegalStateException("name=" + name + " => id="
391           + Arrays.toString(id) + ", already mapped to "
392           + Arrays.toString(found));
393     }
394   }
395 
396   /**
397    * Implements the process to allocate a new UID.
398    * This callback is re-used multiple times in a four step process:
399    *   1. Allocate a new UID via atomic increment.
400    *   2. Create the reverse mapping (ID to name).
401    *   3. Create the forward mapping (name to ID).
402    *   4. Return the new UID to the caller.
403    */
404   private final class UniqueIdAllocator implements Callback<Object, Object> {
405     private final String name;  // What we're trying to allocate an ID for.
406     private final Deferred<byte[]> assignment; // deferred to call back
407     private short attempt = randomize_id ?     // Give up when zero.
408         MAX_ATTEMPTS_ASSIGN_RANDOM_ID : MAX_ATTEMPTS_ASSIGN_ID;
409 
410     private HBaseException hbe = null;  // Last exception caught.
411     // TODO(manolama) - right now if we retry the assignment it will create a
412     // callback chain MAX_ATTEMPTS_* long and call the ErrBack that many times.
413     // This can be cleaned up a fair amount but it may require changing the
414     // public behavior a bit. For now, the flag will prevent multiple attempts
415     // to execute the callback.
416     private boolean called = false; // whether we called the deferred or not
417 
418     private long id = -1;  // The ID we'll grab with an atomic increment.
419     private byte row[];    // The same ID, as a byte array.
420 
421     private static final byte ALLOCATE_UID = 0;
422     private static final byte CREATE_REVERSE_MAPPING = 1;
423     private static final byte CREATE_FORWARD_MAPPING = 2;
424     private static final byte DONE = 3;
425     private byte state = ALLOCATE_UID;  // Current state of the process.
426 
UniqueIdAllocator(final String name, final Deferred<byte[]> assignment)427     UniqueIdAllocator(final String name, final Deferred<byte[]> assignment) {
428       this.name = name;
429       this.assignment = assignment;
430     }
431 
tryAllocate()432     Deferred<byte[]> tryAllocate() {
433       attempt--;
434       state = ALLOCATE_UID;
435       call(null);
436       return assignment;
437     }
438 
439     @SuppressWarnings("unchecked")
call(final Object arg)440     public Object call(final Object arg) {
441       if (attempt == 0) {
442         if (hbe == null && !randomize_id) {
443           throw new IllegalStateException("Should never happen!");
444         }
445         LOG.error("Failed to assign an ID for kind='" + kind()
446                   + "' name='" + name + "'", hbe);
447         if (hbe == null) {
448           throw new FailedToAssignUniqueIdException(kind(), name,
449               MAX_ATTEMPTS_ASSIGN_RANDOM_ID);
450         }
451         throw hbe;
452       }
453 
454       if (arg instanceof Exception) {
455         final String msg = ("Failed attempt #" + (randomize_id
456                          ? (MAX_ATTEMPTS_ASSIGN_RANDOM_ID - attempt)
457                          : (MAX_ATTEMPTS_ASSIGN_ID - attempt))
458                          + " to assign an UID for " + kind() + ':' + name
459                          + " at step #" + state);
460         if (arg instanceof HBaseException) {
461           LOG.error(msg, (Exception) arg);
462           hbe = (HBaseException) arg;
463           attempt--;
464           state = ALLOCATE_UID;;  // Retry from the beginning.
465         } else {
466           LOG.error("WTF?  Unexpected exception!  " + msg, (Exception) arg);
467           return arg;  // Unexpected exception, let it bubble up.
468         }
469       }
470 
471       class ErrBack implements Callback<Object, Exception> {
472         public Object call(final Exception e) throws Exception {
473           if (!called) {
474             LOG.warn("Failed pending assignment for: " + name, e);
475             assignment.callback(e);
476             called = true;
477           }
478           return assignment;
479         }
480       }
481 
482       final Deferred d;
483       switch (state) {
484         case ALLOCATE_UID:
485           d = allocateUid();
486           break;
487         case CREATE_REVERSE_MAPPING:
488           d = createReverseMapping(arg);
489           break;
490         case CREATE_FORWARD_MAPPING:
491           d = createForwardMapping(arg);
492           break;
493         case DONE:
494           return done(arg);
495         default:
496           throw new AssertionError("Should never be here!");
497       }
498       return d.addBoth(this).addErrback(new ErrBack());
499     }
500 
501     /** Generates either a random or a serial ID. If random, we need to
502      * make sure that there isn't a UID collision.
503      */
allocateUid()504     private Deferred<Long> allocateUid() {
505       LOG.info("Creating " + (randomize_id ? "a random " : "an ") +
506           "ID for kind='" + kind() + "' name='" + name + '\'');
507 
508       state = CREATE_REVERSE_MAPPING;
509       if (randomize_id) {
510         return Deferred.fromResult(RandomUniqueId.getRandomUID());
511       } else {
512         return client.atomicIncrement(new AtomicIncrementRequest(table,
513                                       MAXID_ROW, ID_FAMILY, kind));
514       }
515     }
516 
517     /**
518      * Create the reverse mapping.
519      * We do this before the forward one so that if we die before creating
520      * the forward mapping we don't run the risk of "publishing" a
521      * partially assigned ID.  The reverse mapping on its own is harmless
522      * but the forward mapping without reverse mapping is bad as it would
523      * point to an ID that cannot be resolved.
524      */
createReverseMapping(final Object arg)525     private Deferred<Boolean> createReverseMapping(final Object arg) {
526       if (!(arg instanceof Long)) {
527         throw new IllegalStateException("Expected a Long but got " + arg);
528       }
529       id = (Long) arg;
530       if (id <= 0) {
531         throw new IllegalStateException("Got a negative ID from HBase: " + id);
532       }
533       LOG.info("Got ID=" + id
534                + " for kind='" + kind() + "' name='" + name + "'");
535       row = Bytes.fromLong(id);
536       // row.length should actually be 8.
537       if (row.length < id_width) {
538         throw new IllegalStateException("OMG, row.length = " + row.length
539                                         + " which is less than " + id_width
540                                         + " for id=" + id
541                                         + " row=" + Arrays.toString(row));
542       }
543       // Verify that we're going to drop bytes that are 0.
544       for (int i = 0; i < row.length - id_width; i++) {
545         if (row[i] != 0) {
546           final String message = "All Unique IDs for " + kind()
547             + " on " + id_width + " bytes are already assigned!";
548           LOG.error("OMG " + message);
549           throw new IllegalStateException(message);
550         }
551       }
552       // Shrink the ID on the requested number of bytes.
553       row = Arrays.copyOfRange(row, row.length - id_width, row.length);
554 
555       state = CREATE_FORWARD_MAPPING;
556       // We are CAS'ing the KV into existence -- the second argument is how
557       // we tell HBase we want to atomically create the KV, so that if there
558       // is already a KV in this cell, we'll fail.  Technically we could do
559       // just a `put' here, as we have a freshly allocated UID, so there is
560       // not reason why a KV should already exist for this UID, but just to
561       // err on the safe side and catch really weird corruption cases, we do
562       // a CAS instead to create the KV.
563       return client.compareAndSet(reverseMapping(), HBaseClient.EMPTY_ARRAY);
564     }
565 
reverseMapping()566     private PutRequest reverseMapping() {
567       return new PutRequest(table, row, NAME_FAMILY, kind, toBytes(name));
568     }
569 
createForwardMapping(final Object arg)570     private Deferred<?> createForwardMapping(final Object arg) {
571       if (!(arg instanceof Boolean)) {
572         throw new IllegalStateException("Expected a Boolean but got " + arg);
573       }
574       if (!((Boolean) arg)) {  // Previous CAS failed.
575         if (randomize_id) {
576           // This random Id is already used by another row
577           LOG.warn("Detected random id collision and retrying kind='" +
578               kind() + "' name='" + name + "'");
579           random_id_collisions++;
580         } else {
581           // something is really messed up then
582           LOG.error("WTF!  Failed to CAS reverse mapping: " + reverseMapping()
583               + " -- run an fsck against the UID table!");
584         }
585         attempt--;
586         state = ALLOCATE_UID;
587         return Deferred.fromResult(false);
588       }
589 
590       state = DONE;
591       return client.compareAndSet(forwardMapping(), HBaseClient.EMPTY_ARRAY);
592     }
593 
forwardMapping()594     private PutRequest forwardMapping() {
595         return new PutRequest(table, toBytes(name), ID_FAMILY, kind, row);
596     }
597 
done(final Object arg)598     private Deferred<byte[]> done(final Object arg) {
599       if (!(arg instanceof Boolean)) {
600         throw new IllegalStateException("Expected a Boolean but got " + arg);
601       }
602       if (!((Boolean) arg)) {  // Previous CAS failed.  We lost a race.
603         LOG.warn("Race condition: tried to assign ID " + id + " to "
604                  + kind() + ":" + name + ", but CAS failed on "
605                  + forwardMapping() + ", which indicates this UID must have"
606                  + " been allocated concurrently by another TSD or thread. "
607                  + "So ID " + id + " was leaked.");
608         // If two TSDs attempted to allocate a UID for the same name at the
609         // same time, they would both have allocated a UID, and created a
610         // reverse mapping, and upon getting here, only one of them would
611         // manage to CAS this KV into existence.  The one that loses the
612         // race will retry and discover the UID assigned by the winner TSD,
613         // and a UID will have been wasted in the process.  No big deal.
614         if (randomize_id) {
615           // This random Id is already used by another row
616           LOG.warn("Detected random id collision between two tsdb "
617               + "servers kind='" + kind() + "' name='" + name + "'");
618           random_id_collisions++;
619         }
620 
621         class GetIdCB implements Callback<Object, byte[]> {
622           public Object call(final byte[] row) throws Exception {
623             assignment.callback(row);
624             return null;
625           }
626         }
627         getIdAsync(name).addCallback(new GetIdCB());
628         return assignment;
629       }
630 
631       cacheMapping(name, row);
632 
633       if (tsdb != null && tsdb.getConfig().enable_realtime_uid()) {
634         final UIDMeta meta = new UIDMeta(type, row, name);
635         meta.storeNew(tsdb);
636         LOG.info("Wrote UIDMeta for: " + name);
637         tsdb.indexUIDMeta(meta);
638       }
639 
640       synchronized(pending_assignments) {
641         if (pending_assignments.remove(name) != null) {
642           LOG.info("Completed pending assignment for: " + name);
643         }
644       }
645       assignment.callback(row);
646       return assignment;
647     }
648 
649   }
650 
651   /** Adds the bidirectional mapping in the cache. */
cacheMapping(final String name, final byte[] id)652   private void cacheMapping(final String name, final byte[] id) {
653     addIdToCache(name, id);
654     addNameToCache(id, name);
655   }
656 
657   /**
658    * Finds the ID associated with a given name or creates it.
659    * <p>
660    * <strong>This method is blocking.</strong>  Its use within OpenTSDB itself
661    * is discouraged, please use {@link #getOrCreateIdAsync} instead.
662    * <p>
663    * The length of the byte array is fixed in advance by the implementation.
664    *
665    * @param name The name to lookup in the table or to assign an ID to.
666    * @throws HBaseException if there is a problem communicating with HBase.
667    * @throws IllegalStateException if all possible IDs are already assigned.
668    * @throws IllegalStateException if the ID found in HBase is encoded on the
669    * wrong number of bytes.
670    */
getOrCreateId(final String name)671   public byte[] getOrCreateId(final String name) throws HBaseException {
672     try {
673       return getIdAsync(name).joinUninterruptibly();
674     } catch (NoSuchUniqueName e) {
675       if (tsdb != null && tsdb.getUidFilter() != null &&
676           tsdb.getUidFilter().fillterUIDAssignments()) {
677         try {
678           if (!tsdb.getUidFilter().allowUIDAssignment(type, name, null, null)
679                 .join()) {
680             rejected_assignments++;
681             throw new FailedToAssignUniqueIdException(new String(kind), name, 0,
682                 "Blocked by UID filter.");
683           }
684         } catch (FailedToAssignUniqueIdException e1) {
685           throw e1;
686         } catch (InterruptedException e1) {
687           LOG.error("Interrupted", e1);
688           Thread.currentThread().interrupt();
689         } catch (Exception e1) {
690           throw new RuntimeException("Should never be here", e1);
691         }
692       }
693 
694       Deferred<byte[]> assignment = null;
695       boolean pending = false;
696       synchronized (pending_assignments) {
697         assignment = pending_assignments.get(name);
698         if (assignment == null) {
699           // to prevent UID leaks that can be caused when multiple time
700           // series for the same metric or tags arrive, we need to write a
701           // deferred to the pending map as quickly as possible. Then we can
702           // start the assignment process after we've stashed the deferred
703           // and released the lock
704           assignment = new Deferred<byte[]>();
705           pending_assignments.put(name, assignment);
706         } else {
707           pending = true;
708         }
709       }
710 
711       if (pending) {
712         LOG.info("Already waiting for UID assignment: " + name);
713         try {
714           return assignment.joinUninterruptibly();
715         } catch (Exception e1) {
716           throw new RuntimeException("Should never be here", e1);
717         }
718       }
719 
720       // start the assignment dance after stashing the deferred
721       byte[] uid = null;
722       try {
723         uid = new UniqueIdAllocator(name, assignment).tryAllocate().joinUninterruptibly();
724       } catch (RuntimeException e1) {
725         throw e1;
726       } catch (Exception e1) {
727         throw new RuntimeException("Should never be here", e);
728       } finally {
729         synchronized (pending_assignments) {
730           if (pending_assignments.remove(name) != null) {
731             LOG.info("Completed pending assignment for: " + name);
732           }
733         }
734       }
735       return uid;
736     } catch (Exception e) {
737       throw new RuntimeException("Should never be here", e);
738     }
739   }
740 
741   /**
742    * Finds the ID associated with a given name or creates it.
743    * <p>
744    * The length of the byte array is fixed in advance by the implementation.
745    *
746    * @param name The name to lookup in the table or to assign an ID to.
747    * @throws HBaseException if there is a problem communicating with HBase.
748    * @throws IllegalStateException if all possible IDs are already assigned.
749    * @throws IllegalStateException if the ID found in HBase is encoded on the
750    * wrong number of bytes.
751    * @since 1.2
752    */
getOrCreateIdAsync(final String name)753   public Deferred<byte[]> getOrCreateIdAsync(final String name) {
754     return getOrCreateIdAsync(name, null, null);
755   }
756 
757   /**
758    * Finds the ID associated with a given name or creates it.
759    * <p>
760    * The length of the byte array is fixed in advance by the implementation.
761    *
762    * @param name The name to lookup in the table or to assign an ID to.
763    * @param metric Name of the metric associated with the UID for filtering.
764    * @param tags Tag set associated with the UID for filtering.
765    * @throws HBaseException if there is a problem communicating with HBase.
766    * @throws IllegalStateException if all possible IDs are already assigned.
767    * @throws IllegalStateException if the ID found in HBase is encoded on the
768    * wrong number of bytes.
769    * @since 2.3
770    */
getOrCreateIdAsync(final String name, final String metric, final Map<String, String> tags)771   public Deferred<byte[]> getOrCreateIdAsync(final String name,
772       final String metric, final Map<String, String> tags) {
773     // Look in the cache first.
774     final byte[] id = getIdFromCache(name);
775     if (id != null) {
776       cache_hits++;
777       return Deferred.fromResult(id);
778     }
779     // Not found in our cache, so look in HBase instead.
780 
781     /** Triggers the assignment if allowed through the filter */
782     class AssignmentAllowedCB implements  Callback<Deferred<byte[]>, Boolean> {
783       @Override
784       public Deferred<byte[]> call(final Boolean allowed) throws Exception {
785         if (!allowed) {
786           rejected_assignments++;
787           return Deferred.fromError(new FailedToAssignUniqueIdException(
788               new String(kind), name, 0, "Blocked by UID filter."));
789         }
790 
791         Deferred<byte[]> assignment = null;
792         synchronized (pending_assignments) {
793           assignment = pending_assignments.get(name);
794           if (assignment == null) {
795             // to prevent UID leaks that can be caused when multiple time
796             // series for the same metric or tags arrive, we need to write a
797             // deferred to the pending map as quickly as possible. Then we can
798             // start the assignment process after we've stashed the deferred
799             // and released the lock
800             assignment = new Deferred<byte[]>();
801             pending_assignments.put(name, assignment);
802           } else {
803             LOG.info("Already waiting for UID assignment: " + name);
804             return assignment;
805           }
806         }
807 
808         // start the assignment dance after stashing the deferred
809         if (metric != null && LOG.isDebugEnabled()) {
810           LOG.debug("Assigning UID for '" + name + "' of type '" + type +
811               "' for series '" + metric + ", " + tags + "'");
812         }
813 
814         // start the assignment dance after stashing the deferred
815         return new UniqueIdAllocator(name, assignment).tryAllocate();
816       }
817       @Override
818       public String toString() {
819         return "AssignmentAllowedCB";
820       }
821     }
822 
823     /** Triggers an assignment (possibly through the filter) if the exception
824      * returned was a NoSuchUniqueName. */
825     class HandleNoSuchUniqueNameCB implements Callback<Object, Exception> {
826       public Object call(final Exception e) {
827         if (e instanceof NoSuchUniqueName) {
828           if (tsdb != null && tsdb.getUidFilter() != null &&
829               tsdb.getUidFilter().fillterUIDAssignments()) {
830             return tsdb.getUidFilter()
831                 .allowUIDAssignment(type, name, metric, tags)
832                 .addCallbackDeferring(new AssignmentAllowedCB());
833           } else {
834             return Deferred.fromResult(true)
835                 .addCallbackDeferring(new AssignmentAllowedCB());
836           }
837         }
838         return e;  // Other unexpected exception, let it bubble up.
839       }
840     }
841 
842     // Kick off the HBase lookup, and if we don't find it there either, start
843     // the process to allocate a UID.
844     return getIdAsync(name).addErrback(new HandleNoSuchUniqueNameCB());
845   }
846 
847   /**
848    * Attempts to find suggestions of names given a search term.
849    * <p>
850    * <strong>This method is blocking.</strong>  Its use within OpenTSDB itself
851    * is discouraged, please use {@link #suggestAsync} instead.
852    * @param search The search term (possibly empty).
853    * @return A list of known valid names that have UIDs that sort of match
854    * the search term.  If the search term is empty, returns the first few
855    * terms.
856    * @throws HBaseException if there was a problem getting suggestions from
857    * HBase.
858    */
suggest(final String search)859   public List<String> suggest(final String search) throws HBaseException {
860     return suggest(search, MAX_SUGGESTIONS);
861   }
862 
863   /**
864    * Attempts to find suggestions of names given a search term.
865    * @param search The search term (possibly empty).
866    * @param max_results The number of results to return. Must be 1 or greater
867    * @return A list of known valid names that have UIDs that sort of match
868    * the search term.  If the search term is empty, returns the first few
869    * terms.
870    * @throws HBaseException if there was a problem getting suggestions from
871    * HBase.
872    * @throws IllegalArgumentException if the count was less than 1
873    * @since 2.0
874    */
suggest(final String search, final int max_results)875   public List<String> suggest(final String search, final int max_results)
876     throws HBaseException {
877     if (max_results < 1) {
878       throw new IllegalArgumentException("Count must be greater than 0");
879     }
880     try {
881       return suggestAsync(search, max_results).joinUninterruptibly();
882     } catch (HBaseException e) {
883       throw e;
884     } catch (Exception e) {  // Should never happen.
885       final String msg = "Unexpected exception caught by "
886         + this + ".suggest(" + search + ')';
887       LOG.error(msg, e);
888       throw new RuntimeException(msg, e);  // Should never happen.
889     }
890   }
891 
892   /**
893    * Attempts to find suggestions of names given a search term.
894    * @param search The search term (possibly empty).
895    * @return A list of known valid names that have UIDs that sort of match
896    * the search term.  If the search term is empty, returns the first few
897    * terms.
898    * @throws HBaseException if there was a problem getting suggestions from
899    * HBase.
900    * @since 1.1
901    */
suggestAsync(final String search, final int max_results)902   public Deferred<List<String>> suggestAsync(final String search,
903       final int max_results) {
904     return new SuggestCB(search, max_results).search();
905   }
906 
907   /**
908    * Helper callback to asynchronously scan HBase for suggestions.
909    */
910   private final class SuggestCB
911     implements Callback<Object, ArrayList<ArrayList<KeyValue>>> {
912     private final LinkedList<String> suggestions = new LinkedList<String>();
913     private final Scanner scanner;
914     private final int max_results;
915 
SuggestCB(final String search, final int max_results)916     SuggestCB(final String search, final int max_results) {
917       this.max_results = max_results;
918       this.scanner = getSuggestScanner(client, table, search, kind, max_results);
919     }
920 
921     @SuppressWarnings("unchecked")
search()922     Deferred<List<String>> search() {
923       return (Deferred) scanner.nextRows().addCallback(this);
924     }
925 
call(final ArrayList<ArrayList<KeyValue>> rows)926     public Object call(final ArrayList<ArrayList<KeyValue>> rows) {
927       if (rows == null) {  // We're done scanning.
928         return suggestions;
929       }
930 
931       for (final ArrayList<KeyValue> row : rows) {
932         if (row.size() != 1) {
933           LOG.error("WTF shouldn't happen!  Scanner " + scanner + " returned"
934                     + " a row that doesn't have exactly 1 KeyValue: " + row);
935           if (row.isEmpty()) {
936             continue;
937           }
938         }
939         final byte[] key = row.get(0).key();
940         final String name = fromBytes(key);
941         final byte[] id = row.get(0).value();
942         final byte[] cached_id = name_cache.get(name);
943         if (cached_id == null) {
944           cacheMapping(name, id);
945         } else if (!Arrays.equals(id, cached_id)) {
946           throw new IllegalStateException("WTF?  For kind=" + kind()
947             + " name=" + name + ", we have id=" + Arrays.toString(cached_id)
948             + " in cache, but just scanned id=" + Arrays.toString(id));
949         }
950         suggestions.add(name);
951         if ((short) suggestions.size() >= max_results) {  // We have enough.
952           return scanner.close().addCallback(new Callback<Object, Object>() {
953             @Override
954             public Object call(Object ignored) throws Exception {
955               return suggestions;
956             }
957           });
958         }
959         row.clear();  // free()
960       }
961       return search();  // Get more suggestions.
962     }
963   }
964 
965   /**
966    * Reassigns the UID to a different name (non-atomic).
967    * <p>
968    * Whatever was the UID of {@code oldname} will be given to {@code newname}.
969    * {@code oldname} will no longer be assigned a UID.
970    * <p>
971    * Beware that the assignment change is <b>not atommic</b>.  If two threads
972    * or processes attempt to rename the same UID differently, the result is
973    * unspecified and might even be inconsistent.  This API is only here for
974    * administrative purposes, not for normal programmatic interactions.
975    * @param oldname The old name to rename.
976    * @param newname The new name.
977    * @throws NoSuchUniqueName if {@code oldname} wasn't assigned.
978    * @throws IllegalArgumentException if {@code newname} was already assigned.
979    * @throws HBaseException if there was a problem with HBase while trying to
980    * update the mapping.
981    */
982   public void rename(final String oldname, final String newname) {
983     final byte[] row = getId(oldname);
984     final String row_string = fromBytes(row);
985     {
986       byte[] id = null;
987       try {
988         id = getId(newname);
989       } catch (NoSuchUniqueName e) {
990         // OK, we don't want the new name to be assigned.
991       }
992       if (id != null) {
993         throw new IllegalArgumentException("When trying rename(\"" + oldname
994           + "\", \"" + newname + "\") on " + this + ": new name already"
995           + " assigned ID=" + Arrays.toString(id));
996       }
997     }
998 
999     if (renaming_id_names.contains(row_string)
1000         || renaming_id_names.contains(newname)) {
1001       throw new IllegalArgumentException("Ongoing rename on the same ID(\""
1002         + Arrays.toString(row) + "\") or an identical new name(\"" + newname
1003         + "\")");
1004     }
1005     renaming_id_names.add(row_string);
1006     renaming_id_names.add(newname);
1007 
1008     final byte[] newnameb = toBytes(newname);
1009 
1010     // Update the reverse mapping first, so that if we die before updating
1011     // the forward mapping we don't run the risk of "publishing" a
1012     // partially assigned ID.  The reverse mapping on its own is harmless
1013     // but the forward mapping without reverse mapping is bad.
1014     try {
1015       final PutRequest reverse_mapping = new PutRequest(
1016         table, row, NAME_FAMILY, kind, newnameb);
1017       hbasePutWithRetry(reverse_mapping, MAX_ATTEMPTS_PUT,
1018                         INITIAL_EXP_BACKOFF_DELAY);
1019     } catch (HBaseException e) {
1020       LOG.error("When trying rename(\"" + oldname
1021         + "\", \"" + newname + "\") on " + this + ": Failed to update reverse"
1022         + " mapping for ID=" + Arrays.toString(row), e);
1023       renaming_id_names.remove(row_string);
1024       renaming_id_names.remove(newname);
1025       throw e;
1026     }
1027 
1028     // Now create the new forward mapping.
1029     try {
1030       final PutRequest forward_mapping = new PutRequest(
1031         table, newnameb, ID_FAMILY, kind, row);
1032       hbasePutWithRetry(forward_mapping, MAX_ATTEMPTS_PUT,
1033                         INITIAL_EXP_BACKOFF_DELAY);
1034     } catch (HBaseException e) {
1035       LOG.error("When trying rename(\"" + oldname
1036         + "\", \"" + newname + "\") on " + this + ": Failed to create the"
1037         + " new forward mapping with ID=" + Arrays.toString(row), e);
1038       renaming_id_names.remove(row_string);
1039       renaming_id_names.remove(newname);
1040       throw e;
1041     }
1042 
1043     // Update cache.
1044     addIdToCache(newname, row);            // add     new name -> ID
1045     id_cache.put(fromBytes(row), newname);  // update  ID -> new name
1046     name_cache.remove(oldname);             // remove  old name -> ID
1047 
1048     // Delete the old forward mapping.
1049     try {
1050       final DeleteRequest old_forward_mapping = new DeleteRequest(
1051         table, toBytes(oldname), ID_FAMILY, kind);
1052       client.delete(old_forward_mapping).joinUninterruptibly();
1053     } catch (HBaseException e) {
1054       LOG.error("When trying rename(\"" + oldname
1055         + "\", \"" + newname + "\") on " + this + ": Failed to remove the"
1056         + " old forward mapping for ID=" + Arrays.toString(row), e);
1057       throw e;
1058     } catch (Exception e) {
1059       final String msg = "Unexpected exception when trying rename(\"" + oldname
1060         + "\", \"" + newname + "\") on " + this + ": Failed to remove the"
1061         + " old forward mapping for ID=" + Arrays.toString(row);
1062       LOG.error("WTF?  " + msg, e);
1063       throw new RuntimeException(msg, e);
1064     } finally {
1065       renaming_id_names.remove(row_string);
1066       renaming_id_names.remove(newname);
1067     }
1068     // Success!
1069   }
1070 
1071   /**
1072    * Attempts to remove the mappings for the given string from the UID table
1073    * as well as the cache. If used, the caller should remove the entry from all
1074    * TSD caches as well.
1075    * <p>
1076    * WARNING: This is a best attempt only method in that we'll lookup the UID
1077    * for the given string, then issue two delete requests, one for each mapping.
1078    * If either mapping fails then the cache can be re-populated later on with
1079    * stale data. In that case, please run the FSCK utility.
1080    * <p>
1081    * WARNING 2: This method will NOT delete time series data or TSMeta data
1082    * associated with the UIDs. It only removes them from the UID table. Deleting
1083    * a metric is generally safe as you won't query over it in the future. But
1084    * deleting tag keys or values can cause queries to fail if they find data
1085    * without a corresponding name.
1086    *
1087    * @param name The name of the UID to delete
1088    * @return A deferred to wait on for completion. The result will be null if
1089    * successful, an exception otherwise.
1090    * @throws NoSuchUniqueName if the UID string did not exist in storage
1091    * @throws IllegalStateException if the TSDB wasn't set for this UID object
1092    * @since 2.2
1093    */
1094   public Deferred<Object> deleteAsync(final String name) {
1095     if (tsdb == null) {
1096       throw new IllegalStateException("The TSDB is null for this UID object.");
1097     }
1098     final byte[] uid = new byte[id_width];
1099     final ArrayList<Deferred<Object>> deferreds =
1100         new ArrayList<Deferred<Object>>(2);
1101 
1102     /** Catches errors and still cleans out the cache */
1103     class ErrCB implements Callback<Object, Exception> {
1104       @Override
1105       public Object call(final Exception ex) throws Exception {
1106         name_cache.remove(name);
1107         id_cache.remove(fromBytes(uid));
1108         LOG.error("Failed to delete " + fromBytes(kind) + " UID " + name
1109             + " but still cleared the cache", ex);
1110         return ex;
1111       }
1112     }
1113 
1114     /** Used to wait on the group of delete requests */
1115     class GroupCB implements Callback<Deferred<Object>, ArrayList<Object>> {
1116       @Override
1117       public Deferred<Object> call(final ArrayList<Object> response)
1118           throws Exception {
1119         name_cache.remove(name);
1120         id_cache.remove(fromBytes(uid));
1121         LOG.info("Successfully deleted " + fromBytes(kind) + " UID " + name);
1122         return Deferred.fromResult(null);
1123       }
1124     }
1125 
1126     /** Called after fetching the UID from storage */
1127     class LookupCB implements Callback<Deferred<Object>, byte[]> {
1128       @Override
1129       public Deferred<Object> call(final byte[] stored_uid) throws Exception {
1130         if (stored_uid == null) {
1131           return Deferred.fromError(new NoSuchUniqueName(kind(), name));
1132         }
1133         System.arraycopy(stored_uid, 0, uid, 0, id_width);
1134         final DeleteRequest forward =
1135             new DeleteRequest(table, toBytes(name), ID_FAMILY, kind);
1136         deferreds.add(tsdb.getClient().delete(forward));
1137 
1138         final DeleteRequest reverse =
1139             new DeleteRequest(table, uid, NAME_FAMILY, kind);
1140         deferreds.add(tsdb.getClient().delete(reverse));
1141 
1142         final DeleteRequest meta = new DeleteRequest(table, uid, NAME_FAMILY,
1143             toBytes((type.toString().toLowerCase() + "_meta")));
1144         deferreds.add(tsdb.getClient().delete(meta));
1145         return Deferred.group(deferreds).addCallbackDeferring(new GroupCB());
1146       }
1147     }
1148 
1149     final byte[] cached_uid = name_cache.get(name);
1150     if (cached_uid == null) {
1151       return getIdFromHBase(name).addCallbackDeferring(new LookupCB())
1152           .addErrback(new ErrCB());
1153     }
1154     System.arraycopy(cached_uid, 0, uid, 0, id_width);
1155     final DeleteRequest forward =
1156         new DeleteRequest(table, toBytes(name), ID_FAMILY, kind);
1157     deferreds.add(tsdb.getClient().delete(forward));
1158 
1159     final DeleteRequest reverse =
1160         new DeleteRequest(table, uid, NAME_FAMILY, kind);
1161     deferreds.add(tsdb.getClient().delete(reverse));
1162 
1163     final DeleteRequest meta = new DeleteRequest(table, uid, NAME_FAMILY,
1164         toBytes((type.toString().toLowerCase() + "_meta")));
1165     deferreds.add(tsdb.getClient().delete(meta));
1166     return Deferred.group(deferreds).addCallbackDeferring(new GroupCB())
1167         .addErrback(new ErrCB());
1168   }
1169 
1170   /** The start row to scan on empty search strings.  `!' = first ASCII char. */
1171   private static final byte[] START_ROW = new byte[] { '!' };
1172 
1173   /** The end row to scan on empty search strings.  `~' = last ASCII char. */
1174   private static final byte[] END_ROW = new byte[] { '~' };
1175 
1176   /**
1177    * Creates a scanner that scans the right range of rows for suggestions.
1178    * @param client The HBase client to use.
1179    * @param tsd_uid_table Table where IDs are stored.
1180    * @param search The string to start searching at
1181    * @param kind_or_null The kind of UID to search or null for any kinds.
1182    * @param max_results The max number of results to return
1183    */
1184   private static Scanner getSuggestScanner(final HBaseClient client,
1185       final byte[] tsd_uid_table, final String search,
1186       final byte[] kind_or_null, final int max_results) {
1187     final byte[] start_row;
1188     final byte[] end_row;
1189     if (search.isEmpty()) {
1190       start_row = START_ROW;
1191       end_row = END_ROW;
1192     } else {
1193       start_row = toBytes(search);
1194       end_row = Arrays.copyOf(start_row, start_row.length);
1195       end_row[start_row.length - 1]++;
1196     }
1197     final Scanner scanner = client.newScanner(tsd_uid_table);
1198     scanner.setStartKey(start_row);
1199     scanner.setStopKey(end_row);
1200     scanner.setFamily(ID_FAMILY);
1201     if (kind_or_null != null) {
1202       scanner.setQualifier(kind_or_null);
1203     }
1204     scanner.setMaxNumRows(max_results <= 4096 ? max_results : 4096);
1205     return scanner;
1206   }
1207 
1208   /** Returns the cell of the specified row key, using family:kind. */
1209   private Deferred<byte[]> hbaseGet(final byte[] key, final byte[] family) {
1210     final GetRequest get = new GetRequest(table, key);
1211     get.family(family).qualifier(kind);
1212     class GetCB implements Callback<byte[], ArrayList<KeyValue>> {
1213       public byte[] call(final ArrayList<KeyValue> row) {
1214         if (row == null || row.isEmpty()) {
1215           return null;
1216         }
1217         return row.get(0).value();
1218       }
1219     }
1220     return client.get(get).addCallback(new GetCB());
1221   }
1222 
1223   /**
1224    * Attempts to run the PutRequest given in argument, retrying if needed.
1225    *
1226    * Puts are synchronized.
1227    *
1228    * @param put The PutRequest to execute.
1229    * @param attempts The maximum number of attempts.
1230    * @param wait The initial amount of time in ms to sleep for after a
1231    * failure.  This amount is doubled after each failed attempt.
1232    * @throws HBaseException if all the attempts have failed.  This exception
1233    * will be the exception of the last attempt.
1234    */
1235   private void hbasePutWithRetry(final PutRequest put, short attempts, short wait)
1236     throws HBaseException {
1237     put.setBufferable(false);  // TODO(tsuna): Remove once this code is async.
1238     while (attempts-- > 0) {
1239       try {
1240         client.put(put).joinUninterruptibly();
1241         return;
1242       } catch (HBaseException e) {
1243         if (attempts > 0) {
1244           LOG.error("Put failed, attempts left=" + attempts
1245                     + " (retrying in " + wait + " ms), put=" + put, e);
1246           try {
1247             Thread.sleep(wait);
1248           } catch (InterruptedException ie) {
1249             throw new RuntimeException("interrupted", ie);
1250           }
1251           wait *= 2;
1252         } else {
1253           throw e;
1254         }
1255       } catch (Exception e) {
1256         LOG.error("WTF?  Unexpected exception type, put=" + put, e);
1257       }
1258     }
1259     throw new IllegalStateException("This code should never be reached!");
1260   }
1261 
1262   private static byte[] toBytes(final String s) {
1263     return s.getBytes(CHARSET);
1264   }
1265 
1266   private static String fromBytes(final byte[] b) {
1267     return new String(b, CHARSET);
1268   }
1269 
1270   /** Returns a human readable string representation of the object. */
1271   public String toString() {
1272     return "UniqueId(" + fromBytes(table) + ", " + kind() + ", " + id_width + ")";
1273   }
1274 
1275   /**
1276    * Converts a byte array to a hex encoded, upper case string with padding
1277    * @param uid The ID to convert
1278    * @return the UID as a hex string
1279    * @throws NullPointerException if the ID was null
1280    * @since 2.0
1281    */
1282   public static String uidToString(final byte[] uid) {
1283     return DatatypeConverter.printHexBinary(uid);
1284   }
1285 
1286   /**
1287    * Converts a hex string to a byte array
1288    * If the {@code uid} is less than {@code uid_length * 2} characters wide, it
1289    * will be padded with 0s to conform to the spec. E.g. if the tagk width is 3
1290    * and the given {@code uid} string is "1", the string will be padded to
1291    * "000001" and then converted to a byte array to reach 3 bytes.
1292    * All {@code uid}s are padded to 1 byte. If given "1", and {@code uid_length}
1293    * is 0, the uid will be padded to "01" then converted.
1294    * @param uid The UID to convert
1295    * @return The UID as a byte array
1296    * @throws NullPointerException if the ID was null
1297    * @throws IllegalArgumentException if the string is not valid hex
1298    * @since 2.0
1299    */
1300   public static byte[] stringToUid(final String uid) {
1301     return stringToUid(uid, (short)0);
1302   }
1303 
1304   /**
1305    * Converts a UID to an integer value. The array must be the same length as
1306    * uid_length or an exception will be thrown.
1307    * @param uid The hex encoded UID to convert
1308    * @param uid_length Length the array SHOULD be according to the UID config
1309    * @return The UID converted to an integer
1310    * @throws IllegalArgumentException if the length of the byte array does not
1311    * match the uid_length value
1312    * @since 2.1
1313    */
1314   public static long uidToLong(final String uid, final short uid_length) {
1315     return uidToLong(stringToUid(uid), uid_length);
1316   }
1317 
1318   /**
1319    * Converts a UID to an integer value. The array must be the same length as
1320    * uid_length or an exception will be thrown.
1321    * @param uid The byte array to convert
1322    * @param uid_length Length the array SHOULD be according to the UID config
1323    * @return The UID converted to an integer
1324    * @throws IllegalArgumentException if the length of the byte array does not
1325    * match the uid_length value
1326    * @since 2.1
1327    */
1328   public static long uidToLong(final byte[] uid, final short uid_length) {
1329     if (uid.length != uid_length) {
1330       throw new IllegalArgumentException("UID was " + uid.length
1331           + " bytes long but expected to be " + uid_length);
1332     }
1333 
1334     final byte[] uid_raw = new byte[8];
1335     System.arraycopy(uid, 0, uid_raw, 8 - uid_length, uid_length);
1336     return Bytes.getLong(uid_raw);
1337   }
1338 
1339   /**
1340    * Converts a Long to a byte array with the proper UID width
1341    * @param uid The UID to convert
1342    * @param width The width of the UID in bytes
1343    * @return The UID as a byte array
1344    * @throws IllegalStateException if the UID is larger than the width would
1345    * allow
1346    * @since 2.1
1347    */
1348   public static byte[] longToUID(final long uid, final short width) {
1349     // Verify that we're going to drop bytes that are 0.
1350     final byte[] padded = Bytes.fromLong(uid);
1351     for (int i = 0; i < padded.length - width; i++) {
1352       if (padded[i] != 0) {
1353         final String message = "UID " + Long.toString(uid) +
1354           " was too large for " + width + " bytes";
1355         LOG.error("OMG " + message);
1356         throw new IllegalStateException(message);
1357       }
1358     }
1359     // Shrink the ID on the requested number of bytes.
1360     return Arrays.copyOfRange(padded, padded.length - width, padded.length);
1361   }
1362 
1363   /**
1364    * Appends the given UID to the given string buffer, followed by "\\E".
1365    * @param buf The buffer to append
1366    * @param id The UID to add as a binary regex pattern
1367    * @since 2.1
1368    */
1369   public static void addIdToRegexp(final StringBuilder buf, final byte[] id) {
1370     boolean backslash = false;
1371     for (final byte b : id) {
1372       buf.append((char) (b & 0xFF));
1373       if (b == 'E' && backslash) {  // If we saw a `\' and now we have a `E'.
1374         // So we just terminated the quoted section because we just added \E
1375         // to `buf'.  So let's put a literal \E now and start quoting again.
1376         buf.append("\\\\E\\Q");
1377       } else {
1378         backslash = b == '\\';
1379       }
1380     }
1381     buf.append("\\E");
1382   }
1383 
1384   /**
1385    * Attempts to convert the given string to a type enumerator
1386    * @param type The string to convert
1387    * @return a valid UniqueIdType if matched
1388    * @throws IllegalArgumentException if the string did not match a type
1389    * @since 2.0
1390    */
1391   public static UniqueIdType stringToUniqueIdType(final String type) {
1392     if (type.toLowerCase().equals("metric") ||
1393         type.toLowerCase().equals("metrics")) {
1394       return UniqueIdType.METRIC;
1395     } else if (type.toLowerCase().equals("tagk")) {
1396       return UniqueIdType.TAGK;
1397     } else if (type.toLowerCase().equals("tagv")) {
1398       return UniqueIdType.TAGV;
1399     } else {
1400       throw new IllegalArgumentException("Invalid type requested: " + type);
1401     }
1402   }
1403 
1404   /**
1405    * Converts a hex string to a byte array
1406    * If the {@code uid} is less than {@code uid_length * 2} characters wide, it
1407    * will be padded with 0s to conform to the spec. E.g. if the tagk width is 3
1408    * and the given {@code uid} string is "1", the string will be padded to
1409    * "000001" and then converted to a byte array to reach 3 bytes.
1410    * All {@code uid}s are padded to 1 byte. If given "1", and {@code uid_length}
1411    * is 0, the uid will be padded to "01" then converted.
1412    * @param uid The UID to convert
1413    * @param uid_length An optional length, in bytes, that the UID must conform
1414    * to. Set to 0 if not used.
1415    * @return The UID as a byte array
1416    * @throws NullPointerException if the ID was null
1417    * @throws IllegalArgumentException if the string is not valid hex
1418    * @since 2.0
1419    */
1420   public static byte[] stringToUid(final String uid, final short uid_length) {
1421     if (uid == null || uid.isEmpty()) {
1422       throw new IllegalArgumentException("UID was empty");
1423     }
1424     String id = uid;
1425     if (uid_length > 0) {
1426       while (id.length() < uid_length * 2) {
1427         id = "0" + id;
1428       }
1429     } else {
1430       if (id.length() % 2 > 0) {
1431         id = "0" + id;
1432       }
1433     }
1434     return DatatypeConverter.parseHexBinary(id);
1435   }
1436 
1437   /**
1438    * Extracts the TSUID from a storage row key that includes the timestamp.
1439    * @param row_key The row key to process
1440    * @param metric_width The width of the metric
1441    * @param timestamp_width The width of the timestamp
1442    * @return The TSUID as a byte array
1443    * @throws IllegalArgumentException if the row key is missing tags or it is
1444    * corrupt such as a salted key when salting is disabled or vice versa.
1445    */
1446   public static byte[] getTSUIDFromKey(final byte[] row_key,
1447       final short metric_width, final short timestamp_width) {
1448     int idx = 0;
1449     // validation
1450     final int tag_pair_width = TSDB.tagk_width() + TSDB.tagv_width();
1451     final int tags_length = row_key.length -
1452         (Const.SALT_WIDTH() + metric_width + timestamp_width);
1453     if (tags_length < tag_pair_width || (tags_length % tag_pair_width) != 0) {
1454       throw new IllegalArgumentException(
1455           "Row key is missing tags or it is corrupted " + Arrays.toString(row_key));
1456     }
1457     final byte[] tsuid = new byte[
1458                  row_key.length - timestamp_width - Const.SALT_WIDTH()];
1459     for (int i = Const.SALT_WIDTH(); i < row_key.length; i++) {
1460       if (i < Const.SALT_WIDTH() + metric_width ||
1461           i >= (Const.SALT_WIDTH() + metric_width + timestamp_width)) {
1462         tsuid[idx] = row_key[i];
1463         idx++;
1464       }
1465     }
1466     return tsuid;
1467   }
1468 
1469   /**
1470    * Extracts a list of tagks and tagvs as individual values in a list
1471    * @param tsuid The tsuid to parse
1472    * @return A list of tagk/tagv UIDs alternating with tagk, tagv, tagk, tagv
1473    * @throws IllegalArgumentException if the TSUID is malformed
1474    * @since 2.1
1475    */
1476   public static List<byte[]> getTagsFromTSUID(final String tsuid) {
1477     if (tsuid == null || tsuid.isEmpty()) {
1478       throw new IllegalArgumentException("Missing TSUID");
1479     }
1480     if (tsuid.length() <= TSDB.metrics_width() * 2) {
1481       throw new IllegalArgumentException(
1482           "TSUID is too short, may be missing tags");
1483     }
1484 
1485     final List<byte[]> tags = new ArrayList<byte[]>();
1486     final int pair_width = (TSDB.tagk_width() * 2) + (TSDB.tagv_width() * 2);
1487 
1488     // start after the metric then iterate over each tagk/tagv pair
1489     for (int i = TSDB.metrics_width() * 2; i < tsuid.length(); i+= pair_width) {
1490       if (i + pair_width > tsuid.length()){
1491         throw new IllegalArgumentException(
1492             "The TSUID appears to be malformed, improper tag width");
1493       }
1494       String tag = tsuid.substring(i, i + (TSDB.tagk_width() * 2));
1495       tags.add(UniqueId.stringToUid(tag));
1496       tag = tsuid.substring(i + (TSDB.tagk_width() * 2), i + pair_width);
1497       tags.add(UniqueId.stringToUid(tag));
1498     }
1499     return tags;
1500   }
1501 
1502   /**
1503    * Extracts a list of tagk/tagv pairs from a tsuid
1504    * @param tsuid The tsuid to parse
1505    * @return A list of tagk/tagv UID pairs
1506    * @throws IllegalArgumentException if the TSUID is malformed
1507    * @since 2.0
1508    */
1509   public static List<byte[]> getTagPairsFromTSUID(final String tsuid) {
1510      if (tsuid == null || tsuid.isEmpty()) {
1511        throw new IllegalArgumentException("Missing TSUID");
1512      }
1513      if (tsuid.length() <= TSDB.metrics_width() * 2) {
1514        throw new IllegalArgumentException(
1515            "TSUID is too short, may be missing tags");
1516      }
1517 
1518      final List<byte[]> tags = new ArrayList<byte[]>();
1519      final int pair_width = (TSDB.tagk_width() * 2) + (TSDB.tagv_width() * 2);
1520 
1521      // start after the metric then iterate over each tagk/tagv pair
1522      for (int i = TSDB.metrics_width() * 2; i < tsuid.length(); i+= pair_width) {
1523        if (i + pair_width > tsuid.length()){
1524          throw new IllegalArgumentException(
1525              "The TSUID appears to be malformed, improper tag width");
1526        }
1527        String tag = tsuid.substring(i, i + pair_width);
1528        tags.add(UniqueId.stringToUid(tag));
1529      }
1530      return tags;
1531    }
1532 
1533   /**
1534    * Extracts a list of tagk/tagv pairs from a tsuid
1535    * @param tsuid The tsuid to parse
1536    * @return A list of tagk/tagv UID pairs
1537    * @throws IllegalArgumentException if the TSUID is malformed
1538    * @since 2.0
1539    */
1540   public static List<byte[]> getTagPairsFromTSUID(final byte[] tsuid) {
1541     if (tsuid == null) {
1542       throw new IllegalArgumentException("Missing TSUID");
1543     }
1544     if (tsuid.length <= TSDB.metrics_width()) {
1545       throw new IllegalArgumentException(
1546           "TSUID is too short, may be missing tags");
1547     }
1548 
1549     final List<byte[]> tags = new ArrayList<byte[]>();
1550     final int pair_width = TSDB.tagk_width() + TSDB.tagv_width();
1551 
1552     // start after the metric then iterate over each tagk/tagv pair
1553     for (int i = TSDB.metrics_width(); i < tsuid.length; i+= pair_width) {
1554       if (i + pair_width > tsuid.length){
1555         throw new IllegalArgumentException(
1556             "The TSUID appears to be malformed, improper tag width");
1557       }
1558       tags.add(Arrays.copyOfRange(tsuid, i, i + pair_width));
1559     }
1560     return tags;
1561   }
1562 
1563   /**
1564    * Returns a map of max UIDs from storage for the given list of UID types
1565    * @param tsdb The TSDB to which we belong
1566    * @param kinds A list of qualifiers to fetch
1567    * @return A map with the "kind" as the key and the maximum assigned UID as
1568    * the value
1569    * @since 2.0
1570    */
1571   public static Deferred<Map<String, Long>> getUsedUIDs(final TSDB tsdb,
1572       final byte[][] kinds) {
1573 
1574     /**
1575      * Returns a map with 0 if the max ID row hasn't been initialized yet,
1576      * otherwise the map has actual data
1577      */
1578     final class GetCB implements Callback<Map<String, Long>,
1579       ArrayList<KeyValue>> {
1580 
1581       @Override
1582       public Map<String, Long> call(final ArrayList<KeyValue> row)
1583           throws Exception {
1584 
1585         final Map<String, Long> results = new HashMap<String, Long>(3);
1586         if (row == null || row.isEmpty()) {
1587           // it could be the case that this is the first time the TSD has run
1588           // and the user hasn't put any metrics in, so log and return 0s
1589           LOG.info("Could not find the UID assignment row");
1590           for (final byte[] kind : kinds) {
1591             results.put(new String(kind, CHARSET), 0L);
1592           }
1593           return results;
1594         }
1595 
1596         for (final KeyValue column : row) {
1597           results.put(new String(column.qualifier(), CHARSET),
1598               Bytes.getLong(column.value()));
1599         }
1600 
1601         // if the user is starting with a fresh UID table, we need to account
1602         // for missing columns
1603         for (final byte[] kind : kinds) {
1604           if (results.get(new String(kind, CHARSET)) == null) {
1605             results.put(new String(kind, CHARSET), 0L);
1606           }
1607         }
1608         return results;
1609       }
1610 
1611     }
1612 
1613     final GetRequest get = new GetRequest(tsdb.uidTable(), MAXID_ROW);
1614     get.family(ID_FAMILY);
1615     get.qualifiers(kinds);
1616     return tsdb.getClient().get(get).addCallback(new GetCB());
1617   }
1618 
1619   /**
1620    * Pre-load UID caches, scanning up to "tsd.core.preload_uid_cache.max_entries"
1621    * rows from the UID table.
1622    * @param tsdb The TSDB to use
1623    * @param uid_cache_map A map of {@link UniqueId} objects keyed on the kind.
1624    * @throws HBaseException Passes any HBaseException from HBase scanner.
1625    * @throws RuntimeException Wraps any non HBaseException from HBase scanner.
1626    * @2.1
1627    */
1628   public static void preloadUidCache(final TSDB tsdb,
1629       final ByteMap<UniqueId> uid_cache_map) throws HBaseException {
1630     int max_results = tsdb.getConfig().getInt(
1631         "tsd.core.preload_uid_cache.max_entries");
1632     LOG.info("Preloading uid cache with max_results=" + max_results);
1633     if (max_results <= 0) {
1634       return;
1635     }
1636     Scanner scanner = null;
1637     try {
1638       int num_rows = 0;
1639       scanner = getSuggestScanner(tsdb.getClient(), tsdb.uidTable(), "", null,
1640           max_results);
1641       for (ArrayList<ArrayList<KeyValue>> rows = scanner.nextRows().join();
1642           rows != null;
1643           rows = scanner.nextRows().join()) {
1644         for (final ArrayList<KeyValue> row : rows) {
1645           for (KeyValue kv: row) {
1646             final String name = fromBytes(kv.key());
1647             final byte[] kind = kv.qualifier();
1648             final byte[] id = kv.value();
1649             LOG.debug("id='{}', name='{}', kind='{}'", Arrays.toString(id),
1650                 name, fromBytes(kind));
1651             UniqueId uid_cache = uid_cache_map.get(kind);
1652             if (uid_cache != null) {
1653               uid_cache.cacheMapping(name, id);
1654             }
1655           }
1656           num_rows += row.size();
1657           row.clear();  // free()
1658           if (num_rows >= max_results) {
1659             break;
1660           }
1661         }
1662       }
1663       for (UniqueId unique_id_table : uid_cache_map.values()) {
1664         LOG.info("After preloading, uid cache '{}' has {} ids and {} names.",
1665                  unique_id_table.kind(),
1666                  unique_id_table.id_cache.size(),
1667                  unique_id_table.name_cache.size());
1668       }
1669     } catch (Exception e) {
1670       if (e instanceof HBaseException) {
1671         throw (HBaseException)e;
1672       } else if (e instanceof RuntimeException) {
1673         throw (RuntimeException)e;
1674       } else {
1675         throw new RuntimeException("Error while preloading IDs", e);
1676       }
1677     } finally {
1678       if (scanner != null) {
1679         scanner.close();
1680       }
1681     }
1682   }
1683 }
1684