1 // This file is part of OpenTSDB. 2 // Copyright (C) 2015 The OpenTSDB Authors. 3 // 4 // This program is free software: you can redistribute it and/or modify it 5 // under the terms of the GNU Lesser General Public License as published by 6 // the Free Software Foundation, either version 2.1 of the License, or (at your 7 // option) any later version. This program is distributed in the hope that it 8 // will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty 9 // of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser 10 // General Public License for more details. You should have received a copy 11 // of the GNU Lesser General Public License along with this program. If not, 12 // see <http://www.gnu.org/licenses/>. 13 package net.opentsdb.query.expression; 14 15 import java.util.HashMap; 16 import java.util.Iterator; 17 import java.util.Map; 18 import java.util.Map.Entry; 19 20 import net.opentsdb.core.FillPolicy; 21 import net.opentsdb.core.IllegalDataException; 22 import net.opentsdb.core.TSDB; 23 import net.opentsdb.utils.ByteSet; 24 25 import org.hbase.async.HBaseClient; 26 import org.hbase.async.Bytes.ByteMap; 27 import org.slf4j.Logger; 28 import org.slf4j.LoggerFactory; 29 30 import sun.reflect.generics.reflectiveObjects.NotImplementedException; 31 32 /** 33 * An iterator that computes the union of all series in the result sets. This 34 * means we match every series with it's corresponding series in the other sets. 35 * If one or more set lacks the matching series, then a {@code null} is stored 36 * and when the caller iterates over the results, the need to detect the null 37 * and substitute a fill value. 38 * @since 2.3 39 */ 40 public class UnionIterator implements ITimeSyncedIterator, VariableIterator { 41 private static final Logger LOG = LoggerFactory.getLogger(UnionIterator.class); 42 43 /** The queries compiled and fetched from storage */ 44 private final Map<String, ITimeSyncedIterator> queries; 45 46 /** A list of the current values for each series post intersection */ 47 private final Map<String, ExpressionDataPoint[]> current_values; 48 49 /** A map used for single series iteration where the array is the index */ 50 private final Map<String, int[]> single_series_matrix; 51 52 /** A map of the sub query index to their names for intersection computation */ 53 private final String[] index_to_names; 54 55 /** Whether or not to intersect on the query tagks instead of the result set 56 * tagks */ 57 private final boolean union_on_query_tagks; 58 59 /** Whether or not to include the aggregated tags in the result set */ 60 private final boolean include_agg_tags; 61 62 /** The start/current timestamp for the iterator in ms */ 63 private long timestamp; 64 65 /** Post intersection number of time series */ 66 private int series_size; 67 68 /** The ID of this iterator */ 69 private final String id; 70 71 /** The index of this iterator in a list of iterators */ 72 private int index; 73 74 /** The fill policy to use when a series is missing from one of the sets. 75 * Default is zero. */ 76 private NumericFillPolicy fill_policy; 77 78 /** A data point used for filling missing time series */ 79 private ExpressionDataPoint fill_dp; 80 81 /** 82 * Default ctor 83 * @param id The variable ID for this iterator 84 * @param results Upstream iterators 85 * @param union_on_query_tagks Whether or not to flatten and join on only 86 * the tags from the query or those returned in the results. 87 * @param include_agg_tags Whether or not to include the flattened aggregated 88 * tag keys in the join. 89 */ UnionIterator(final String id, final Map<String, ITimeSyncedIterator> results, final boolean union_on_query_tagks, final boolean include_agg_tags)90 public UnionIterator(final String id, final Map<String, ITimeSyncedIterator> results, 91 final boolean union_on_query_tagks, final boolean include_agg_tags) { 92 this.id = id; 93 this.union_on_query_tagks = union_on_query_tagks; 94 this.include_agg_tags = include_agg_tags; 95 timestamp = Long.MAX_VALUE; 96 queries = new HashMap<String, ITimeSyncedIterator>(results.size()); 97 current_values = new HashMap<String, ExpressionDataPoint[]>(results.size()); 98 single_series_matrix = new HashMap<String, int[]>(results.size()); 99 index_to_names = new String[results.size()]; 100 fill_policy = new NumericFillPolicy(FillPolicy.ZERO); 101 fill_dp = new ExpressionDataPoint(); 102 103 int i = 0; 104 for (final Map.Entry<String, ITimeSyncedIterator> entry : results.entrySet()) { 105 if (LOG.isDebugEnabled()) { 106 LOG.debug("Adding iterator " + entry.getValue()); 107 } 108 queries.put(entry.getKey(), entry.getValue()); 109 entry.getValue().setIndex(i); 110 index_to_names[i] = entry.getKey(); 111 ++i; 112 } 113 114 computeUnion(); 115 116 // calculate the starting timestamp from the various iterators 117 for (final ITimeSyncedIterator it : queries.values()) { 118 final long ts = it.nextTimestamp(); 119 if (ts < timestamp) { 120 timestamp = ts; 121 } 122 } 123 124 if (LOG.isDebugEnabled()) { 125 LOG.debug("Computed union: " + this); 126 } 127 } 128 129 /** 130 * Private copy constructor that copies references and sets up new collections 131 * without copying results. 132 * @param iterator The iterator to copy from. 133 */ UnionIterator(final UnionIterator iterator)134 private UnionIterator(final UnionIterator iterator) { 135 id = iterator.id; 136 union_on_query_tagks = iterator.union_on_query_tagks; 137 include_agg_tags = iterator.include_agg_tags; 138 timestamp = Long.MAX_VALUE; 139 queries = new HashMap<String, ITimeSyncedIterator>(iterator.queries.size()); 140 current_values = new HashMap<String, ExpressionDataPoint[]>(queries.size()); 141 single_series_matrix = new HashMap<String, int[]>(queries.size()); 142 index_to_names = new String[queries.size()]; 143 fill_policy = iterator.fill_policy; 144 145 int i = 0; 146 for (final Map.Entry<String, ITimeSyncedIterator> entry : iterator.queries.entrySet()) { 147 if (LOG.isDebugEnabled()) { 148 LOG.debug("Adding iterator " + entry.getValue()); 149 } 150 queries.put(entry.getKey(), entry.getValue()); 151 entry.getValue().setIndex(i); 152 index_to_names[i] = entry.getKey(); 153 ++i; 154 } 155 156 computeUnion(); 157 158 // calculate the starting timestamp from the various iterators 159 for (final ITimeSyncedIterator it : queries.values()) { 160 final long ts = it.nextTimestamp(); 161 if (ts < timestamp) { 162 timestamp = ts; 163 } 164 } 165 } 166 167 /** 168 * Computes the union of all sets, matching on tags and optionally the 169 * aggregated tags across each variable. 170 */ computeUnion()171 private void computeUnion() { 172 // key = flattened tags, array of queries.size() 173 final ByteMap<ExpressionDataPoint[]> ordered_union = 174 new ByteMap<ExpressionDataPoint[]>(); 175 176 final Iterator<ITimeSyncedIterator> it = queries.values().iterator(); 177 while (it.hasNext()) { 178 final ITimeSyncedIterator sub = it.next(); 179 final ExpressionDataPoint[] dps = sub.values(); 180 final ByteMap<Integer> local_tags = new ByteMap<Integer>(); 181 182 for (int i = 0; i < sub.size(); i++) { 183 final byte[] key = flattenTags(union_on_query_tagks, include_agg_tags, 184 dps[i], sub); 185 local_tags.put(key, i); 186 ExpressionDataPoint[] udps = ordered_union.get(key); 187 if (udps == null) { 188 udps = new ExpressionDataPoint[queries.size()]; 189 ordered_union.put(key, udps); 190 } 191 udps[sub.getIndex()] = dps[i]; 192 } 193 } 194 195 if (ordered_union.size() < 1) { 196 // if no data, just stop here 197 return; 198 } 199 200 setCurrentAndMeta(ordered_union); 201 } 202 203 /** 204 * Takes the resulting union and builds the {@link #current_values} 205 * and {@link #meta} maps. 206 * @param ordered_union The union to build from. 207 */ setCurrentAndMeta(final ByteMap<ExpressionDataPoint[]> ordered_union)208 private void setCurrentAndMeta(final ByteMap<ExpressionDataPoint[]> 209 ordered_union) { 210 for (final String id : queries.keySet()) { 211 current_values.put(id, new ExpressionDataPoint[ordered_union.size()]); 212 // TODO - blech. Fill with a sentinel value to reflect "no data here!" 213 final int[] m = new int[ordered_union.size()]; 214 for (int i = 0; i < m.length; i++) { 215 m[i] = -1; 216 } 217 single_series_matrix.put(id, m); 218 } 219 220 int i = 0; 221 for (final Entry<byte[], ExpressionDataPoint[]> entry : ordered_union.entrySet()) { 222 final ExpressionDataPoint[] idps = entry.getValue(); 223 for (int x = 0; x < idps.length; x++) { 224 final ExpressionDataPoint[] current_dps = 225 current_values.get(index_to_names[x]); 226 current_dps[i] = idps[x]; 227 final int[] m = single_series_matrix.get(index_to_names[x]); 228 if (idps[x] != null) { 229 m[i] = idps[x].getIndex(); 230 } 231 } 232 ++i; 233 } 234 235 // set fills on nulls 236 for (final ExpressionDataPoint[] idps : current_values.values()) { 237 for (i = 0; i < idps.length; i++) { 238 if (idps[i] == null) { 239 idps[i] = fill_dp; 240 } 241 } 242 } 243 series_size = ordered_union.size(); 244 } 245 246 /** 247 * Creates a key based on the concatenation of the tag pairs then the agg 248 * tag keys. 249 * @param use_query_tags Whether or not to include tags returned with the 250 * results or just use those group by'd in the query 251 * @param include_agg_tags Whether or not to include the aggregated tags in 252 * the identifier 253 * @param dp The current expression data point 254 * @param sub The sub query iterator 255 * @return A byte array with the flattened tag keys and values. Note that 256 * if the tags set is empty, this may return an empty array (but not a null 257 * array) 258 */ flattenTags(final boolean use_query_tags, final boolean include_agg_tags, final ExpressionDataPoint dp, final ITimeSyncedIterator sub)259 static byte[] flattenTags(final boolean use_query_tags, 260 final boolean include_agg_tags, final ExpressionDataPoint dp, 261 final ITimeSyncedIterator sub) { 262 if (dp.tags() == null || dp.tags().isEmpty()) { 263 return HBaseClient.EMPTY_ARRAY; 264 } 265 final int tagk_width = TSDB.tagk_width(); 266 final int tagv_width = TSDB.tagv_width(); 267 268 final ByteSet query_tagks; 269 // NOTE: We MAY need the agg tags but I'm not sure yet 270 final int tag_size; 271 if (use_query_tags) { 272 int i = 0; 273 if (sub.getQueryTagKs() != null && !sub.getQueryTagKs().isEmpty()) { 274 query_tagks = sub.getQueryTagKs(); 275 for (final Map.Entry<byte[], byte[]> pair : dp.tags().entrySet()) { 276 if (query_tagks.contains(pair.getKey())) { 277 i++; 278 } 279 } 280 } else { 281 query_tagks = new ByteSet(); 282 } 283 tag_size = i; 284 } else { 285 query_tagks = new ByteSet(); 286 tag_size = dp.tags().size(); 287 } 288 289 final int length = (tag_size * (tagk_width + tagv_width)) 290 + (include_agg_tags ? (dp.aggregatedTags().size() * tagk_width) : 0); 291 final byte[] key = new byte[length]; 292 int idx = 0; 293 for (final Entry<byte[], byte[]> pair : dp.tags().entrySet()) { 294 if (use_query_tags && !query_tagks.contains(pair.getKey())) { 295 continue; 296 } 297 System.arraycopy(pair.getKey(), 0, key, idx, tagk_width); 298 idx += tagk_width; 299 System.arraycopy(pair.getValue(), 0, key, idx, tagv_width); 300 idx += tagv_width; 301 } 302 if (include_agg_tags) { 303 for (final byte[] tagk : dp.aggregatedTags()) { 304 System.arraycopy(tagk, 0, key, idx, tagk_width); 305 idx += tagk_width; 306 } 307 } 308 return key; 309 } 310 311 @Override toString()312 public String toString() { 313 final StringBuilder buf = new StringBuilder(); 314 buf.append("UnionIterator(id=") 315 .append(id) 316 .append(", useQueryTags=") 317 .append(union_on_query_tagks) 318 .append(", includeAggTags=") 319 .append(include_agg_tags) 320 .append(", index=") 321 .append(index) 322 .append(", queries=") 323 .append(queries); 324 return buf.toString(); 325 } 326 327 // Iterator implementations 328 329 @Override hasNext()330 public boolean hasNext() { 331 for (final ITimeSyncedIterator sub : queries.values()) { 332 if (sub.hasNext()) { 333 return true; 334 } 335 } 336 return false; 337 } 338 339 @Override next(long timestamp)340 public ExpressionDataPoint[] next(long timestamp) { 341 throw new NotImplementedException(); 342 } 343 344 @Override nextTimestamp()345 public long nextTimestamp() { 346 long ts = Long.MAX_VALUE; 347 for (final ITimeSyncedIterator sub : queries.values()) { 348 if (sub != null) { 349 final long t = sub.nextTimestamp(); 350 if (t < ts) { 351 ts = t; 352 } 353 } 354 } 355 return ts; 356 } 357 358 @Override size()359 public int size() { 360 throw new NotImplementedException(); 361 } 362 363 @Override values()364 public ExpressionDataPoint[] values() { 365 throw new NotImplementedException(); 366 } 367 368 @Override nullIterator(int index)369 public void nullIterator(int index) { 370 throw new NotImplementedException(); 371 } 372 373 @Override getIndex()374 public int getIndex() { 375 return index; 376 } 377 378 @Override setIndex(int index)379 public void setIndex(int index) { 380 this.index = index; 381 } 382 383 @Override getId()384 public String getId() { 385 return id; 386 } 387 388 @Override getQueryTagKs()389 public ByteSet getQueryTagKs() { 390 throw new NotImplementedException(); 391 } 392 393 @Override setFillPolicy(NumericFillPolicy policy)394 public void setFillPolicy(NumericFillPolicy policy) { 395 this.fill_policy = policy; 396 } 397 398 @Override getFillPolicy()399 public NumericFillPolicy getFillPolicy() { 400 return fill_policy; 401 } 402 403 @Override getCopy()404 public ITimeSyncedIterator getCopy() { 405 return new UnionIterator(this); 406 } 407 408 @Override next()409 public void next() { 410 if (!hasNext()) { 411 throw new IllegalDataException("No more data"); 412 } 413 for (final ITimeSyncedIterator sub : queries.values()) { 414 sub.next(timestamp); 415 } 416 // reset the fill data point 417 fill_dp.reset(timestamp, fill_policy.getValue()); 418 timestamp = nextTimestamp(); 419 } 420 421 @Override getResults()422 public Map<String, ExpressionDataPoint[]> getResults() { 423 return current_values; 424 } 425 426 @Override getSeriesSize()427 public int getSeriesSize() { 428 return series_size; 429 } 430 431 @Override hasNext(int index)432 public boolean hasNext(int index) { 433 for (final Entry<String, int[]> entry : single_series_matrix.entrySet()) { 434 final int idx = entry.getValue()[index]; 435 if (idx >= 0 && queries.get(entry.getKey()).hasNext(idx)) { 436 return true; 437 } 438 } 439 return false; 440 } 441 442 @Override next(int index)443 public void next(int index) { 444 if (!hasNext()) { 445 throw new IllegalDataException("No more data"); 446 } 447 for (final Entry<String, int[]> entry : single_series_matrix.entrySet()) { 448 final int idx = entry.getValue()[index]; 449 if (idx >= 0) { 450 queries.get(entry.getKey()).next(idx); 451 } 452 } 453 } 454 455 } 456