1 // This file is part of OpenTSDB. 2 // Copyright (C) 2010-2012 The OpenTSDB Authors. 3 // 4 // This program is free software: you can redistribute it and/or modify it 5 // under the terms of the GNU Lesser General Public License as published by 6 // the Free Software Foundation, either version 2.1 of the License, or (at your 7 // option) any later version. This program is distributed in the hope that it 8 // will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty 9 // of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser 10 // General Public License for more details. You should have received a copy 11 // of the GNU Lesser General Public License along with this program. If not, 12 // see <http://www.gnu.org/licenses/>. 13 package net.opentsdb.core; 14 15 import java.io.IOException; 16 import java.lang.reflect.InvocationTargetException; 17 import java.nio.charset.Charset; 18 import java.util.ArrayList; 19 import java.util.Arrays; 20 import java.util.List; 21 import java.util.Map; 22 import java.util.Set; 23 import java.util.concurrent.atomic.AtomicLong; 24 25 import com.stumbleupon.async.Callback; 26 import com.stumbleupon.async.Deferred; 27 import com.stumbleupon.async.DeferredGroupException; 28 29 import org.slf4j.Logger; 30 import org.slf4j.LoggerFactory; 31 import org.hbase.async.AppendRequest; 32 import org.hbase.async.Bytes; 33 import org.hbase.async.Bytes.ByteMap; 34 import org.hbase.async.ClientStats; 35 import org.hbase.async.DeleteRequest; 36 import org.hbase.async.GetRequest; 37 import org.hbase.async.HBaseClient; 38 import org.hbase.async.HBaseException; 39 import org.hbase.async.KeyValue; 40 import org.hbase.async.PutRequest; 41 import org.jboss.netty.util.HashedWheelTimer; 42 import org.jboss.netty.util.Timeout; 43 import org.jboss.netty.util.Timer; 44 45 import net.opentsdb.tree.TreeBuilder; 46 import net.opentsdb.tsd.RTPublisher; 47 import net.opentsdb.tsd.StorageExceptionHandler; 48 import net.opentsdb.uid.NoSuchUniqueId; 49 import net.opentsdb.uid.NoSuchUniqueName; 50 import net.opentsdb.uid.UniqueId; 51 import net.opentsdb.uid.UniqueIdFilterPlugin; 52 import net.opentsdb.uid.UniqueId.UniqueIdType; 53 import net.opentsdb.utils.Config; 54 import net.opentsdb.utils.DateTime; 55 import net.opentsdb.utils.PluginLoader; 56 import net.opentsdb.utils.Threads; 57 import net.opentsdb.meta.Annotation; 58 import net.opentsdb.meta.MetaDataCache; 59 import net.opentsdb.meta.TSMeta; 60 import net.opentsdb.meta.UIDMeta; 61 import net.opentsdb.query.expression.ExpressionFactory; 62 import net.opentsdb.query.filter.TagVFilter; 63 import net.opentsdb.search.SearchPlugin; 64 import net.opentsdb.search.SearchQuery; 65 import net.opentsdb.tools.StartupPlugin; 66 import net.opentsdb.stats.Histogram; 67 import net.opentsdb.stats.QueryStats; 68 import net.opentsdb.stats.StatsCollector; 69 70 /** 71 * Thread-safe implementation of the TSDB client. 72 * <p> 73 * This class is the central class of OpenTSDB. You use it to add new data 74 * points or query the database. 75 */ 76 public final class TSDB { 77 private static final Logger LOG = LoggerFactory.getLogger(TSDB.class); 78 79 static final byte[] FAMILY = { 't' }; 80 81 /** Charset used to convert Strings to byte arrays and back. */ 82 private static final Charset CHARSET = Charset.forName("ISO-8859-1"); 83 private static final String METRICS_QUAL = "metrics"; 84 private static short METRICS_WIDTH = 3; 85 private static final String TAG_NAME_QUAL = "tagk"; 86 private static short TAG_NAME_WIDTH = 3; 87 private static final String TAG_VALUE_QUAL = "tagv"; 88 private static short TAG_VALUE_WIDTH = 3; 89 90 /** Client for the HBase cluster to use. */ 91 final HBaseClient client; 92 93 /** Name of the table in which timeseries are stored. */ 94 final byte[] table; 95 /** Name of the table in which UID information is stored. */ 96 final byte[] uidtable; 97 /** Name of the table where tree data is stored. */ 98 final byte[] treetable; 99 /** Name of the table where meta data is stored. */ 100 final byte[] meta_table; 101 102 /** Unique IDs for the metric names. */ 103 final UniqueId metrics; 104 /** Unique IDs for the tag names. */ 105 final UniqueId tag_names; 106 /** Unique IDs for the tag values. */ 107 final UniqueId tag_values; 108 109 /** Configuration object for all TSDB components */ 110 final Config config; 111 112 /** Timer used for various tasks such as idle timeouts or query timeouts */ 113 private final HashedWheelTimer timer; 114 115 /** 116 * Row keys that need to be compacted. 117 * Whenever we write a new data point to a row, we add the row key to this 118 * set. Every once in a while, the compaction thread will go through old 119 * row keys and will read re-compact them. 120 */ 121 private final CompactionQueue compactionq; 122 123 /** Search indexer to use if configure */ 124 private SearchPlugin search = null; 125 126 /** Optional Startup Plugin to use if configured */ 127 private StartupPlugin startup = null; 128 129 /** Optional real time pulblisher plugin to use if configured */ 130 private RTPublisher rt_publisher = null; 131 132 /** Optional plugin for handling meta data caching and updating */ 133 private MetaDataCache meta_cache = null; 134 135 /** Plugin for dealing with data points that can't be stored */ 136 private StorageExceptionHandler storage_exception_handler = null; 137 138 /** A filter plugin for allowing or blocking time series */ 139 private WriteableDataPointFilterPlugin ts_filter; 140 141 /** A filter plugin for allowing or blocking UIDs */ 142 private UniqueIdFilterPlugin uid_filter; 143 144 /** Writes rejected by the filter */ 145 private final AtomicLong rejected_dps = new AtomicLong(); 146 private final AtomicLong rejected_aggregate_dps = new AtomicLong(); 147 148 /** Datapoints Added */ 149 private static final AtomicLong datapoints_added = new AtomicLong(); 150 151 /** 152 * Constructor 153 * @param client An initialized HBase client object 154 * @param config An initialized configuration object 155 * @since 2.1 156 */ TSDB(final HBaseClient client, final Config config)157 public TSDB(final HBaseClient client, final Config config) { 158 this.config = config; 159 if (client == null) { 160 final org.hbase.async.Config async_config; 161 if (config.configLocation() != null && !config.configLocation().isEmpty()) { 162 try { 163 async_config = new org.hbase.async.Config(config.configLocation()); 164 } catch (final IOException e) { 165 throw new RuntimeException("Failed to read the config file: " + 166 config.configLocation(), e); 167 } 168 } else { 169 async_config = new org.hbase.async.Config(); 170 } 171 async_config.overrideConfig("hbase.zookeeper.znode.parent", 172 config.getString("tsd.storage.hbase.zk_basedir")); 173 async_config.overrideConfig("hbase.zookeeper.quorum", 174 config.getString("tsd.storage.hbase.zk_quorum")); 175 this.client = new HBaseClient(async_config); 176 } else { 177 this.client = client; 178 } 179 180 // SALT AND UID WIDTHS 181 // Users really wanted this to be set via config instead of having to 182 // compile. Hopefully they know NOT to change these after writing data. 183 if (config.hasProperty("tsd.storage.uid.width.metric")) { 184 METRICS_WIDTH = config.getShort("tsd.storage.uid.width.metric"); 185 } 186 if (config.hasProperty("tsd.storage.uid.width.tagk")) { 187 TAG_NAME_WIDTH = config.getShort("tsd.storage.uid.width.tagk"); 188 } 189 if (config.hasProperty("tsd.storage.uid.width.tagv")) { 190 TAG_VALUE_WIDTH = config.getShort("tsd.storage.uid.width.tagv"); 191 } 192 if (config.hasProperty("tsd.storage.max_tags")) { 193 Const.setMaxNumTags(config.getShort("tsd.storage.max_tags")); 194 } 195 if (config.hasProperty("tsd.storage.salt.buckets")) { 196 Const.setSaltBuckets(config.getInt("tsd.storage.salt.buckets")); 197 } 198 if (config.hasProperty("tsd.storage.salt.width")) { 199 Const.setSaltWidth(config.getInt("tsd.storage.salt.width")); 200 } 201 202 table = config.getString("tsd.storage.hbase.data_table").getBytes(CHARSET); 203 uidtable = config.getString("tsd.storage.hbase.uid_table").getBytes(CHARSET); 204 treetable = config.getString("tsd.storage.hbase.tree_table").getBytes(CHARSET); 205 meta_table = config.getString("tsd.storage.hbase.meta_table").getBytes(CHARSET); 206 207 if (config.getBoolean("tsd.core.uid.random_metrics")) { 208 metrics = new UniqueId(this, uidtable, METRICS_QUAL, METRICS_WIDTH, true); 209 } else { 210 metrics = new UniqueId(this, uidtable, METRICS_QUAL, METRICS_WIDTH, false); 211 } 212 tag_names = new UniqueId(this, uidtable, TAG_NAME_QUAL, TAG_NAME_WIDTH, false); 213 tag_values = new UniqueId(this, uidtable, TAG_VALUE_QUAL, TAG_VALUE_WIDTH, false); 214 compactionq = new CompactionQueue(this); 215 216 if (config.hasProperty("tsd.core.timezone")) { 217 DateTime.setDefaultTimezone(config.getString("tsd.core.timezone")); 218 } 219 220 timer = Threads.newTimer("TSDB Timer"); 221 222 QueryStats.setEnableDuplicates( 223 config.getBoolean("tsd.query.allow_simultaneous_duplicates")); 224 225 if (config.getBoolean("tsd.core.preload_uid_cache")) { 226 final ByteMap<UniqueId> uid_cache_map = new ByteMap<UniqueId>(); 227 uid_cache_map.put(METRICS_QUAL.getBytes(CHARSET), metrics); 228 uid_cache_map.put(TAG_NAME_QUAL.getBytes(CHARSET), tag_names); 229 uid_cache_map.put(TAG_VALUE_QUAL.getBytes(CHARSET), tag_values); 230 UniqueId.preloadUidCache(this, uid_cache_map); 231 } 232 233 if (config.getString("tsd.core.tag.allow_specialchars") != null) { 234 Tags.setAllowSpecialChars(config.getString("tsd.core.tag.allow_specialchars")); 235 } 236 237 // load up the functions that require the TSDB object 238 ExpressionFactory.addTSDBFunctions(this); 239 240 // set any extra tags from the config for stats 241 StatsCollector.setGlobalTags(config); 242 243 LOG.debug(config.dumpConfiguration()); 244 } 245 246 /** 247 * Constructor 248 * @param config An initialized configuration object 249 * @since 2.0 250 */ TSDB(final Config config)251 public TSDB(final Config config) { 252 this(null, config); 253 } 254 255 /** @return The data point column family name */ FAMILY()256 public static byte[] FAMILY() { 257 return FAMILY; 258 } 259 260 /** 261 * Called by initializePlugins, also used to load startup plugins. 262 * @since 2.3 263 */ loadPluginPath(final String plugin_path)264 public static void loadPluginPath(final String plugin_path) { 265 if (plugin_path != null && !plugin_path.isEmpty()) { 266 try { 267 PluginLoader.loadJARs(plugin_path); 268 } catch (Exception e) { 269 LOG.error("Error loading plugins from plugin path: " + plugin_path, e); 270 throw new RuntimeException("Error loading plugins from plugin path: " + 271 plugin_path, e); 272 } 273 } 274 } 275 276 /** 277 * Should be called immediately after construction to initialize plugins and 278 * objects that rely on such. It also moves most of the potential exception 279 * throwing code out of the constructor so TSDMain can shutdown clients and 280 * such properly. 281 * @param init_rpcs Whether or not to initialize RPC plugins as well 282 * @throws RuntimeException if the plugin path could not be processed 283 * @throws IllegalArgumentException if a plugin could not be initialized 284 * @since 2.0 285 */ initializePlugins(final boolean init_rpcs)286 public void initializePlugins(final boolean init_rpcs) { 287 final String plugin_path = config.getString("tsd.core.plugin_path"); 288 loadPluginPath(plugin_path); 289 290 try { 291 TagVFilter.initializeFilterMap(this); 292 // @#$@%$%#$ing typed exceptions 293 } catch (SecurityException e) { 294 throw new RuntimeException("Failed to instantiate filters", e); 295 } catch (IllegalArgumentException e) { 296 throw new RuntimeException("Failed to instantiate filters", e); 297 } catch (ClassNotFoundException e) { 298 throw new RuntimeException("Failed to instantiate filters", e); 299 } catch (NoSuchMethodException e) { 300 throw new RuntimeException("Failed to instantiate filters", e); 301 } catch (NoSuchFieldException e) { 302 throw new RuntimeException("Failed to instantiate filters", e); 303 } catch (IllegalAccessException e) { 304 throw new RuntimeException("Failed to instantiate filters", e); 305 } catch (InvocationTargetException e) { 306 throw new RuntimeException("Failed to instantiate filters", e); 307 } 308 309 // load the search plugin if enabled 310 if (config.getBoolean("tsd.search.enable")) { 311 search = PluginLoader.loadSpecificPlugin( 312 config.getString("tsd.search.plugin"), SearchPlugin.class); 313 if (search == null) { 314 throw new IllegalArgumentException("Unable to locate search plugin: " + 315 config.getString("tsd.search.plugin")); 316 } 317 try { 318 search.initialize(this); 319 } catch (Exception e) { 320 throw new RuntimeException("Failed to initialize search plugin", e); 321 } 322 LOG.info("Successfully initialized search plugin [" + 323 search.getClass().getCanonicalName() + "] version: " 324 + search.version()); 325 } else { 326 search = null; 327 } 328 329 // load the real time publisher plugin if enabled 330 if (config.getBoolean("tsd.rtpublisher.enable")) { 331 rt_publisher = PluginLoader.loadSpecificPlugin( 332 config.getString("tsd.rtpublisher.plugin"), RTPublisher.class); 333 if (rt_publisher == null) { 334 throw new IllegalArgumentException( 335 "Unable to locate real time publisher plugin: " + 336 config.getString("tsd.rtpublisher.plugin")); 337 } 338 try { 339 rt_publisher.initialize(this); 340 } catch (Exception e) { 341 throw new RuntimeException( 342 "Failed to initialize real time publisher plugin", e); 343 } 344 LOG.info("Successfully initialized real time publisher plugin [" + 345 rt_publisher.getClass().getCanonicalName() + "] version: " 346 + rt_publisher.version()); 347 } else { 348 rt_publisher = null; 349 } 350 351 // load the meta cache plugin if enabled 352 if (config.getBoolean("tsd.core.meta.cache.enable")) { 353 meta_cache = PluginLoader.loadSpecificPlugin( 354 config.getString("tsd.core.meta.cache.plugin"), MetaDataCache.class); 355 if (meta_cache == null) { 356 throw new IllegalArgumentException( 357 "Unable to locate meta cache plugin: " + 358 config.getString("tsd.core.meta.cache.plugin")); 359 } 360 try { 361 meta_cache.initialize(this); 362 } catch (Exception e) { 363 throw new RuntimeException( 364 "Failed to initialize meta cache plugin", e); 365 } 366 LOG.info("Successfully initialized meta cache plugin [" + 367 meta_cache.getClass().getCanonicalName() + "] version: " 368 + meta_cache.version()); 369 } 370 371 // load the storage exception plugin if enabled 372 if (config.getBoolean("tsd.core.storage_exception_handler.enable")) { 373 storage_exception_handler = PluginLoader.loadSpecificPlugin( 374 config.getString("tsd.core.storage_exception_handler.plugin"), 375 StorageExceptionHandler.class); 376 if (storage_exception_handler == null) { 377 throw new IllegalArgumentException( 378 "Unable to locate storage exception handler plugin: " + 379 config.getString("tsd.core.storage_exception_handler.plugin")); 380 } 381 try { 382 storage_exception_handler.initialize(this); 383 } catch (Exception e) { 384 throw new RuntimeException( 385 "Failed to initialize storage exception handler plugin", e); 386 } 387 LOG.info("Successfully initialized storage exception handler plugin [" + 388 storage_exception_handler.getClass().getCanonicalName() + "] version: " 389 + storage_exception_handler.version()); 390 } 391 392 // Writeable Data Point Filter 393 if (config.getBoolean("tsd.timeseriesfilter.enable")) { 394 ts_filter = PluginLoader.loadSpecificPlugin( 395 config.getString("tsd.timeseriesfilter.plugin"), 396 WriteableDataPointFilterPlugin.class); 397 if (ts_filter == null) { 398 throw new IllegalArgumentException( 399 "Unable to locate time series filter plugin plugin: " + 400 config.getString("tsd.timeseriesfilter.plugin")); 401 } 402 try { 403 ts_filter.initialize(this); 404 } catch (Exception e) { 405 throw new RuntimeException( 406 "Failed to initialize time series filter plugin", e); 407 } 408 LOG.info("Successfully initialized time series filter plugin [" + 409 ts_filter.getClass().getCanonicalName() + "] version: " 410 + ts_filter.version()); 411 } 412 413 // UID Filter 414 if (config.getBoolean("tsd.uidfilter.enable")) { 415 uid_filter = PluginLoader.loadSpecificPlugin( 416 config.getString("tsd.uidfilter.plugin"), 417 UniqueIdFilterPlugin.class); 418 if (uid_filter == null) { 419 throw new IllegalArgumentException( 420 "Unable to locate UID filter plugin plugin: " + 421 config.getString("tsd.uidfilter.plugin")); 422 } 423 try { 424 uid_filter.initialize(this); 425 } catch (Exception e) { 426 throw new RuntimeException( 427 "Failed to initialize UID filter plugin", e); 428 } 429 LOG.info("Successfully initialized UID filter plugin [" + 430 uid_filter.getClass().getCanonicalName() + "] version: " 431 + uid_filter.version()); 432 } 433 } 434 435 /** 436 * Returns the configured HBase client 437 * @return The HBase client 438 * @since 2.0 439 */ getClient()440 public final HBaseClient getClient() { 441 return this.client; 442 } 443 444 /** 445 * Sets the startup plugin so that it can be shutdown properly. 446 * Note that this method will not initialize or call any other methods 447 * belonging to the plugin's implementation. 448 * @param plugin The startup plugin that was used. 449 * @since 2.3 450 */ setStartupPlugin(final StartupPlugin plugin)451 public final void setStartupPlugin(final StartupPlugin plugin) { 452 startup = plugin; 453 } 454 455 /** 456 * Getter that returns the startup plugin object. 457 * @return The StartupPlugin object or null if the plugin was not set. 458 * @since 2.3 459 */ getStartupPlugin()460 public final StartupPlugin getStartupPlugin() { 461 return startup; 462 } 463 464 /** 465 * Getter that returns the configuration object 466 * @return The configuration object 467 * @since 2.0 468 */ getConfig()469 public final Config getConfig() { 470 return this.config; 471 } 472 473 /** 474 * Returns the storage exception handler. May be null if not enabled 475 * @return The storage exception handler 476 * @since 2.2 477 */ getStorageExceptionHandler()478 public final StorageExceptionHandler getStorageExceptionHandler() { 479 return storage_exception_handler; 480 } 481 482 /** 483 * @return the TS filter object, may be null 484 * @since 2.3 485 */ getTSfilter()486 public WriteableDataPointFilterPlugin getTSfilter() { 487 return ts_filter; 488 } 489 490 /** 491 * @return The UID filter object, may be null. 492 * @since 2.3 493 */ getUidFilter()494 public UniqueIdFilterPlugin getUidFilter() { 495 return uid_filter; 496 } 497 498 /** 499 * Attempts to find the name for a unique identifier given a type 500 * @param type The type of UID 501 * @param uid The UID to search for 502 * @return The name of the UID object if found 503 * @throws IllegalArgumentException if the type is not valid 504 * @throws NoSuchUniqueId if the UID was not found 505 * @since 2.0 506 */ getUidName(final UniqueIdType type, final byte[] uid)507 public Deferred<String> getUidName(final UniqueIdType type, final byte[] uid) { 508 if (uid == null) { 509 throw new IllegalArgumentException("Missing UID"); 510 } 511 512 switch (type) { 513 case METRIC: 514 return this.metrics.getNameAsync(uid); 515 case TAGK: 516 return this.tag_names.getNameAsync(uid); 517 case TAGV: 518 return this.tag_values.getNameAsync(uid); 519 default: 520 throw new IllegalArgumentException("Unrecognized UID type"); 521 } 522 } 523 524 /** 525 * Attempts to find the UID matching a given name 526 * @param type The type of UID 527 * @param name The name to search for 528 * @throws IllegalArgumentException if the type is not valid 529 * @throws NoSuchUniqueName if the name was not found 530 * @since 2.0 531 */ getUID(final UniqueIdType type, final String name)532 public byte[] getUID(final UniqueIdType type, final String name) { 533 try { 534 return getUIDAsync(type, name).join(); 535 } catch (NoSuchUniqueName e) { 536 throw e; 537 } catch (IllegalArgumentException e) { 538 throw e; 539 } catch (Exception e) { 540 LOG.error("Unexpected exception", e); 541 throw new RuntimeException(e); 542 } 543 } 544 545 /** 546 * Attempts to find the UID matching a given name 547 * @param type The type of UID 548 * @param name The name to search for 549 * @throws IllegalArgumentException if the type is not valid 550 * @throws NoSuchUniqueName if the name was not found 551 * @since 2.1 552 */ getUIDAsync(final UniqueIdType type, final String name)553 public Deferred<byte[]> getUIDAsync(final UniqueIdType type, final String name) { 554 if (name == null || name.isEmpty()) { 555 throw new IllegalArgumentException("Missing UID name"); 556 } 557 switch (type) { 558 case METRIC: 559 return metrics.getIdAsync(name); 560 case TAGK: 561 return tag_names.getIdAsync(name); 562 case TAGV: 563 return tag_values.getIdAsync(name); 564 default: 565 throw new IllegalArgumentException("Unrecognized UID type"); 566 } 567 } 568 569 /** 570 * Verifies that the data and UID tables exist in HBase and optionally the 571 * tree and meta data tables if the user has enabled meta tracking or tree 572 * building 573 * @return An ArrayList of objects to wait for 574 * @throws TableNotFoundException 575 * @since 2.0 576 */ checkNecessaryTablesExist()577 public Deferred<ArrayList<Object>> checkNecessaryTablesExist() { 578 final ArrayList<Deferred<Object>> checks = 579 new ArrayList<Deferred<Object>>(2); 580 checks.add(client.ensureTableExists( 581 config.getString("tsd.storage.hbase.data_table"))); 582 checks.add(client.ensureTableExists( 583 config.getString("tsd.storage.hbase.uid_table"))); 584 if (config.enable_tree_processing()) { 585 checks.add(client.ensureTableExists( 586 config.getString("tsd.storage.hbase.tree_table"))); 587 } 588 if (config.enable_realtime_ts() || config.enable_realtime_uid() || 589 config.enable_tsuid_incrementing()) { 590 checks.add(client.ensureTableExists( 591 config.getString("tsd.storage.hbase.meta_table"))); 592 } 593 return Deferred.group(checks); 594 } 595 596 /** Number of cache hits during lookups involving UIDs. */ uidCacheHits()597 public int uidCacheHits() { 598 return (metrics.cacheHits() + tag_names.cacheHits() 599 + tag_values.cacheHits()); 600 } 601 602 /** Number of cache misses during lookups involving UIDs. */ uidCacheMisses()603 public int uidCacheMisses() { 604 return (metrics.cacheMisses() + tag_names.cacheMisses() 605 + tag_values.cacheMisses()); 606 } 607 608 /** Number of cache entries currently in RAM for lookups involving UIDs. */ uidCacheSize()609 public int uidCacheSize() { 610 return (metrics.cacheSize() + tag_names.cacheSize() 611 + tag_values.cacheSize()); 612 } 613 614 /** 615 * Collects the stats and metrics tracked by this instance. 616 * @param collector The collector to use. 617 */ collectStats(final StatsCollector collector)618 public void collectStats(final StatsCollector collector) { 619 final byte[][] kinds = { 620 METRICS_QUAL.getBytes(CHARSET), 621 TAG_NAME_QUAL.getBytes(CHARSET), 622 TAG_VALUE_QUAL.getBytes(CHARSET) 623 }; 624 try { 625 final Map<String, Long> used_uids = UniqueId.getUsedUIDs(this, kinds) 626 .joinUninterruptibly(); 627 628 collectUidStats(metrics, collector); 629 if (config.getBoolean("tsd.core.uid.random_metrics")) { 630 collector.record("uid.ids-used", 0, "kind=" + METRICS_QUAL); 631 collector.record("uid.ids-available", 0, "kind=" + METRICS_QUAL); 632 } else { 633 collector.record("uid.ids-used", used_uids.get(METRICS_QUAL), 634 "kind=" + METRICS_QUAL); 635 collector.record("uid.ids-available", 636 (Internal.getMaxUnsignedValueOnBytes(metrics.width()) - 637 used_uids.get(METRICS_QUAL)), "kind=" + METRICS_QUAL); 638 } 639 640 collectUidStats(tag_names, collector); 641 collector.record("uid.ids-used", used_uids.get(TAG_NAME_QUAL), 642 "kind=" + TAG_NAME_QUAL); 643 collector.record("uid.ids-available", 644 (Internal.getMaxUnsignedValueOnBytes(tag_names.width()) - 645 used_uids.get(TAG_NAME_QUAL)), 646 "kind=" + TAG_NAME_QUAL); 647 648 collectUidStats(tag_values, collector); 649 collector.record("uid.ids-used", used_uids.get(TAG_VALUE_QUAL), 650 "kind=" + TAG_VALUE_QUAL); 651 collector.record("uid.ids-available", 652 (Internal.getMaxUnsignedValueOnBytes(tag_values.width()) - 653 used_uids.get(TAG_VALUE_QUAL)), "kind=" + TAG_VALUE_QUAL); 654 655 } catch (Exception e) { 656 throw new RuntimeException("Shouldn't be here", e); 657 } 658 659 collector.record("uid.filter.rejected", rejected_dps.get(), "kind=raw"); 660 collector.record("uid.filter.rejected", rejected_aggregate_dps.get(), 661 "kind=aggregate"); 662 663 { 664 final Runtime runtime = Runtime.getRuntime(); 665 collector.record("jvm.ramfree", runtime.freeMemory()); 666 collector.record("jvm.ramused", runtime.totalMemory()); 667 } 668 669 collector.addExtraTag("class", "IncomingDataPoints"); 670 try { 671 collector.record("hbase.latency", IncomingDataPoints.putlatency, "method=put"); 672 } finally { 673 collector.clearExtraTag("class"); 674 } 675 676 collector.addExtraTag("class", "TSDB"); 677 try { 678 collector.record("datapoints.added", datapoints_added, "type=all"); 679 } finally { 680 collector.clearExtraTag("class"); 681 } 682 683 collector.addExtraTag("class", "TsdbQuery"); 684 try { 685 collector.record("hbase.latency", TsdbQuery.scanlatency, "method=scan"); 686 } finally { 687 collector.clearExtraTag("class"); 688 } 689 final ClientStats stats = client.stats(); 690 collector.record("hbase.root_lookups", stats.rootLookups()); 691 collector.record("hbase.meta_lookups", 692 stats.uncontendedMetaLookups(), "type=uncontended"); 693 collector.record("hbase.meta_lookups", 694 stats.contendedMetaLookups(), "type=contended"); 695 collector.record("hbase.rpcs", 696 stats.atomicIncrements(), "type=increment"); 697 collector.record("hbase.rpcs", stats.deletes(), "type=delete"); 698 collector.record("hbase.rpcs", stats.gets(), "type=get"); 699 collector.record("hbase.rpcs", stats.puts(), "type=put"); 700 collector.record("hbase.rpcs", stats.appends(), "type=append"); 701 collector.record("hbase.rpcs", stats.rowLocks(), "type=rowLock"); 702 collector.record("hbase.rpcs", stats.scannersOpened(), "type=openScanner"); 703 collector.record("hbase.rpcs", stats.scans(), "type=scan"); 704 collector.record("hbase.rpcs.batched", stats.numBatchedRpcSent()); 705 collector.record("hbase.flushes", stats.flushes()); 706 collector.record("hbase.connections.created", stats.connectionsCreated()); 707 collector.record("hbase.connections.idle_closed", stats.idleConnectionsClosed()); 708 collector.record("hbase.nsre", stats.noSuchRegionExceptions()); 709 collector.record("hbase.nsre.rpcs_delayed", 710 stats.numRpcDelayedDueToNSRE()); 711 collector.record("hbase.region_clients.open", 712 stats.regionClients()); 713 collector.record("hbase.region_clients.idle_closed", 714 stats.idleConnectionsClosed()); 715 716 compactionq.collectStats(collector); 717 // Collect Stats from Plugins 718 if (startup != null) { 719 try { 720 collector.addExtraTag("plugin", "startup"); 721 startup.collectStats(collector); 722 } finally { 723 collector.clearExtraTag("plugin"); 724 } 725 } 726 if (rt_publisher != null) { 727 try { 728 collector.addExtraTag("plugin", "publish"); 729 rt_publisher.collectStats(collector); 730 } finally { 731 collector.clearExtraTag("plugin"); 732 } 733 } 734 if (search != null) { 735 try { 736 collector.addExtraTag("plugin", "search"); 737 search.collectStats(collector); 738 } finally { 739 collector.clearExtraTag("plugin"); 740 } 741 } 742 if (storage_exception_handler != null) { 743 try { 744 collector.addExtraTag("plugin", "storageExceptionHandler"); 745 storage_exception_handler.collectStats(collector); 746 } finally { 747 collector.clearExtraTag("plugin"); 748 } 749 } 750 if (ts_filter != null) { 751 try { 752 collector.addExtraTag("plugin", "timeseriesFilter"); 753 ts_filter.collectStats(collector); 754 } finally { 755 collector.clearExtraTag("plugin"); 756 } 757 } 758 if (uid_filter != null) { 759 try { 760 collector.addExtraTag("plugin", "uidFilter"); 761 uid_filter.collectStats(collector); 762 } finally { 763 collector.clearExtraTag("plugin"); 764 } 765 } 766 } 767 768 /** Returns a latency histogram for Put RPCs used to store data points. */ getPutLatencyHistogram()769 public Histogram getPutLatencyHistogram() { 770 return IncomingDataPoints.putlatency; 771 } 772 773 /** Returns a latency histogram for Scan RPCs used to fetch data points. */ getScanLatencyHistogram()774 public Histogram getScanLatencyHistogram() { 775 return TsdbQuery.scanlatency; 776 } 777 778 /** 779 * Collects the stats for a {@link UniqueId}. 780 * @param uid The instance from which to collect stats. 781 * @param collector The collector to use. 782 */ collectUidStats(final UniqueId uid, final StatsCollector collector)783 private static void collectUidStats(final UniqueId uid, 784 final StatsCollector collector) { 785 collector.record("uid.cache-hit", uid.cacheHits(), "kind=" + uid.kind()); 786 collector.record("uid.cache-miss", uid.cacheMisses(), "kind=" + uid.kind()); 787 collector.record("uid.cache-size", uid.cacheSize(), "kind=" + uid.kind()); 788 collector.record("uid.random-collisions", uid.randomIdCollisions(), 789 "kind=" + uid.kind()); 790 collector.record("uid.rejected-assignments", uid.rejectedAssignments(), 791 "kind=" + uid.kind()); 792 } 793 794 /** @return the width, in bytes, of metric UIDs */ metrics_width()795 public static short metrics_width() { 796 return METRICS_WIDTH; 797 } 798 799 /** @return the width, in bytes, of tagk UIDs */ tagk_width()800 public static short tagk_width() { 801 return TAG_NAME_WIDTH; 802 } 803 804 /** @return the width, in bytes, of tagv UIDs */ tagv_width()805 public static short tagv_width() { 806 return TAG_VALUE_WIDTH; 807 } 808 809 /** 810 * Returns a new {@link Query} instance suitable for this TSDB. 811 */ newQuery()812 public Query newQuery() { 813 return new TsdbQuery(this); 814 } 815 816 /** 817 * Returns a new {@link WritableDataPoints} instance suitable for this TSDB. 818 * <p> 819 * If you want to add a single data-point, consider using {@link #addPoint} 820 * instead. 821 */ newDataPoints()822 public WritableDataPoints newDataPoints() { 823 return new IncomingDataPoints(this); 824 } 825 826 /** 827 * Returns a new {@link BatchedDataPoints} instance suitable for this TSDB. 828 * 829 * @param metric Every data point that gets appended must be associated to this metric. 830 * @param tags The associated tags for all data points being added. 831 * @return data structure which can have data points appended. 832 */ newBatch(String metric, Map<String, String> tags)833 public WritableDataPoints newBatch(String metric, Map<String, String> tags) { 834 return new BatchedDataPoints(this, metric, tags); 835 } 836 837 /** 838 * Adds a single integer value data point in the TSDB. 839 * @param metric A non-empty string. 840 * @param timestamp The timestamp associated with the value. 841 * @param value The value of the data point. 842 * @param tags The tags on this series. This map must be non-empty. 843 * @return A deferred object that indicates the completion of the request. 844 * The {@link Object} has not special meaning and can be {@code null} (think 845 * of it as {@code Deferred<Void>}). But you probably want to attach at 846 * least an errback to this {@code Deferred} to handle failures. 847 * @throws IllegalArgumentException if the timestamp is less than or equal 848 * to the previous timestamp added or 0 for the first timestamp, or if the 849 * difference with the previous timestamp is too large. 850 * @throws IllegalArgumentException if the metric name is empty or contains 851 * illegal characters. 852 * @throws IllegalArgumentException if the tags list is empty or one of the 853 * elements contains illegal characters. 854 * @throws HBaseException (deferred) if there was a problem while persisting 855 * data. 856 */ addPoint(final String metric, final long timestamp, final long value, final Map<String, String> tags)857 public Deferred<Object> addPoint(final String metric, 858 final long timestamp, 859 final long value, 860 final Map<String, String> tags) { 861 final byte[] v; 862 if (Byte.MIN_VALUE <= value && value <= Byte.MAX_VALUE) { 863 v = new byte[] { (byte) value }; 864 } else if (Short.MIN_VALUE <= value && value <= Short.MAX_VALUE) { 865 v = Bytes.fromShort((short) value); 866 } else if (Integer.MIN_VALUE <= value && value <= Integer.MAX_VALUE) { 867 v = Bytes.fromInt((int) value); 868 } else { 869 v = Bytes.fromLong(value); 870 } 871 872 final short flags = (short) (v.length - 1); // Just the length. 873 return addPointInternal(metric, timestamp, v, tags, flags); 874 } 875 876 /** 877 * Adds a double precision floating-point value data point in the TSDB. 878 * @param metric A non-empty string. 879 * @param timestamp The timestamp associated with the value. 880 * @param value The value of the data point. 881 * @param tags The tags on this series. This map must be non-empty. 882 * @return A deferred object that indicates the completion of the request. 883 * The {@link Object} has not special meaning and can be {@code null} (think 884 * of it as {@code Deferred<Void>}). But you probably want to attach at 885 * least an errback to this {@code Deferred} to handle failures. 886 * @throws IllegalArgumentException if the timestamp is less than or equal 887 * to the previous timestamp added or 0 for the first timestamp, or if the 888 * difference with the previous timestamp is too large. 889 * @throws IllegalArgumentException if the metric name is empty or contains 890 * illegal characters. 891 * @throws IllegalArgumentException if the value is NaN or infinite. 892 * @throws IllegalArgumentException if the tags list is empty or one of the 893 * elements contains illegal characters. 894 * @throws HBaseException (deferred) if there was a problem while persisting 895 * data. 896 * @since 1.2 897 */ addPoint(final String metric, final long timestamp, final double value, final Map<String, String> tags)898 public Deferred<Object> addPoint(final String metric, 899 final long timestamp, 900 final double value, 901 final Map<String, String> tags) { 902 if (Double.isNaN(value) || Double.isInfinite(value)) { 903 throw new IllegalArgumentException("value is NaN or Infinite: " + value 904 + " for metric=" + metric 905 + " timestamp=" + timestamp); 906 } 907 final short flags = Const.FLAG_FLOAT | 0x7; // A float stored on 8 bytes. 908 return addPointInternal(metric, timestamp, 909 Bytes.fromLong(Double.doubleToRawLongBits(value)), 910 tags, flags); 911 } 912 913 /** 914 * Adds a single floating-point value data point in the TSDB. 915 * @param metric A non-empty string. 916 * @param timestamp The timestamp associated with the value. 917 * @param value The value of the data point. 918 * @param tags The tags on this series. This map must be non-empty. 919 * @return A deferred object that indicates the completion of the request. 920 * The {@link Object} has not special meaning and can be {@code null} (think 921 * of it as {@code Deferred<Void>}). But you probably want to attach at 922 * least an errback to this {@code Deferred} to handle failures. 923 * @throws IllegalArgumentException if the timestamp is less than or equal 924 * to the previous timestamp added or 0 for the first timestamp, or if the 925 * difference with the previous timestamp is too large. 926 * @throws IllegalArgumentException if the metric name is empty or contains 927 * illegal characters. 928 * @throws IllegalArgumentException if the value is NaN or infinite. 929 * @throws IllegalArgumentException if the tags list is empty or one of the 930 * elements contains illegal characters. 931 * @throws HBaseException (deferred) if there was a problem while persisting 932 * data. 933 */ addPoint(final String metric, final long timestamp, final float value, final Map<String, String> tags)934 public Deferred<Object> addPoint(final String metric, 935 final long timestamp, 936 final float value, 937 final Map<String, String> tags) { 938 if (Float.isNaN(value) || Float.isInfinite(value)) { 939 throw new IllegalArgumentException("value is NaN or Infinite: " + value 940 + " for metric=" + metric 941 + " timestamp=" + timestamp); 942 } 943 final short flags = Const.FLAG_FLOAT | 0x3; // A float stored on 4 bytes. 944 return addPointInternal(metric, timestamp, 945 Bytes.fromInt(Float.floatToRawIntBits(value)), 946 tags, flags); 947 } 948 addPointInternal(final String metric, final long timestamp, final byte[] value, final Map<String, String> tags, final short flags)949 private Deferred<Object> addPointInternal(final String metric, 950 final long timestamp, 951 final byte[] value, 952 final Map<String, String> tags, 953 final short flags) { 954 // we only accept positive unix epoch timestamps in seconds or milliseconds 955 if (timestamp < 0 || ((timestamp & Const.SECOND_MASK) != 0 && 956 timestamp > 9999999999999L)) { 957 throw new IllegalArgumentException((timestamp < 0 ? "negative " : "bad") 958 + " timestamp=" + timestamp 959 + " when trying to add value=" + Arrays.toString(value) + '/' + flags 960 + " to metric=" + metric + ", tags=" + tags); 961 } 962 IncomingDataPoints.checkMetricAndTags(metric, tags); 963 final byte[] row = IncomingDataPoints.rowKeyTemplate(this, metric, tags); 964 final long base_time; 965 final byte[] qualifier = Internal.buildQualifier(timestamp, flags); 966 967 if ((timestamp & Const.SECOND_MASK) != 0) { 968 // drop the ms timestamp to seconds to calculate the base timestamp 969 base_time = ((timestamp / 1000) - 970 ((timestamp / 1000) % Const.MAX_TIMESPAN)); 971 } else { 972 base_time = (timestamp - (timestamp % Const.MAX_TIMESPAN)); 973 } 974 975 /** Callback executed for chaining filter calls to see if the value 976 * should be written or not. */ 977 final class WriteCB implements Callback<Deferred<Object>, Boolean> { 978 @Override 979 public Deferred<Object> call(final Boolean allowed) throws Exception { 980 if (!allowed) { 981 rejected_dps.incrementAndGet(); 982 return Deferred.fromResult(null); 983 } 984 985 Bytes.setInt(row, (int) base_time, metrics.width() + Const.SALT_WIDTH()); 986 RowKey.prefixKeyWithSalt(row); 987 988 Deferred<Object> result = null; 989 if (config.enable_appends()) { 990 final AppendDataPoints kv = new AppendDataPoints(qualifier, value); 991 final AppendRequest point = new AppendRequest(table, row, FAMILY, 992 AppendDataPoints.APPEND_COLUMN_QUALIFIER, kv.getBytes()); 993 result = client.append(point); 994 } else { 995 scheduleForCompaction(row, (int) base_time); 996 final PutRequest point = new PutRequest(table, row, FAMILY, qualifier, value); 997 result = client.put(point); 998 } 999 1000 // Count all added datapoints, not just those that came in through PUT rpc 1001 // Will there be others? Well, something could call addPoint programatically right? 1002 datapoints_added.incrementAndGet(); 1003 1004 // TODO(tsuna): Add a callback to time the latency of HBase and store the 1005 // timing in a moving Histogram (once we have a class for this). 1006 1007 if (!config.enable_realtime_ts() && !config.enable_tsuid_incrementing() && 1008 !config.enable_tsuid_tracking() && rt_publisher == null) { 1009 return result; 1010 } 1011 1012 final byte[] tsuid = UniqueId.getTSUIDFromKey(row, METRICS_WIDTH, 1013 Const.TIMESTAMP_BYTES); 1014 1015 // if the meta cache plugin is instantiated then tracking goes through it 1016 if (meta_cache != null) { 1017 meta_cache.increment(tsuid); 1018 } else { 1019 if (config.enable_tsuid_tracking()) { 1020 if (config.enable_realtime_ts()) { 1021 if (config.enable_tsuid_incrementing()) { 1022 TSMeta.incrementAndGetCounter(TSDB.this, tsuid); 1023 } else { 1024 TSMeta.storeIfNecessary(TSDB.this, tsuid); 1025 } 1026 } else { 1027 final PutRequest tracking = new PutRequest(meta_table, tsuid, 1028 TSMeta.FAMILY(), TSMeta.COUNTER_QUALIFIER(), Bytes.fromLong(1)); 1029 client.put(tracking); 1030 } 1031 } 1032 } 1033 1034 if (rt_publisher != null) { 1035 rt_publisher.sinkDataPoint(metric, timestamp, value, tags, tsuid, flags); 1036 } 1037 return result; 1038 } 1039 @Override 1040 public String toString() { 1041 return "addPointInternal Write Callback"; 1042 } 1043 } 1044 1045 if (ts_filter != null && ts_filter.filterDataPoints()) { 1046 return ts_filter.allowDataPoint(metric, timestamp, value, tags, flags) 1047 .addCallbackDeferring(new WriteCB()); 1048 } 1049 return Deferred.fromResult(true).addCallbackDeferring(new WriteCB()); 1050 } 1051 1052 /** 1053 * Forces a flush of any un-committed in memory data including left over 1054 * compactions. 1055 * <p> 1056 * For instance, any data point not persisted will be sent to HBase. 1057 * @return A {@link Deferred} that will be called once all the un-committed 1058 * data has been successfully and durably stored. The value of the deferred 1059 * object return is meaningless and unspecified, and can be {@code null}. 1060 * @throws HBaseException (deferred) if there was a problem sending 1061 * un-committed data to HBase. Please refer to the {@link HBaseException} 1062 * hierarchy to handle the possible failures. Some of them are easily 1063 * recoverable by retrying, some are not. 1064 */ flush()1065 public Deferred<Object> flush() throws HBaseException { 1066 final class HClientFlush implements Callback<Object, ArrayList<Object>> { 1067 public Object call(final ArrayList<Object> args) { 1068 return client.flush(); 1069 } 1070 public String toString() { 1071 return "flush HBase client"; 1072 } 1073 } 1074 1075 return config.enable_compactions() && compactionq != null 1076 ? compactionq.flush().addCallback(new HClientFlush()) 1077 : client.flush(); 1078 } 1079 1080 /** 1081 * Gracefully shuts down this TSD instance. 1082 * <p> 1083 * The method must call {@code shutdown()} on all plugins as well as flush the 1084 * compaction queue. 1085 * @return A {@link Deferred} that will be called once all the un-committed 1086 * data has been successfully and durably stored, and all resources used by 1087 * this instance have been released. The value of the deferred object 1088 * return is meaningless and unspecified, and can be {@code null}. 1089 * @throws HBaseException (deferred) if there was a problem sending 1090 * un-committed data to HBase. Please refer to the {@link HBaseException} 1091 * hierarchy to handle the possible failures. Some of them are easily 1092 * recoverable by retrying, some are not. 1093 */ shutdown()1094 public Deferred<Object> shutdown() { 1095 final ArrayList<Deferred<Object>> deferreds = 1096 new ArrayList<Deferred<Object>>(); 1097 1098 final class FinalShutdown implements Callback<Object, Object> { 1099 @Override 1100 public Object call(Object result) throws Exception { 1101 if (result instanceof Exception) { 1102 LOG.error("A previous shutdown failed", (Exception)result); 1103 } 1104 final Set<Timeout> timeouts = timer.stop(); 1105 // TODO - at some point we should clean these up. 1106 if (timeouts.size() > 0) { 1107 LOG.warn("There were " + timeouts.size() + " timer tasks queued"); 1108 } 1109 LOG.info("Completed shutting down the TSDB"); 1110 return Deferred.fromResult(null); 1111 } 1112 } 1113 1114 final class SEHShutdown implements Callback<Object, Object> { 1115 @Override 1116 public Object call(Object result) throws Exception { 1117 if (result instanceof Exception) { 1118 LOG.error("Shutdown of the HBase client failed", (Exception)result); 1119 } 1120 LOG.info("Shutting down storage exception handler plugin: " + 1121 storage_exception_handler.getClass().getCanonicalName()); 1122 return storage_exception_handler.shutdown().addBoth(new FinalShutdown()); 1123 } 1124 @Override 1125 public String toString() { 1126 return "SEHShutdown"; 1127 } 1128 } 1129 1130 final class HClientShutdown implements Callback<Deferred<Object>, ArrayList<Object>> { 1131 public Deferred<Object> call(final ArrayList<Object> args) { 1132 if (storage_exception_handler != null) { 1133 return client.shutdown().addBoth(new SEHShutdown()); 1134 } 1135 return client.shutdown().addBoth(new FinalShutdown()); 1136 } 1137 public String toString() { 1138 return "shutdown HBase client"; 1139 } 1140 } 1141 1142 final class ShutdownErrback implements Callback<Object, Exception> { 1143 public Object call(final Exception e) { 1144 final Logger LOG = LoggerFactory.getLogger(ShutdownErrback.class); 1145 if (e instanceof DeferredGroupException) { 1146 final DeferredGroupException ge = (DeferredGroupException) e; 1147 for (final Object r : ge.results()) { 1148 if (r instanceof Exception) { 1149 LOG.error("Failed to shutdown the TSD", (Exception) r); 1150 } 1151 } 1152 } else { 1153 LOG.error("Failed to shutdown the TSD", e); 1154 } 1155 return new HClientShutdown().call(null); 1156 } 1157 public String toString() { 1158 return "shutdown HBase client after error"; 1159 } 1160 } 1161 1162 final class CompactCB implements Callback<Object, ArrayList<Object>> { 1163 public Object call(ArrayList<Object> compactions) throws Exception { 1164 return null; 1165 } 1166 } 1167 1168 if (config.enable_compactions()) { 1169 LOG.info("Flushing compaction queue"); 1170 deferreds.add(compactionq.flush().addCallback(new CompactCB())); 1171 } 1172 if (startup != null) { 1173 LOG.info("Shutting down startup plugin: " + 1174 startup.getClass().getCanonicalName()); 1175 deferreds.add(startup.shutdown()); 1176 } 1177 if (search != null) { 1178 LOG.info("Shutting down search plugin: " + 1179 search.getClass().getCanonicalName()); 1180 deferreds.add(search.shutdown()); 1181 } 1182 if (rt_publisher != null) { 1183 LOG.info("Shutting down RT plugin: " + 1184 rt_publisher.getClass().getCanonicalName()); 1185 deferreds.add(rt_publisher.shutdown()); 1186 } 1187 if (meta_cache != null) { 1188 LOG.info("Shutting down meta cache plugin: " + 1189 meta_cache.getClass().getCanonicalName()); 1190 deferreds.add(meta_cache.shutdown()); 1191 } 1192 if (storage_exception_handler != null) { 1193 LOG.info("Shutting down storage exception handler plugin: " + 1194 storage_exception_handler.getClass().getCanonicalName()); 1195 deferreds.add(storage_exception_handler.shutdown()); 1196 } 1197 if (ts_filter != null) { 1198 LOG.info("Shutting down time series filter plugin: " + 1199 ts_filter.getClass().getCanonicalName()); 1200 deferreds.add(ts_filter.shutdown()); 1201 } 1202 if (uid_filter != null) { 1203 LOG.info("Shutting down UID filter plugin: " + 1204 uid_filter.getClass().getCanonicalName()); 1205 deferreds.add(uid_filter.shutdown()); 1206 } 1207 1208 // wait for plugins to shutdown before we close the client 1209 return deferreds.size() > 0 1210 ? Deferred.group(deferreds).addCallbackDeferring(new HClientShutdown()) 1211 .addErrback(new ShutdownErrback()) 1212 : new HClientShutdown().call(null); 1213 } 1214 1215 /** 1216 * Given a prefix search, returns a few matching metric names. 1217 * @param search A prefix to search. 1218 */ suggestMetrics(final String search)1219 public List<String> suggestMetrics(final String search) { 1220 return metrics.suggest(search); 1221 } 1222 1223 /** 1224 * Given a prefix search, returns matching metric names. 1225 * @param search A prefix to search. 1226 * @param max_results Maximum number of results to return. 1227 * @since 2.0 1228 */ suggestMetrics(final String search, final int max_results)1229 public List<String> suggestMetrics(final String search, 1230 final int max_results) { 1231 return metrics.suggest(search, max_results); 1232 } 1233 1234 /** 1235 * Given a prefix search, returns a few matching tag names. 1236 * @param search A prefix to search. 1237 */ suggestTagNames(final String search)1238 public List<String> suggestTagNames(final String search) { 1239 return tag_names.suggest(search); 1240 } 1241 1242 /** 1243 * Given a prefix search, returns matching tagk names. 1244 * @param search A prefix to search. 1245 * @param max_results Maximum number of results to return. 1246 * @since 2.0 1247 */ suggestTagNames(final String search, final int max_results)1248 public List<String> suggestTagNames(final String search, 1249 final int max_results) { 1250 return tag_names.suggest(search, max_results); 1251 } 1252 1253 /** 1254 * Given a prefix search, returns a few matching tag values. 1255 * @param search A prefix to search. 1256 */ suggestTagValues(final String search)1257 public List<String> suggestTagValues(final String search) { 1258 return tag_values.suggest(search); 1259 } 1260 1261 /** 1262 * Given a prefix search, returns matching tag values. 1263 * @param search A prefix to search. 1264 * @param max_results Maximum number of results to return. 1265 * @since 2.0 1266 */ suggestTagValues(final String search, final int max_results)1267 public List<String> suggestTagValues(final String search, 1268 final int max_results) { 1269 return tag_values.suggest(search, max_results); 1270 } 1271 1272 /** 1273 * Discards all in-memory caches. 1274 * @since 1.1 1275 */ dropCaches()1276 public void dropCaches() { 1277 metrics.dropCaches(); 1278 tag_names.dropCaches(); 1279 tag_values.dropCaches(); 1280 } 1281 1282 /** 1283 * Attempts to assign a UID to a name for the given type 1284 * Used by the UniqueIdRpc call to generate IDs for new metrics, tagks or 1285 * tagvs. The name must pass validation and if it's already assigned a UID, 1286 * this method will throw an error with the proper UID. Otherwise if it can 1287 * create the UID, it will be returned 1288 * @param type The type of uid to assign, metric, tagk or tagv 1289 * @param name The name of the uid object 1290 * @return A byte array with the UID if the assignment was successful 1291 * @throws IllegalArgumentException if the name is invalid or it already 1292 * exists 1293 * @since 2.0 1294 */ assignUid(final String type, final String name)1295 public byte[] assignUid(final String type, final String name) { 1296 Tags.validateString(type, name); 1297 if (type.toLowerCase().equals("metric")) { 1298 try { 1299 final byte[] uid = this.metrics.getId(name); 1300 throw new IllegalArgumentException("Name already exists with UID: " + 1301 UniqueId.uidToString(uid)); 1302 } catch (NoSuchUniqueName nsue) { 1303 return this.metrics.getOrCreateId(name); 1304 } 1305 } else if (type.toLowerCase().equals("tagk")) { 1306 try { 1307 final byte[] uid = this.tag_names.getId(name); 1308 throw new IllegalArgumentException("Name already exists with UID: " + 1309 UniqueId.uidToString(uid)); 1310 } catch (NoSuchUniqueName nsue) { 1311 return this.tag_names.getOrCreateId(name); 1312 } 1313 } else if (type.toLowerCase().equals("tagv")) { 1314 try { 1315 final byte[] uid = this.tag_values.getId(name); 1316 throw new IllegalArgumentException("Name already exists with UID: " + 1317 UniqueId.uidToString(uid)); 1318 } catch (NoSuchUniqueName nsue) { 1319 return this.tag_values.getOrCreateId(name); 1320 } 1321 } else { 1322 LOG.warn("Unknown type name: " + type); 1323 throw new IllegalArgumentException("Unknown type name"); 1324 } 1325 } 1326 1327 /** 1328 * Attempts to delete the given UID name mapping from the storage table as 1329 * well as the local cache. 1330 * @param type The type of UID to delete. Must be "metrics", "tagk" or "tagv" 1331 * @param name The name of the UID to delete 1332 * @return A deferred to wait on for completion, or an exception if thrown 1333 * @throws IllegalArgumentException if the type is invalid 1334 * @since 2.2 1335 */ deleteUidAsync(final String type, final String name)1336 public Deferred<Object> deleteUidAsync(final String type, final String name) { 1337 final UniqueIdType uid_type = UniqueId.stringToUniqueIdType(type); 1338 switch (uid_type) { 1339 case METRIC: 1340 return metrics.deleteAsync(name); 1341 case TAGK: 1342 return tag_names.deleteAsync(name); 1343 case TAGV: 1344 return tag_values.deleteAsync(name); 1345 default: 1346 throw new IllegalArgumentException("Unrecognized UID type: " + uid_type); 1347 } 1348 } 1349 1350 /** 1351 * Attempts to rename a UID from existing name to the given name 1352 * Used by the UniqueIdRpc call to rename name of existing metrics, tagks or 1353 * tagvs. The name must pass validation. If the UID doesn't exist, the method 1354 * will throw an error. Chained IllegalArgumentException is directly exposed 1355 * to caller. If the rename was successful, this method returns. 1356 * @param type The type of uid to rename, one of metric, tagk and tagv 1357 * @param oldname The existing name of the uid object 1358 * @param newname The new name to be used on the uid object 1359 * @throws IllegalArgumentException if error happened 1360 * @since 2.2 1361 */ renameUid(final String type, final String oldname, final String newname)1362 public void renameUid(final String type, final String oldname, 1363 final String newname) { 1364 Tags.validateString(type, oldname); 1365 Tags.validateString(type, newname); 1366 if (type.toLowerCase().equals("metric")) { 1367 try { 1368 this.metrics.getId(oldname); 1369 this.metrics.rename(oldname, newname); 1370 } catch (NoSuchUniqueName nsue) { 1371 throw new IllegalArgumentException("Name(\"" + oldname + 1372 "\") does not exist"); 1373 } 1374 } else if (type.toLowerCase().equals("tagk")) { 1375 try { 1376 this.tag_names.getId(oldname); 1377 this.tag_names.rename(oldname, newname); 1378 } catch (NoSuchUniqueName nsue) { 1379 throw new IllegalArgumentException("Name(\"" + oldname + 1380 "\") does not exist"); 1381 } 1382 } else if (type.toLowerCase().equals("tagv")) { 1383 try { 1384 this.tag_values.getId(oldname); 1385 this.tag_values.rename(oldname, newname); 1386 } catch (NoSuchUniqueName nsue) { 1387 throw new IllegalArgumentException("Name(\"" + oldname + 1388 "\") does not exist"); 1389 } 1390 } else { 1391 LOG.warn("Unknown type name: " + type); 1392 throw new IllegalArgumentException("Unknown type name"); 1393 } 1394 } 1395 1396 /** @return the name of the UID table as a byte array for client requests */ uidTable()1397 public byte[] uidTable() { 1398 return this.uidtable; 1399 } 1400 1401 /** @return the name of the data table as a byte array for client requests */ dataTable()1402 public byte[] dataTable() { 1403 return this.table; 1404 } 1405 1406 /** @return the name of the tree table as a byte array for client requests */ treeTable()1407 public byte[] treeTable() { 1408 return this.treetable; 1409 } 1410 1411 /** @return the name of the meta table as a byte array for client requests */ metaTable()1412 public byte[] metaTable() { 1413 return this.meta_table; 1414 } 1415 1416 /** 1417 * Index the given timeseries meta object via the configured search plugin 1418 * @param meta The meta data object to index 1419 * @since 2.0 1420 */ indexTSMeta(final TSMeta meta)1421 public void indexTSMeta(final TSMeta meta) { 1422 if (search != null) { 1423 search.indexTSMeta(meta).addErrback(new PluginError()); 1424 } 1425 } 1426 1427 /** 1428 * Delete the timeseries meta object from the search index 1429 * @param tsuid The TSUID to delete 1430 * @since 2.0 1431 */ deleteTSMeta(final String tsuid)1432 public void deleteTSMeta(final String tsuid) { 1433 if (search != null) { 1434 search.deleteTSMeta(tsuid).addErrback(new PluginError()); 1435 } 1436 } 1437 1438 /** 1439 * Index the given UID meta object via the configured search plugin 1440 * @param meta The meta data object to index 1441 * @since 2.0 1442 */ indexUIDMeta(final UIDMeta meta)1443 public void indexUIDMeta(final UIDMeta meta) { 1444 if (search != null) { 1445 search.indexUIDMeta(meta).addErrback(new PluginError()); 1446 } 1447 } 1448 1449 /** 1450 * Delete the UID meta object from the search index 1451 * @param meta The UID meta object to delete 1452 * @since 2.0 1453 */ deleteUIDMeta(final UIDMeta meta)1454 public void deleteUIDMeta(final UIDMeta meta) { 1455 if (search != null) { 1456 search.deleteUIDMeta(meta).addErrback(new PluginError()); 1457 } 1458 } 1459 1460 /** 1461 * Index the given Annotation object via the configured search plugin 1462 * @param note The annotation object to index 1463 * @since 2.0 1464 */ indexAnnotation(final Annotation note)1465 public void indexAnnotation(final Annotation note) { 1466 if (search != null) { 1467 search.indexAnnotation(note).addErrback(new PluginError()); 1468 } 1469 if( rt_publisher != null ) { 1470 rt_publisher.publishAnnotation(note); 1471 } 1472 } 1473 1474 /** 1475 * Delete the annotation object from the search index 1476 * @param note The annotation object to delete 1477 * @since 2.0 1478 */ deleteAnnotation(final Annotation note)1479 public void deleteAnnotation(final Annotation note) { 1480 if (search != null) { 1481 search.deleteAnnotation(note).addErrback(new PluginError()); 1482 } 1483 } 1484 1485 /** 1486 * Processes the TSMeta through all of the trees if configured to do so 1487 * @param meta The meta data to process 1488 * @since 2.0 1489 */ processTSMetaThroughTrees(final TSMeta meta)1490 public Deferred<Boolean> processTSMetaThroughTrees(final TSMeta meta) { 1491 if (config.enable_tree_processing()) { 1492 return TreeBuilder.processAllTrees(this, meta); 1493 } 1494 return Deferred.fromResult(false); 1495 } 1496 1497 /** 1498 * Executes a search query using the search plugin 1499 * @param query The query to execute 1500 * @return A deferred object to wait on for the results to be fetched 1501 * @throws IllegalStateException if the search plugin has not been enabled or 1502 * configured 1503 * @since 2.0 1504 */ executeSearch(final SearchQuery query)1505 public Deferred<SearchQuery> executeSearch(final SearchQuery query) { 1506 if (search == null) { 1507 throw new IllegalStateException( 1508 "Searching has not been enabled on this TSD"); 1509 } 1510 1511 return search.executeQuery(query); 1512 } 1513 1514 /** 1515 * Simply logs plugin errors when they're thrown by attaching as an errorback. 1516 * Without this, exceptions will just disappear (unless logged by the plugin) 1517 * since we don't wait for a result. 1518 */ 1519 final class PluginError implements Callback<Object, Exception> { 1520 @Override call(final Exception e)1521 public Object call(final Exception e) throws Exception { 1522 LOG.error("Exception from Search plugin indexer", e); 1523 return null; 1524 } 1525 } 1526 1527 /** 1528 * Blocks while pre-fetching meta data from the data and uid tables 1529 * so that performance improves, particularly with a large number of 1530 * regions and region servers. 1531 * @since 2.2 1532 */ preFetchHBaseMeta()1533 public void preFetchHBaseMeta() { 1534 LOG.info("Pre-fetching meta data for all tables"); 1535 final long start = System.currentTimeMillis(); 1536 final ArrayList<Deferred<Object>> deferreds = new ArrayList<Deferred<Object>>(); 1537 deferreds.add(client.prefetchMeta(table)); 1538 deferreds.add(client.prefetchMeta(uidtable)); 1539 1540 // TODO(cl) - meta, tree, etc 1541 1542 try { 1543 Deferred.group(deferreds).join(); 1544 LOG.info("Fetched meta data for tables in " + 1545 (System.currentTimeMillis() - start) + "ms"); 1546 } catch (InterruptedException e) { 1547 LOG.error("Interrupted", e); 1548 Thread.currentThread().interrupt(); 1549 return; 1550 } catch (Exception e) { 1551 LOG.error("Failed to prefetch meta for our tables", e); 1552 } 1553 } 1554 1555 /** @return the timer used for various house keeping functions */ getTimer()1556 public Timer getTimer() { 1557 return timer; 1558 } 1559 1560 // ------------------ // 1561 // Compaction helpers // 1562 // ------------------ // 1563 compact(final ArrayList<KeyValue> row, List<Annotation> annotations)1564 final KeyValue compact(final ArrayList<KeyValue> row, 1565 List<Annotation> annotations) { 1566 return compactionq.compact(row, annotations); 1567 } 1568 1569 /** 1570 * Schedules the given row key for later re-compaction. 1571 * Once this row key has become "old enough", we'll read back all the data 1572 * points in that row, write them back to HBase in a more compact fashion, 1573 * and delete the individual data points. 1574 * @param row The row key to re-compact later. Will not be modified. 1575 * @param base_time The 32-bit unsigned UNIX timestamp. 1576 */ scheduleForCompaction(final byte[] row, final int base_time)1577 final void scheduleForCompaction(final byte[] row, final int base_time) { 1578 if (config.enable_compactions()) { 1579 compactionq.add(row); 1580 } 1581 } 1582 1583 // ------------------------ // 1584 // HBase operations helpers // 1585 // ------------------------ // 1586 1587 /** Gets the entire given row from the data table. */ get(final byte[] key)1588 final Deferred<ArrayList<KeyValue>> get(final byte[] key) { 1589 return client.get(new GetRequest(table, key)); 1590 } 1591 1592 /** Puts the given value into the data table. */ put(final byte[] key, final byte[] qualifier, final byte[] value)1593 final Deferred<Object> put(final byte[] key, 1594 final byte[] qualifier, 1595 final byte[] value) { 1596 return client.put(new PutRequest(table, key, FAMILY, qualifier, value)); 1597 } 1598 1599 /** Deletes the given cells from the data table. */ delete(final byte[] key, final byte[][] qualifiers)1600 final Deferred<Object> delete(final byte[] key, final byte[][] qualifiers) { 1601 return client.delete(new DeleteRequest(table, key, FAMILY, qualifiers)); 1602 } 1603 1604 } 1605