1 // This file is part of OpenTSDB.
2 // Copyright (C) 2010-2012  The OpenTSDB Authors.
3 //
4 // This program is free software: you can redistribute it and/or modify it
5 // under the terms of the GNU Lesser General Public License as published by
6 // the Free Software Foundation, either version 2.1 of the License, or (at your
7 // option) any later version.  This program is distributed in the hope that it
8 // will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
9 // of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser
10 // General Public License for more details.  You should have received a copy
11 // of the GNU Lesser General Public License along with this program.  If not,
12 // see <http://www.gnu.org/licenses/>.
13 package net.opentsdb.core;
14 
15 import java.io.IOException;
16 import java.lang.reflect.InvocationTargetException;
17 import java.nio.charset.Charset;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.Set;
23 import java.util.concurrent.atomic.AtomicLong;
24 
25 import com.stumbleupon.async.Callback;
26 import com.stumbleupon.async.Deferred;
27 import com.stumbleupon.async.DeferredGroupException;
28 
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import org.hbase.async.AppendRequest;
32 import org.hbase.async.Bytes;
33 import org.hbase.async.Bytes.ByteMap;
34 import org.hbase.async.ClientStats;
35 import org.hbase.async.DeleteRequest;
36 import org.hbase.async.GetRequest;
37 import org.hbase.async.HBaseClient;
38 import org.hbase.async.HBaseException;
39 import org.hbase.async.KeyValue;
40 import org.hbase.async.PutRequest;
41 import org.jboss.netty.util.HashedWheelTimer;
42 import org.jboss.netty.util.Timeout;
43 import org.jboss.netty.util.Timer;
44 
45 import net.opentsdb.tree.TreeBuilder;
46 import net.opentsdb.tsd.RTPublisher;
47 import net.opentsdb.tsd.StorageExceptionHandler;
48 import net.opentsdb.uid.NoSuchUniqueId;
49 import net.opentsdb.uid.NoSuchUniqueName;
50 import net.opentsdb.uid.UniqueId;
51 import net.opentsdb.uid.UniqueIdFilterPlugin;
52 import net.opentsdb.uid.UniqueId.UniqueIdType;
53 import net.opentsdb.utils.Config;
54 import net.opentsdb.utils.DateTime;
55 import net.opentsdb.utils.PluginLoader;
56 import net.opentsdb.utils.Threads;
57 import net.opentsdb.meta.Annotation;
58 import net.opentsdb.meta.MetaDataCache;
59 import net.opentsdb.meta.TSMeta;
60 import net.opentsdb.meta.UIDMeta;
61 import net.opentsdb.query.expression.ExpressionFactory;
62 import net.opentsdb.query.filter.TagVFilter;
63 import net.opentsdb.search.SearchPlugin;
64 import net.opentsdb.search.SearchQuery;
65 import net.opentsdb.tools.StartupPlugin;
66 import net.opentsdb.stats.Histogram;
67 import net.opentsdb.stats.QueryStats;
68 import net.opentsdb.stats.StatsCollector;
69 
70 /**
71  * Thread-safe implementation of the TSDB client.
72  * <p>
73  * This class is the central class of OpenTSDB.  You use it to add new data
74  * points or query the database.
75  */
76 public final class TSDB {
77   private static final Logger LOG = LoggerFactory.getLogger(TSDB.class);
78 
79   static final byte[] FAMILY = { 't' };
80 
81   /** Charset used to convert Strings to byte arrays and back. */
82   private static final Charset CHARSET = Charset.forName("ISO-8859-1");
83   private static final String METRICS_QUAL = "metrics";
84   private static short METRICS_WIDTH = 3;
85   private static final String TAG_NAME_QUAL = "tagk";
86   private static short TAG_NAME_WIDTH = 3;
87   private static final String TAG_VALUE_QUAL = "tagv";
88   private static short TAG_VALUE_WIDTH = 3;
89 
90   /** Client for the HBase cluster to use.  */
91   final HBaseClient client;
92 
93   /** Name of the table in which timeseries are stored.  */
94   final byte[] table;
95   /** Name of the table in which UID information is stored. */
96   final byte[] uidtable;
97   /** Name of the table where tree data is stored. */
98   final byte[] treetable;
99   /** Name of the table where meta data is stored. */
100   final byte[] meta_table;
101 
102   /** Unique IDs for the metric names. */
103   final UniqueId metrics;
104   /** Unique IDs for the tag names. */
105   final UniqueId tag_names;
106   /** Unique IDs for the tag values. */
107   final UniqueId tag_values;
108 
109   /** Configuration object for all TSDB components */
110   final Config config;
111 
112   /** Timer used for various tasks such as idle timeouts or query timeouts */
113   private final HashedWheelTimer timer;
114 
115   /**
116    * Row keys that need to be compacted.
117    * Whenever we write a new data point to a row, we add the row key to this
118    * set.  Every once in a while, the compaction thread will go through old
119    * row keys and will read re-compact them.
120    */
121   private final CompactionQueue compactionq;
122 
123   /** Search indexer to use if configure */
124   private SearchPlugin search = null;
125 
126   /** Optional Startup Plugin to use if configured */
127   private StartupPlugin startup = null;
128 
129   /** Optional real time pulblisher plugin to use if configured */
130   private RTPublisher rt_publisher = null;
131 
132   /** Optional plugin for handling meta data caching and updating */
133   private MetaDataCache meta_cache = null;
134 
135   /** Plugin for dealing with data points that can't be stored */
136   private StorageExceptionHandler storage_exception_handler = null;
137 
138   /** A filter plugin for allowing or blocking time series */
139   private WriteableDataPointFilterPlugin ts_filter;
140 
141   /** A filter plugin for allowing or blocking UIDs */
142   private UniqueIdFilterPlugin uid_filter;
143 
144   /** Writes rejected by the filter */
145   private final AtomicLong rejected_dps = new AtomicLong();
146   private final AtomicLong rejected_aggregate_dps = new AtomicLong();
147 
148   /** Datapoints Added */
149   private static final AtomicLong datapoints_added = new AtomicLong();
150 
151   /**
152    * Constructor
153    * @param client An initialized HBase client object
154    * @param config An initialized configuration object
155    * @since 2.1
156    */
TSDB(final HBaseClient client, final Config config)157   public TSDB(final HBaseClient client, final Config config) {
158     this.config = config;
159     if (client == null) {
160       final org.hbase.async.Config async_config;
161       if (config.configLocation() != null && !config.configLocation().isEmpty()) {
162         try {
163           async_config = new org.hbase.async.Config(config.configLocation());
164         } catch (final IOException e) {
165           throw new RuntimeException("Failed to read the config file: " +
166               config.configLocation(), e);
167         }
168       } else {
169         async_config = new org.hbase.async.Config();
170       }
171       async_config.overrideConfig("hbase.zookeeper.znode.parent",
172           config.getString("tsd.storage.hbase.zk_basedir"));
173       async_config.overrideConfig("hbase.zookeeper.quorum",
174           config.getString("tsd.storage.hbase.zk_quorum"));
175       this.client = new HBaseClient(async_config);
176     } else {
177       this.client = client;
178     }
179 
180     // SALT AND UID WIDTHS
181     // Users really wanted this to be set via config instead of having to
182     // compile. Hopefully they know NOT to change these after writing data.
183     if (config.hasProperty("tsd.storage.uid.width.metric")) {
184       METRICS_WIDTH = config.getShort("tsd.storage.uid.width.metric");
185     }
186     if (config.hasProperty("tsd.storage.uid.width.tagk")) {
187       TAG_NAME_WIDTH = config.getShort("tsd.storage.uid.width.tagk");
188     }
189     if (config.hasProperty("tsd.storage.uid.width.tagv")) {
190       TAG_VALUE_WIDTH = config.getShort("tsd.storage.uid.width.tagv");
191     }
192     if (config.hasProperty("tsd.storage.max_tags")) {
193       Const.setMaxNumTags(config.getShort("tsd.storage.max_tags"));
194     }
195     if (config.hasProperty("tsd.storage.salt.buckets")) {
196       Const.setSaltBuckets(config.getInt("tsd.storage.salt.buckets"));
197     }
198     if (config.hasProperty("tsd.storage.salt.width")) {
199       Const.setSaltWidth(config.getInt("tsd.storage.salt.width"));
200     }
201 
202     table = config.getString("tsd.storage.hbase.data_table").getBytes(CHARSET);
203     uidtable = config.getString("tsd.storage.hbase.uid_table").getBytes(CHARSET);
204     treetable = config.getString("tsd.storage.hbase.tree_table").getBytes(CHARSET);
205     meta_table = config.getString("tsd.storage.hbase.meta_table").getBytes(CHARSET);
206 
207     if (config.getBoolean("tsd.core.uid.random_metrics")) {
208       metrics = new UniqueId(this, uidtable, METRICS_QUAL, METRICS_WIDTH, true);
209     } else {
210       metrics = new UniqueId(this, uidtable, METRICS_QUAL, METRICS_WIDTH, false);
211     }
212     tag_names = new UniqueId(this, uidtable, TAG_NAME_QUAL, TAG_NAME_WIDTH, false);
213     tag_values = new UniqueId(this, uidtable, TAG_VALUE_QUAL, TAG_VALUE_WIDTH, false);
214     compactionq = new CompactionQueue(this);
215 
216     if (config.hasProperty("tsd.core.timezone")) {
217       DateTime.setDefaultTimezone(config.getString("tsd.core.timezone"));
218     }
219 
220     timer = Threads.newTimer("TSDB Timer");
221 
222     QueryStats.setEnableDuplicates(
223         config.getBoolean("tsd.query.allow_simultaneous_duplicates"));
224 
225     if (config.getBoolean("tsd.core.preload_uid_cache")) {
226       final ByteMap<UniqueId> uid_cache_map = new ByteMap<UniqueId>();
227       uid_cache_map.put(METRICS_QUAL.getBytes(CHARSET), metrics);
228       uid_cache_map.put(TAG_NAME_QUAL.getBytes(CHARSET), tag_names);
229       uid_cache_map.put(TAG_VALUE_QUAL.getBytes(CHARSET), tag_values);
230       UniqueId.preloadUidCache(this, uid_cache_map);
231     }
232 
233     if (config.getString("tsd.core.tag.allow_specialchars") != null) {
234       Tags.setAllowSpecialChars(config.getString("tsd.core.tag.allow_specialchars"));
235     }
236 
237     // load up the functions that require the TSDB object
238     ExpressionFactory.addTSDBFunctions(this);
239 
240     // set any extra tags from the config for stats
241     StatsCollector.setGlobalTags(config);
242 
243     LOG.debug(config.dumpConfiguration());
244   }
245 
246   /**
247    * Constructor
248    * @param config An initialized configuration object
249    * @since 2.0
250    */
TSDB(final Config config)251   public TSDB(final Config config) {
252     this(null, config);
253   }
254 
255   /** @return The data point column family name */
FAMILY()256   public static byte[] FAMILY() {
257     return FAMILY;
258   }
259 
260   /**
261    * Called by initializePlugins, also used to load startup plugins.
262    * @since 2.3
263    */
loadPluginPath(final String plugin_path)264   public static void loadPluginPath(final String plugin_path) {
265     if (plugin_path != null && !plugin_path.isEmpty()) {
266       try {
267         PluginLoader.loadJARs(plugin_path);
268       } catch (Exception e) {
269         LOG.error("Error loading plugins from plugin path: " + plugin_path, e);
270         throw new RuntimeException("Error loading plugins from plugin path: " +
271                 plugin_path, e);
272       }
273     }
274   }
275 
276   /**
277    * Should be called immediately after construction to initialize plugins and
278    * objects that rely on such. It also moves most of the potential exception
279    * throwing code out of the constructor so TSDMain can shutdown clients and
280    * such properly.
281    * @param init_rpcs Whether or not to initialize RPC plugins as well
282    * @throws RuntimeException if the plugin path could not be processed
283    * @throws IllegalArgumentException if a plugin could not be initialized
284    * @since 2.0
285    */
initializePlugins(final boolean init_rpcs)286   public void initializePlugins(final boolean init_rpcs) {
287     final String plugin_path = config.getString("tsd.core.plugin_path");
288     loadPluginPath(plugin_path);
289 
290     try {
291       TagVFilter.initializeFilterMap(this);
292       // @#$@%$%#$ing typed exceptions
293     } catch (SecurityException e) {
294       throw new RuntimeException("Failed to instantiate filters", e);
295     } catch (IllegalArgumentException e) {
296       throw new RuntimeException("Failed to instantiate filters", e);
297     } catch (ClassNotFoundException e) {
298       throw new RuntimeException("Failed to instantiate filters", e);
299     } catch (NoSuchMethodException e) {
300       throw new RuntimeException("Failed to instantiate filters", e);
301     } catch (NoSuchFieldException e) {
302       throw new RuntimeException("Failed to instantiate filters", e);
303     } catch (IllegalAccessException e) {
304       throw new RuntimeException("Failed to instantiate filters", e);
305     } catch (InvocationTargetException e) {
306       throw new RuntimeException("Failed to instantiate filters", e);
307     }
308 
309     // load the search plugin if enabled
310     if (config.getBoolean("tsd.search.enable")) {
311       search = PluginLoader.loadSpecificPlugin(
312           config.getString("tsd.search.plugin"), SearchPlugin.class);
313       if (search == null) {
314         throw new IllegalArgumentException("Unable to locate search plugin: " +
315             config.getString("tsd.search.plugin"));
316       }
317       try {
318         search.initialize(this);
319       } catch (Exception e) {
320         throw new RuntimeException("Failed to initialize search plugin", e);
321       }
322       LOG.info("Successfully initialized search plugin [" +
323           search.getClass().getCanonicalName() + "] version: "
324           + search.version());
325     } else {
326       search = null;
327     }
328 
329     // load the real time publisher plugin if enabled
330     if (config.getBoolean("tsd.rtpublisher.enable")) {
331       rt_publisher = PluginLoader.loadSpecificPlugin(
332           config.getString("tsd.rtpublisher.plugin"), RTPublisher.class);
333       if (rt_publisher == null) {
334         throw new IllegalArgumentException(
335             "Unable to locate real time publisher plugin: " +
336             config.getString("tsd.rtpublisher.plugin"));
337       }
338       try {
339         rt_publisher.initialize(this);
340       } catch (Exception e) {
341         throw new RuntimeException(
342             "Failed to initialize real time publisher plugin", e);
343       }
344       LOG.info("Successfully initialized real time publisher plugin [" +
345           rt_publisher.getClass().getCanonicalName() + "] version: "
346           + rt_publisher.version());
347     } else {
348       rt_publisher = null;
349     }
350 
351     // load the meta cache plugin if enabled
352     if (config.getBoolean("tsd.core.meta.cache.enable")) {
353       meta_cache = PluginLoader.loadSpecificPlugin(
354           config.getString("tsd.core.meta.cache.plugin"), MetaDataCache.class);
355       if (meta_cache == null) {
356         throw new IllegalArgumentException(
357             "Unable to locate meta cache plugin: " +
358             config.getString("tsd.core.meta.cache.plugin"));
359       }
360       try {
361         meta_cache.initialize(this);
362       } catch (Exception e) {
363         throw new RuntimeException(
364             "Failed to initialize meta cache plugin", e);
365       }
366       LOG.info("Successfully initialized meta cache plugin [" +
367           meta_cache.getClass().getCanonicalName() + "] version: "
368           + meta_cache.version());
369     }
370 
371     // load the storage exception plugin if enabled
372     if (config.getBoolean("tsd.core.storage_exception_handler.enable")) {
373       storage_exception_handler = PluginLoader.loadSpecificPlugin(
374           config.getString("tsd.core.storage_exception_handler.plugin"),
375           StorageExceptionHandler.class);
376       if (storage_exception_handler == null) {
377         throw new IllegalArgumentException(
378             "Unable to locate storage exception handler plugin: " +
379             config.getString("tsd.core.storage_exception_handler.plugin"));
380       }
381       try {
382         storage_exception_handler.initialize(this);
383       } catch (Exception e) {
384         throw new RuntimeException(
385             "Failed to initialize storage exception handler plugin", e);
386       }
387       LOG.info("Successfully initialized storage exception handler plugin [" +
388           storage_exception_handler.getClass().getCanonicalName() + "] version: "
389           + storage_exception_handler.version());
390     }
391 
392     // Writeable Data Point Filter
393     if (config.getBoolean("tsd.timeseriesfilter.enable")) {
394       ts_filter = PluginLoader.loadSpecificPlugin(
395           config.getString("tsd.timeseriesfilter.plugin"),
396           WriteableDataPointFilterPlugin.class);
397       if (ts_filter == null) {
398         throw new IllegalArgumentException(
399             "Unable to locate time series filter plugin plugin: " +
400             config.getString("tsd.timeseriesfilter.plugin"));
401       }
402       try {
403         ts_filter.initialize(this);
404       } catch (Exception e) {
405         throw new RuntimeException(
406             "Failed to initialize time series filter plugin", e);
407       }
408       LOG.info("Successfully initialized time series filter plugin [" +
409           ts_filter.getClass().getCanonicalName() + "] version: "
410           + ts_filter.version());
411     }
412 
413     // UID Filter
414     if (config.getBoolean("tsd.uidfilter.enable")) {
415       uid_filter = PluginLoader.loadSpecificPlugin(
416           config.getString("tsd.uidfilter.plugin"),
417           UniqueIdFilterPlugin.class);
418       if (uid_filter == null) {
419         throw new IllegalArgumentException(
420             "Unable to locate UID filter plugin plugin: " +
421             config.getString("tsd.uidfilter.plugin"));
422       }
423       try {
424         uid_filter.initialize(this);
425       } catch (Exception e) {
426         throw new RuntimeException(
427             "Failed to initialize UID filter plugin", e);
428       }
429       LOG.info("Successfully initialized UID filter plugin [" +
430           uid_filter.getClass().getCanonicalName() + "] version: "
431           + uid_filter.version());
432     }
433   }
434 
435   /**
436    * Returns the configured HBase client
437    * @return The HBase client
438    * @since 2.0
439    */
getClient()440   public final HBaseClient getClient() {
441     return this.client;
442   }
443 
444   /**
445    * Sets the startup plugin so that it can be shutdown properly.
446    * Note that this method will not initialize or call any other methods
447    * belonging to the plugin's implementation.
448    * @param plugin The startup plugin that was used.
449    * @since 2.3
450    */
setStartupPlugin(final StartupPlugin plugin)451   public final void setStartupPlugin(final StartupPlugin plugin) {
452     startup = plugin;
453   }
454 
455   /**
456    * Getter that returns the startup plugin object.
457    * @return The StartupPlugin object or null if the plugin was not set.
458    * @since 2.3
459    */
getStartupPlugin()460   public final StartupPlugin getStartupPlugin() {
461     return startup;
462   }
463 
464   /**
465    * Getter that returns the configuration object
466    * @return The configuration object
467    * @since 2.0
468    */
getConfig()469   public final Config getConfig() {
470     return this.config;
471   }
472 
473   /**
474    * Returns the storage exception handler. May be null if not enabled
475    * @return The storage exception handler
476    * @since 2.2
477    */
getStorageExceptionHandler()478   public final StorageExceptionHandler getStorageExceptionHandler() {
479     return storage_exception_handler;
480   }
481 
482   /**
483    * @return the TS filter object, may be null
484    * @since 2.3
485    */
getTSfilter()486   public WriteableDataPointFilterPlugin getTSfilter() {
487     return ts_filter;
488   }
489 
490   /**
491    * @return The UID filter object, may be null.
492    * @since 2.3
493    */
getUidFilter()494   public UniqueIdFilterPlugin getUidFilter() {
495     return uid_filter;
496   }
497 
498   /**
499    * Attempts to find the name for a unique identifier given a type
500    * @param type The type of UID
501    * @param uid The UID to search for
502    * @return The name of the UID object if found
503    * @throws IllegalArgumentException if the type is not valid
504    * @throws NoSuchUniqueId if the UID was not found
505    * @since 2.0
506    */
getUidName(final UniqueIdType type, final byte[] uid)507   public Deferred<String> getUidName(final UniqueIdType type, final byte[] uid) {
508     if (uid == null) {
509       throw new IllegalArgumentException("Missing UID");
510     }
511 
512     switch (type) {
513       case METRIC:
514         return this.metrics.getNameAsync(uid);
515       case TAGK:
516         return this.tag_names.getNameAsync(uid);
517       case TAGV:
518         return this.tag_values.getNameAsync(uid);
519       default:
520         throw new IllegalArgumentException("Unrecognized UID type");
521     }
522   }
523 
524   /**
525    * Attempts to find the UID matching a given name
526    * @param type The type of UID
527    * @param name The name to search for
528    * @throws IllegalArgumentException if the type is not valid
529    * @throws NoSuchUniqueName if the name was not found
530    * @since 2.0
531    */
getUID(final UniqueIdType type, final String name)532   public byte[] getUID(final UniqueIdType type, final String name) {
533     try {
534       return getUIDAsync(type, name).join();
535     } catch (NoSuchUniqueName e) {
536       throw e;
537     } catch (IllegalArgumentException e) {
538       throw e;
539     } catch (Exception e) {
540       LOG.error("Unexpected exception", e);
541       throw new RuntimeException(e);
542     }
543   }
544 
545   /**
546    * Attempts to find the UID matching a given name
547    * @param type The type of UID
548    * @param name The name to search for
549    * @throws IllegalArgumentException if the type is not valid
550    * @throws NoSuchUniqueName if the name was not found
551    * @since 2.1
552    */
getUIDAsync(final UniqueIdType type, final String name)553   public Deferred<byte[]> getUIDAsync(final UniqueIdType type, final String name) {
554     if (name == null || name.isEmpty()) {
555       throw new IllegalArgumentException("Missing UID name");
556     }
557     switch (type) {
558       case METRIC:
559         return metrics.getIdAsync(name);
560       case TAGK:
561         return tag_names.getIdAsync(name);
562       case TAGV:
563         return tag_values.getIdAsync(name);
564       default:
565         throw new IllegalArgumentException("Unrecognized UID type");
566     }
567   }
568 
569   /**
570    * Verifies that the data and UID tables exist in HBase and optionally the
571    * tree and meta data tables if the user has enabled meta tracking or tree
572    * building
573    * @return An ArrayList of objects to wait for
574    * @throws TableNotFoundException
575    * @since 2.0
576    */
checkNecessaryTablesExist()577   public Deferred<ArrayList<Object>> checkNecessaryTablesExist() {
578     final ArrayList<Deferred<Object>> checks =
579       new ArrayList<Deferred<Object>>(2);
580     checks.add(client.ensureTableExists(
581         config.getString("tsd.storage.hbase.data_table")));
582     checks.add(client.ensureTableExists(
583         config.getString("tsd.storage.hbase.uid_table")));
584     if (config.enable_tree_processing()) {
585       checks.add(client.ensureTableExists(
586           config.getString("tsd.storage.hbase.tree_table")));
587     }
588     if (config.enable_realtime_ts() || config.enable_realtime_uid() ||
589         config.enable_tsuid_incrementing()) {
590       checks.add(client.ensureTableExists(
591           config.getString("tsd.storage.hbase.meta_table")));
592     }
593     return Deferred.group(checks);
594   }
595 
596   /** Number of cache hits during lookups involving UIDs. */
uidCacheHits()597   public int uidCacheHits() {
598     return (metrics.cacheHits() + tag_names.cacheHits()
599             + tag_values.cacheHits());
600   }
601 
602   /** Number of cache misses during lookups involving UIDs. */
uidCacheMisses()603   public int uidCacheMisses() {
604     return (metrics.cacheMisses() + tag_names.cacheMisses()
605             + tag_values.cacheMisses());
606   }
607 
608   /** Number of cache entries currently in RAM for lookups involving UIDs. */
uidCacheSize()609   public int uidCacheSize() {
610     return (metrics.cacheSize() + tag_names.cacheSize()
611             + tag_values.cacheSize());
612   }
613 
614   /**
615    * Collects the stats and metrics tracked by this instance.
616    * @param collector The collector to use.
617    */
collectStats(final StatsCollector collector)618   public void collectStats(final StatsCollector collector) {
619     final byte[][] kinds = {
620         METRICS_QUAL.getBytes(CHARSET),
621         TAG_NAME_QUAL.getBytes(CHARSET),
622         TAG_VALUE_QUAL.getBytes(CHARSET)
623       };
624     try {
625       final Map<String, Long> used_uids = UniqueId.getUsedUIDs(this, kinds)
626         .joinUninterruptibly();
627 
628       collectUidStats(metrics, collector);
629       if (config.getBoolean("tsd.core.uid.random_metrics")) {
630         collector.record("uid.ids-used", 0, "kind=" + METRICS_QUAL);
631         collector.record("uid.ids-available", 0, "kind=" + METRICS_QUAL);
632       } else {
633         collector.record("uid.ids-used", used_uids.get(METRICS_QUAL),
634             "kind=" + METRICS_QUAL);
635         collector.record("uid.ids-available",
636             (Internal.getMaxUnsignedValueOnBytes(metrics.width()) -
637                 used_uids.get(METRICS_QUAL)), "kind=" + METRICS_QUAL);
638       }
639 
640       collectUidStats(tag_names, collector);
641       collector.record("uid.ids-used", used_uids.get(TAG_NAME_QUAL),
642           "kind=" + TAG_NAME_QUAL);
643       collector.record("uid.ids-available",
644           (Internal.getMaxUnsignedValueOnBytes(tag_names.width()) -
645               used_uids.get(TAG_NAME_QUAL)),
646           "kind=" + TAG_NAME_QUAL);
647 
648       collectUidStats(tag_values, collector);
649       collector.record("uid.ids-used", used_uids.get(TAG_VALUE_QUAL),
650           "kind=" + TAG_VALUE_QUAL);
651       collector.record("uid.ids-available",
652           (Internal.getMaxUnsignedValueOnBytes(tag_values.width()) -
653               used_uids.get(TAG_VALUE_QUAL)), "kind=" + TAG_VALUE_QUAL);
654 
655     } catch (Exception e) {
656       throw new RuntimeException("Shouldn't be here", e);
657     }
658 
659     collector.record("uid.filter.rejected", rejected_dps.get(), "kind=raw");
660     collector.record("uid.filter.rejected", rejected_aggregate_dps.get(),
661         "kind=aggregate");
662 
663     {
664       final Runtime runtime = Runtime.getRuntime();
665       collector.record("jvm.ramfree", runtime.freeMemory());
666       collector.record("jvm.ramused", runtime.totalMemory());
667     }
668 
669     collector.addExtraTag("class", "IncomingDataPoints");
670     try {
671       collector.record("hbase.latency", IncomingDataPoints.putlatency, "method=put");
672     } finally {
673       collector.clearExtraTag("class");
674     }
675 
676     collector.addExtraTag("class", "TSDB");
677     try {
678       collector.record("datapoints.added", datapoints_added, "type=all");
679     } finally {
680       collector.clearExtraTag("class");
681     }
682 
683     collector.addExtraTag("class", "TsdbQuery");
684     try {
685       collector.record("hbase.latency", TsdbQuery.scanlatency, "method=scan");
686     } finally {
687       collector.clearExtraTag("class");
688     }
689     final ClientStats stats = client.stats();
690     collector.record("hbase.root_lookups", stats.rootLookups());
691     collector.record("hbase.meta_lookups",
692                      stats.uncontendedMetaLookups(), "type=uncontended");
693     collector.record("hbase.meta_lookups",
694                      stats.contendedMetaLookups(), "type=contended");
695     collector.record("hbase.rpcs",
696                      stats.atomicIncrements(), "type=increment");
697     collector.record("hbase.rpcs", stats.deletes(), "type=delete");
698     collector.record("hbase.rpcs", stats.gets(), "type=get");
699     collector.record("hbase.rpcs", stats.puts(), "type=put");
700     collector.record("hbase.rpcs", stats.appends(), "type=append");
701     collector.record("hbase.rpcs", stats.rowLocks(), "type=rowLock");
702     collector.record("hbase.rpcs", stats.scannersOpened(), "type=openScanner");
703     collector.record("hbase.rpcs", stats.scans(), "type=scan");
704     collector.record("hbase.rpcs.batched", stats.numBatchedRpcSent());
705     collector.record("hbase.flushes", stats.flushes());
706     collector.record("hbase.connections.created", stats.connectionsCreated());
707     collector.record("hbase.connections.idle_closed", stats.idleConnectionsClosed());
708     collector.record("hbase.nsre", stats.noSuchRegionExceptions());
709     collector.record("hbase.nsre.rpcs_delayed",
710                      stats.numRpcDelayedDueToNSRE());
711     collector.record("hbase.region_clients.open",
712         stats.regionClients());
713     collector.record("hbase.region_clients.idle_closed",
714         stats.idleConnectionsClosed());
715 
716     compactionq.collectStats(collector);
717     // Collect Stats from Plugins
718     if (startup != null) {
719       try {
720         collector.addExtraTag("plugin", "startup");
721         startup.collectStats(collector);
722       } finally {
723         collector.clearExtraTag("plugin");
724       }
725     }
726     if (rt_publisher != null) {
727       try {
728         collector.addExtraTag("plugin", "publish");
729         rt_publisher.collectStats(collector);
730       } finally {
731         collector.clearExtraTag("plugin");
732       }
733     }
734     if (search != null) {
735       try {
736         collector.addExtraTag("plugin", "search");
737         search.collectStats(collector);
738       } finally {
739         collector.clearExtraTag("plugin");
740       }
741     }
742     if (storage_exception_handler != null) {
743       try {
744         collector.addExtraTag("plugin", "storageExceptionHandler");
745         storage_exception_handler.collectStats(collector);
746       } finally {
747         collector.clearExtraTag("plugin");
748       }
749     }
750     if (ts_filter != null) {
751       try {
752         collector.addExtraTag("plugin", "timeseriesFilter");
753         ts_filter.collectStats(collector);
754       } finally {
755         collector.clearExtraTag("plugin");
756       }
757     }
758     if (uid_filter != null) {
759       try {
760         collector.addExtraTag("plugin", "uidFilter");
761         uid_filter.collectStats(collector);
762       } finally {
763         collector.clearExtraTag("plugin");
764       }
765     }
766   }
767 
768   /** Returns a latency histogram for Put RPCs used to store data points. */
getPutLatencyHistogram()769   public Histogram getPutLatencyHistogram() {
770     return IncomingDataPoints.putlatency;
771   }
772 
773   /** Returns a latency histogram for Scan RPCs used to fetch data points.  */
getScanLatencyHistogram()774   public Histogram getScanLatencyHistogram() {
775     return TsdbQuery.scanlatency;
776   }
777 
778   /**
779    * Collects the stats for a {@link UniqueId}.
780    * @param uid The instance from which to collect stats.
781    * @param collector The collector to use.
782    */
collectUidStats(final UniqueId uid, final StatsCollector collector)783   private static void collectUidStats(final UniqueId uid,
784                                       final StatsCollector collector) {
785     collector.record("uid.cache-hit", uid.cacheHits(), "kind=" + uid.kind());
786     collector.record("uid.cache-miss", uid.cacheMisses(), "kind=" + uid.kind());
787     collector.record("uid.cache-size", uid.cacheSize(), "kind=" + uid.kind());
788     collector.record("uid.random-collisions", uid.randomIdCollisions(),
789         "kind=" + uid.kind());
790     collector.record("uid.rejected-assignments", uid.rejectedAssignments(),
791         "kind=" + uid.kind());
792   }
793 
794   /** @return the width, in bytes, of metric UIDs */
metrics_width()795   public static short metrics_width() {
796     return METRICS_WIDTH;
797   }
798 
799   /** @return the width, in bytes, of tagk UIDs */
tagk_width()800   public static short tagk_width() {
801     return TAG_NAME_WIDTH;
802   }
803 
804   /** @return the width, in bytes, of tagv UIDs */
tagv_width()805   public static short tagv_width() {
806     return TAG_VALUE_WIDTH;
807   }
808 
809   /**
810    * Returns a new {@link Query} instance suitable for this TSDB.
811    */
newQuery()812   public Query newQuery() {
813     return new TsdbQuery(this);
814   }
815 
816   /**
817    * Returns a new {@link WritableDataPoints} instance suitable for this TSDB.
818    * <p>
819    * If you want to add a single data-point, consider using {@link #addPoint}
820    * instead.
821    */
newDataPoints()822   public WritableDataPoints newDataPoints() {
823     return new IncomingDataPoints(this);
824   }
825 
826   /**
827    * Returns a new {@link BatchedDataPoints} instance suitable for this TSDB.
828    *
829    * @param metric Every data point that gets appended must be associated to this metric.
830    * @param tags The associated tags for all data points being added.
831    * @return data structure which can have data points appended.
832    */
newBatch(String metric, Map<String, String> tags)833   public WritableDataPoints newBatch(String metric, Map<String, String> tags) {
834     return new BatchedDataPoints(this, metric, tags);
835   }
836 
837   /**
838    * Adds a single integer value data point in the TSDB.
839    * @param metric A non-empty string.
840    * @param timestamp The timestamp associated with the value.
841    * @param value The value of the data point.
842    * @param tags The tags on this series.  This map must be non-empty.
843    * @return A deferred object that indicates the completion of the request.
844    * The {@link Object} has not special meaning and can be {@code null} (think
845    * of it as {@code Deferred<Void>}). But you probably want to attach at
846    * least an errback to this {@code Deferred} to handle failures.
847    * @throws IllegalArgumentException if the timestamp is less than or equal
848    * to the previous timestamp added or 0 for the first timestamp, or if the
849    * difference with the previous timestamp is too large.
850    * @throws IllegalArgumentException if the metric name is empty or contains
851    * illegal characters.
852    * @throws IllegalArgumentException if the tags list is empty or one of the
853    * elements contains illegal characters.
854    * @throws HBaseException (deferred) if there was a problem while persisting
855    * data.
856    */
addPoint(final String metric, final long timestamp, final long value, final Map<String, String> tags)857   public Deferred<Object> addPoint(final String metric,
858                                    final long timestamp,
859                                    final long value,
860                                    final Map<String, String> tags) {
861     final byte[] v;
862     if (Byte.MIN_VALUE <= value && value <= Byte.MAX_VALUE) {
863       v = new byte[] { (byte) value };
864     } else if (Short.MIN_VALUE <= value && value <= Short.MAX_VALUE) {
865       v = Bytes.fromShort((short) value);
866     } else if (Integer.MIN_VALUE <= value && value <= Integer.MAX_VALUE) {
867       v = Bytes.fromInt((int) value);
868     } else {
869       v = Bytes.fromLong(value);
870     }
871 
872     final short flags = (short) (v.length - 1);  // Just the length.
873     return addPointInternal(metric, timestamp, v, tags, flags);
874   }
875 
876   /**
877    * Adds a double precision floating-point value data point in the TSDB.
878    * @param metric A non-empty string.
879    * @param timestamp The timestamp associated with the value.
880    * @param value The value of the data point.
881    * @param tags The tags on this series.  This map must be non-empty.
882    * @return A deferred object that indicates the completion of the request.
883    * The {@link Object} has not special meaning and can be {@code null} (think
884    * of it as {@code Deferred<Void>}). But you probably want to attach at
885    * least an errback to this {@code Deferred} to handle failures.
886    * @throws IllegalArgumentException if the timestamp is less than or equal
887    * to the previous timestamp added or 0 for the first timestamp, or if the
888    * difference with the previous timestamp is too large.
889    * @throws IllegalArgumentException if the metric name is empty or contains
890    * illegal characters.
891    * @throws IllegalArgumentException if the value is NaN or infinite.
892    * @throws IllegalArgumentException if the tags list is empty or one of the
893    * elements contains illegal characters.
894    * @throws HBaseException (deferred) if there was a problem while persisting
895    * data.
896    * @since 1.2
897    */
addPoint(final String metric, final long timestamp, final double value, final Map<String, String> tags)898   public Deferred<Object> addPoint(final String metric,
899                                    final long timestamp,
900                                    final double value,
901                                    final Map<String, String> tags) {
902     if (Double.isNaN(value) || Double.isInfinite(value)) {
903       throw new IllegalArgumentException("value is NaN or Infinite: " + value
904                                          + " for metric=" + metric
905                                          + " timestamp=" + timestamp);
906     }
907     final short flags = Const.FLAG_FLOAT | 0x7;  // A float stored on 8 bytes.
908     return addPointInternal(metric, timestamp,
909                             Bytes.fromLong(Double.doubleToRawLongBits(value)),
910                             tags, flags);
911   }
912 
913   /**
914    * Adds a single floating-point value data point in the TSDB.
915    * @param metric A non-empty string.
916    * @param timestamp The timestamp associated with the value.
917    * @param value The value of the data point.
918    * @param tags The tags on this series.  This map must be non-empty.
919    * @return A deferred object that indicates the completion of the request.
920    * The {@link Object} has not special meaning and can be {@code null} (think
921    * of it as {@code Deferred<Void>}). But you probably want to attach at
922    * least an errback to this {@code Deferred} to handle failures.
923    * @throws IllegalArgumentException if the timestamp is less than or equal
924    * to the previous timestamp added or 0 for the first timestamp, or if the
925    * difference with the previous timestamp is too large.
926    * @throws IllegalArgumentException if the metric name is empty or contains
927    * illegal characters.
928    * @throws IllegalArgumentException if the value is NaN or infinite.
929    * @throws IllegalArgumentException if the tags list is empty or one of the
930    * elements contains illegal characters.
931    * @throws HBaseException (deferred) if there was a problem while persisting
932    * data.
933    */
addPoint(final String metric, final long timestamp, final float value, final Map<String, String> tags)934   public Deferred<Object> addPoint(final String metric,
935                                    final long timestamp,
936                                    final float value,
937                                    final Map<String, String> tags) {
938     if (Float.isNaN(value) || Float.isInfinite(value)) {
939       throw new IllegalArgumentException("value is NaN or Infinite: " + value
940                                          + " for metric=" + metric
941                                          + " timestamp=" + timestamp);
942     }
943     final short flags = Const.FLAG_FLOAT | 0x3;  // A float stored on 4 bytes.
944     return addPointInternal(metric, timestamp,
945                             Bytes.fromInt(Float.floatToRawIntBits(value)),
946                             tags, flags);
947   }
948 
addPointInternal(final String metric, final long timestamp, final byte[] value, final Map<String, String> tags, final short flags)949   private Deferred<Object> addPointInternal(final String metric,
950                                             final long timestamp,
951                                             final byte[] value,
952                                             final Map<String, String> tags,
953                                             final short flags) {
954     // we only accept positive unix epoch timestamps in seconds or milliseconds
955     if (timestamp < 0 || ((timestamp & Const.SECOND_MASK) != 0 &&
956         timestamp > 9999999999999L)) {
957       throw new IllegalArgumentException((timestamp < 0 ? "negative " : "bad")
958           + " timestamp=" + timestamp
959           + " when trying to add value=" + Arrays.toString(value) + '/' + flags
960           + " to metric=" + metric + ", tags=" + tags);
961     }
962     IncomingDataPoints.checkMetricAndTags(metric, tags);
963     final byte[] row = IncomingDataPoints.rowKeyTemplate(this, metric, tags);
964     final long base_time;
965     final byte[] qualifier = Internal.buildQualifier(timestamp, flags);
966 
967     if ((timestamp & Const.SECOND_MASK) != 0) {
968       // drop the ms timestamp to seconds to calculate the base timestamp
969       base_time = ((timestamp / 1000) -
970           ((timestamp / 1000) % Const.MAX_TIMESPAN));
971     } else {
972       base_time = (timestamp - (timestamp % Const.MAX_TIMESPAN));
973     }
974 
975     /** Callback executed for chaining filter calls to see if the value
976      * should be written or not. */
977     final class WriteCB implements Callback<Deferred<Object>, Boolean> {
978       @Override
979       public Deferred<Object> call(final Boolean allowed) throws Exception {
980         if (!allowed) {
981           rejected_dps.incrementAndGet();
982           return Deferred.fromResult(null);
983         }
984 
985         Bytes.setInt(row, (int) base_time, metrics.width() + Const.SALT_WIDTH());
986         RowKey.prefixKeyWithSalt(row);
987 
988         Deferred<Object> result = null;
989         if (config.enable_appends()) {
990           final AppendDataPoints kv = new AppendDataPoints(qualifier, value);
991           final AppendRequest point = new AppendRequest(table, row, FAMILY,
992               AppendDataPoints.APPEND_COLUMN_QUALIFIER, kv.getBytes());
993           result = client.append(point);
994         } else {
995           scheduleForCompaction(row, (int) base_time);
996           final PutRequest point = new PutRequest(table, row, FAMILY, qualifier, value);
997           result = client.put(point);
998         }
999 
1000         // Count all added datapoints, not just those that came in through PUT rpc
1001         // Will there be others? Well, something could call addPoint programatically right?
1002         datapoints_added.incrementAndGet();
1003 
1004         // TODO(tsuna): Add a callback to time the latency of HBase and store the
1005         // timing in a moving Histogram (once we have a class for this).
1006 
1007         if (!config.enable_realtime_ts() && !config.enable_tsuid_incrementing() &&
1008             !config.enable_tsuid_tracking() && rt_publisher == null) {
1009           return result;
1010         }
1011 
1012         final byte[] tsuid = UniqueId.getTSUIDFromKey(row, METRICS_WIDTH,
1013             Const.TIMESTAMP_BYTES);
1014 
1015         // if the meta cache plugin is instantiated then tracking goes through it
1016         if (meta_cache != null) {
1017           meta_cache.increment(tsuid);
1018         } else {
1019           if (config.enable_tsuid_tracking()) {
1020             if (config.enable_realtime_ts()) {
1021               if (config.enable_tsuid_incrementing()) {
1022                 TSMeta.incrementAndGetCounter(TSDB.this, tsuid);
1023               } else {
1024                 TSMeta.storeIfNecessary(TSDB.this, tsuid);
1025               }
1026             } else {
1027               final PutRequest tracking = new PutRequest(meta_table, tsuid,
1028                   TSMeta.FAMILY(), TSMeta.COUNTER_QUALIFIER(), Bytes.fromLong(1));
1029               client.put(tracking);
1030             }
1031           }
1032         }
1033 
1034         if (rt_publisher != null) {
1035           rt_publisher.sinkDataPoint(metric, timestamp, value, tags, tsuid, flags);
1036         }
1037         return result;
1038       }
1039       @Override
1040       public String toString() {
1041         return "addPointInternal Write Callback";
1042       }
1043     }
1044 
1045     if (ts_filter != null && ts_filter.filterDataPoints()) {
1046       return ts_filter.allowDataPoint(metric, timestamp, value, tags, flags)
1047           .addCallbackDeferring(new WriteCB());
1048     }
1049     return Deferred.fromResult(true).addCallbackDeferring(new WriteCB());
1050   }
1051 
1052   /**
1053    * Forces a flush of any un-committed in memory data including left over
1054    * compactions.
1055    * <p>
1056    * For instance, any data point not persisted will be sent to HBase.
1057    * @return A {@link Deferred} that will be called once all the un-committed
1058    * data has been successfully and durably stored.  The value of the deferred
1059    * object return is meaningless and unspecified, and can be {@code null}.
1060    * @throws HBaseException (deferred) if there was a problem sending
1061    * un-committed data to HBase.  Please refer to the {@link HBaseException}
1062    * hierarchy to handle the possible failures.  Some of them are easily
1063    * recoverable by retrying, some are not.
1064    */
flush()1065   public Deferred<Object> flush() throws HBaseException {
1066     final class HClientFlush implements Callback<Object, ArrayList<Object>> {
1067       public Object call(final ArrayList<Object> args) {
1068         return client.flush();
1069       }
1070       public String toString() {
1071         return "flush HBase client";
1072       }
1073     }
1074 
1075     return config.enable_compactions() && compactionq != null
1076       ? compactionq.flush().addCallback(new HClientFlush())
1077       : client.flush();
1078   }
1079 
1080   /**
1081    * Gracefully shuts down this TSD instance.
1082    * <p>
1083    * The method must call {@code shutdown()} on all plugins as well as flush the
1084    * compaction queue.
1085    * @return A {@link Deferred} that will be called once all the un-committed
1086    * data has been successfully and durably stored, and all resources used by
1087    * this instance have been released.  The value of the deferred object
1088    * return is meaningless and unspecified, and can be {@code null}.
1089    * @throws HBaseException (deferred) if there was a problem sending
1090    * un-committed data to HBase.  Please refer to the {@link HBaseException}
1091    * hierarchy to handle the possible failures.  Some of them are easily
1092    * recoverable by retrying, some are not.
1093    */
shutdown()1094   public Deferred<Object> shutdown() {
1095     final ArrayList<Deferred<Object>> deferreds =
1096       new ArrayList<Deferred<Object>>();
1097 
1098     final class FinalShutdown implements Callback<Object, Object> {
1099       @Override
1100       public Object call(Object result) throws Exception {
1101         if (result instanceof Exception) {
1102           LOG.error("A previous shutdown failed", (Exception)result);
1103         }
1104         final Set<Timeout> timeouts = timer.stop();
1105         // TODO - at some point we should clean these up.
1106         if (timeouts.size() > 0) {
1107           LOG.warn("There were " + timeouts.size() + " timer tasks queued");
1108         }
1109         LOG.info("Completed shutting down the TSDB");
1110         return Deferred.fromResult(null);
1111       }
1112     }
1113 
1114     final class SEHShutdown implements Callback<Object, Object> {
1115       @Override
1116       public Object call(Object result) throws Exception {
1117         if (result instanceof Exception) {
1118           LOG.error("Shutdown of the HBase client failed", (Exception)result);
1119         }
1120         LOG.info("Shutting down storage exception handler plugin: " +
1121             storage_exception_handler.getClass().getCanonicalName());
1122         return storage_exception_handler.shutdown().addBoth(new FinalShutdown());
1123       }
1124       @Override
1125       public String toString() {
1126         return "SEHShutdown";
1127       }
1128     }
1129 
1130     final class HClientShutdown implements Callback<Deferred<Object>, ArrayList<Object>> {
1131       public Deferred<Object> call(final ArrayList<Object> args) {
1132         if (storage_exception_handler != null) {
1133           return client.shutdown().addBoth(new SEHShutdown());
1134         }
1135         return client.shutdown().addBoth(new FinalShutdown());
1136       }
1137       public String toString() {
1138         return "shutdown HBase client";
1139       }
1140     }
1141 
1142     final class ShutdownErrback implements Callback<Object, Exception> {
1143       public Object call(final Exception e) {
1144         final Logger LOG = LoggerFactory.getLogger(ShutdownErrback.class);
1145         if (e instanceof DeferredGroupException) {
1146           final DeferredGroupException ge = (DeferredGroupException) e;
1147           for (final Object r : ge.results()) {
1148             if (r instanceof Exception) {
1149               LOG.error("Failed to shutdown the TSD", (Exception) r);
1150             }
1151           }
1152         } else {
1153           LOG.error("Failed to shutdown the TSD", e);
1154         }
1155         return new HClientShutdown().call(null);
1156       }
1157       public String toString() {
1158         return "shutdown HBase client after error";
1159       }
1160     }
1161 
1162     final class CompactCB implements Callback<Object, ArrayList<Object>> {
1163       public Object call(ArrayList<Object> compactions) throws Exception {
1164         return null;
1165       }
1166     }
1167 
1168     if (config.enable_compactions()) {
1169       LOG.info("Flushing compaction queue");
1170       deferreds.add(compactionq.flush().addCallback(new CompactCB()));
1171     }
1172     if (startup != null) {
1173       LOG.info("Shutting down startup plugin: " +
1174               startup.getClass().getCanonicalName());
1175       deferreds.add(startup.shutdown());
1176     }
1177     if (search != null) {
1178       LOG.info("Shutting down search plugin: " +
1179           search.getClass().getCanonicalName());
1180       deferreds.add(search.shutdown());
1181     }
1182     if (rt_publisher != null) {
1183       LOG.info("Shutting down RT plugin: " +
1184           rt_publisher.getClass().getCanonicalName());
1185       deferreds.add(rt_publisher.shutdown());
1186     }
1187     if (meta_cache != null) {
1188       LOG.info("Shutting down meta cache plugin: " +
1189           meta_cache.getClass().getCanonicalName());
1190       deferreds.add(meta_cache.shutdown());
1191     }
1192     if (storage_exception_handler != null) {
1193       LOG.info("Shutting down storage exception handler plugin: " +
1194           storage_exception_handler.getClass().getCanonicalName());
1195       deferreds.add(storage_exception_handler.shutdown());
1196     }
1197     if (ts_filter != null) {
1198       LOG.info("Shutting down time series filter plugin: " +
1199           ts_filter.getClass().getCanonicalName());
1200       deferreds.add(ts_filter.shutdown());
1201     }
1202     if (uid_filter != null) {
1203       LOG.info("Shutting down UID filter plugin: " +
1204           uid_filter.getClass().getCanonicalName());
1205       deferreds.add(uid_filter.shutdown());
1206     }
1207 
1208     // wait for plugins to shutdown before we close the client
1209     return deferreds.size() > 0
1210       ? Deferred.group(deferreds).addCallbackDeferring(new HClientShutdown())
1211           .addErrback(new ShutdownErrback())
1212       : new HClientShutdown().call(null);
1213   }
1214 
1215   /**
1216    * Given a prefix search, returns a few matching metric names.
1217    * @param search A prefix to search.
1218    */
suggestMetrics(final String search)1219   public List<String> suggestMetrics(final String search) {
1220     return metrics.suggest(search);
1221   }
1222 
1223   /**
1224    * Given a prefix search, returns matching metric names.
1225    * @param search A prefix to search.
1226    * @param max_results Maximum number of results to return.
1227    * @since 2.0
1228    */
suggestMetrics(final String search, final int max_results)1229   public List<String> suggestMetrics(final String search,
1230       final int max_results) {
1231     return metrics.suggest(search, max_results);
1232   }
1233 
1234   /**
1235    * Given a prefix search, returns a few matching tag names.
1236    * @param search A prefix to search.
1237    */
suggestTagNames(final String search)1238   public List<String> suggestTagNames(final String search) {
1239     return tag_names.suggest(search);
1240   }
1241 
1242   /**
1243    * Given a prefix search, returns matching tagk names.
1244    * @param search A prefix to search.
1245    * @param max_results Maximum number of results to return.
1246    * @since 2.0
1247    */
suggestTagNames(final String search, final int max_results)1248   public List<String> suggestTagNames(final String search,
1249       final int max_results) {
1250     return tag_names.suggest(search, max_results);
1251   }
1252 
1253   /**
1254    * Given a prefix search, returns a few matching tag values.
1255    * @param search A prefix to search.
1256    */
suggestTagValues(final String search)1257   public List<String> suggestTagValues(final String search) {
1258     return tag_values.suggest(search);
1259   }
1260 
1261   /**
1262    * Given a prefix search, returns matching tag values.
1263    * @param search A prefix to search.
1264    * @param max_results Maximum number of results to return.
1265    * @since 2.0
1266    */
suggestTagValues(final String search, final int max_results)1267   public List<String> suggestTagValues(final String search,
1268       final int max_results) {
1269     return tag_values.suggest(search, max_results);
1270   }
1271 
1272   /**
1273    * Discards all in-memory caches.
1274    * @since 1.1
1275    */
dropCaches()1276   public void dropCaches() {
1277     metrics.dropCaches();
1278     tag_names.dropCaches();
1279     tag_values.dropCaches();
1280   }
1281 
1282   /**
1283    * Attempts to assign a UID to a name for the given type
1284    * Used by the UniqueIdRpc call to generate IDs for new metrics, tagks or
1285    * tagvs. The name must pass validation and if it's already assigned a UID,
1286    * this method will throw an error with the proper UID. Otherwise if it can
1287    * create the UID, it will be returned
1288    * @param type The type of uid to assign, metric, tagk or tagv
1289    * @param name The name of the uid object
1290    * @return A byte array with the UID if the assignment was successful
1291    * @throws IllegalArgumentException if the name is invalid or it already
1292    * exists
1293    * @since 2.0
1294    */
assignUid(final String type, final String name)1295   public byte[] assignUid(final String type, final String name) {
1296     Tags.validateString(type, name);
1297     if (type.toLowerCase().equals("metric")) {
1298       try {
1299         final byte[] uid = this.metrics.getId(name);
1300         throw new IllegalArgumentException("Name already exists with UID: " +
1301             UniqueId.uidToString(uid));
1302       } catch (NoSuchUniqueName nsue) {
1303         return this.metrics.getOrCreateId(name);
1304       }
1305     } else if (type.toLowerCase().equals("tagk")) {
1306       try {
1307         final byte[] uid = this.tag_names.getId(name);
1308         throw new IllegalArgumentException("Name already exists with UID: " +
1309             UniqueId.uidToString(uid));
1310       } catch (NoSuchUniqueName nsue) {
1311         return this.tag_names.getOrCreateId(name);
1312       }
1313     } else if (type.toLowerCase().equals("tagv")) {
1314       try {
1315         final byte[] uid = this.tag_values.getId(name);
1316         throw new IllegalArgumentException("Name already exists with UID: " +
1317             UniqueId.uidToString(uid));
1318       } catch (NoSuchUniqueName nsue) {
1319         return this.tag_values.getOrCreateId(name);
1320       }
1321     } else {
1322       LOG.warn("Unknown type name: " + type);
1323       throw new IllegalArgumentException("Unknown type name");
1324     }
1325   }
1326 
1327   /**
1328    * Attempts to delete the given UID name mapping from the storage table as
1329    * well as the local cache.
1330    * @param type The type of UID to delete. Must be "metrics", "tagk" or "tagv"
1331    * @param name The name of the UID to delete
1332    * @return A deferred to wait on for completion, or an exception if thrown
1333    * @throws IllegalArgumentException if the type is invalid
1334    * @since 2.2
1335    */
deleteUidAsync(final String type, final String name)1336   public Deferred<Object> deleteUidAsync(final String type, final String name) {
1337     final UniqueIdType uid_type = UniqueId.stringToUniqueIdType(type);
1338     switch (uid_type) {
1339     case METRIC:
1340       return metrics.deleteAsync(name);
1341     case TAGK:
1342       return tag_names.deleteAsync(name);
1343     case TAGV:
1344       return tag_values.deleteAsync(name);
1345     default:
1346       throw new IllegalArgumentException("Unrecognized UID type: " + uid_type);
1347     }
1348   }
1349 
1350   /**
1351    * Attempts to rename a UID from existing name to the given name
1352    * Used by the UniqueIdRpc call to rename name of existing metrics, tagks or
1353    * tagvs. The name must pass validation. If the UID doesn't exist, the method
1354    * will throw an error. Chained IllegalArgumentException is directly exposed
1355    * to caller. If the rename was successful, this method returns.
1356    * @param type The type of uid to rename, one of metric, tagk and tagv
1357    * @param oldname The existing name of the uid object
1358    * @param newname The new name to be used on the uid object
1359    * @throws IllegalArgumentException if error happened
1360    * @since 2.2
1361    */
renameUid(final String type, final String oldname, final String newname)1362   public void renameUid(final String type, final String oldname,
1363       final String newname) {
1364     Tags.validateString(type, oldname);
1365     Tags.validateString(type, newname);
1366     if (type.toLowerCase().equals("metric")) {
1367       try {
1368         this.metrics.getId(oldname);
1369         this.metrics.rename(oldname, newname);
1370       } catch (NoSuchUniqueName nsue) {
1371         throw new IllegalArgumentException("Name(\"" + oldname +
1372             "\") does not exist");
1373       }
1374     } else if (type.toLowerCase().equals("tagk")) {
1375       try {
1376         this.tag_names.getId(oldname);
1377         this.tag_names.rename(oldname, newname);
1378       } catch (NoSuchUniqueName nsue) {
1379         throw new IllegalArgumentException("Name(\"" + oldname +
1380             "\") does not exist");
1381       }
1382     } else if (type.toLowerCase().equals("tagv")) {
1383       try {
1384         this.tag_values.getId(oldname);
1385         this.tag_values.rename(oldname, newname);
1386       } catch (NoSuchUniqueName nsue) {
1387         throw new IllegalArgumentException("Name(\"" + oldname +
1388             "\") does not exist");
1389       }
1390     } else {
1391       LOG.warn("Unknown type name: " + type);
1392       throw new IllegalArgumentException("Unknown type name");
1393     }
1394   }
1395 
1396   /** @return the name of the UID table as a byte array for client requests */
uidTable()1397   public byte[] uidTable() {
1398     return this.uidtable;
1399   }
1400 
1401   /** @return the name of the data table as a byte array for client requests */
dataTable()1402   public byte[] dataTable() {
1403     return this.table;
1404   }
1405 
1406   /** @return the name of the tree table as a byte array for client requests */
treeTable()1407   public byte[] treeTable() {
1408     return this.treetable;
1409   }
1410 
1411   /** @return the name of the meta table as a byte array for client requests */
metaTable()1412   public byte[] metaTable() {
1413     return this.meta_table;
1414   }
1415 
1416   /**
1417    * Index the given timeseries meta object via the configured search plugin
1418    * @param meta The meta data object to index
1419    * @since 2.0
1420    */
indexTSMeta(final TSMeta meta)1421   public void indexTSMeta(final TSMeta meta) {
1422     if (search != null) {
1423       search.indexTSMeta(meta).addErrback(new PluginError());
1424     }
1425   }
1426 
1427   /**
1428    * Delete the timeseries meta object from the search index
1429    * @param tsuid The TSUID to delete
1430    * @since 2.0
1431    */
deleteTSMeta(final String tsuid)1432   public void deleteTSMeta(final String tsuid) {
1433     if (search != null) {
1434       search.deleteTSMeta(tsuid).addErrback(new PluginError());
1435     }
1436   }
1437 
1438   /**
1439    * Index the given UID meta object via the configured search plugin
1440    * @param meta The meta data object to index
1441    * @since 2.0
1442    */
indexUIDMeta(final UIDMeta meta)1443   public void indexUIDMeta(final UIDMeta meta) {
1444     if (search != null) {
1445       search.indexUIDMeta(meta).addErrback(new PluginError());
1446     }
1447   }
1448 
1449   /**
1450    * Delete the UID meta object from the search index
1451    * @param meta The UID meta object to delete
1452    * @since 2.0
1453    */
deleteUIDMeta(final UIDMeta meta)1454   public void deleteUIDMeta(final UIDMeta meta) {
1455     if (search != null) {
1456       search.deleteUIDMeta(meta).addErrback(new PluginError());
1457     }
1458   }
1459 
1460   /**
1461    * Index the given Annotation object via the configured search plugin
1462    * @param note The annotation object to index
1463    * @since 2.0
1464    */
indexAnnotation(final Annotation note)1465   public void indexAnnotation(final Annotation note) {
1466     if (search != null) {
1467       search.indexAnnotation(note).addErrback(new PluginError());
1468     }
1469     if( rt_publisher != null ) {
1470     	rt_publisher.publishAnnotation(note);
1471     }
1472   }
1473 
1474   /**
1475    * Delete the annotation object from the search index
1476    * @param note The annotation object to delete
1477    * @since 2.0
1478    */
deleteAnnotation(final Annotation note)1479   public void deleteAnnotation(final Annotation note) {
1480     if (search != null) {
1481       search.deleteAnnotation(note).addErrback(new PluginError());
1482     }
1483   }
1484 
1485   /**
1486    * Processes the TSMeta through all of the trees if configured to do so
1487    * @param meta The meta data to process
1488    * @since 2.0
1489    */
processTSMetaThroughTrees(final TSMeta meta)1490   public Deferred<Boolean> processTSMetaThroughTrees(final TSMeta meta) {
1491     if (config.enable_tree_processing()) {
1492       return TreeBuilder.processAllTrees(this, meta);
1493     }
1494     return Deferred.fromResult(false);
1495   }
1496 
1497   /**
1498    * Executes a search query using the search plugin
1499    * @param query The query to execute
1500    * @return A deferred object to wait on for the results to be fetched
1501    * @throws IllegalStateException if the search plugin has not been enabled or
1502    * configured
1503    * @since 2.0
1504    */
executeSearch(final SearchQuery query)1505   public Deferred<SearchQuery> executeSearch(final SearchQuery query) {
1506     if (search == null) {
1507       throw new IllegalStateException(
1508           "Searching has not been enabled on this TSD");
1509     }
1510 
1511     return search.executeQuery(query);
1512   }
1513 
1514   /**
1515    * Simply logs plugin errors when they're thrown by attaching as an errorback.
1516    * Without this, exceptions will just disappear (unless logged by the plugin)
1517    * since we don't wait for a result.
1518    */
1519   final class PluginError implements Callback<Object, Exception> {
1520     @Override
call(final Exception e)1521     public Object call(final Exception e) throws Exception {
1522       LOG.error("Exception from Search plugin indexer", e);
1523       return null;
1524     }
1525   }
1526 
1527   /**
1528    * Blocks while pre-fetching meta data from the data and uid tables
1529    * so that performance improves, particularly with a large number of
1530    * regions and region servers.
1531    * @since 2.2
1532    */
preFetchHBaseMeta()1533   public void preFetchHBaseMeta() {
1534     LOG.info("Pre-fetching meta data for all tables");
1535     final long start = System.currentTimeMillis();
1536     final ArrayList<Deferred<Object>> deferreds = new ArrayList<Deferred<Object>>();
1537     deferreds.add(client.prefetchMeta(table));
1538     deferreds.add(client.prefetchMeta(uidtable));
1539 
1540     // TODO(cl) - meta, tree, etc
1541 
1542     try {
1543       Deferred.group(deferreds).join();
1544       LOG.info("Fetched meta data for tables in " +
1545           (System.currentTimeMillis() - start) + "ms");
1546     } catch (InterruptedException e) {
1547       LOG.error("Interrupted", e);
1548       Thread.currentThread().interrupt();
1549       return;
1550     } catch (Exception e) {
1551       LOG.error("Failed to prefetch meta for our tables", e);
1552     }
1553   }
1554 
1555   /** @return the timer used for various house keeping functions */
getTimer()1556   public Timer getTimer() {
1557     return timer;
1558   }
1559 
1560   // ------------------ //
1561   // Compaction helpers //
1562   // ------------------ //
1563 
compact(final ArrayList<KeyValue> row, List<Annotation> annotations)1564   final KeyValue compact(final ArrayList<KeyValue> row,
1565       List<Annotation> annotations) {
1566     return compactionq.compact(row, annotations);
1567   }
1568 
1569   /**
1570    * Schedules the given row key for later re-compaction.
1571    * Once this row key has become "old enough", we'll read back all the data
1572    * points in that row, write them back to HBase in a more compact fashion,
1573    * and delete the individual data points.
1574    * @param row The row key to re-compact later.  Will not be modified.
1575    * @param base_time The 32-bit unsigned UNIX timestamp.
1576    */
scheduleForCompaction(final byte[] row, final int base_time)1577   final void scheduleForCompaction(final byte[] row, final int base_time) {
1578     if (config.enable_compactions()) {
1579       compactionq.add(row);
1580     }
1581   }
1582 
1583   // ------------------------ //
1584   // HBase operations helpers //
1585   // ------------------------ //
1586 
1587   /** Gets the entire given row from the data table. */
get(final byte[] key)1588   final Deferred<ArrayList<KeyValue>> get(final byte[] key) {
1589     return client.get(new GetRequest(table, key));
1590   }
1591 
1592   /** Puts the given value into the data table. */
put(final byte[] key, final byte[] qualifier, final byte[] value)1593   final Deferred<Object> put(final byte[] key,
1594                              final byte[] qualifier,
1595                              final byte[] value) {
1596     return client.put(new PutRequest(table, key, FAMILY, qualifier, value));
1597   }
1598 
1599   /** Deletes the given cells from the data table. */
delete(final byte[] key, final byte[][] qualifiers)1600   final Deferred<Object> delete(final byte[] key, final byte[][] qualifiers) {
1601     return client.delete(new DeleteRequest(table, key, FAMILY, qualifiers));
1602   }
1603 
1604 }
1605