1 // This file is part of OpenTSDB.
2 // Copyright (C) 2010-2014  The OpenTSDB Authors.
3 //
4 // This program is free software: you can redistribute it and/or modify it
5 // under the terms of the GNU Lesser General Public License as published by
6 // the Free Software Foundation, either version 2.1 of the License, or (at your
7 // option) any later version.  This program is distributed in the hope that it
8 // will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
9 // of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser
10 // General Public License for more details.  You should have received a copy
11 // of the GNU Lesser General Public License along with this program.  If not,
12 // see <http://www.gnu.org/licenses/>.
13 package net.opentsdb.search;
14 
15 import java.nio.charset.Charset;
16 import java.util.ArrayList;
17 import java.util.Arrays;
18 import java.util.Collections;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.regex.Pattern;
22 
23 import net.opentsdb.core.Const;
24 import net.opentsdb.core.Internal;
25 import net.opentsdb.core.RowKey;
26 import net.opentsdb.core.TSDB;
27 import net.opentsdb.core.Tags;
28 import net.opentsdb.meta.TSMeta;
29 import net.opentsdb.query.QueryUtil;
30 import net.opentsdb.uid.NoSuchUniqueId;
31 import net.opentsdb.uid.NoSuchUniqueName;
32 import net.opentsdb.uid.UniqueId;
33 import net.opentsdb.uid.UniqueId.UniqueIdType;
34 import net.opentsdb.utils.ByteArrayPair;
35 import net.opentsdb.utils.Exceptions;
36 import net.opentsdb.utils.Pair;
37 
38 import org.hbase.async.Bytes;
39 import org.hbase.async.KeyValue;
40 import org.hbase.async.Scanner;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43 
44 import com.stumbleupon.async.Callback;
45 import com.stumbleupon.async.Deferred;
46 import com.stumbleupon.async.DeferredGroupException;
47 
48 /**
49  * Lookup series related to a metric, tagk, tagv or any combination thereof.
50  * This class doesn't handle wild-card searching yet.
51  *
52  * When dealing with tags, we can lookup on tagks, tagvs or pairs. Thus:
53  * tagk, null  <- lookup all series with a tagk
54  * tagk, tagv  <- lookup all series with a tag pair
55  * null, tagv  <- lookup all series with a tag value somewhere
56  *
57  * The user can supply multiple tags in a query so the logic is a little goofy
58  * but here it is:
59  * - Different tagks are AND'd, e.g. given "host=web01 dc=lga" we will lookup
60  *   series that contain both of those tag pairs. Also when given "host= dc="
61  *   then we lookup series with both tag keys regardless of their values.
62  * - Tagks without a tagv will override tag pairs. E.g. "host=web01 host=" will
63  *   return all series with the "host" tagk.
64  * - Tagvs without a tagk are OR'd. Given "=lga =phx" the lookup will fetch
65  *   anything with either "lga" or "phx" as the value for a pair. When combined
66  *   with a tagk, e.g. "host=web01 =lga" then it will return any series with the
67  *   tag pair AND any tag with the "lga" value.
68  *
69  * To avoid running performance degrading regexes in HBase regions, we'll double
70  * filter when necessary. If tagks are present, those are used in the rowkey
71  * filter and a secondary filter is applied in the TSD with remaining tagvs.
72  * E.g. the query "host=web01 =lga" will issue a rowkey filter with "host=web01"
73  * then within the TSD scanner, we'll filter out only the rows that contain an
74  * "lga" tag value. We don't know where in a row key the tagv may fall, so we
75  * would have to first match on the pair, then backtrack to find the value and
76  * make sure the pair is skipped. Thus its easier on the region server to execute
77  * a simpler rowkey regex, pass all the results to the TSD, then let us filter on
78  * tag values only when necessary. (if a query only has tag values, then this is
79  * moot and we can pass them in a rowkey filter since they're OR'd).
80  *
81  * @since 2.1
82  */
83 public class TimeSeriesLookup {
84   private static final Logger LOG =
85       LoggerFactory.getLogger(TimeSeriesLookup.class);
86 
87   /** Charset used to convert Strings to byte arrays and back. */
88   private static final Charset CHARSET = Charset.forName("ISO-8859-1");
89 
90   /** The query with metrics and/or tags to use */
91   private final SearchQuery query;
92 
93   /** Whether or not to dump the output to standard out for CLI commands */
94   private boolean to_stdout;
95 
96   /** The TSD to use for lookups */
97   private final TSDB tsdb;
98 
99   /** The metric UID if given by the query, post resolution */
100   private byte[] metric_uid;
101 
102   /** Tag UID pairs if given in the query. Key or value may be null. */
103   private List<ByteArrayPair> pairs;
104 
105   /** The compiled row key regex for HBase filtering */
106   private String rowkey_regex;
107 
108   /** Post scan filtering if we have a lot of values to look at */
109   private String tagv_filter;
110 
111   /** The results to send to the caller */
112   private final List<byte[]> tsuids;
113 
114   /**
115    * Default ctor
116    * @param tsdb The TSD to which we belong
117    * @param metric A metric to match on, may be null
118    * @param tags One or more tags to match on, may be null
119    */
TimeSeriesLookup(final TSDB tsdb, final SearchQuery query)120   public TimeSeriesLookup(final TSDB tsdb, final SearchQuery query) {
121     this.tsdb = tsdb;
122     this.query = query;
123     tsuids = Collections.synchronizedList(new ArrayList<byte[]>());
124   }
125 
126   /**
127    * Lookup time series associated with the given metric, tagk, tagv or tag
128    * pairs. Either the meta table or the data table will be scanned. If no
129    * metric is given, a full table scan must be performed and this call may take
130    * a long time to complete.
131    * When dumping to stdout, if an ID can't be looked up, it will be logged and
132    * skipped.
133    * @return A list of TSUIDs matching the given lookup query.
134    * @throws NoSuchUniqueName if any of the given names fail to resolve to a
135    * UID.
136    */
lookup()137   public List<byte[]> lookup() {
138     try {
139       return lookupAsync().join();
140     } catch (InterruptedException e) {
141       LOG.error("Interrupted performing lookup", e);
142       Thread.currentThread().interrupt();
143       return null;
144     } catch (DeferredGroupException e) {
145       final Throwable ex = Exceptions.getCause(e);
146       if (ex instanceof NoSuchUniqueName) {
147         throw (NoSuchUniqueName)ex;
148       }
149       throw new RuntimeException("Unexpected exception", ex);
150     } catch (NoSuchUniqueName e) {
151       throw e;
152     } catch (Exception e) {
153       throw new RuntimeException("Unexpected exception", e);
154     }
155   }
156 
157   /**
158    * Lookup time series associated with the given metric, tagk, tagv or tag
159    * pairs. Either the meta table or the data table will be scanned. If no
160    * metric is given, a full table scan must be performed and this call may take
161    * a long time to complete.
162    * When dumping to stdout, if an ID can't be looked up, it will be logged and
163    * skipped.
164    * @return A list of TSUIDs matching the given lookup query.
165    * @throws NoSuchUniqueName if any of the given names fail to resolve to a
166    * UID.
167    * @since 2.2
168    */
lookupAsync()169   public Deferred<List<byte[]>> lookupAsync() {
170     final Pattern tagv_regex = tagv_filter != null ?
171         Pattern.compile(tagv_filter) : null;
172 
173     // we don't really know what size the UIDs will resolve to so just grab
174     // a decent amount.
175     final StringBuffer buf = to_stdout ? new StringBuffer(2048) : null;
176     final long start = System.currentTimeMillis();
177     final int limit;
178     if (query.getLimit() > 0) {
179       if (query.useMeta() || Const.SALT_WIDTH() < 1) {
180         limit = query.getLimit();
181       } else if (query.getLimit() < Const.SALT_BUCKETS()) {
182         limit = 1;
183       } else {
184         limit = query.getLimit() / Const.SALT_BUCKETS();
185       }
186     } else {
187       limit = 0;
188     }
189 
190     class ScannerCB implements Callback<Deferred<List<byte[]>>,
191       ArrayList<ArrayList<KeyValue>>> {
192       private final Scanner scanner;
193       // used to avoid dupes when scanning the data table
194       private byte[] last_tsuid = null;
195       private int rows_read;
196 
197       ScannerCB(final Scanner scanner) {
198         this.scanner = scanner;
199       }
200 
201       Deferred<List<byte[]>> scan() {
202         return scanner.nextRows().addCallbackDeferring(this);
203       }
204 
205       @Override
206       public Deferred<List<byte[]>> call(final ArrayList<ArrayList<KeyValue>> rows)
207           throws Exception {
208         if (rows == null) {
209           scanner.close();
210           if (query.useMeta() || Const.SALT_WIDTH() < 1) {
211             LOG.debug("Lookup query matched " + tsuids.size() + " time series in " +
212                 (System.currentTimeMillis() - start) + " ms");
213           }
214           return Deferred.fromResult(tsuids);
215         }
216 
217         for (final ArrayList<KeyValue> row : rows) {
218           if (limit > 0 && rows_read >= limit) {
219             // little recursion to close the scanner and log above.
220             return call(null);
221           }
222           final byte[] tsuid = query.useMeta() ? row.get(0).key() :
223             UniqueId.getTSUIDFromKey(row.get(0).key(), TSDB.metrics_width(),
224                 Const.TIMESTAMP_BYTES);
225 
226           // TODO - there MUST be a better way than creating a ton of temp
227           // string objects.
228           if (tagv_regex != null &&
229               !tagv_regex.matcher(new String(tsuid, CHARSET)).find()) {
230             continue;
231           }
232 
233           if (to_stdout) {
234             if (last_tsuid != null && Bytes.memcmp(last_tsuid, tsuid) == 0) {
235               continue;
236             }
237             last_tsuid = tsuid;
238 
239             try {
240               buf.append(UniqueId.uidToString(tsuid)).append(" ");
241               buf.append(RowKey.metricNameAsync(tsdb, tsuid)
242                   .joinUninterruptibly());
243               buf.append(" ");
244 
245               final List<byte[]> tag_ids = UniqueId.getTagPairsFromTSUID(tsuid);
246               final Map<String, String> resolved_tags =
247                   Tags.resolveIdsAsync(tsdb, tag_ids).joinUninterruptibly();
248               for (final Map.Entry<String, String> tag_pair :
249                   resolved_tags.entrySet()) {
250                 buf.append(tag_pair.getKey()).append("=")
251                    .append(tag_pair.getValue()).append(" ");
252               }
253             } catch (NoSuchUniqueId nsui) {
254               LOG.error("Unable to resolve UID in TSUID (" +
255                   UniqueId.uidToString(tsuid) + ") " + nsui.getMessage());
256             }
257             buf.setLength(0); // reset the buffer so we can re-use it
258           } else {
259             tsuids.add(tsuid);
260           }
261           ++rows_read;
262         }
263 
264         return scan();
265       }
266 
267       @Override
268       public String toString() {
269         return "Scanner callback";
270       }
271     }
272 
273     class CompleteCB implements Callback<List<byte[]>, ArrayList<List<byte[]>>> {
274       @Override
275       public List<byte[]> call(final ArrayList<List<byte[]>> unused) throws Exception {
276         LOG.debug("Lookup query matched " + tsuids.size() + " time series in " +
277             (System.currentTimeMillis() - start) + " ms");
278         return tsuids;
279       }
280       @Override
281       public String toString() {
282         return "Final async lookup callback";
283       }
284     }
285 
286     class UIDCB implements Callback<Deferred<List<byte[]>>, Object> {
287       @Override
288       public Deferred<List<byte[]>> call(Object arg0) throws Exception {
289         if (!query.useMeta() && Const.SALT_WIDTH() > 0 && metric_uid != null) {
290           final ArrayList<Deferred<List<byte[]>>> deferreds =
291               new ArrayList<Deferred<List<byte[]>>>(Const.SALT_BUCKETS());
292           for (int i = 0; i < Const.SALT_BUCKETS(); i++) {
293             deferreds.add(new ScannerCB(getScanner(i)).scan());
294           }
295           return Deferred.group(deferreds).addCallback(new CompleteCB());
296         } else {
297           return new ScannerCB(getScanner(0)).scan();
298         }
299       }
300       @Override
301       public String toString() {
302         return "UID resolution callback";
303       }
304     }
305 
306     return resolveUIDs().addCallbackDeferring(new UIDCB());
307   }
308 
309   /**
310    * Resolves the metric and tag strings to their UIDs
311    * @return A deferred to wait on for resolution to complete.
312    */
resolveUIDs()313   private Deferred<Object> resolveUIDs() {
314 
315     class TagsCB implements Callback<Object, ArrayList<Object>> {
316       @Override
317       public Object call(final ArrayList<Object> ignored) throws Exception {
318         rowkey_regex = getRowKeyRegex();
319         return null;
320       }
321     }
322 
323     class PairResolution implements Callback<Object, ArrayList<byte[]>> {
324       @Override
325       public Object call(final ArrayList<byte[]> tags) throws Exception {
326         if (tags.size() < 2) {
327           throw new IllegalArgumentException("Somehow we received an array "
328               + "that wasn't two bytes in size! " + tags);
329         }
330         pairs.add(new ByteArrayPair(tags.get(0), tags.get(1)));
331         return Deferred.fromResult(null);
332       }
333     }
334 
335     class TagResolution implements Callback<Deferred<Object>, Object> {
336       @Override
337       public Deferred<Object> call(final Object unused) throws Exception {
338         if (query.getTags() == null || query.getTags().isEmpty()) {
339           return Deferred.fromResult(null);
340         }
341 
342         pairs = Collections.synchronizedList(
343             new ArrayList<ByteArrayPair>(query.getTags().size()));
344         final ArrayList<Deferred<Object>> deferreds =
345             new ArrayList<Deferred<Object>>(pairs.size());
346 
347         for (final Pair<String, String> tags : query.getTags()) {
348           final ArrayList<Deferred<byte[]>> deferred_tags =
349               new ArrayList<Deferred<byte[]>>(2);
350           if (tags.getKey() != null && !tags.getKey().equals("*")) {
351             deferred_tags.add(tsdb.getUIDAsync(UniqueIdType.TAGK, tags.getKey()));
352           } else {
353             deferred_tags.add(Deferred.<byte[]>fromResult(null));
354           }
355           if (tags.getValue() != null && !tags.getValue().equals("*")) {
356             deferred_tags.add(tsdb.getUIDAsync(UniqueIdType.TAGV, tags.getValue()));
357           } else {
358             deferred_tags.add(Deferred.<byte[]>fromResult(null));
359           }
360           deferreds.add(Deferred.groupInOrder(deferred_tags)
361               .addCallback(new PairResolution()));
362         }
363         return Deferred.group(deferreds).addCallback(new TagsCB());
364       }
365     }
366 
367     class MetricCB implements Callback<Deferred<Object>, byte[]> {
368       @Override
369       public Deferred<Object> call(final byte[] uid) throws Exception {
370         metric_uid = uid;
371         LOG.debug("Found UID (" + UniqueId.uidToString(metric_uid) +
372             ") for metric (" + query.getMetric() + ")");
373         return new TagResolution().call(null);
374       }
375     }
376 
377     if (query.getMetric() != null && !query.getMetric().isEmpty() &&
378         !query.getMetric().equals("*")) {
379       return tsdb.getUIDAsync(UniqueIdType.METRIC, query.getMetric())
380           .addCallbackDeferring(new MetricCB());
381     } else {
382       try {
383         return new TagResolution().call(null);
384       } catch (Exception e) {
385         return Deferred.fromError(e);
386       }
387     }
388   }
389 
390   /** Compiles a scanner with the given salt ID if salting is enabled AND we're
391    * not scanning the meta table.
392    * @param salt An ID for the salt bucket
393    * @return A scanner to send to HBase.
394    */
getScanner(final int salt)395   private Scanner getScanner(final int salt) {
396     final Scanner scanner = tsdb.getClient().newScanner(
397         query.useMeta() ? tsdb.metaTable() : tsdb.dataTable());
398     scanner.setFamily(query.useMeta() ? TSMeta.FAMILY : TSDB.FAMILY());
399 
400     if (metric_uid != null) {
401       byte[] key;
402       if (query.useMeta() || Const.SALT_WIDTH() < 1) {
403         key = metric_uid;
404       } else {
405         key = new byte[Const.SALT_WIDTH() + TSDB.metrics_width()];
406         System.arraycopy(RowKey.getSaltBytes(salt), 0, key, 0, Const.SALT_WIDTH());
407         System.arraycopy(metric_uid, 0, key, Const.SALT_WIDTH(), metric_uid.length);
408       }
409       scanner.setStartKey(key);
410       long uid = UniqueId.uidToLong(metric_uid, TSDB.metrics_width());
411       uid++;
412       if (uid < Internal.getMaxUnsignedValueOnBytes(TSDB.metrics_width())) {
413         // if random metrics are enabled we could see a metric with the max UID
414         // value. If so, we need to leave the stop key as null
415         if (query.useMeta() || Const.SALT_WIDTH() < 1) {
416           key = UniqueId.longToUID(uid, TSDB.metrics_width());
417         } else {
418           key = new byte[Const.SALT_WIDTH() + TSDB.metrics_width()];
419           System.arraycopy(RowKey.getSaltBytes(salt), 0, key, 0, Const.SALT_WIDTH());
420           System.arraycopy(UniqueId.longToUID(uid, TSDB.metrics_width()), 0,
421               key, Const.SALT_WIDTH(), metric_uid.length);
422         }
423         scanner.setStopKey(key);
424       }
425     }
426 
427     if (rowkey_regex != null) {
428       scanner.setKeyRegexp(rowkey_regex, CHARSET);
429       if (LOG.isDebugEnabled()) {
430         LOG.debug("Scanner regex: " + QueryUtil.byteRegexToString(rowkey_regex));
431       }
432     }
433 
434     return scanner;
435   }
436 
437   /**
438    * Constructs a row key regular expression to pass to HBase if the user gave
439    * some tags in the query
440    * @return The regular expression to use.
441    */
getRowKeyRegex()442   private String getRowKeyRegex() {
443     final StringBuilder tagv_buffer = new StringBuilder();
444     // remember, tagks are sorted in the row key so we need to supply a sorted
445     // regex or matching will fail.
446     Collections.sort(pairs);
447 
448     final short name_width = TSDB.tagk_width();
449     final short value_width = TSDB.tagv_width();
450     final short tagsize = (short) (name_width + value_width);
451 
452     int index = 0;
453     final StringBuilder buf = new StringBuilder(
454         22  // "^.{N}" + "(?:.{M})*" + "$" + wiggle
455         + ((13 + tagsize) // "(?:.{M})*\\Q" + tagsize bytes + "\\E"
456            * (pairs.size())));
457     buf.append("(?s)^.{").append(query.useMeta() ? TSDB.metrics_width() :
458       TSDB.metrics_width() + Const.SALT_WIDTH())
459       .append("}");
460     if (!query.useMeta()) {
461       buf.append("(?:.{").append(Const.TIMESTAMP_BYTES).append("})*");
462     }
463     buf.append("(?:.{").append(tagsize).append("})*");
464 
465     // at the top of the list will be the null=tagv pairs. We want to compile
466     // a separate regex for them.
467     for (; index < pairs.size(); index++) {
468       if (pairs.get(index).getKey() != null) {
469         break;
470       }
471 
472       if (index > 0) {
473         buf.append("|");
474       }
475       buf.append("(?:.{").append(name_width).append("})");
476       buf.append("\\Q");
477       QueryUtil.addId(buf, pairs.get(index).getValue(), true);
478     }
479     buf.append("(?:.{").append(tagsize).append("})*")
480        .append("$");
481 
482     if (index > 0 && index < pairs.size()) {
483       // we had one or more tagvs to lookup AND we have tagk or tag pairs to
484       // filter on, so we dump the previous regex into the tagv_filter and
485       // continue on with a row key
486       tagv_buffer.append(buf.toString());
487       LOG.debug("Setting tagv filter: " + QueryUtil.byteRegexToString(buf.toString()));
488     } else if (index >= pairs.size()) {
489       // in this case we don't have any tagks to deal with so we can just
490       // pass the previously compiled regex to the rowkey filter of the
491       // scanner
492       LOG.debug("Setting scanner row key filter with tagvs only: " +
493           QueryUtil.byteRegexToString(buf.toString()));
494       if (tagv_buffer.length() > 0) {
495         tagv_filter = tagv_buffer.toString();
496       }
497       return buf.toString();
498     }
499 
500     // catch any left over tagk/tag pairs
501     if (index < pairs.size()){ // This condition is true whenever the first tagk in the pairs has a null value.
502       buf.setLength(0);
503       buf.append("(?s)^.{").append(query.useMeta() ? TSDB.metrics_width() :
504         TSDB.metrics_width() + Const.SALT_WIDTH())
505          .append("}");
506       if (!query.useMeta()) {
507 	buf.append("(?:.{").append(Const.TIMESTAMP_BYTES).append("})");
508       }
509 
510       ByteArrayPair last_pair = null;
511       for (; index < pairs.size(); index++) {
512         if (last_pair != null && last_pair.getValue() == null &&
513             Bytes.memcmp(last_pair.getKey(), pairs.get(index).getKey()) == 0) {
514           // tagk=null is a wildcard so we don't need to bother adding
515           // tagk=tagv pairs with the same tagk.
516           LOG.debug("Skipping pair due to wildcard: " + pairs.get(index));
517         } else if (last_pair != null &&
518             Bytes.memcmp(last_pair.getKey(), pairs.get(index).getKey()) == 0) {
519           // in this case we're ORing e.g. "host=web01|host=web02"
520           buf.append("|\\Q");
521           QueryUtil.addId(buf, pairs.get(index).getKey(), false);
522           QueryUtil.addId(buf, pairs.get(index).getValue(), true);
523         } else {
524           if (last_pair != null) {
525             buf.append(")");
526           }
527           // moving on to the next tagk set
528 	  buf.append("(?:.{").append(tagsize).append("})*"); // catch tag pairs in between
529           buf.append("(?:");
530           if (pairs.get(index).getKey() != null &&
531               pairs.get(index).getValue() != null) {
532             buf.append("\\Q");
533             QueryUtil.addId(buf, pairs.get(index).getKey(), false);
534             QueryUtil.addId(buf, pairs.get(index).getValue(), true);
535           } else {
536             buf.append("\\Q");
537             QueryUtil.addId(buf, pairs.get(index).getKey(), true);
538 	    buf.append("(?:.{").append(value_width).append("})");
539           }
540         }
541         last_pair = pairs.get(index);
542       }
543       buf.append(")(?:.{").append(tagsize).append("})*").append("$");
544     }
545     if (tagv_buffer.length() > 0) {
546       tagv_filter = tagv_buffer.toString();
547     }
548     return buf.toString();
549   }
550 
551   /** @param to_stdout Whether or not to dump to standard out as we scan */
setToStdout(final boolean to_stdout)552   public void setToStdout(final boolean to_stdout) {
553     this.to_stdout = to_stdout;
554   }
555 
556   @Override
toString()557   public String toString() {
558     final StringBuilder buf = new StringBuilder();
559     buf.append("query={")
560        .append(query)
561        .append("}, to_stdout=")
562        .append(to_stdout)
563        .append(", metric_uid=")
564        .append(metric_uid == null ? "null" : Arrays.toString(metric_uid))
565        .append(", pairs=")
566        .append(pairs)
567        .append(", rowkey_regex=")
568        .append(rowkey_regex)
569        .append(", tagv_filter=")
570        .append(tagv_filter);
571     return buf.toString();
572 
573   }
574 }
575