1 // This file is part of OpenTSDB.
2 // Copyright (C) 2015  The OpenTSDB Authors.
3 //
4 // This program is free software: you can redistribute it and/or modify it
5 // under the terms of the GNU Lesser General Public License as published by
6 // the Free Software Foundation, either version 2.1 of the License, or (at your
7 // option) any later version.  This program is distributed in the hope that it
8 // will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
9 // of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser
10 // General Public License for more details.  You should have received a copy
11 // of the GNU Lesser General Public License along with this program.  If not,
12 // see <http://www.gnu.org/licenses/>.
13 package net.opentsdb.core;
14 
15 import static org.junit.Assert.assertEquals;
16 import static org.junit.Assert.assertNotNull;
17 import static org.junit.Assert.assertNull;
18 import static org.junit.Assert.assertTrue;
19 import static org.junit.Assert.fail;
20 import static org.mockito.Mockito.atLeast;
21 import static org.mockito.Mockito.never;
22 import static org.mockito.Mockito.times;
23 import static org.mockito.Mockito.verify;
24 import static org.mockito.Mockito.when;
25 
26 import java.lang.reflect.Field;
27 import java.util.ArrayList;
28 import java.util.HashMap;
29 import java.util.List;
30 import java.util.Map;
31 
32 import net.opentsdb.storage.MockBase;
33 import net.opentsdb.storage.MockBase.MockScanner;
34 import net.opentsdb.uid.NoSuchUniqueId;
35 import net.opentsdb.utils.Config;
36 
37 import org.hbase.async.Bytes;
38 import org.hbase.async.FilterList;
39 import org.hbase.async.Scanner;
40 import org.junit.Before;
41 import org.junit.Test;
42 import org.junit.runner.RunWith;
43 import org.powermock.core.classloader.annotations.PrepareForTest;
44 import org.powermock.modules.junit4.PowerMockRunner;
45 
46 import com.stumbleupon.async.Deferred;
47 
48 /**
49  * An integration test class that makes sure our query path is up to snuff.
50  * This class should have tests for different data point types, rates,
51  * compactions, etc. Other files can cover salting, aggregation and downsampling.
52  */
53 @RunWith(PowerMockRunner.class)
54 @PrepareForTest({ Scanner.class })
55 public class TestTsdbQueryQueries extends BaseTsdbTest {
56   protected TsdbQuery query = null;
57 
58   @Before
beforeLocal()59   public void beforeLocal() throws Exception {
60     query = new TsdbQuery(tsdb);
61   }
62 
63   @Test
runLongSingleTS()64   public void runLongSingleTS() throws Exception {
65     storeLongTimeSeriesSeconds(true, false);
66 
67     query.setStartTime(1356998400);
68     query.setEndTime(1357041600);
69     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false);
70 
71     final DataPoints[] dps = query.run();
72     assertMeta(dps, 0, false);
73 
74     int value = 1;
75     long timestamp = 1356998430000L;
76     verify(tag_values, times(1)).getNameAsync(TAGV_BYTES);
77     verify(tag_values, never()).getNameAsync(TAGV_B_BYTES);
78     for (DataPoint dp : dps[0]) {
79       assertEquals(value, dp.longValue());
80       assertEquals(timestamp, dp.timestamp());
81       value++;
82       timestamp += 30000;
83     }
84     assertEquals(300, dps[0].aggregatedSize());
85   }
86 
87   @Test
runLongSingleTSMs()88   public void runLongSingleTSMs() throws Exception {
89     storeLongTimeSeriesMs();
90 
91     query.setStartTime(1356998400);
92     query.setEndTime(1357041600);
93     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false);
94 
95     final DataPoints[] dps = query.run();
96     assertMeta(dps, 0, false);
97 
98     int value = 1;
99     long timestamp = 1356998400500L;
100     for (DataPoint dp : dps[0]) {
101       assertEquals(value, dp.longValue());
102       assertEquals(timestamp, dp.timestamp());
103       value++;
104       timestamp += 500;
105     }
106     assertEquals(300, dps[0].aggregatedSize());
107   }
108 
109   @Test
runLongSingleTSNoData()110   public void runLongSingleTSNoData() throws Exception {
111     setDataPointStorage();
112 
113     query.setStartTime(1356998400);
114     query.setEndTime(1357041600);
115     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false);
116 
117     final DataPoints[] dps = query.run();
118     assertNotNull(dps);
119     assertEquals(0, dps.length);
120   }
121 
122   @Test
runLongTwoAggSum()123   public void runLongTwoAggSum() throws Exception {
124     storeLongTimeSeriesSeconds(true, false);
125 
126     tags.clear();
127     query.setStartTime(1356998400L);
128     query.setEndTime(1357041600L);
129     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false);
130 
131     final DataPoints[] dps = query.run();
132     assertMeta(dps, 0, true);
133 
134     long timestamp = 1356998430000L;
135     for (DataPoint dp : dps[0]) {
136       assertEquals(301, dp.longValue());
137       assertEquals(timestamp, dp.timestamp());
138       timestamp += 30000;
139     }
140     assertEquals(300, dps[0].size());
141   }
142 
143   @Test
runLongTwoAggSumMs()144   public void runLongTwoAggSumMs() throws Exception {
145     storeLongTimeSeriesMs();
146 
147     tags.clear();
148     query.setStartTime(1356998400L);
149     query.setEndTime(1357041600L);
150     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false);
151 
152     final DataPoints[] dps = query.run();
153     assertMeta(dps, 0, true);
154 
155     long timestamp = 1356998400500L;
156     for (DataPoint dp : dps[0]) {
157       assertEquals(301, dp.longValue());
158       assertEquals(timestamp, dp.timestamp());
159       timestamp += 500;
160     }
161     assertEquals(300, dps[0].size());
162   }
163 
164   @Test
runLongTwoGroup()165   public void runLongTwoGroup() throws Exception {
166     storeLongTimeSeriesSeconds(true, false);
167 
168     tags.clear();
169     tags.put(TAGK_STRING , "*");
170     query.setStartTime(1356998400);
171     query.setEndTime(1357041600);
172     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false);
173 
174     final DataPoints[] dps = query.run();
175     assertMeta(dps, 0, false);
176     assertMeta(dps, 1, false);
177     assertEquals(2, dps.length);
178 
179     int value = 1;
180     long timestamp = 1356998430000L;
181     for (DataPoint dp : dps[0]) {
182       assertEquals(value, dp.longValue());
183       assertEquals(timestamp, dp.timestamp());
184       value++;
185       timestamp += 30000;
186     }
187     assertEquals(300, dps[0].size());
188 
189     value = 300;
190     timestamp = 1356998430000L;
191     for (DataPoint dp : dps[1]) {
192       assertEquals(value, dp.longValue());
193       assertEquals(timestamp, dp.timestamp());
194       value--;
195       timestamp += 30000;
196     }
197     assertEquals(300, dps[1].size());
198   }
199 
200   @Test
runLongSingleTSRate()201   public void runLongSingleTSRate() throws Exception {
202     storeLongTimeSeriesSeconds(true, false);
203 
204     query.setStartTime(1356998400);
205     query.setEndTime(1357041600);
206     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, true);
207 
208     final DataPoints[] dps = query.run();
209     assertMeta(dps, 0, false);
210 
211     long timestamp = 1356998460000L;
212     for (DataPoint dp : dps[0]) {
213       assertEquals(0.033F, dp.doubleValue(), 0.001);
214       assertEquals(timestamp, dp.timestamp());
215       timestamp += 30000;
216     }
217     assertEquals(299, dps[0].size());
218   }
219 
220   @Test
runLongSingleTSRateMs()221   public void runLongSingleTSRateMs() throws Exception {
222     storeLongTimeSeriesMs();
223 
224     query.setStartTime(1356998400);
225     query.setEndTime(1357041600);
226     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, true);
227 
228     final DataPoints[] dps = query.run();
229     assertMeta(dps, 0, false);
230 
231     long timestamp = 1356998401000L;
232     for (DataPoint dp : dps[0]) {
233       assertEquals(2.0F, dp.doubleValue(), 0.001);
234       assertEquals(timestamp, dp.timestamp());
235       timestamp += 500;
236     }
237     assertEquals(299, dps[0].size());
238   }
239 
240   @Test
runFloatSingleTS()241   public void runFloatSingleTS() throws Exception {
242     storeFloatTimeSeriesSeconds(true, false);
243 
244     query.setStartTime(1356998400);
245     query.setEndTime(1357041600);
246     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false);
247 
248     final DataPoints[] dps = query.run();
249     assertMeta(dps, 0, false);
250 
251     double value = 1.25D;
252     long timestamp = 1356998430000L;
253     for (DataPoint dp : dps[0]) {
254       assertEquals(value, dp.doubleValue(), 0.001);
255       assertEquals(timestamp, dp.timestamp());
256       value += 0.25D;
257       timestamp += 30000;
258     }
259     assertEquals(300, dps[0].size());
260   }
261 
262   @Test
runFloatSingleTSMs()263   public void runFloatSingleTSMs() throws Exception {
264     storeFloatTimeSeriesMs();
265 
266     query.setStartTime(1356998400);
267     query.setEndTime(1357041600);
268     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false);
269 
270     final DataPoints[] dps = query.run();
271     assertMeta(dps, 0, false);
272 
273     double value = 1.25D;
274     long timestamp = 1356998400500L;
275     for (DataPoint dp : dps[0]) {
276       assertEquals(value, dp.doubleValue(), 0.001);
277       assertEquals(timestamp, dp.timestamp());
278       value += 0.25D;
279       timestamp += 500;
280     }
281     assertEquals(300, dps[0].size());
282   }
283 
284   @Test
runFloatTwoAggSum()285   public void runFloatTwoAggSum() throws Exception {
286     storeFloatTimeSeriesSeconds(true, false);
287 
288     tags.clear();
289     query.setStartTime(1356998400);
290     query.setEndTime(1357041600);
291     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false);
292 
293     final DataPoints[] dps = query.run();
294     assertMeta(dps, 0, true);
295 
296     long timestamp = 1356998430000L;
297     for (DataPoint dp : dps[0]) {
298       assertEquals(76.25, dp.doubleValue(), 0.00001);
299       assertEquals(timestamp, dp.timestamp());
300       timestamp += 30000;
301     }
302     assertEquals(300, dps[0].size());
303   }
304 
305   @Test
runFloatTwoAggNoneAgg()306   public void runFloatTwoAggNoneAgg() throws Exception {
307     storeFloatTimeSeriesSeconds(true, false);
308 
309     tags.clear();
310     query.setStartTime(1356998400);
311     query.setEndTime(1357041600);
312     query.setTimeSeries(METRIC_STRING, tags, Aggregators.NONE, false);
313 
314     final DataPoints[] dps = query.run();
315     assertMeta(dps, 0, false);
316     assertMeta(dps, 1, false);
317     assertEquals(2, dps.length);
318 
319     double value = 1.25D;
320     long timestamp = 1356998430000L;
321     for (DataPoint dp : dps[0]) {
322       assertEquals(value, dp.doubleValue(), 0.0001);
323       assertEquals(timestamp, dp.timestamp());
324       value += 0.25D;
325       timestamp += 30000;
326     }
327     assertEquals(300, dps[0].size());
328 
329     value = 75D;
330     timestamp = 1356998430000L;
331     for (DataPoint dp : dps[1]) {
332       assertEquals(value, dp.doubleValue(), 0.0001);
333       assertEquals(timestamp, dp.timestamp());
334       value -= 0.25d;
335       timestamp += 30000;
336     }
337     assertEquals(300, dps[1].size());
338   }
339 
340   @Test
runFloatTwoAggSumMs()341   public void runFloatTwoAggSumMs() throws Exception {
342     storeFloatTimeSeriesMs();
343 
344     tags.clear();
345     query.setStartTime(1356998400);
346     query.setEndTime(1357041600);
347     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false);
348 
349     final DataPoints[] dps = query.run();
350     assertMeta(dps, 0, true);
351 
352     long timestamp = 1356998400500L;
353     for (DataPoint dp : dps[0]) {
354       assertEquals(76.25, dp.doubleValue(), 0.00001);
355       assertEquals(timestamp, dp.timestamp());
356       timestamp += 500;
357     }
358     assertEquals(300, dps[0].size());
359   }
360 
361   @Test
runFloatTwoGroup()362   public void runFloatTwoGroup() throws Exception {
363     storeFloatTimeSeriesSeconds(true, false);
364     final HashMap<String, String> tags = new HashMap<String, String>(1);
365     tags.put(TAGK_STRING , "*");
366     query.setStartTime(1356998400);
367     query.setEndTime(1357041600);
368     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false);
369 
370     final DataPoints[] dps = query.run();
371     assertMeta(dps, 0, false);
372     assertMeta(dps, 1, false);
373     assertEquals(2, dps.length);
374 
375     double value = 1.25D;
376     long timestamp = 1356998430000L;
377     for (DataPoint dp : dps[0]) {
378       assertEquals(value, dp.doubleValue(), 0.0001);
379       assertEquals(timestamp, dp.timestamp());
380       value += 0.25D;
381       timestamp += 30000;
382     }
383     assertEquals(300, dps[0].size());
384 
385     value = 75D;
386     timestamp = 1356998430000L;
387     for (DataPoint dp : dps[1]) {
388       assertEquals(value, dp.doubleValue(), 0.0001);
389       assertEquals(timestamp, dp.timestamp());
390       value -= 0.25d;
391       timestamp += 30000;
392     }
393     assertEquals(300, dps[1].size());
394   }
395 
396   @Test
runFloatSingleTSRate()397   public void runFloatSingleTSRate() throws Exception {
398     storeFloatTimeSeriesSeconds(true, false);
399 
400     query.setStartTime(1356998400);
401     query.setEndTime(1357041600);
402     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, true);
403 
404     final DataPoints[] dps = query.run();
405     assertMeta(dps, 0, false);
406 
407     long timestamp = 1356998460000L;
408     for (DataPoint dp : dps[0]) {
409       assertEquals(0.00833F, dp.doubleValue(), 0.00001);
410       assertEquals(timestamp, dp.timestamp());
411       timestamp += 30000;
412     }
413     assertEquals(299, dps[0].size());
414   }
415 
416   @Test
runFloatSingleTSRateMs()417   public void runFloatSingleTSRateMs() throws Exception {
418     storeFloatTimeSeriesMs();
419 
420     query.setStartTime(1356998400);
421     query.setEndTime(1357041600);
422     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, true);
423 
424     final DataPoints[] dps = query.run();
425     assertMeta(dps, 0, false);
426 
427     long timestamp = 1356998401000L;
428     for (DataPoint dp : dps[0]) {
429       assertEquals(0.5F, dp.doubleValue(), 0.00001);
430       assertEquals(timestamp, dp.timestamp());
431       timestamp += 500;
432     }
433     assertEquals(299, dps[0].size());
434   }
435 
436   @Test
runFloatSingleTSCompacted()437   public void runFloatSingleTSCompacted() throws Exception {
438     storeFloatTimeSeriesSeconds(true, false);
439     storage.tsdbCompactAllRows();
440 
441     query.setStartTime(1356998400);
442     query.setEndTime(1357041600);
443     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false);
444 
445     final DataPoints[] dps = query.run();
446     assertMeta(dps, 0, false);
447 
448     long timestamp = 1356998430000L;
449     double value = 1.25D;
450     for (DataPoint dp : dps[0]) {
451       assertEquals(value, dp.doubleValue(), 0.001);
452       assertEquals(timestamp, dp.timestamp());
453       value += 0.25D;
454       timestamp += 30000;
455     }
456     assertEquals(300, dps[0].size());
457   }
458 
459   @Test
runMixedSingleTS()460   public void runMixedSingleTS() throws Exception {
461     storeMixedTimeSeriesSeconds();
462 
463     query.setStartTime(1356998400);
464     query.setEndTime(1357041600);
465     query.setTimeSeries(METRIC_STRING, tags, Aggregators.AVG, false);
466 
467     final DataPoints[] dps = query.run();
468     assertMeta(dps, 0, false);
469 
470     long timestamp = 1356998430000L;
471     double float_value = 1.25D;
472     int int_value = 76;
473     // due to aggregation, the only int that will be returned will be the very
474     // last value of 76 since the agg will convert every point in between to a
475     // double
476     for (DataPoint dp : dps[0]) {
477       if (dp.isInteger()) {
478         assertEquals(int_value, dp.longValue());
479         int_value++;
480         float_value = int_value;
481       } else {
482         assertEquals(float_value, dp.doubleValue(), 0.001);
483         float_value += 0.25D;
484       }
485       assertEquals(timestamp, dp.timestamp());
486       timestamp += 30000;
487     }
488     assertEquals(300, dps[0].size());
489   }
490 
491   @Test
runMixedSingleTSMsAndS()492   public void runMixedSingleTSMsAndS() throws Exception {
493     storeMixedTimeSeriesMsAndS();
494 
495     query.setStartTime(1356998400);
496     query.setEndTime(1357041600);
497     query.setTimeSeries(METRIC_STRING, tags, Aggregators.AVG, false);
498 
499     final DataPoints[] dps = query.run();
500     assertMeta(dps, 0, false);
501 
502     long timestamp = 1356998400500L;
503     double float_value = 1.25D;
504     int int_value = 76;
505     // due to aggregation, the only int that will be returned will be the very
506     // last value of 76 since the agg will convert every point in between to a
507     // double
508     for (DataPoint dp : dps[0]) {
509       if (dp.isInteger()) {
510         assertEquals(int_value, dp.longValue());
511         int_value++;
512         float_value = int_value;
513       } else {
514         assertEquals(float_value, dp.doubleValue(), 0.001);
515         float_value += 0.25D;
516       }
517       assertEquals(timestamp, dp.timestamp());
518       timestamp += 500;
519     }
520     assertEquals(300, dps[0].size());
521   }
522 
523   @Test
runMixedSingleTSPostCompaction()524   public void runMixedSingleTSPostCompaction() throws Exception {
525     storeMixedTimeSeriesSeconds();
526 
527     final Field compact = Config.class.getDeclaredField("enable_compactions");
528     compact.setAccessible(true);
529     compact.set(config, true);
530     query.setStartTime(1356998400);
531     query.setEndTime(1357041600);
532     query.setTimeSeries(METRIC_STRING, tags, Aggregators.AVG, false);
533     assertNotNull(query.run());
534 
535     // this should only compact the rows for the time series that we fetched and
536     // leave the others alone
537 
538     final byte[] key =
539         IncomingDataPoints.rowKeyTemplate(tsdb, METRIC_STRING, tags);
540     RowKey.prefixKeyWithSalt(key);
541     System.arraycopy(Bytes.fromInt(1356998400), 0, key,
542         Const.SALT_WIDTH() + TSDB.metrics_width(), Const.TIMESTAMP_BYTES);
543     assertEquals(1, storage.numColumns(key));
544     System.arraycopy(Bytes.fromInt(1357002000), 0, key,
545         Const.SALT_WIDTH() + TSDB.metrics_width(), Const.TIMESTAMP_BYTES);
546     assertEquals(1, storage.numColumns(key));
547     System.arraycopy(Bytes.fromInt(1357005600), 0, key,
548         Const.SALT_WIDTH() + TSDB.metrics_width(), Const.TIMESTAMP_BYTES);
549     assertEquals(1, storage.numColumns(key));
550 
551     // run it again to verify the compacted data uncompacts properly
552     final DataPoints[] dps = query.run();
553     assertMeta(dps, 0, false);
554 
555     long timestamp = 1356998430000L;
556     double float_value = 1.25D;
557     int int_value = 76;
558     // due to aggregation, the only int that will be returned will be the very
559     // last value of 76 since the agg will convert every point in between to a
560     // double
561     for (DataPoint dp : dps[0]) {
562       if (dp.isInteger()) {
563         assertEquals(int_value, dp.longValue());
564         int_value++;
565         float_value = int_value;
566       } else {
567         assertEquals(float_value, dp.doubleValue(), 0.001);
568         float_value += 0.25D;
569       }
570       assertEquals(timestamp, dp.timestamp());
571       timestamp += 30000;
572     }
573     assertEquals(300, dps[0].size());
574   }
575 
576   @Test
runEndTime()577   public void runEndTime() throws Exception {
578     storeLongTimeSeriesSeconds(true, false);
579 
580     query.setStartTime(1356998400);
581     query.setEndTime(1357001900);
582     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false);
583     final DataPoints[] dps = query.run();
584     assertMeta(dps, 0, false);
585 
586     int value = 1;
587     long timestamp = 1356998430000L;
588     for (DataPoint dp : dps[0]) {
589       assertEquals(value, dp.longValue());
590       assertEquals(timestamp, dp.timestamp());
591       value++;
592       timestamp += 30000;
593     }
594     assertEquals(119, dps[0].size());
595   }
596 
597   @Test
runCompactPostQuery()598   public void runCompactPostQuery() throws Exception {
599     storeLongTimeSeriesSeconds(true, false);
600 
601     final Field compact = Config.class.getDeclaredField("enable_compactions");
602     compact.setAccessible(true);
603     compact.set(config, true);
604 
605     query.setStartTime(1356998400);
606     query.setEndTime(1357041600);
607     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false);
608     DataPoints[] dps = query.run();
609     assertMeta(dps, 0, false);
610 
611     // this should only compact the rows for the time series that we fetched and
612     // leave the others alone
613     final byte[] key_a =
614         IncomingDataPoints.rowKeyTemplate(tsdb, METRIC_STRING, tags);
615     RowKey.prefixKeyWithSalt(key_a);
616     final Map<String, String> tags_copy = new HashMap<String, String>(tags);
617     tags_copy.put(TAGK_STRING, TAGV_B_STRING);
618     final byte[] key_b =
619         IncomingDataPoints.rowKeyTemplate(tsdb, METRIC_STRING, tags_copy);
620     RowKey.prefixKeyWithSalt(key_b);
621 
622     System.arraycopy(Bytes.fromInt(1356998400), 0, key_a,
623         Const.SALT_WIDTH() + TSDB.metrics_width(), Const.TIMESTAMP_BYTES);
624     assertEquals(1, storage.numColumns(key_a));
625 
626     System.arraycopy(Bytes.fromInt(1356998400), 0, key_b,
627         Const.SALT_WIDTH() + TSDB.metrics_width(), Const.TIMESTAMP_BYTES);
628     if (config.enable_appends()) {
629       assertEquals(1, storage.numColumns(key_b));
630     } else {
631       assertEquals(119, storage.numColumns(key_b));
632     }
633 
634     System.arraycopy(Bytes.fromInt(1357002000), 0, key_a,
635         Const.SALT_WIDTH() + TSDB.metrics_width(), Const.TIMESTAMP_BYTES);
636     assertEquals(1, storage.numColumns(key_a));
637 
638     System.arraycopy(Bytes.fromInt(1357002000), 0, key_b,
639         Const.SALT_WIDTH() + TSDB.metrics_width(), Const.TIMESTAMP_BYTES);
640     if (config.enable_appends()) {
641       assertEquals(1, storage.numColumns(key_b));
642     } else {
643       assertEquals(120, storage.numColumns(key_b));
644     }
645 
646     System.arraycopy(Bytes.fromInt(1357005600), 0, key_a,
647         Const.SALT_WIDTH() + TSDB.metrics_width(), Const.TIMESTAMP_BYTES);
648     assertEquals(1, storage.numColumns(key_a));
649 
650     System.arraycopy(Bytes.fromInt(1357005600), 0, key_b,
651         Const.SALT_WIDTH() + TSDB.metrics_width(), Const.TIMESTAMP_BYTES);
652     if (config.enable_appends()) {
653       assertEquals(1, storage.numColumns(key_b));
654     } else {
655       assertEquals(61, storage.numColumns(key_b));
656     }
657 
658     // run it again to verify the compacted data uncompacts properly
659     dps = query.run();
660     assertMeta(dps, 0, false);
661 
662     int value = 1;
663     long timestamp = 1356998430000L;
664     for (DataPoint dp : dps[0]) {
665       assertEquals(value, dp.longValue());
666       assertEquals(timestamp, dp.timestamp());
667       value++;
668       timestamp += 30000;
669     }
670     assertEquals(300, dps[0].size());
671   }
672 
673   @Test (expected = IllegalStateException.class)
runStartNotSet()674   public void runStartNotSet() throws Exception {
675     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false);
676     query.run();
677   }
678 
679   @Test
runFloatAndIntSameTSNoFix()680   public void runFloatAndIntSameTSNoFix() throws Exception {
681     // if a row has an integer and a float for the same timestamp, there will be
682     // two different qualifiers that will resolve to the same offset. This no
683     // will throw the IllegalDataException as querytime fixes are disabled by
684     // default
685     storeLongTimeSeriesSeconds(true, false);
686 
687     tsdb.addPoint(METRIC_STRING, 1356998430, 42.5F, tags).joinUninterruptibly();
688     query.setStartTime(1356998400);
689     query.setEndTime(1357041600);
690     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false);
691 
692     if (config.enable_appends()) {
693       DataPoints[] dps = query.run();
694       assertMeta(dps, 0, false, false);
695 
696       int value = 1;
697       long timestamp = 1356998430000L;
698       for (DataPoint dp : dps[0]) {
699         if (value == 1) {
700           // first value was replaced in the append
701           assertEquals(42.5, dp.doubleValue(), 0.0001);
702         } else {
703           assertEquals(value, dp.longValue());
704         }
705         assertEquals(timestamp, dp.timestamp());
706         value++;
707         timestamp += 30000;
708       }
709       assertEquals(300, dps[0].size());
710     } else {
711       try {
712         query.run();
713         fail("Expected an IllegalDataException");
714       } catch (IllegalDataException ide) { }
715     }
716   }
717 
718   @Test
runFloatAndIntSameTSFix()719   public void runFloatAndIntSameTSFix() throws Exception {
720     config.setFixDuplicates(true);
721     // if a row has an integer and a float for the same timestamp, there will be
722     // two different qualifiers that will resolve to the same offset. This no
723     // longer tosses an exception, and keeps the last value
724     storeLongTimeSeriesSeconds(true, false);
725 
726     tsdb.addPoint(METRIC_STRING, 1356998430, 42.5F, tags).joinUninterruptibly();
727     query.setStartTime(1356998400);
728     query.setEndTime(1357041600);
729     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false);
730     final DataPoints[] dps = query.run();
731     assertMeta(dps, 0, false);
732 
733     int value = 1;
734     long timestamp = 1356998430000L;
735     for (DataPoint dp : dps[0]) {
736       if (value == 1) {
737         assertEquals(42.5, dp.doubleValue(), 0.001);
738       } else {
739         assertEquals(value, dp.longValue());
740       }
741       assertEquals(timestamp, dp.timestamp());
742       value++;
743       timestamp += 30000;
744     }
745     assertEquals(300, dps[0].aggregatedSize());
746   }
747 
748   @Test
runWithAnnotation()749   public void runWithAnnotation() throws Exception {
750     storeLongTimeSeriesSeconds(true, false);
751     storeAnnotation(1356998490);
752 
753     query.setStartTime(1356998400);
754     query.setEndTime(1357041600);
755     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false);
756 
757     final DataPoints[] dps = query.run();
758     assertMeta(dps, 0, false, true);
759 
760     int value = 1;
761     long timestamp = 1356998430000L;
762     for (DataPoint dp : dps[0]) {
763       assertEquals(value, dp.longValue());
764       assertEquals(timestamp, dp.timestamp());
765       value++;
766       timestamp += 30000;
767     }
768     assertEquals(300, dps[0].size());
769   }
770 
771   @Test
runWithAnnotationPostCompact()772   public void runWithAnnotationPostCompact() throws Exception {
773     storeLongTimeSeriesSeconds(true, false);
774     storeAnnotation(1356998490);
775 
776     final Field compact = Config.class.getDeclaredField("enable_compactions");
777     compact.setAccessible(true);
778     compact.set(config, true);
779 
780     query.setStartTime(1356998400);
781     query.setEndTime(1357041600);
782     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false);
783     DataPoints[] dps = query.run();
784     assertMeta(dps, 0, false, true);
785 
786     // this should only compact the rows for the time series that we fetched and
787     // leave the others alone
788     final byte[] key_a =
789         IncomingDataPoints.rowKeyTemplate(tsdb, METRIC_STRING, tags);
790     RowKey.prefixKeyWithSalt(key_a);
791     final Map<String, String> tags_copy = new HashMap<String, String>(tags);
792     tags_copy.put(TAGK_STRING, TAGV_B_STRING);
793     final byte[] key_b =
794         IncomingDataPoints.rowKeyTemplate(tsdb, METRIC_STRING, tags_copy);
795     RowKey.prefixKeyWithSalt(key_b);
796 
797     System.arraycopy(Bytes.fromInt(1356998400), 0, key_a,
798         Const.SALT_WIDTH() + TSDB.metrics_width(), Const.TIMESTAMP_BYTES);
799     assertEquals(2, storage.numColumns(key_a));
800 
801     System.arraycopy(Bytes.fromInt(1356998400), 0, key_b,
802         Const.SALT_WIDTH() + TSDB.metrics_width(), Const.TIMESTAMP_BYTES);
803     if (config.enable_appends()) {
804       assertEquals(1, storage.numColumns(key_b));
805     } else {
806       assertEquals(119, storage.numColumns(key_b));
807     }
808 
809     System.arraycopy(Bytes.fromInt(1357002000), 0, key_a,
810         Const.SALT_WIDTH() + TSDB.metrics_width(), Const.TIMESTAMP_BYTES);
811     assertEquals(1, storage.numColumns(key_a));
812 
813     System.arraycopy(Bytes.fromInt(1357002000), 0, key_b,
814         Const.SALT_WIDTH() + TSDB.metrics_width(), Const.TIMESTAMP_BYTES);
815     if (config.enable_appends()) {
816       assertEquals(1, storage.numColumns(key_b));
817     } else {
818       assertEquals(120, storage.numColumns(key_b));
819     }
820 
821     System.arraycopy(Bytes.fromInt(1357005600), 0, key_a,
822         Const.SALT_WIDTH() + TSDB.metrics_width(), Const.TIMESTAMP_BYTES);
823     assertEquals(1, storage.numColumns(key_a));
824 
825     System.arraycopy(Bytes.fromInt(1357005600), 0, key_b,
826         Const.SALT_WIDTH() + TSDB.metrics_width(), Const.TIMESTAMP_BYTES);
827     if (config.enable_appends()) {
828       assertEquals(1, storage.numColumns(key_b));
829     } else {
830       assertEquals(61, storage.numColumns(key_b));
831     }
832 
833     dps = query.run();
834     assertMeta(dps, 0, false, true);
835 
836     int value = 1;
837     long timestamp = 1356998430000L;
838     for (DataPoint dp : dps[0]) {
839       assertEquals(value, dp.longValue());
840       assertEquals(timestamp, dp.timestamp());
841       value++;
842       timestamp += 30000;
843     }
844     assertEquals(300, dps[0].size());
845   }
846 
847   @Test
runWithOnlyAnnotation()848   public void runWithOnlyAnnotation() throws Exception {
849     storeLongTimeSeriesSeconds(true, false);
850 
851     // verifies that we can pickup an annotation stored all by it's lonesome
852     // in a row without any data
853     final byte[] key =
854         IncomingDataPoints.rowKeyTemplate(tsdb, METRIC_STRING, tags);
855     RowKey.prefixKeyWithSalt(key);
856     System.arraycopy(Bytes.fromInt(1357002000), 0, key,
857         Const.SALT_WIDTH() + TSDB.metrics_width(), Const.TIMESTAMP_BYTES);
858     storage.flushRow(key);
859 
860     storeAnnotation(1357002090);
861 
862     query.setStartTime(1356998400);
863     query.setEndTime(1357041600);
864     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false);
865 
866     final DataPoints[] dps = query.run();
867     assertMeta(dps, 0, false, true);
868 
869     int value = 1;
870     long timestamp = 1356998430000L;
871     for (DataPoint dp : dps[0]) {
872       assertEquals(value, dp.longValue());
873       assertEquals(timestamp, dp.timestamp());
874       if (timestamp == 1357001970000L) {
875         timestamp = 1357005600000L;
876       } else {
877         timestamp += 30000;
878       }
879       value++;
880       // account for the jump
881       if (value == 120) {
882         value = 240;
883       }
884     }
885     assertEquals(180, dps[0].size());
886   }
887 
888   @Test
runWithSingleAnnotation()889   public void runWithSingleAnnotation() throws Exception {
890     setDataPointStorage();
891 
892     // verifies that we can pickup an annotation stored all by it's lonesome
893     // in a row without any data
894     final byte[] key =
895         IncomingDataPoints.rowKeyTemplate(tsdb, METRIC_STRING, tags);
896     RowKey.prefixKeyWithSalt(key);
897     System.arraycopy(Bytes.fromInt(1357002000), 0, key,
898         Const.SALT_WIDTH() + TSDB.metrics_width(), Const.TIMESTAMP_BYTES);
899     storage.flushRow(key);
900 
901     storeAnnotation(1357002090);
902 
903     query.setStartTime(1356998400);
904     query.setEndTime(1357041600);
905     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false);
906 
907     final DataPoints[] dps = query.run();
908     // TODO - apparently if you only fetch annotations, the metric and tags
909     // may not be set. Check this
910     //assertMeta(dps, 0, false, true);
911     assertEquals(1, dps[0].getAnnotations().size());
912     assertEquals(NOTE_DESCRIPTION, dps[0].getAnnotations().get(0)
913         .getDescription());
914     assertEquals(NOTE_NOTES, dps[0].getAnnotations().get(0).getNotes());
915     assertEquals(0, dps[0].size());
916   }
917 
918   @Test
runSingleDataPoint()919   public void runSingleDataPoint() throws Exception {
920     setDataPointStorage();
921     long timestamp = 1356998410;
922     tsdb.addPoint(METRIC_STRING, timestamp, 42, tags).joinUninterruptibly();
923 
924     query.setStartTime(1356998400);
925     query.setEndTime(1357041600);
926     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false);
927     storage.dumpToSystemOut();
928     final DataPoints[] dps = query.run();
929     assertMeta(dps, 0, false);
930 
931     assertEquals(1, dps[0].size());
932     assertEquals(42, dps[0].longValue(0));
933     assertEquals(1356998410000L, dps[0].timestamp(0));
934   }
935 
936   @Test
runSingleDataPointWithAnnotation()937   public void runSingleDataPointWithAnnotation() throws Exception {
938     setDataPointStorage();
939     long timestamp = 1356998410;
940     tsdb.addPoint(METRIC_STRING, timestamp, 42, tags).joinUninterruptibly();
941 
942     final byte[] key =
943         IncomingDataPoints.rowKeyTemplate(tsdb, METRIC_STRING, tags);
944     RowKey.prefixKeyWithSalt(key);
945     System.arraycopy(Bytes.fromInt(1357002000), 0, key,
946         Const.SALT_WIDTH() + TSDB.metrics_width(), Const.TIMESTAMP_BYTES);
947     storage.flushRow(key);
948 
949     storeAnnotation(1357002090);
950 
951     query.setStartTime(1356998400);
952     query.setEndTime(1357041600);
953     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false);
954 
955     final DataPoints[] dps = query.run();
956     assertMeta(dps, 0, false, true);
957 
958     assertEquals(1, dps[0].size());
959     assertEquals(42, dps[0].longValue(0));
960     assertEquals(1356998410000L, dps[0].timestamp(0));
961   }
962 
963   @Test
runTSUIDQuery()964   public void runTSUIDQuery() throws Exception {
965     storeLongTimeSeriesSeconds(true, false);
966 
967     query.setStartTime(1356998400);
968     query.setEndTime(1357041600);
969     final List<String> tsuids = new ArrayList<String>(1);
970     tsuids.add("000001000001000001");
971     query.setTimeSeries(tsuids, Aggregators.SUM, false);
972 
973     final DataPoints[] dps = query.run();
974     assertMeta(dps, 0, false);
975 
976     int value = 1;
977     long timestamp = 1356998430000L;
978     for (DataPoint dp : dps[0]) {
979       assertEquals(value, dp.longValue());
980       assertEquals(timestamp, dp.timestamp());
981       value++;
982       timestamp += 30000;
983     }
984     assertEquals(300, dps[0].aggregatedSize());
985   }
986 
987   @Test
runTSUIDsAggSum()988   public void runTSUIDsAggSum() throws Exception {
989     storeLongTimeSeriesSeconds(true, false);
990 
991     query.setStartTime(1356998400);
992     query.setEndTime(1357041600);
993     final List<String> tsuids = new ArrayList<String>(1);
994     tsuids.add("000001000001000001");
995     tsuids.add("000001000001000002");
996     query.setTimeSeries(tsuids, Aggregators.SUM, false);
997 
998     final DataPoints[] dps = query.run();
999     assertMeta(dps, 0, true);
1000 
1001     long timestamp = 1356998430000L;
1002     for (DataPoint dp : dps[0]) {
1003       assertEquals(301, dp.longValue());
1004       assertEquals(timestamp, dp.timestamp());
1005       timestamp += 30000;
1006     }
1007     assertEquals(300, dps[0].size());
1008   }
1009 
1010   @Test
runTSUIDQueryNoData()1011   public void runTSUIDQueryNoData() throws Exception {
1012     setDataPointStorage();
1013 
1014     query.setStartTime(1356998400);
1015     query.setEndTime(1357041600);
1016 
1017     final List<String> tsuids = new ArrayList<String>(1);
1018     tsuids.add("000001000001000001");
1019     query.setTimeSeries(tsuids, Aggregators.SUM, false);
1020 
1021     final DataPoints[] dps = query.run();
1022     assertNotNull(dps);
1023     assertEquals(0, dps.length);
1024   }
1025 
1026   @Test
runTSUIDQueryNoDataForTSUID()1027   public void runTSUIDQueryNoDataForTSUID() throws Exception {
1028     // this doesn't throw an exception since the UIDs are only looked for when
1029     // the query completes.
1030     setDataPointStorage();
1031 
1032     query.setStartTime(1356998400);
1033     query.setEndTime(1357041600);
1034     final List<String> tsuids = new ArrayList<String>(1);
1035     tsuids.add("000001000001000005");
1036     query.setTimeSeries(tsuids, Aggregators.SUM, false);
1037 
1038     final DataPoints[] dps = query.run();
1039     assertNotNull(dps);
1040     assertEquals(0, dps.length);
1041   }
1042 
1043   @Test (expected = NoSuchUniqueId.class)
runTSUIDQueryNSU()1044   public void runTSUIDQueryNSU() throws Exception {
1045     when(metrics.getNameAsync(new byte[] { 0, 0, 1 }))
1046       .thenThrow(new NoSuchUniqueId("metrics", new byte[] { 0, 0, 1 }));
1047     storeLongTimeSeriesSeconds(true, false);
1048 
1049     query.setStartTime(1356998400);
1050     query.setEndTime(1357041600);
1051     final List<String> tsuids = new ArrayList<String>(1);
1052     tsuids.add("000001000001000001");
1053     query.setTimeSeries(tsuids, Aggregators.SUM, false);
1054 
1055     final DataPoints[] dps = query.run();
1056     assertNotNull(dps);
1057     dps[0].metricName();
1058   }
1059 
1060   @Test
runRateCounterDefault()1061   public void runRateCounterDefault() throws Exception {
1062     setDataPointStorage();
1063     long timestamp = 1356998400;
1064     tsdb.addPoint(METRIC_STRING, timestamp += 30, Long.MAX_VALUE - 55, tags)
1065       .joinUninterruptibly();
1066     tsdb.addPoint(METRIC_STRING, timestamp += 30, Long.MAX_VALUE - 25, tags)
1067       .joinUninterruptibly();
1068     tsdb.addPoint(METRIC_STRING, timestamp += 30, 5, tags).joinUninterruptibly();
1069 
1070     final RateOptions ro = new RateOptions(true, Long.MAX_VALUE, 0);
1071     query.setStartTime(1356998400);
1072     query.setEndTime(1357041600);
1073     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, true, ro);
1074     final DataPoints[] dps = query.run();
1075     assertMeta(dps, 0, false);
1076 
1077     timestamp = 1356998460000L;
1078     for (DataPoint dp : dps[0]) {
1079       assertEquals(1.0, dp.doubleValue(), 0.001);
1080       assertEquals(timestamp, dp.timestamp());
1081       timestamp += 30000;
1082     }
1083     assertEquals(2, dps[0].size());
1084   }
1085 
1086   @Test
runRateCounterDefaultNoOp()1087   public void runRateCounterDefaultNoOp() throws Exception {
1088     setDataPointStorage();
1089     long timestamp = 1356998400;
1090     tsdb.addPoint(METRIC_STRING, timestamp += 30, 30, tags).joinUninterruptibly();
1091     tsdb.addPoint(METRIC_STRING, timestamp += 30, 60, tags).joinUninterruptibly();
1092     tsdb.addPoint(METRIC_STRING, timestamp += 30, 90, tags).joinUninterruptibly();
1093 
1094     final RateOptions ro = new RateOptions(true, Long.MAX_VALUE, 0);
1095     query.setStartTime(1356998400);
1096     query.setEndTime(1357041600);
1097     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, true, ro);
1098     final DataPoints[] dps = query.run();
1099     assertMeta(dps, 0, false);
1100 
1101     timestamp = 1356998460000L;
1102     for (DataPoint dp : dps[0]) {
1103       assertEquals(1.0, dp.doubleValue(), 0.001);
1104       assertEquals(timestamp, dp.timestamp());
1105       timestamp += 30000;
1106     }
1107     assertEquals(2, dps[0].size());
1108   }
1109 
1110   @Test
runRateCounterMaxSet()1111   public void runRateCounterMaxSet() throws Exception {
1112     setDataPointStorage();
1113     long timestamp = 1356998400;
1114     tsdb.addPoint(METRIC_STRING, timestamp += 30, 45, tags).joinUninterruptibly();
1115     tsdb.addPoint(METRIC_STRING, timestamp += 30, 75, tags).joinUninterruptibly();
1116     tsdb.addPoint(METRIC_STRING, timestamp += 30, 5, tags).joinUninterruptibly();
1117 
1118     final RateOptions ro = new RateOptions(true, 100, 0);
1119     query.setStartTime(1356998400);
1120     query.setEndTime(1357041600);
1121     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, true, ro);
1122     final DataPoints[] dps = query.run();
1123     assertMeta(dps, 0, false);
1124 
1125     timestamp = 1356998460000L;
1126     for (DataPoint dp : dps[0]) {
1127       assertEquals(1.0, dp.doubleValue(), 0.001);
1128       assertEquals(timestamp, dp.timestamp());
1129       timestamp += 30000;
1130     }
1131     assertEquals(2, dps[0].size());
1132   }
1133 
1134   @Test
runRateCounterAnomally()1135   public void runRateCounterAnomally() throws Exception {
1136     setDataPointStorage();
1137     long timestamp = 1356998400;
1138     tsdb.addPoint(METRIC_STRING, timestamp += 30, 45, tags).joinUninterruptibly();
1139     tsdb.addPoint(METRIC_STRING, timestamp += 30, 75, tags).joinUninterruptibly();
1140     tsdb.addPoint(METRIC_STRING, timestamp += 30, 25, tags).joinUninterruptibly();
1141 
1142     final RateOptions ro = new RateOptions(true, 10000, 35);
1143     query.setStartTime(1356998400);
1144     query.setEndTime(1357041600);
1145     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, true, ro);
1146     final DataPoints[] dps = query.run();
1147     assertMeta(dps, 0, false);
1148 
1149     assertEquals(1.0, dps[0].doubleValue(0), 0.001);
1150     assertEquals(1356998460000L, dps[0].timestamp(0));
1151     assertEquals(0, dps[0].doubleValue(1), 0.001);
1152     assertEquals(1356998490000L, dps[0].timestamp(1));
1153     assertEquals(2, dps[0].size());
1154   }
1155 
1156   @Test
runRateCounterAnomallyDrop()1157   public void runRateCounterAnomallyDrop() throws Exception {
1158     setDataPointStorage();
1159     long timestamp = 1356998400;
1160     tsdb.addPoint(METRIC_STRING, timestamp += 30, 45, tags).joinUninterruptibly();
1161     tsdb.addPoint(METRIC_STRING, timestamp += 30, 75, tags).joinUninterruptibly();
1162     tsdb.addPoint(METRIC_STRING, timestamp += 30, 25, tags).joinUninterruptibly();
1163     tsdb.addPoint(METRIC_STRING, timestamp += 30, 55, tags).joinUninterruptibly();
1164 
1165     final RateOptions ro = new RateOptions(true, 10000, 35, true);
1166     query.setStartTime(1356998400);
1167     query.setEndTime(1357041600);
1168     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, true, ro);
1169     final DataPoints[] dps = query.run();
1170     assertMeta(dps, 0, false);
1171 
1172     assertEquals(1.0, dps[0].doubleValue(0), 0.001);
1173     assertEquals(1356998460000L, dps[0].timestamp(0));
1174     assertEquals(1, dps[0].doubleValue(1), 0.001);
1175     assertEquals(1356998520000L, dps[0].timestamp(1));
1176     assertEquals(2, dps[0].size());
1177   }
1178 
1179   @Test
runMultiCompact()1180   public void runMultiCompact() throws Exception {
1181     final byte[] qual1 = { 0x00, 0x17 };
1182     final byte[] val1 = Bytes.fromLong(1L);
1183     final byte[] qual2 = { 0x00, 0x27 };
1184     final byte[] val2 = Bytes.fromLong(2L);
1185 
1186     // 2nd compaction
1187     final byte[] qual3 = { 0x00, 0x37 };
1188     final byte[] val3 = Bytes.fromLong(3L);
1189     final byte[] qual4 = { 0x00, 0x47 };
1190     final byte[] val4 = Bytes.fromLong(4L);
1191 
1192     // 3rd compaction
1193     final byte[] qual5 = { 0x00, 0x57 };
1194     final byte[] val5 = Bytes.fromLong(5L);
1195     final byte[] qual6 = { 0x00, 0x67 };
1196     final byte[] val6 = Bytes.fromLong(6L);
1197 
1198     final byte[] key = IncomingDataPoints.rowKeyTemplate(tsdb, METRIC_STRING, tags);
1199     RowKey.prefixKeyWithSalt(key);
1200     System.arraycopy(Bytes.fromInt(1356998400), 0, key,
1201         Const.SALT_WIDTH() + TSDB.metrics_width(), Const.TIMESTAMP_BYTES);
1202 
1203     setDataPointStorage();
1204     storage.addColumn(key,
1205         MockBase.concatByteArrays(qual1, qual2),
1206         MockBase.concatByteArrays(val1, val2, new byte[] { 0 }));
1207     storage.addColumn(key,
1208         MockBase.concatByteArrays(qual3, qual4),
1209         MockBase.concatByteArrays(val3, val4, new byte[] { 0 }));
1210     storage.addColumn(key,
1211         MockBase.concatByteArrays(qual5, qual6),
1212         MockBase.concatByteArrays(val5, val6, new byte[] { 0 }));
1213 
1214     HashMap<String, String> tags = new HashMap<String, String>(1);
1215     tags.put(TAGK_STRING , TAGV_STRING );
1216     query.setStartTime(1356998400);
1217     query.setEndTime(1357041600);
1218     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false);
1219 
1220     final DataPoints[] dps = query.run();
1221     assertMeta(dps, 0, false);
1222 
1223     int value = 1;
1224     long timestamp = 1356998401000L;
1225     for (DataPoint dp : dps[0]) {
1226       assertEquals(value, dp.longValue());
1227       assertEquals(timestamp, dp.timestamp());
1228       value++;
1229       timestamp += 1000;
1230     }
1231     assertEquals(6, dps[0].aggregatedSize());
1232   }
1233 
1234   @Test
runMultiCompactAndSingles()1235   public void runMultiCompactAndSingles() throws Exception {
1236     final byte[] qual1 = { 0x00, 0x17 };
1237     final byte[] val1 = Bytes.fromLong(1L);
1238     final byte[] qual2 = { 0x00, 0x27 };
1239     final byte[] val2 = Bytes.fromLong(2L);
1240 
1241     // 2nd compaction
1242     final byte[] qual3 = { 0x00, 0x37 };
1243     final byte[] val3 = Bytes.fromLong(3L);
1244     final byte[] qual4 = { 0x00, 0x47 };
1245     final byte[] val4 = Bytes.fromLong(4L);
1246 
1247     // 3rd compaction
1248     final byte[] qual5 = { 0x00, 0x57 };
1249     final byte[] val5 = Bytes.fromLong(5L);
1250     final byte[] qual6 = { 0x00, 0x67 };
1251     final byte[] val6 = Bytes.fromLong(6L);
1252 
1253     final byte[] key = IncomingDataPoints.rowKeyTemplate(tsdb, METRIC_STRING, tags);
1254     RowKey.prefixKeyWithSalt(key);
1255     System.arraycopy(Bytes.fromInt(1356998400), 0, key,
1256         Const.SALT_WIDTH() + TSDB.metrics_width(), Const.TIMESTAMP_BYTES);
1257 
1258     setDataPointStorage();
1259     storage.addColumn(key,
1260         MockBase.concatByteArrays(qual1, qual2),
1261         MockBase.concatByteArrays(val1, val2, new byte[] { 0 }));
1262     storage.addColumn(key, qual3, val3);
1263     storage.addColumn(key, qual4, val4);
1264     storage.addColumn(key,
1265         MockBase.concatByteArrays(qual5, qual6),
1266         MockBase.concatByteArrays(val5, val6, new byte[] { 0 }));
1267 
1268     query.setStartTime(1356998400);
1269     query.setEndTime(1357041600);
1270     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false);
1271 
1272     final DataPoints[] dps = query.run();
1273     assertMeta(dps, 0, false);
1274 
1275     int value = 1;
1276     long timestamp = 1356998401000L;
1277     for (DataPoint dp : dps[0]) {
1278       assertEquals(value, dp.longValue());
1279       assertEquals(timestamp, dp.timestamp());
1280       value++;
1281       timestamp += 1000;
1282     }
1283     assertEquals(6, dps[0].aggregatedSize());
1284   }
1285 
1286   @Test
runInterpolationSeconds()1287   public void runInterpolationSeconds() throws Exception {
1288     setDataPointStorage();
1289     long timestamp = 1356998400;
1290     for (int i = 1; i <= 300; i++) {
1291       tsdb.addPoint(METRIC_STRING, timestamp += 30, i, tags)
1292         .joinUninterruptibly();
1293     }
1294     tags.clear();
1295     tags.put(TAGK_STRING , TAGV_B_STRING);
1296     timestamp = 1356998415;
1297     for (int i = 300; i > 0; i--) {
1298       tsdb.addPoint(METRIC_STRING, timestamp += 30, i, tags)
1299         .joinUninterruptibly();
1300     }
1301 
1302     tags.clear();
1303     query.setStartTime(1356998400);
1304     query.setEndTime(1357041600);
1305     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false);
1306     final DataPoints[] dps = query.run();
1307     assertMeta(dps, 0, true);
1308 
1309     long v = 1;
1310     long ts = 1356998430000L;
1311     for (DataPoint dp : dps[0]) {
1312       assertEquals(ts, dp.timestamp());
1313       ts += 15000;
1314       assertEquals(v, dp.longValue());
1315 
1316       if (dp.timestamp() == 1357007400000L) {
1317         v = 1;
1318       } else if (v == 1 || v == 302) {
1319         v = 301;
1320       } else {
1321         v = 302;
1322       }
1323     }
1324     assertEquals(600, dps[0].size());
1325   }
1326 
1327   @Test
runInterpolationMs()1328   public void runInterpolationMs() throws Exception {
1329     setDataPointStorage();
1330     long timestamp = 1356998400000L;
1331     for (int i = 1; i <= 300; i++) {
1332       tsdb.addPoint(METRIC_STRING, timestamp += 500, i, tags)
1333         .joinUninterruptibly();
1334     }
1335     tags.clear();
1336     tags.put(TAGK_STRING , TAGV_B_STRING );
1337     timestamp = 1356998400250L;
1338     for (int i = 300; i > 0; i--) {
1339       tsdb.addPoint(METRIC_STRING, timestamp += 500, i, tags)
1340         .joinUninterruptibly();
1341     }
1342 
1343     tags.clear();
1344     query.setStartTime(1356998400);
1345     query.setEndTime(1357041600);
1346     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false);
1347     final DataPoints[] dps = query.run();
1348     assertMeta(dps, 0, true);
1349 
1350     long v = 1;
1351     long ts = 1356998400500L;
1352     for (DataPoint dp : dps[0]) {
1353       assertEquals(ts, dp.timestamp());
1354       ts += 250;
1355       assertEquals(v, dp.longValue());
1356 
1357       if (dp.timestamp() == 1356998550000L) {
1358         v = 1;
1359       } else if (v == 1 || v == 302) {
1360         v = 301;
1361       } else {
1362         v = 302;
1363       }
1364     }
1365     assertEquals(600, dps[0].size());
1366   }
1367 
1368   @Test
runInterpolationMsDownsampled()1369   public void runInterpolationMsDownsampled() throws Exception {
1370     setDataPointStorage();
1371     // ts = 1356998400500, v = 1
1372     // ts = 1356998401000, v = 2
1373     // ts = 1356998401500, v = 3
1374     // ts = 1356998402000, v = 4
1375     // ts = 1356998402500, v = 5
1376     // ...
1377     // ts = 1356998449000, v = 98
1378     // ts = 1356998449500, v = 99
1379     // ts = 1356998450000, v = 100
1380     // ts = 1356998455000, v = 101
1381     // ts = 1356998460000, v = 102
1382     // ...
1383     // ts = 1356998550000, v = 120
1384     long timestamp = 1356998400000L;
1385     for (int i = 1; i <= 120; i++) {
1386       timestamp += i <= 100 ? 500 : 5000;
1387       tsdb.addPoint(METRIC_STRING, timestamp, i, tags)
1388         .joinUninterruptibly();
1389     }
1390 
1391     // ts = 1356998400750, v = 300
1392     // ts = 1356998401250, v = 299
1393     // ts = 1356998401750, v = 298
1394     // ts = 1356998402250, v = 297
1395     // ts = 1356998402750, v = 296
1396     // ...
1397     // ts = 1356998549250, v = 3
1398     // ts = 1356998549750, v = 2
1399     // ts = 1356998550250, v = 1
1400     tags.clear();
1401     tags.put(TAGK_STRING , TAGV_B_STRING);
1402     timestamp = 1356998400250L;
1403     for (int i = 300; i > 0; i--) {
1404       tsdb.addPoint(METRIC_STRING, timestamp += 500, i, tags)
1405         .joinUninterruptibly();
1406     }
1407 
1408     tags.clear();
1409     query.setStartTime(1356998400);
1410     query.setEndTime(1357041600);
1411     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false);
1412     query.downsample(1000, Aggregators.SUM);
1413 
1414     final DataPoints[] dps = query.run();
1415     assertMeta(dps, 0, true);
1416 
1417     // TS1 in intervals = (1), (2,3), (4,5) ... (98,99), 100, (), (), (), (),
1418     //                    (101), ... (120)
1419     // TS2 in intervals = (300), (299,298), (297,296), ... (203, 202) ...
1420     //                    (3,2), (1)
1421     // TS1 downsample = 1, 5, 9, ... 197, 100, _, _, _, _, 101, ... 120
1422     // TS1 interpolation = 1, 5, ... 197, 100, 100.2, 100.4, 100.6, 100.8, 101,
1423     //                     ... 119.6, 119.8, 120
1424     // TS2 downsample = 300, 597, 593, ... 405, 401, ... 5, 1
1425     // TS1 + TS2 = 301, 602, 602, ... 501, 497.2, ... 124.8, 121
1426     int i = 0;
1427     long ts = 1356998400000L;
1428     for (DataPoint dp : dps[0]) {
1429       assertEquals(ts, dp.timestamp());
1430       ts += 1000;
1431       if (i == 0) {
1432         assertEquals(301, dp.doubleValue(), 0.0000001);
1433       } else if (i < 50) {
1434         // TS1 = i * 2 + i * 2 + 1
1435         // TS2 = (300 - i * 2 + 1) + (300 - i * 2)
1436         // TS1 + TS2 = 602
1437         assertEquals(602, dp.doubleValue(), 0.0000001);
1438       } else {
1439         // TS1 = 100 + (i - 50) * 0.2
1440         // TS2 = (300 - i * 2 + 1) + (300 - i * 2)
1441         // TS1 + TS2 = 701 + (i - 50) * 0.2 - i * 4
1442         double value = 701 + (i - 50) * 0.2 - i * 4;
1443         assertEquals(value, dp.doubleValue(), 0.0000001);
1444       }
1445       ++i;
1446     }
1447     assertEquals(151, dps[0].size());
1448   }
1449 
1450   @Test
runRegexp()1451   public void runRegexp() throws Exception {
1452     storeLongTimeSeriesSeconds(true, false);
1453 
1454     query.setStartTime(1356998400);
1455     query.setEndTime(1357041600);
1456     tags.clear();
1457     tags.put("host", "regexp(web01)");
1458     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false);
1459 
1460     final DataPoints[] dps = query.run();
1461     assertMeta(dps, 0, false);
1462     verify(tag_values, atLeast(1)).getNameAsync(TAGV_BYTES);
1463     verify(tag_values, atLeast(1)).getNameAsync(TAGV_B_BYTES);
1464     int value = 1;
1465     long timestamp = 1356998430000L;
1466     for (DataPoint dp : dps[0]) {
1467       assertEquals(value, dp.longValue());
1468       assertEquals(timestamp, dp.timestamp());
1469       value++;
1470       timestamp += 30000;
1471     }
1472     assertEquals(300, dps[0].aggregatedSize());
1473   }
1474 
1475   @Test
runRegexpNoMatch()1476   public void runRegexpNoMatch() throws Exception {
1477     storeLongTimeSeriesSeconds(true, false);
1478 
1479     query.setStartTime(1356998400);
1480     query.setEndTime(1357041600);
1481     tags.clear();
1482     tags.put("host", "regexp(dbsvr.*)");
1483     query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false);
1484 
1485     final DataPoints[] dps = query.run();
1486     verify(tag_values, atLeast(1)).getNameAsync(TAGV_BYTES);
1487     verify(tag_values, atLeast(1)).getNameAsync(TAGV_B_BYTES);
1488     assertEquals(0, dps.length);
1489   }
1490 
1491   @Test
filterExplicitTagsOK()1492   public void filterExplicitTagsOK() throws Exception {
1493     tsdb.getConfig().overrideConfig("tsd.query.enable_fuzzy", "true");
1494     storeLongTimeSeriesSeconds(true, false);
1495     HashMap<String, String> tags = new HashMap<String, String>(1);
1496     tags.put("host", "web01");
1497     query.setStartTime(1356998400);
1498     query.setEndTime(1357041600);
1499     query.setExplicitTags(true);
1500     query.setTimeSeries("sys.cpu.user", tags, Aggregators.SUM, false);
1501 
1502     final DataPoints[] dps = query.run();
1503 
1504     assertNotNull(dps);
1505     assertEquals("sys.cpu.user", dps[0].metricName());
1506     assertTrue(dps[0].getAggregatedTags().isEmpty());
1507     assertNull(dps[0].getAnnotations());
1508     assertEquals("web01", dps[0].getTags().get("host"));
1509 
1510     int value = 1;
1511     for (DataPoint dp : dps[0]) {
1512       assertEquals(value, dp.longValue());
1513       value++;
1514     }
1515     assertEquals(300, dps[0].aggregatedSize());
1516     // assert fuzzy
1517     for (final MockScanner scanner : storage.getScanners()) {
1518       assertTrue(scanner.getFilter() instanceof FilterList);
1519     }
1520   }
1521 
1522   @Test
filterExplicitTagsGroupByOK()1523   public void filterExplicitTagsGroupByOK() throws Exception {
1524     tsdb.getConfig().overrideConfig("tsd.query.enable_fuzzy", "true");
1525     storeLongTimeSeriesSeconds(true, false);
1526     HashMap<String, String> tags = new HashMap<String, String>(1);
1527     tags.put("host", "*");
1528     query.setStartTime(1356998400);
1529     query.setEndTime(1357041600);
1530     query.setExplicitTags(true);
1531     query.setTimeSeries("sys.cpu.user", tags, Aggregators.SUM, false);
1532 
1533     final DataPoints[] dps = query.run();
1534 
1535     assertNotNull(dps);
1536     assertEquals("sys.cpu.user", dps[0].metricName());
1537     assertTrue(dps[0].getAggregatedTags().isEmpty());
1538     assertNull(dps[0].getAnnotations());
1539     assertEquals("web01", dps[0].getTags().get("host"));
1540 
1541     int value = 1;
1542     for (DataPoint dp : dps[0]) {
1543       assertEquals(value, dp.longValue());
1544       value++;
1545     }
1546     assertEquals(300, dps[0].aggregatedSize());
1547     // assert fuzzy
1548     for (final MockScanner scanner : storage.getScanners()) {
1549       assertTrue(scanner.getFilter() instanceof FilterList);
1550     }
1551   }
1552 
1553   @Test
filterExplicitTagsMissing()1554   public void filterExplicitTagsMissing() throws Exception {
1555     tsdb.getConfig().overrideConfig("tsd.query.enable_fuzzy", "true");
1556     when(tag_names.getIdAsync("colo"))
1557       .thenReturn(Deferred.fromResult(new byte[] { 0, 0, 0, 4 }));
1558     when(tag_values.getIdAsync("lga"))
1559     .thenReturn(Deferred.fromResult(new byte[] { 0, 0, 0, 4 }));
1560     storeLongTimeSeriesSeconds(true, false);
1561     HashMap<String, String> tags = new HashMap<String, String>(1);
1562     tags.put("host", "web01");
1563     tags.put("colo", "lga");
1564     query.setStartTime(1356998400);
1565     query.setEndTime(1357041600);
1566     query.setExplicitTags(true);
1567     query.setTimeSeries("sys.cpu.user", tags, Aggregators.SUM, false);
1568 
1569     final DataPoints[] dps = query.run();
1570 
1571     assertNotNull(dps);
1572     assertEquals(0, dps.length);
1573     // assert fuzzy
1574     for (final MockScanner scanner : storage.getScanners()) {
1575       assertTrue(scanner.getFilter() instanceof FilterList);
1576     }
1577   }
1578 
1579 }
1580