1 // This file is part of OpenTSDB.
2 // Copyright (C) 2015  The OpenTSDB Authors.
3 //
4 // This program is free software: you can redistribute it and/or modify it
5 // under the terms of the GNU Lesser General Public License as published by
6 // the Free Software Foundation, either version 2.1 of the License, or (at your
7 // option) any later version.  This program is distributed in the hope that it
8 // will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
9 // of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser
10 // General Public License for more details.  You should have received a copy
11 // of the GNU Lesser General Public License along with this program.  If not,
12 // see <http://www.gnu.org/licenses/>.
13 package net.opentsdb.query.expression;
14 
15 import java.util.HashMap;
16 import java.util.Iterator;
17 import java.util.Map;
18 import java.util.Map.Entry;
19 
20 import net.opentsdb.core.FillPolicy;
21 import net.opentsdb.core.IllegalDataException;
22 import net.opentsdb.core.TSDB;
23 import net.opentsdb.utils.ByteSet;
24 
25 import org.hbase.async.HBaseClient;
26 import org.hbase.async.Bytes.ByteMap;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29 
30 import sun.reflect.generics.reflectiveObjects.NotImplementedException;
31 
32 /**
33  * An iterator that computes the union of all series in the result sets. This
34  * means we match every series with it's corresponding series in the other sets.
35  * If one or more set lacks the matching series, then a {@code null} is stored
36  * and when the caller iterates over the results, the need to detect the null
37  * and substitute a fill value.
38  * @since 2.3
39  */
40 public class UnionIterator implements ITimeSyncedIterator, VariableIterator {
41   private static final Logger LOG = LoggerFactory.getLogger(UnionIterator.class);
42 
43   /** The queries compiled and fetched from storage */
44   private final Map<String, ITimeSyncedIterator> queries;
45 
46   /** A list of the current values for each series post intersection */
47   private final Map<String, ExpressionDataPoint[]> current_values;
48 
49   /** A map used for single series iteration where the array is the index */
50   private final Map<String, int[]> single_series_matrix;
51 
52   /** A map of the sub query index to their names for intersection computation */
53   private final String[] index_to_names;
54 
55   /** Whether or not to intersect on the query tagks instead of the result set
56    * tagks */
57   private final boolean union_on_query_tagks;
58 
59   /** Whether or not to include the aggregated tags in the result set */
60   private final boolean include_agg_tags;
61 
62   /** The start/current timestamp for the iterator in ms */
63   private long timestamp;
64 
65   /** Post intersection number of time series */
66   private int series_size;
67 
68   /** The ID of this iterator */
69   private final String id;
70 
71   /** The index of this iterator in a list of iterators */
72   private int index;
73 
74   /** The fill policy to use when a series is missing from one of the sets.
75    * Default is zero. */
76   private NumericFillPolicy fill_policy;
77 
78   /** A data point used for filling missing time series */
79   private ExpressionDataPoint fill_dp;
80 
81   /**
82    * Default ctor
83    * @param id The variable ID for this iterator
84    * @param results Upstream iterators
85    * @param union_on_query_tagks Whether or not to flatten and join on only
86    * the tags from the query or those returned in the results.
87    * @param include_agg_tags Whether or not to include the flattened aggregated
88    * tag keys in the join.
89    */
UnionIterator(final String id, final Map<String, ITimeSyncedIterator> results, final boolean union_on_query_tagks, final boolean include_agg_tags)90   public UnionIterator(final String id, final Map<String, ITimeSyncedIterator> results,
91       final boolean union_on_query_tagks, final boolean include_agg_tags) {
92     this.id = id;
93     this.union_on_query_tagks = union_on_query_tagks;
94     this.include_agg_tags = include_agg_tags;
95     timestamp = Long.MAX_VALUE;
96     queries = new HashMap<String, ITimeSyncedIterator>(results.size());
97     current_values = new HashMap<String, ExpressionDataPoint[]>(results.size());
98     single_series_matrix = new HashMap<String, int[]>(results.size());
99     index_to_names = new String[results.size()];
100     fill_policy = new NumericFillPolicy(FillPolicy.ZERO);
101     fill_dp = new ExpressionDataPoint();
102 
103     int i = 0;
104     for (final Map.Entry<String, ITimeSyncedIterator> entry : results.entrySet()) {
105       if (LOG.isDebugEnabled()) {
106         LOG.debug("Adding iterator " + entry.getValue());
107       }
108       queries.put(entry.getKey(), entry.getValue());
109       entry.getValue().setIndex(i);
110       index_to_names[i] = entry.getKey();
111       ++i;
112     }
113 
114     computeUnion();
115 
116     // calculate the starting timestamp from the various iterators
117     for (final ITimeSyncedIterator it : queries.values()) {
118       final long ts = it.nextTimestamp();
119       if (ts < timestamp) {
120         timestamp = ts;
121       }
122     }
123 
124     if (LOG.isDebugEnabled()) {
125       LOG.debug("Computed union: " + this);
126     }
127   }
128 
129   /**
130    * Private copy constructor that copies references and sets up new collections
131    * without copying results.
132    * @param iterator The iterator to copy from.
133    */
UnionIterator(final UnionIterator iterator)134   private UnionIterator(final UnionIterator iterator) {
135     id = iterator.id;
136     union_on_query_tagks = iterator.union_on_query_tagks;
137     include_agg_tags = iterator.include_agg_tags;
138     timestamp = Long.MAX_VALUE;
139     queries = new HashMap<String, ITimeSyncedIterator>(iterator.queries.size());
140     current_values = new HashMap<String, ExpressionDataPoint[]>(queries.size());
141     single_series_matrix = new HashMap<String, int[]>(queries.size());
142     index_to_names = new String[queries.size()];
143     fill_policy = iterator.fill_policy;
144 
145     int i = 0;
146     for (final Map.Entry<String, ITimeSyncedIterator> entry : iterator.queries.entrySet()) {
147       if (LOG.isDebugEnabled()) {
148         LOG.debug("Adding iterator " + entry.getValue());
149       }
150       queries.put(entry.getKey(), entry.getValue());
151       entry.getValue().setIndex(i);
152       index_to_names[i] = entry.getKey();
153       ++i;
154     }
155 
156     computeUnion();
157 
158     // calculate the starting timestamp from the various iterators
159     for (final ITimeSyncedIterator it : queries.values()) {
160       final long ts = it.nextTimestamp();
161       if (ts < timestamp) {
162         timestamp = ts;
163       }
164     }
165   }
166 
167   /**
168    * Computes the union of all sets, matching on tags and optionally the
169    * aggregated tags across each variable.
170    */
computeUnion()171   private void computeUnion() {
172     // key = flattened tags, array of queries.size()
173     final ByteMap<ExpressionDataPoint[]> ordered_union =
174         new ByteMap<ExpressionDataPoint[]>();
175 
176     final Iterator<ITimeSyncedIterator> it = queries.values().iterator();
177     while (it.hasNext()) {
178       final ITimeSyncedIterator sub = it.next();
179       final ExpressionDataPoint[] dps = sub.values();
180       final ByteMap<Integer> local_tags = new ByteMap<Integer>();
181 
182       for (int i = 0; i < sub.size(); i++) {
183         final byte[] key = flattenTags(union_on_query_tagks, include_agg_tags,
184             dps[i], sub);
185         local_tags.put(key, i);
186         ExpressionDataPoint[] udps = ordered_union.get(key);
187         if (udps == null) {
188           udps = new ExpressionDataPoint[queries.size()];
189           ordered_union.put(key, udps);
190         }
191         udps[sub.getIndex()] = dps[i];
192       }
193     }
194 
195     if (ordered_union.size() < 1) {
196       // if no data, just stop here
197       return;
198     }
199 
200     setCurrentAndMeta(ordered_union);
201   }
202 
203   /**
204    * Takes the resulting union and builds the {@link #current_values}
205    * and {@link #meta} maps.
206    * @param ordered_union The union to build from.
207    */
setCurrentAndMeta(final ByteMap<ExpressionDataPoint[]> ordered_union)208   private void setCurrentAndMeta(final ByteMap<ExpressionDataPoint[]>
209       ordered_union) {
210     for (final String id : queries.keySet()) {
211       current_values.put(id, new ExpressionDataPoint[ordered_union.size()]);
212       // TODO - blech. Fill with a sentinel value to reflect "no data here!"
213       final int[] m = new int[ordered_union.size()];
214       for (int i = 0; i < m.length; i++) {
215         m[i] = -1;
216       }
217       single_series_matrix.put(id, m);
218     }
219 
220     int i = 0;
221     for (final Entry<byte[], ExpressionDataPoint[]> entry : ordered_union.entrySet()) {
222       final ExpressionDataPoint[] idps = entry.getValue();
223       for (int x = 0; x < idps.length; x++) {
224         final ExpressionDataPoint[] current_dps =
225             current_values.get(index_to_names[x]);
226         current_dps[i] = idps[x];
227         final int[] m = single_series_matrix.get(index_to_names[x]);
228         if (idps[x] != null) {
229           m[i] = idps[x].getIndex();
230         }
231       }
232       ++i;
233     }
234 
235     // set fills on nulls
236     for (final ExpressionDataPoint[] idps : current_values.values()) {
237       for (i = 0; i < idps.length; i++) {
238         if (idps[i] == null) {
239           idps[i] = fill_dp;
240         }
241       }
242     }
243     series_size = ordered_union.size();
244   }
245 
246   /**
247    * Creates a key based on the concatenation of the tag pairs then the agg
248    * tag keys.
249    * @param use_query_tags Whether or not to include tags returned with the
250    * results or just use those group by'd in the query
251    * @param include_agg_tags Whether or not to include the aggregated tags in
252    * the identifier
253    * @param dp The current expression data point
254    * @param sub The sub query iterator
255    * @return A byte array with the flattened tag keys and values. Note that
256    * if the tags set is empty, this may return an empty array (but not a null
257    * array)
258    */
flattenTags(final boolean use_query_tags, final boolean include_agg_tags, final ExpressionDataPoint dp, final ITimeSyncedIterator sub)259   static byte[] flattenTags(final boolean use_query_tags,
260       final boolean include_agg_tags, final ExpressionDataPoint dp,
261       final ITimeSyncedIterator sub) {
262     if (dp.tags() == null || dp.tags().isEmpty()) {
263       return HBaseClient.EMPTY_ARRAY;
264     }
265     final int tagk_width = TSDB.tagk_width();
266     final int tagv_width = TSDB.tagv_width();
267 
268     final ByteSet query_tagks;
269     // NOTE: We MAY need the agg tags but I'm not sure yet
270     final int tag_size;
271     if (use_query_tags) {
272       int i = 0;
273       if (sub.getQueryTagKs() != null && !sub.getQueryTagKs().isEmpty()) {
274         query_tagks = sub.getQueryTagKs();
275         for (final Map.Entry<byte[], byte[]> pair : dp.tags().entrySet()) {
276           if (query_tagks.contains(pair.getKey())) {
277             i++;
278           }
279         }
280       } else {
281         query_tagks = new ByteSet();
282       }
283       tag_size = i;
284     } else {
285       query_tagks = new ByteSet();
286       tag_size = dp.tags().size();
287     }
288 
289     final int length = (tag_size * (tagk_width + tagv_width))
290         + (include_agg_tags ? (dp.aggregatedTags().size() * tagk_width) : 0);
291     final byte[] key = new byte[length];
292     int idx = 0;
293     for (final Entry<byte[], byte[]> pair : dp.tags().entrySet()) {
294       if (use_query_tags && !query_tagks.contains(pair.getKey())) {
295         continue;
296       }
297       System.arraycopy(pair.getKey(), 0, key, idx, tagk_width);
298       idx += tagk_width;
299       System.arraycopy(pair.getValue(), 0, key, idx, tagv_width);
300       idx += tagv_width;
301     }
302     if (include_agg_tags) {
303       for (final byte[] tagk : dp.aggregatedTags()) {
304         System.arraycopy(tagk, 0, key, idx, tagk_width);
305         idx += tagk_width;
306       }
307     }
308     return key;
309   }
310 
311   @Override
toString()312   public String toString() {
313     final StringBuilder buf = new StringBuilder();
314     buf.append("UnionIterator(id=")
315        .append(id)
316        .append(", useQueryTags=")
317        .append(union_on_query_tagks)
318        .append(", includeAggTags=")
319        .append(include_agg_tags)
320        .append(", index=")
321        .append(index)
322        .append(", queries=")
323        .append(queries);
324     return buf.toString();
325   }
326 
327   // Iterator implementations
328 
329   @Override
hasNext()330   public boolean hasNext() {
331     for (final ITimeSyncedIterator sub : queries.values()) {
332       if (sub.hasNext()) {
333         return true;
334       }
335     }
336     return false;
337   }
338 
339   @Override
next(long timestamp)340   public ExpressionDataPoint[] next(long timestamp) {
341     throw new NotImplementedException();
342   }
343 
344   @Override
nextTimestamp()345   public long nextTimestamp() {
346     long ts = Long.MAX_VALUE;
347     for (final ITimeSyncedIterator sub : queries.values()) {
348       if (sub != null) {
349         final long t = sub.nextTimestamp();
350         if (t < ts) {
351           ts = t;
352         }
353       }
354     }
355     return ts;
356   }
357 
358   @Override
size()359   public int size() {
360     throw new NotImplementedException();
361   }
362 
363   @Override
values()364   public ExpressionDataPoint[] values() {
365     throw new NotImplementedException();
366   }
367 
368   @Override
nullIterator(int index)369   public void nullIterator(int index) {
370     throw new NotImplementedException();
371   }
372 
373   @Override
getIndex()374   public int getIndex() {
375     return index;
376   }
377 
378   @Override
setIndex(int index)379   public void setIndex(int index) {
380     this.index = index;
381   }
382 
383   @Override
getId()384   public String getId() {
385     return id;
386   }
387 
388   @Override
getQueryTagKs()389   public ByteSet getQueryTagKs() {
390     throw new NotImplementedException();
391   }
392 
393   @Override
setFillPolicy(NumericFillPolicy policy)394   public void setFillPolicy(NumericFillPolicy policy) {
395     this.fill_policy = policy;
396   }
397 
398   @Override
getFillPolicy()399   public NumericFillPolicy getFillPolicy() {
400     return fill_policy;
401   }
402 
403   @Override
getCopy()404   public ITimeSyncedIterator getCopy() {
405     return new UnionIterator(this);
406   }
407 
408   @Override
next()409   public void next() {
410     if (!hasNext()) {
411       throw new IllegalDataException("No more data");
412     }
413     for (final ITimeSyncedIterator sub : queries.values()) {
414       sub.next(timestamp);
415     }
416     // reset the fill data point
417     fill_dp.reset(timestamp, fill_policy.getValue());
418     timestamp = nextTimestamp();
419   }
420 
421   @Override
getResults()422   public Map<String, ExpressionDataPoint[]> getResults() {
423     return current_values;
424   }
425 
426   @Override
getSeriesSize()427   public int getSeriesSize() {
428     return series_size;
429   }
430 
431   @Override
hasNext(int index)432   public boolean hasNext(int index) {
433     for (final Entry<String, int[]> entry : single_series_matrix.entrySet()) {
434       final int idx = entry.getValue()[index];
435       if (idx >= 0 && queries.get(entry.getKey()).hasNext(idx)) {
436         return true;
437       }
438     }
439     return false;
440   }
441 
442   @Override
next(int index)443   public void next(int index) {
444     if (!hasNext()) {
445       throw new IllegalDataException("No more data");
446     }
447     for (final Entry<String, int[]> entry : single_series_matrix.entrySet()) {
448       final int idx = entry.getValue()[index];
449       if (idx >= 0) {
450         queries.get(entry.getKey()).next(idx);
451       }
452     }
453   }
454 
455 }
456