1 // This file is part of OpenTSDB. 2 // Copyright (C) 2010-2014 The OpenTSDB Authors. 3 // 4 // This program is free software: you can redistribute it and/or modify it 5 // under the terms of the GNU Lesser General Public License as published by 6 // the Free Software Foundation, either version 2.1 of the License, or (at your 7 // option) any later version. This program is distributed in the hope that it 8 // will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty 9 // of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser 10 // General Public License for more details. You should have received a copy 11 // of the GNU Lesser General Public License along with this program. If not, 12 // see <http://www.gnu.org/licenses/>. 13 package net.opentsdb.search; 14 15 import java.nio.charset.Charset; 16 import java.util.ArrayList; 17 import java.util.Arrays; 18 import java.util.Collections; 19 import java.util.List; 20 import java.util.Map; 21 import java.util.regex.Pattern; 22 23 import net.opentsdb.core.Const; 24 import net.opentsdb.core.Internal; 25 import net.opentsdb.core.RowKey; 26 import net.opentsdb.core.TSDB; 27 import net.opentsdb.core.Tags; 28 import net.opentsdb.meta.TSMeta; 29 import net.opentsdb.query.QueryUtil; 30 import net.opentsdb.uid.NoSuchUniqueId; 31 import net.opentsdb.uid.NoSuchUniqueName; 32 import net.opentsdb.uid.UniqueId; 33 import net.opentsdb.uid.UniqueId.UniqueIdType; 34 import net.opentsdb.utils.ByteArrayPair; 35 import net.opentsdb.utils.Exceptions; 36 import net.opentsdb.utils.Pair; 37 38 import org.hbase.async.Bytes; 39 import org.hbase.async.KeyValue; 40 import org.hbase.async.Scanner; 41 import org.slf4j.Logger; 42 import org.slf4j.LoggerFactory; 43 44 import com.stumbleupon.async.Callback; 45 import com.stumbleupon.async.Deferred; 46 import com.stumbleupon.async.DeferredGroupException; 47 48 /** 49 * Lookup series related to a metric, tagk, tagv or any combination thereof. 50 * This class doesn't handle wild-card searching yet. 51 * 52 * When dealing with tags, we can lookup on tagks, tagvs or pairs. Thus: 53 * tagk, null <- lookup all series with a tagk 54 * tagk, tagv <- lookup all series with a tag pair 55 * null, tagv <- lookup all series with a tag value somewhere 56 * 57 * The user can supply multiple tags in a query so the logic is a little goofy 58 * but here it is: 59 * - Different tagks are AND'd, e.g. given "host=web01 dc=lga" we will lookup 60 * series that contain both of those tag pairs. Also when given "host= dc=" 61 * then we lookup series with both tag keys regardless of their values. 62 * - Tagks without a tagv will override tag pairs. E.g. "host=web01 host=" will 63 * return all series with the "host" tagk. 64 * - Tagvs without a tagk are OR'd. Given "=lga =phx" the lookup will fetch 65 * anything with either "lga" or "phx" as the value for a pair. When combined 66 * with a tagk, e.g. "host=web01 =lga" then it will return any series with the 67 * tag pair AND any tag with the "lga" value. 68 * 69 * To avoid running performance degrading regexes in HBase regions, we'll double 70 * filter when necessary. If tagks are present, those are used in the rowkey 71 * filter and a secondary filter is applied in the TSD with remaining tagvs. 72 * E.g. the query "host=web01 =lga" will issue a rowkey filter with "host=web01" 73 * then within the TSD scanner, we'll filter out only the rows that contain an 74 * "lga" tag value. We don't know where in a row key the tagv may fall, so we 75 * would have to first match on the pair, then backtrack to find the value and 76 * make sure the pair is skipped. Thus its easier on the region server to execute 77 * a simpler rowkey regex, pass all the results to the TSD, then let us filter on 78 * tag values only when necessary. (if a query only has tag values, then this is 79 * moot and we can pass them in a rowkey filter since they're OR'd). 80 * 81 * @since 2.1 82 */ 83 public class TimeSeriesLookup { 84 private static final Logger LOG = 85 LoggerFactory.getLogger(TimeSeriesLookup.class); 86 87 /** Charset used to convert Strings to byte arrays and back. */ 88 private static final Charset CHARSET = Charset.forName("ISO-8859-1"); 89 90 /** The query with metrics and/or tags to use */ 91 private final SearchQuery query; 92 93 /** Whether or not to dump the output to standard out for CLI commands */ 94 private boolean to_stdout; 95 96 /** The TSD to use for lookups */ 97 private final TSDB tsdb; 98 99 /** The metric UID if given by the query, post resolution */ 100 private byte[] metric_uid; 101 102 /** Tag UID pairs if given in the query. Key or value may be null. */ 103 private List<ByteArrayPair> pairs; 104 105 /** The compiled row key regex for HBase filtering */ 106 private String rowkey_regex; 107 108 /** Post scan filtering if we have a lot of values to look at */ 109 private String tagv_filter; 110 111 /** The results to send to the caller */ 112 private final List<byte[]> tsuids; 113 114 /** 115 * Default ctor 116 * @param tsdb The TSD to which we belong 117 * @param metric A metric to match on, may be null 118 * @param tags One or more tags to match on, may be null 119 */ TimeSeriesLookup(final TSDB tsdb, final SearchQuery query)120 public TimeSeriesLookup(final TSDB tsdb, final SearchQuery query) { 121 this.tsdb = tsdb; 122 this.query = query; 123 tsuids = Collections.synchronizedList(new ArrayList<byte[]>()); 124 } 125 126 /** 127 * Lookup time series associated with the given metric, tagk, tagv or tag 128 * pairs. Either the meta table or the data table will be scanned. If no 129 * metric is given, a full table scan must be performed and this call may take 130 * a long time to complete. 131 * When dumping to stdout, if an ID can't be looked up, it will be logged and 132 * skipped. 133 * @return A list of TSUIDs matching the given lookup query. 134 * @throws NoSuchUniqueName if any of the given names fail to resolve to a 135 * UID. 136 */ lookup()137 public List<byte[]> lookup() { 138 try { 139 return lookupAsync().join(); 140 } catch (InterruptedException e) { 141 LOG.error("Interrupted performing lookup", e); 142 Thread.currentThread().interrupt(); 143 return null; 144 } catch (DeferredGroupException e) { 145 final Throwable ex = Exceptions.getCause(e); 146 if (ex instanceof NoSuchUniqueName) { 147 throw (NoSuchUniqueName)ex; 148 } 149 throw new RuntimeException("Unexpected exception", ex); 150 } catch (NoSuchUniqueName e) { 151 throw e; 152 } catch (Exception e) { 153 throw new RuntimeException("Unexpected exception", e); 154 } 155 } 156 157 /** 158 * Lookup time series associated with the given metric, tagk, tagv or tag 159 * pairs. Either the meta table or the data table will be scanned. If no 160 * metric is given, a full table scan must be performed and this call may take 161 * a long time to complete. 162 * When dumping to stdout, if an ID can't be looked up, it will be logged and 163 * skipped. 164 * @return A list of TSUIDs matching the given lookup query. 165 * @throws NoSuchUniqueName if any of the given names fail to resolve to a 166 * UID. 167 * @since 2.2 168 */ lookupAsync()169 public Deferred<List<byte[]>> lookupAsync() { 170 final Pattern tagv_regex = tagv_filter != null ? 171 Pattern.compile(tagv_filter) : null; 172 173 // we don't really know what size the UIDs will resolve to so just grab 174 // a decent amount. 175 final StringBuffer buf = to_stdout ? new StringBuffer(2048) : null; 176 final long start = System.currentTimeMillis(); 177 final int limit; 178 if (query.getLimit() > 0) { 179 if (query.useMeta() || Const.SALT_WIDTH() < 1) { 180 limit = query.getLimit(); 181 } else if (query.getLimit() < Const.SALT_BUCKETS()) { 182 limit = 1; 183 } else { 184 limit = query.getLimit() / Const.SALT_BUCKETS(); 185 } 186 } else { 187 limit = 0; 188 } 189 190 class ScannerCB implements Callback<Deferred<List<byte[]>>, 191 ArrayList<ArrayList<KeyValue>>> { 192 private final Scanner scanner; 193 // used to avoid dupes when scanning the data table 194 private byte[] last_tsuid = null; 195 private int rows_read; 196 197 ScannerCB(final Scanner scanner) { 198 this.scanner = scanner; 199 } 200 201 Deferred<List<byte[]>> scan() { 202 return scanner.nextRows().addCallbackDeferring(this); 203 } 204 205 @Override 206 public Deferred<List<byte[]>> call(final ArrayList<ArrayList<KeyValue>> rows) 207 throws Exception { 208 if (rows == null) { 209 scanner.close(); 210 if (query.useMeta() || Const.SALT_WIDTH() < 1) { 211 LOG.debug("Lookup query matched " + tsuids.size() + " time series in " + 212 (System.currentTimeMillis() - start) + " ms"); 213 } 214 return Deferred.fromResult(tsuids); 215 } 216 217 for (final ArrayList<KeyValue> row : rows) { 218 if (limit > 0 && rows_read >= limit) { 219 // little recursion to close the scanner and log above. 220 return call(null); 221 } 222 final byte[] tsuid = query.useMeta() ? row.get(0).key() : 223 UniqueId.getTSUIDFromKey(row.get(0).key(), TSDB.metrics_width(), 224 Const.TIMESTAMP_BYTES); 225 226 // TODO - there MUST be a better way than creating a ton of temp 227 // string objects. 228 if (tagv_regex != null && 229 !tagv_regex.matcher(new String(tsuid, CHARSET)).find()) { 230 continue; 231 } 232 233 if (to_stdout) { 234 if (last_tsuid != null && Bytes.memcmp(last_tsuid, tsuid) == 0) { 235 continue; 236 } 237 last_tsuid = tsuid; 238 239 try { 240 buf.append(UniqueId.uidToString(tsuid)).append(" "); 241 buf.append(RowKey.metricNameAsync(tsdb, tsuid) 242 .joinUninterruptibly()); 243 buf.append(" "); 244 245 final List<byte[]> tag_ids = UniqueId.getTagPairsFromTSUID(tsuid); 246 final Map<String, String> resolved_tags = 247 Tags.resolveIdsAsync(tsdb, tag_ids).joinUninterruptibly(); 248 for (final Map.Entry<String, String> tag_pair : 249 resolved_tags.entrySet()) { 250 buf.append(tag_pair.getKey()).append("=") 251 .append(tag_pair.getValue()).append(" "); 252 } 253 } catch (NoSuchUniqueId nsui) { 254 LOG.error("Unable to resolve UID in TSUID (" + 255 UniqueId.uidToString(tsuid) + ") " + nsui.getMessage()); 256 } 257 buf.setLength(0); // reset the buffer so we can re-use it 258 } else { 259 tsuids.add(tsuid); 260 } 261 ++rows_read; 262 } 263 264 return scan(); 265 } 266 267 @Override 268 public String toString() { 269 return "Scanner callback"; 270 } 271 } 272 273 class CompleteCB implements Callback<List<byte[]>, ArrayList<List<byte[]>>> { 274 @Override 275 public List<byte[]> call(final ArrayList<List<byte[]>> unused) throws Exception { 276 LOG.debug("Lookup query matched " + tsuids.size() + " time series in " + 277 (System.currentTimeMillis() - start) + " ms"); 278 return tsuids; 279 } 280 @Override 281 public String toString() { 282 return "Final async lookup callback"; 283 } 284 } 285 286 class UIDCB implements Callback<Deferred<List<byte[]>>, Object> { 287 @Override 288 public Deferred<List<byte[]>> call(Object arg0) throws Exception { 289 if (!query.useMeta() && Const.SALT_WIDTH() > 0 && metric_uid != null) { 290 final ArrayList<Deferred<List<byte[]>>> deferreds = 291 new ArrayList<Deferred<List<byte[]>>>(Const.SALT_BUCKETS()); 292 for (int i = 0; i < Const.SALT_BUCKETS(); i++) { 293 deferreds.add(new ScannerCB(getScanner(i)).scan()); 294 } 295 return Deferred.group(deferreds).addCallback(new CompleteCB()); 296 } else { 297 return new ScannerCB(getScanner(0)).scan(); 298 } 299 } 300 @Override 301 public String toString() { 302 return "UID resolution callback"; 303 } 304 } 305 306 return resolveUIDs().addCallbackDeferring(new UIDCB()); 307 } 308 309 /** 310 * Resolves the metric and tag strings to their UIDs 311 * @return A deferred to wait on for resolution to complete. 312 */ resolveUIDs()313 private Deferred<Object> resolveUIDs() { 314 315 class TagsCB implements Callback<Object, ArrayList<Object>> { 316 @Override 317 public Object call(final ArrayList<Object> ignored) throws Exception { 318 rowkey_regex = getRowKeyRegex(); 319 return null; 320 } 321 } 322 323 class PairResolution implements Callback<Object, ArrayList<byte[]>> { 324 @Override 325 public Object call(final ArrayList<byte[]> tags) throws Exception { 326 if (tags.size() < 2) { 327 throw new IllegalArgumentException("Somehow we received an array " 328 + "that wasn't two bytes in size! " + tags); 329 } 330 pairs.add(new ByteArrayPair(tags.get(0), tags.get(1))); 331 return Deferred.fromResult(null); 332 } 333 } 334 335 class TagResolution implements Callback<Deferred<Object>, Object> { 336 @Override 337 public Deferred<Object> call(final Object unused) throws Exception { 338 if (query.getTags() == null || query.getTags().isEmpty()) { 339 return Deferred.fromResult(null); 340 } 341 342 pairs = Collections.synchronizedList( 343 new ArrayList<ByteArrayPair>(query.getTags().size())); 344 final ArrayList<Deferred<Object>> deferreds = 345 new ArrayList<Deferred<Object>>(pairs.size()); 346 347 for (final Pair<String, String> tags : query.getTags()) { 348 final ArrayList<Deferred<byte[]>> deferred_tags = 349 new ArrayList<Deferred<byte[]>>(2); 350 if (tags.getKey() != null && !tags.getKey().equals("*")) { 351 deferred_tags.add(tsdb.getUIDAsync(UniqueIdType.TAGK, tags.getKey())); 352 } else { 353 deferred_tags.add(Deferred.<byte[]>fromResult(null)); 354 } 355 if (tags.getValue() != null && !tags.getValue().equals("*")) { 356 deferred_tags.add(tsdb.getUIDAsync(UniqueIdType.TAGV, tags.getValue())); 357 } else { 358 deferred_tags.add(Deferred.<byte[]>fromResult(null)); 359 } 360 deferreds.add(Deferred.groupInOrder(deferred_tags) 361 .addCallback(new PairResolution())); 362 } 363 return Deferred.group(deferreds).addCallback(new TagsCB()); 364 } 365 } 366 367 class MetricCB implements Callback<Deferred<Object>, byte[]> { 368 @Override 369 public Deferred<Object> call(final byte[] uid) throws Exception { 370 metric_uid = uid; 371 LOG.debug("Found UID (" + UniqueId.uidToString(metric_uid) + 372 ") for metric (" + query.getMetric() + ")"); 373 return new TagResolution().call(null); 374 } 375 } 376 377 if (query.getMetric() != null && !query.getMetric().isEmpty() && 378 !query.getMetric().equals("*")) { 379 return tsdb.getUIDAsync(UniqueIdType.METRIC, query.getMetric()) 380 .addCallbackDeferring(new MetricCB()); 381 } else { 382 try { 383 return new TagResolution().call(null); 384 } catch (Exception e) { 385 return Deferred.fromError(e); 386 } 387 } 388 } 389 390 /** Compiles a scanner with the given salt ID if salting is enabled AND we're 391 * not scanning the meta table. 392 * @param salt An ID for the salt bucket 393 * @return A scanner to send to HBase. 394 */ getScanner(final int salt)395 private Scanner getScanner(final int salt) { 396 final Scanner scanner = tsdb.getClient().newScanner( 397 query.useMeta() ? tsdb.metaTable() : tsdb.dataTable()); 398 scanner.setFamily(query.useMeta() ? TSMeta.FAMILY : TSDB.FAMILY()); 399 400 if (metric_uid != null) { 401 byte[] key; 402 if (query.useMeta() || Const.SALT_WIDTH() < 1) { 403 key = metric_uid; 404 } else { 405 key = new byte[Const.SALT_WIDTH() + TSDB.metrics_width()]; 406 System.arraycopy(RowKey.getSaltBytes(salt), 0, key, 0, Const.SALT_WIDTH()); 407 System.arraycopy(metric_uid, 0, key, Const.SALT_WIDTH(), metric_uid.length); 408 } 409 scanner.setStartKey(key); 410 long uid = UniqueId.uidToLong(metric_uid, TSDB.metrics_width()); 411 uid++; 412 if (uid < Internal.getMaxUnsignedValueOnBytes(TSDB.metrics_width())) { 413 // if random metrics are enabled we could see a metric with the max UID 414 // value. If so, we need to leave the stop key as null 415 if (query.useMeta() || Const.SALT_WIDTH() < 1) { 416 key = UniqueId.longToUID(uid, TSDB.metrics_width()); 417 } else { 418 key = new byte[Const.SALT_WIDTH() + TSDB.metrics_width()]; 419 System.arraycopy(RowKey.getSaltBytes(salt), 0, key, 0, Const.SALT_WIDTH()); 420 System.arraycopy(UniqueId.longToUID(uid, TSDB.metrics_width()), 0, 421 key, Const.SALT_WIDTH(), metric_uid.length); 422 } 423 scanner.setStopKey(key); 424 } 425 } 426 427 if (rowkey_regex != null) { 428 scanner.setKeyRegexp(rowkey_regex, CHARSET); 429 if (LOG.isDebugEnabled()) { 430 LOG.debug("Scanner regex: " + QueryUtil.byteRegexToString(rowkey_regex)); 431 } 432 } 433 434 return scanner; 435 } 436 437 /** 438 * Constructs a row key regular expression to pass to HBase if the user gave 439 * some tags in the query 440 * @return The regular expression to use. 441 */ getRowKeyRegex()442 private String getRowKeyRegex() { 443 final StringBuilder tagv_buffer = new StringBuilder(); 444 // remember, tagks are sorted in the row key so we need to supply a sorted 445 // regex or matching will fail. 446 Collections.sort(pairs); 447 448 final short name_width = TSDB.tagk_width(); 449 final short value_width = TSDB.tagv_width(); 450 final short tagsize = (short) (name_width + value_width); 451 452 int index = 0; 453 final StringBuilder buf = new StringBuilder( 454 22 // "^.{N}" + "(?:.{M})*" + "$" + wiggle 455 + ((13 + tagsize) // "(?:.{M})*\\Q" + tagsize bytes + "\\E" 456 * (pairs.size()))); 457 buf.append("(?s)^.{").append(query.useMeta() ? TSDB.metrics_width() : 458 TSDB.metrics_width() + Const.SALT_WIDTH()) 459 .append("}"); 460 if (!query.useMeta()) { 461 buf.append("(?:.{").append(Const.TIMESTAMP_BYTES).append("})*"); 462 } 463 buf.append("(?:.{").append(tagsize).append("})*"); 464 465 // at the top of the list will be the null=tagv pairs. We want to compile 466 // a separate regex for them. 467 for (; index < pairs.size(); index++) { 468 if (pairs.get(index).getKey() != null) { 469 break; 470 } 471 472 if (index > 0) { 473 buf.append("|"); 474 } 475 buf.append("(?:.{").append(name_width).append("})"); 476 buf.append("\\Q"); 477 QueryUtil.addId(buf, pairs.get(index).getValue(), true); 478 } 479 buf.append("(?:.{").append(tagsize).append("})*") 480 .append("$"); 481 482 if (index > 0 && index < pairs.size()) { 483 // we had one or more tagvs to lookup AND we have tagk or tag pairs to 484 // filter on, so we dump the previous regex into the tagv_filter and 485 // continue on with a row key 486 tagv_buffer.append(buf.toString()); 487 LOG.debug("Setting tagv filter: " + QueryUtil.byteRegexToString(buf.toString())); 488 } else if (index >= pairs.size()) { 489 // in this case we don't have any tagks to deal with so we can just 490 // pass the previously compiled regex to the rowkey filter of the 491 // scanner 492 LOG.debug("Setting scanner row key filter with tagvs only: " + 493 QueryUtil.byteRegexToString(buf.toString())); 494 if (tagv_buffer.length() > 0) { 495 tagv_filter = tagv_buffer.toString(); 496 } 497 return buf.toString(); 498 } 499 500 // catch any left over tagk/tag pairs 501 if (index < pairs.size()){ // This condition is true whenever the first tagk in the pairs has a null value. 502 buf.setLength(0); 503 buf.append("(?s)^.{").append(query.useMeta() ? TSDB.metrics_width() : 504 TSDB.metrics_width() + Const.SALT_WIDTH()) 505 .append("}"); 506 if (!query.useMeta()) { 507 buf.append("(?:.{").append(Const.TIMESTAMP_BYTES).append("})"); 508 } 509 510 ByteArrayPair last_pair = null; 511 for (; index < pairs.size(); index++) { 512 if (last_pair != null && last_pair.getValue() == null && 513 Bytes.memcmp(last_pair.getKey(), pairs.get(index).getKey()) == 0) { 514 // tagk=null is a wildcard so we don't need to bother adding 515 // tagk=tagv pairs with the same tagk. 516 LOG.debug("Skipping pair due to wildcard: " + pairs.get(index)); 517 } else if (last_pair != null && 518 Bytes.memcmp(last_pair.getKey(), pairs.get(index).getKey()) == 0) { 519 // in this case we're ORing e.g. "host=web01|host=web02" 520 buf.append("|\\Q"); 521 QueryUtil.addId(buf, pairs.get(index).getKey(), false); 522 QueryUtil.addId(buf, pairs.get(index).getValue(), true); 523 } else { 524 if (last_pair != null) { 525 buf.append(")"); 526 } 527 // moving on to the next tagk set 528 buf.append("(?:.{").append(tagsize).append("})*"); // catch tag pairs in between 529 buf.append("(?:"); 530 if (pairs.get(index).getKey() != null && 531 pairs.get(index).getValue() != null) { 532 buf.append("\\Q"); 533 QueryUtil.addId(buf, pairs.get(index).getKey(), false); 534 QueryUtil.addId(buf, pairs.get(index).getValue(), true); 535 } else { 536 buf.append("\\Q"); 537 QueryUtil.addId(buf, pairs.get(index).getKey(), true); 538 buf.append("(?:.{").append(value_width).append("})"); 539 } 540 } 541 last_pair = pairs.get(index); 542 } 543 buf.append(")(?:.{").append(tagsize).append("})*").append("$"); 544 } 545 if (tagv_buffer.length() > 0) { 546 tagv_filter = tagv_buffer.toString(); 547 } 548 return buf.toString(); 549 } 550 551 /** @param to_stdout Whether or not to dump to standard out as we scan */ setToStdout(final boolean to_stdout)552 public void setToStdout(final boolean to_stdout) { 553 this.to_stdout = to_stdout; 554 } 555 556 @Override toString()557 public String toString() { 558 final StringBuilder buf = new StringBuilder(); 559 buf.append("query={") 560 .append(query) 561 .append("}, to_stdout=") 562 .append(to_stdout) 563 .append(", metric_uid=") 564 .append(metric_uid == null ? "null" : Arrays.toString(metric_uid)) 565 .append(", pairs=") 566 .append(pairs) 567 .append(", rowkey_regex=") 568 .append(rowkey_regex) 569 .append(", tagv_filter=") 570 .append(tagv_filter); 571 return buf.toString(); 572 573 } 574 } 575