1 // This file is part of OpenTSDB.
2 // Copyright (C) 2015  The OpenTSDB Authors.
3 //
4 // This program is free software: you can redistribute it and/or modify it
5 // under the terms of the GNU Lesser General Public License as published by
6 // the Free Software Foundation, either version 2.1 of the License, or (at your
7 // option) any later version.  This program is distributed in the hope that it
8 // will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
9 // of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser
10 // General Public License for more details.  You should have received a copy
11 // of the GNU Lesser General Public License along with this program.  If not,
12 // see <http://www.gnu.org/licenses/>.
13 package net.opentsdb.core;
14 
15 import static org.junit.Assert.assertEquals;
16 import static org.junit.Assert.assertNotNull;
17 import static org.junit.Assert.assertNull;
18 import static org.junit.Assert.assertTrue;
19 import static org.mockito.Mockito.when;
20 import static org.powermock.api.mockito.PowerMockito.mock;
21 
22 import java.util.HashMap;
23 import java.util.Map;
24 import java.util.Set;
25 import java.util.concurrent.TimeUnit;
26 
27 import net.opentsdb.meta.Annotation;
28 import net.opentsdb.storage.MockBase;
29 import net.opentsdb.uid.NoSuchUniqueId;
30 import net.opentsdb.uid.NoSuchUniqueName;
31 import net.opentsdb.uid.UniqueId;
32 import net.opentsdb.utils.Config;
33 
34 import org.hbase.async.HBaseClient;
35 import org.hbase.async.Scanner;
36 import org.jboss.netty.util.HashedWheelTimer;
37 import org.jboss.netty.util.Timeout;
38 import org.jboss.netty.util.TimerTask;
39 import org.junit.Before;
40 import org.junit.runner.RunWith;
41 import org.mockito.invocation.InvocationOnMock;
42 import org.mockito.stubbing.Answer;
43 import org.powermock.api.mockito.PowerMockito;
44 import org.powermock.core.classloader.annotations.PowerMockIgnore;
45 import org.powermock.core.classloader.annotations.PrepareForTest;
46 import org.powermock.modules.junit4.PowerMockRunner;
47 import org.powermock.reflect.Whitebox;
48 
49 import com.stumbleupon.async.Deferred;
50 
51 /**
52  * Sets up a real TSDB with mocked client, compaction queue and timer along
53  * with mocked UID assignment, fetches for common unit tests.
54  */
55 @RunWith(PowerMockRunner.class)
56 @PowerMockIgnore({"javax.management.*", "javax.xml.*",
57   "ch.qos.*", "org.slf4j.*",
58   "com.sum.*", "org.xml.*"})
59 @PrepareForTest({TSDB.class, Config.class, UniqueId.class, HBaseClient.class,
60   HashedWheelTimer.class, Scanner.class, Const.class })
61 public class BaseTsdbTest {
62   /** A list of UIDs from A to Z for unit testing UIDs values */
63   public static final Map<String, byte[]> METRIC_UIDS =
64       new HashMap<String, byte[]>(26);
65   public static final Map<String, byte[]> TAGK_UIDS =
66       new HashMap<String, byte[]>(26);
67   public static final Map<String, byte[]> TAGV_UIDS =
68       new HashMap<String, byte[]>(26);
69   static {
70     char letter = 'A';
71     int uid = 10;
72     for (int i = 0; i < 26; i++) {
Character.toString(letter)73       METRIC_UIDS.put(Character.toString(letter),
74           UniqueId.longToUID(uid, TSDB.metrics_width()));
Character.toString(letter)75       TAGK_UIDS.put(Character.toString(letter),
76           UniqueId.longToUID(uid, TSDB.tagk_width()));
77       TAGV_UIDS.put(Character.toString(letter++),
78           UniqueId.longToUID(uid++, TSDB.tagv_width()));
79     }
80   }
81 
82   public static final String METRIC_STRING = "sys.cpu.user";
83   public static final byte[] METRIC_BYTES = new byte[] { 0, 0, 1 };
84   public static final String METRIC_B_STRING = "sys.cpu.system";
85   public static final byte[] METRIC_B_BYTES = new byte[] { 0, 0, 2 };
86   public static final String NSUN_METRIC = "sys.cpu.nice";
87   public static final byte[] NSUI_METRIC = new byte[] { 0, 0, 3 };
88 
89   public static final String TAGK_STRING = "host";
90   public static final byte[] TAGK_BYTES = new byte[] { 0, 0, 1 };
91   public static final String TAGK_B_STRING = "owner";
92   public static final byte[] TAGK_B_BYTES = new byte[] { 0, 0, 3 };
93   public static final String NSUN_TAGK = "dc";
94   public static final byte[] NSUI_TAGK = new byte[] { 0, 0, 4 };
95 
96   public static final String TAGV_STRING = "web01";
97   public static final byte[] TAGV_BYTES = new byte[] { 0, 0, 1 };
98   public static final String TAGV_B_STRING = "web02";
99   public static final byte[] TAGV_B_BYTES = new byte[] { 0, 0, 2 };
100   public static final String NSUN_TAGV = "web03";
101   public static final byte[] NSUI_TAGV = new byte[] { 0, 0, 3 };
102 
103   static final String NOTE_DESCRIPTION = "Hello DiscWorld!";
104   static final String NOTE_NOTES = "Millenium hand and shrimp";
105 
106   protected HashedWheelTimer timer;
107   protected Config config;
108   protected TSDB tsdb;
109   protected HBaseClient client = mock(HBaseClient.class);
110   protected UniqueId metrics = mock(UniqueId.class);
111   protected UniqueId tag_names = mock(UniqueId.class);
112   protected UniqueId tag_values = mock(UniqueId.class);
113   protected Map<String, String> tags = new HashMap<String, String>(1);
114   protected MockBase storage;
115 
116   @Before
before()117   public void before() throws Exception {
118     timer = mock(HashedWheelTimer.class);
119 
120     PowerMockito.whenNew(HashedWheelTimer.class).withNoArguments()
121       .thenReturn(timer);
122     PowerMockito.whenNew(HBaseClient.class).withAnyArguments()
123       .thenReturn(client);
124 
125     config = new Config(false);
126     config.overrideConfig("tsd.storage.enable_compaction", "false");
127     tsdb = PowerMockito.spy(new TSDB(config));
128 
129     config.setAutoMetric(true);
130 
131     Whitebox.setInternalState(tsdb, "metrics", metrics);
132     Whitebox.setInternalState(tsdb, "tag_names", tag_names);
133     Whitebox.setInternalState(tsdb, "tag_values", tag_values);
134 
135     setupMetricMaps();
136     setupTagkMaps();
137     setupTagvMaps();
138 
139     when(metrics.width()).thenReturn((short)3);
140     when(tag_names.width()).thenReturn((short)3);
141     when(tag_values.width()).thenReturn((short)3);
142 
143     tags.put(TAGK_STRING, TAGV_STRING);
144   }
145 
146   /** Adds the static UIDs to the metrics UID mock object */
setupMetricMaps()147   void setupMetricMaps() {
148     when(metrics.getId(METRIC_STRING)).thenReturn(METRIC_BYTES);
149     when(metrics.getIdAsync(METRIC_STRING))
150       .thenAnswer(new Answer<Deferred<byte[]>>() {
151           @Override
152           public Deferred<byte[]> answer(InvocationOnMock invocation)
153               throws Throwable {
154             return Deferred.fromResult(METRIC_BYTES);
155           }
156       });
157     when(metrics.getOrCreateId(METRIC_STRING))
158       .thenReturn(METRIC_BYTES);
159 
160     when(metrics.getId(METRIC_B_STRING)).thenReturn(METRIC_B_BYTES);
161     when(metrics.getIdAsync(METRIC_B_STRING))
162       .thenAnswer(new Answer<Deferred<byte[]>>() {
163           @Override
164           public Deferred<byte[]> answer(InvocationOnMock invocation)
165               throws Throwable {
166             return Deferred.fromResult(METRIC_B_BYTES);
167           }
168       });
169     when(metrics.getOrCreateId(METRIC_B_STRING))
170       .thenReturn(METRIC_B_BYTES);
171 
172     when(metrics.getNameAsync(METRIC_BYTES))
173       .thenAnswer(new Answer<Deferred<String>>() {
174           @Override
175           public Deferred<String> answer(InvocationOnMock invocation)
176               throws Throwable {
177             return Deferred.fromResult(METRIC_STRING);
178           }
179       });
180     when(metrics.getNameAsync(METRIC_B_BYTES))
181       .thenAnswer(new Answer<Deferred<String>>() {
182             @Override
183             public Deferred<String> answer(InvocationOnMock invocation)
184                 throws Throwable {
185               return Deferred.fromResult(METRIC_B_STRING);
186             }
187         });
188     when(metrics.getNameAsync(NSUI_METRIC))
189       .thenThrow(new NoSuchUniqueId("metrics", NSUI_METRIC));
190 
191     final NoSuchUniqueName nsun = new NoSuchUniqueName(NSUN_METRIC, "metrics");
192 
193     when(metrics.getId(NSUN_METRIC)).thenThrow(nsun);
194     when(metrics.getIdAsync(NSUN_METRIC))
195       .thenReturn(Deferred.<byte[]>fromError(nsun));
196     when(metrics.getOrCreateId(NSUN_METRIC)).thenThrow(nsun);
197 
198     // Iterate over the metric UIDs and handle both forward and reverse
199     for (final Map.Entry<String, byte[]> uid : METRIC_UIDS.entrySet()) {
200       when(metrics.getId(uid.getKey())).thenReturn(uid.getValue());
201       when(metrics.getIdAsync(uid.getKey()))
202         .thenAnswer(new Answer<Deferred<byte[]>>() {
203             @Override
204             public Deferred<byte[]> answer(InvocationOnMock invocation)
205                 throws Throwable {
206               return Deferred.fromResult(uid.getValue());
207             }
208         });
209       when(metrics.getOrCreateId(uid.getKey()))
210         .thenReturn(uid.getValue());
211       when(metrics.getNameAsync(uid.getValue()))
212         .thenAnswer(new Answer<Deferred<String>>() {
213             @Override
214             public Deferred<String> answer(InvocationOnMock invocation)
215                 throws Throwable {
216               return Deferred.fromResult(uid.getKey());
217             }
218         });
219     }
220   }
221 
222   /** Adds the static UIDs to the tag keys UID mock object */
setupTagkMaps()223   void setupTagkMaps() {
224     when(tag_names.getId(TAGK_STRING)).thenReturn(TAGK_BYTES);
225     when(tag_names.getOrCreateId(TAGK_STRING)).thenReturn(TAGK_BYTES);
226     when(tag_names.getIdAsync(TAGK_STRING))
227       .thenAnswer(new Answer<Deferred<byte[]>>() {
228             @Override
229             public Deferred<byte[]> answer(InvocationOnMock invocation)
230                 throws Throwable {
231               return Deferred.fromResult(TAGK_BYTES);
232             }
233         });
234     when(tag_names.getOrCreateIdAsync(TAGK_STRING))
235       .thenReturn(Deferred.fromResult(TAGK_BYTES));
236 
237     when(tag_names.getId(TAGK_B_STRING)).thenReturn(TAGK_B_BYTES);
238     when(tag_names.getOrCreateId(TAGK_B_STRING)).thenReturn(TAGK_B_BYTES);
239     when(tag_names.getIdAsync(TAGK_B_STRING))
240       .thenAnswer(new Answer<Deferred<byte[]>>() {
241             @Override
242             public Deferred<byte[]> answer(InvocationOnMock invocation)
243                 throws Throwable {
244               return Deferred.fromResult(TAGK_B_BYTES);
245             }
246         });
247     when(tag_names.getOrCreateIdAsync(TAGK_B_STRING))
248       .thenReturn(Deferred.fromResult(TAGK_B_BYTES));
249 
250     when(tag_names.getNameAsync(TAGK_BYTES))
251       .thenAnswer(new Answer<Deferred<String>>() {
252             @Override
253             public Deferred<String> answer(InvocationOnMock invocation)
254                 throws Throwable {
255               return Deferred.fromResult(TAGK_STRING);
256             }
257         });
258     when(tag_names.getNameAsync(TAGK_B_BYTES))
259       .thenAnswer(new Answer<Deferred<String>>() {
260             @Override
261             public Deferred<String> answer(InvocationOnMock invocation)
262                 throws Throwable {
263               return Deferred.fromResult(TAGK_B_STRING);
264             }
265         });
266     when(tag_names.getNameAsync(NSUI_TAGK))
267       .thenThrow(new NoSuchUniqueId("tagk", NSUI_TAGK));
268 
269     final NoSuchUniqueName nsun = new NoSuchUniqueName(NSUN_TAGK, "tagk");
270 
271     when(tag_names.getId(NSUN_TAGK))
272       .thenThrow(nsun);
273     when(tag_names.getIdAsync(NSUN_TAGK))
274       .thenReturn(Deferred.<byte[]>fromError(nsun));
275 
276     // Iterate over the tagk UIDs and handle both forward and reverse
277     for (final Map.Entry<String, byte[]> uid : TAGK_UIDS.entrySet()) {
278       when(tag_names.getId(uid.getKey())).thenReturn(uid.getValue());
279       when(tag_names.getIdAsync(uid.getKey()))
280         .thenAnswer(new Answer<Deferred<byte[]>>() {
281             @Override
282             public Deferred<byte[]> answer(InvocationOnMock invocation)
283                 throws Throwable {
284               return Deferred.fromResult(uid.getValue());
285             }
286         });
287       when(tag_names.getOrCreateId(uid.getKey()))
288         .thenReturn(uid.getValue());
289       when(tag_names.getNameAsync(uid.getValue()))
290         .thenAnswer(new Answer<Deferred<String>>() {
291             @Override
292             public Deferred<String> answer(InvocationOnMock invocation)
293                 throws Throwable {
294               return Deferred.fromResult(uid.getKey());
295             }
296         });
297     }
298   }
299 
300   /** Adds the static UIDs to the tag values UID mock object */
setupTagvMaps()301   void setupTagvMaps() {
302     when(tag_values.getId(TAGV_STRING)).thenReturn(TAGV_BYTES);
303     when(tag_values.getOrCreateId(TAGV_STRING)).thenReturn(TAGV_BYTES);
304     when(tag_values.getIdAsync(TAGV_STRING))
305       .thenAnswer(new Answer<Deferred<byte[]>>() {
306           @Override
307           public Deferred<byte[]> answer(InvocationOnMock invocation)
308               throws Throwable {
309             return Deferred.fromResult(TAGV_BYTES);
310           }
311       });
312     when(tag_values.getOrCreateIdAsync(TAGV_STRING))
313       .thenReturn(Deferred.fromResult(TAGV_BYTES));
314 
315     when(tag_values.getId(TAGV_B_STRING)).thenReturn(TAGV_B_BYTES);
316     when(tag_values.getOrCreateId(TAGV_B_STRING)).thenReturn(TAGV_B_BYTES);
317     when(tag_values.getIdAsync(TAGV_B_STRING))
318       .thenAnswer(new Answer<Deferred<byte[]>>() {
319             @Override
320             public Deferred<byte[]> answer(InvocationOnMock invocation)
321                 throws Throwable {
322               return Deferred.fromResult(TAGV_B_BYTES);
323             }
324         });
325     when(tag_values.getOrCreateIdAsync(TAGV_B_STRING))
326       .thenReturn(Deferred.fromResult(TAGV_B_BYTES));
327 
328     when(tag_values.getNameAsync(TAGV_BYTES))
329       .thenReturn(Deferred.fromResult(TAGV_STRING));
330     when(tag_values.getNameAsync(TAGV_B_BYTES))
331       .thenReturn(Deferred.fromResult(TAGV_B_STRING));
332     when(tag_values.getNameAsync(NSUI_TAGV))
333       .thenThrow(new NoSuchUniqueId("tagv", NSUI_TAGV));
334 
335     final NoSuchUniqueName nsun = new NoSuchUniqueName(NSUN_TAGV, "tagv");
336 
337     when(tag_values.getId(NSUN_TAGV)).thenThrow(nsun);
338     when(tag_values.getIdAsync(NSUN_TAGV))
339       .thenReturn(Deferred.<byte[]>fromError(nsun));
340 
341     // Iterate over the tagv UIDs and handle both forward and reverse
342     for (final Map.Entry<String, byte[]> uid : TAGV_UIDS.entrySet()) {
343       when(tag_values.getId(uid.getKey())).thenReturn(uid.getValue());
344       when(tag_values.getIdAsync(uid.getKey()))
345         .thenAnswer(new Answer<Deferred<byte[]>>() {
346             @Override
347             public Deferred<byte[]> answer(InvocationOnMock invocation)
348                 throws Throwable {
349               return Deferred.fromResult(uid.getValue());
350             }
351         });
352       when(tag_values.getOrCreateId(uid.getKey()))
353         .thenReturn(uid.getValue());
354       when(tag_values.getNameAsync(uid.getValue()))
355         .thenAnswer(new Answer<Deferred<String>>() {
356             @Override
357             public Deferred<String> answer(InvocationOnMock invocation)
358                 throws Throwable {
359               return Deferred.fromResult(uid.getKey());
360             }
361         });
362     }
363   }
364 
365   // ----------------- //
366   // Helper functions. //
367   // ----------------- //
368 
369   /** @return a row key template with the default metric and tags */
getRowKeyTemplate()370   protected byte[] getRowKeyTemplate() {
371     return IncomingDataPoints.rowKeyTemplate(tsdb, METRIC_STRING, tags);
372   }
373 
setDataPointStorage()374   protected void setDataPointStorage() throws Exception {
375     storage = new MockBase(tsdb, client, true, true, true, true);
376     storage.setFamily("t".getBytes(MockBase.ASCII()));
377   }
378 
storeLongTimeSeriesSeconds(final boolean two_metrics, final boolean offset)379   protected void storeLongTimeSeriesSeconds(final boolean two_metrics,
380       final boolean offset) throws Exception {
381     setDataPointStorage();
382 
383     // dump a bunch of rows of two metrics so that we can test filtering out
384     // on the metric
385     HashMap<String, String> tags_local = new HashMap<String, String>(tags);
386     long timestamp = 1356998400;
387     for (int i = 1; i <= 300; i++) {
388       tsdb.addPoint(METRIC_STRING, timestamp += 30, i, tags_local)
389         .joinUninterruptibly();
390       if (two_metrics) {
391         tsdb.addPoint(METRIC_B_STRING, timestamp, i, tags_local)
392           .joinUninterruptibly();
393       }
394     }
395 
396     // dump a parallel set but invert the values
397     tags_local.clear();
398     tags_local.put(TAGK_STRING, TAGV_B_STRING);
399     timestamp = offset ? 1356998415 : 1356998400;
400     for (int i = 300; i > 0; i--) {
401       tsdb.addPoint(METRIC_STRING, timestamp += 30, i, tags_local)
402         .joinUninterruptibly();
403       if (two_metrics) {
404         tsdb.addPoint(METRIC_B_STRING, timestamp, i, tags_local)
405           .joinUninterruptibly();
406       }
407     }
408   }
409 
storeLongTimeSeriesMs()410   protected void storeLongTimeSeriesMs() throws Exception {
411     setDataPointStorage();
412     // dump a bunch of rows of two metrics so that we can test filtering out
413     // on the metric
414     HashMap<String, String> tags_local = new HashMap<String, String>(tags);
415     long timestamp = 1356998400000L;
416     for (int i = 1; i <= 300; i++) {
417       tsdb.addPoint(METRIC_STRING, timestamp += 500, i, tags_local)
418         .joinUninterruptibly();
419       tsdb.addPoint(METRIC_B_STRING, timestamp, i, tags_local)
420         .joinUninterruptibly();
421     }
422 
423     // dump a parallel set but invert the values
424     tags_local.clear();
425     tags_local.put(TAGK_STRING, TAGV_B_STRING);
426     timestamp = 1356998400000L;
427     for (int i = 300; i > 0; i--) {
428       tsdb.addPoint(METRIC_STRING, timestamp += 500, i, tags_local)
429         .joinUninterruptibly();
430       tsdb.addPoint(METRIC_B_STRING, timestamp, i, tags_local)
431         .joinUninterruptibly();
432     }
433   }
434 
435   /**
436    * Create two metrics with same name, skipping every third point in host=web01
437    * and every other point in host=web02. To wit:
438    *
439    *       METRIC    TAG  t0   t1   t2   t3   t4   t5   ...
440    * sys.cpu.user  web01   X    2    3    X    5    6   ...
441    * sys.cpu.user  web02   X  299    X  297    X  295   ...
442    */
storeLongTimeSeriesWithMissingData()443   protected void storeLongTimeSeriesWithMissingData() throws Exception {
444     setDataPointStorage();
445 
446     // host=web01
447     HashMap<String, String> tags_local = new HashMap<String, String>(tags);
448     long timestamp = 1356998400L;
449     for (int i = 0; i < 300; ++i) {
450       // Skip every third point.
451       if (0 != (i % 3)) {
452         tsdb.addPoint(METRIC_STRING, timestamp, i + 1, tags_local)
453           .joinUninterruptibly();
454       }
455       timestamp += 10L;
456     }
457 
458     // host=web02
459     tags_local.clear();
460     tags_local.put(TAGK_STRING, TAGV_B_STRING);
461     timestamp = 1356998400L;
462     for (int i = 300; i > 0; --i) {
463       // Skip every other point.
464       if (0 != (i % 2)) {
465         tsdb.addPoint(METRIC_STRING, timestamp, i, tags_local)
466           .joinUninterruptibly();
467       }
468       timestamp += 10L;
469     }
470   }
471 
storeFloatTimeSeriesSeconds(final boolean two_metrics, final boolean offset)472   protected void storeFloatTimeSeriesSeconds(final boolean two_metrics,
473       final boolean offset) throws Exception {
474     setDataPointStorage();
475     // dump a bunch of rows of two metrics so that we can test filtering out
476     // on the metric
477     HashMap<String, String> tags_local = new HashMap<String, String>(tags);
478     long timestamp = 1356998400;
479     for (float i = 1.25F; i <= 76; i += 0.25F) {
480       tsdb.addPoint(METRIC_STRING, timestamp += 30, i, tags_local)
481         .joinUninterruptibly();
482       if (two_metrics) {
483         tsdb.addPoint(METRIC_B_STRING, timestamp, i, tags_local)
484           .joinUninterruptibly();
485       }
486     }
487 
488     // dump a parallel set but invert the values
489     tags_local.clear();
490     tags_local.put(TAGK_STRING, TAGV_B_STRING);
491     timestamp = offset ? 1356998415 : 1356998400;
492     for (float i = 75F; i > 0; i -= 0.25F) {
493       tsdb.addPoint(METRIC_STRING, timestamp += 30, i, tags_local)
494         .joinUninterruptibly();
495       if (two_metrics) {
496         tsdb.addPoint(METRIC_B_STRING, timestamp, i, tags_local)
497           .joinUninterruptibly();
498       }
499     }
500   }
501 
storeFloatTimeSeriesMs()502   protected void storeFloatTimeSeriesMs() throws Exception {
503     setDataPointStorage();
504     // dump a bunch of rows of two metrics so that we can test filtering out
505     // on the metric
506     HashMap<String, String> tags_local = new HashMap<String, String>(tags);
507     long timestamp = 1356998400000L;
508     for (float i = 1.25F; i <= 76; i += 0.25F) {
509       tsdb.addPoint(METRIC_STRING, timestamp += 500, i, tags_local)
510         .joinUninterruptibly();
511       tsdb.addPoint(METRIC_B_STRING, timestamp, i, tags_local)
512         .joinUninterruptibly();
513     }
514 
515     // dump a parallel set but invert the values
516     tags_local.clear();
517     tags_local.put(TAGK_STRING, TAGV_B_STRING);
518     timestamp = 1356998400000L;
519     for (float i = 75F; i > 0; i -= 0.25F) {
520       tsdb.addPoint(METRIC_STRING, timestamp += 500, i, tags_local)
521         .joinUninterruptibly();
522       tsdb.addPoint(METRIC_B_STRING, timestamp, i, tags_local)
523         .joinUninterruptibly();
524     }
525   }
526 
storeMixedTimeSeriesSeconds()527   protected void storeMixedTimeSeriesSeconds() throws Exception {
528     setDataPointStorage();
529     HashMap<String, String> tags_local = new HashMap<String, String>(tags);
530     long timestamp = 1356998400;
531     for (float i = 1.25F; i <= 76; i += 0.25F) {
532       if (i % 2 == 0) {
533         tsdb.addPoint(METRIC_STRING, timestamp += 30, (long)i, tags_local)
534           .joinUninterruptibly();
535       } else {
536         tsdb.addPoint(METRIC_STRING, timestamp += 30, i, tags_local)
537           .joinUninterruptibly();
538       }
539     }
540   }
541 
542   // dumps ints, floats, seconds and ms
storeMixedTimeSeriesMsAndS()543   protected void storeMixedTimeSeriesMsAndS() throws Exception {
544     setDataPointStorage();
545     HashMap<String, String> tags_local = new HashMap<String, String>(tags);
546     long timestamp = 1356998400000L;
547     for (float i = 1.25F; i <= 76; i += 0.25F) {
548       long ts = timestamp += 500;
549       if (ts % 1000 == 0) {
550         ts /= 1000;
551       }
552       if (i % 2 == 0) {
553         tsdb.addPoint(METRIC_STRING, ts, (long)i, tags_local).joinUninterruptibly();
554       } else {
555         tsdb.addPoint(METRIC_STRING, ts, i, tags_local).joinUninterruptibly();
556       }
557     }
558   }
559 
560   /**
561    * Validates the metric name, tags and annotations
562    * @param dps The datapoints array returned from the query
563    * @param index The index to peek into the array
564    * @param agged_tags Whether or not the tags were aggregated out
565    */
assertMeta(final DataPoints[] dps, final int index, final boolean agged_tags)566   protected void assertMeta(final DataPoints[] dps, final int index,
567       final boolean agged_tags) {
568     assertMeta(dps, index, agged_tags, false);
569   }
570 
571   /**
572    * Validates the metric name, tags and annotations
573    * @param dps The datapoints array returned from the query
574    * @param index The index to peek into the array
575    * @param agged_tags Whether or not the tags were aggregated out
576    * @param annotation Whether we're expecting a note or not
577    */
assertMeta(final DataPoints[] dps, final int index, final boolean agged_tags, final boolean annotation)578   protected void assertMeta(final DataPoints[] dps, final int index,
579       final boolean agged_tags, final boolean annotation) {
580     assertNotNull(dps);
581     assertEquals(METRIC_STRING, dps[index].metricName());
582 
583     if (agged_tags) {
584       assertTrue(dps[index].getTags().isEmpty());
585       assertEquals(TAGK_STRING, dps[index].getAggregatedTags().get(0));
586     } else {
587       if (index == 0) {
588         assertTrue(dps[index].getAggregatedTags().isEmpty());
589         assertEquals(TAGV_STRING, dps[index].getTags().get(TAGK_STRING));
590       } else {
591         assertEquals(TAGV_B_STRING, dps[index].getTags().get(TAGK_STRING));
592       }
593     }
594 
595     if (annotation) {
596       assertEquals(1, dps[index].getAnnotations().size());
597       assertEquals(NOTE_DESCRIPTION, dps[index].getAnnotations().get(0)
598           .getDescription());
599       assertEquals(NOTE_NOTES, dps[index].getAnnotations().get(0).getNotes());
600     } else {
601       assertNull(dps[index].getAnnotations());
602     }
603   }
604 
605   /**
606    * Stores a single annotation in the given row
607    * @param timestamp The time to store the data point at
608    * @throws Exception
609    */
storeAnnotation(final long timestamp)610   protected void storeAnnotation(final long timestamp) throws Exception {
611     final Annotation note = new Annotation();
612     note.setTSUID("000001000001000001");
613     note.setStartTime(timestamp);
614     note.setDescription(NOTE_DESCRIPTION);
615     note.setNotes(NOTE_NOTES);
616     note.syncToStorage(tsdb, false).joinUninterruptibly();
617   }
618 
619   /**
620    * A fake {@link org.jboss.netty.util.Timer} implementation.
621    * Instead of executing the task it will store that task in a internal state
622    * and provides a function to start the execution of the stored task.
623    * This implementation thus allows the flexibility of simulating the
624    * things that will be going on during the time out period of a TimerTask.
625    * This was mainly return to simulate the timeout period for
626    * alreadyNSREdRegion test, where the region will be in the NSREd mode only
627    * during this timeout period, which was difficult to simulate using the
628    * above {@link FakeTimer} implementation, as we don't get back the control
629    * during the timeout period
630    *
631    * Here it will hold at most two Tasks. We have two tasks here because when
632    * one is being executed, it may call for newTimeOut for another task.
633    */
634   public static final class FakeTaskTimer extends HashedWheelTimer {
635 
636     public TimerTask newPausedTask = null;
637     public TimerTask pausedTask = null;
638     public Timeout timeout = null;
639 
640     @Override
newTimeout(final TimerTask task, final long delay, final TimeUnit unit)641     public synchronized Timeout newTimeout(final TimerTask task,
642                                            final long delay,
643                                            final TimeUnit unit) {
644       if (pausedTask == null) {
645         pausedTask = task;
646       }  else if (newPausedTask == null) {
647         newPausedTask = task;
648       } else {
649         throw new IllegalStateException("Cannot Pause Two Timer Tasks");
650       }
651       timeout = mock(Timeout.class);
652       return timeout;
653     }
654 
655     @Override
stop()656     public Set<Timeout> stop() {
657       return null;
658     }
659 
continuePausedTask()660     public boolean continuePausedTask() {
661       if (pausedTask == null) {
662         return false;
663       }
664       try {
665         if (newPausedTask != null) {
666           throw new IllegalStateException("Cannot be in this state");
667         }
668         pausedTask.run(null);  // Argument never used in this code base
669         pausedTask = newPausedTask;
670         newPausedTask = null;
671         return true;
672       } catch (Exception e) {
673         throw new RuntimeException("Timer task failed: " + pausedTask, e);
674       }
675     }
676   }
677 
678   /**
679    * A little class used to throw a very specific type of exception for matching
680    * in Unit Tests.
681    */
682   public static class UnitTestException extends RuntimeException {
UnitTestException()683     public UnitTestException() { }
UnitTestException(final String msg)684     public UnitTestException(final String msg) {
685       super(msg);
686     }
687     private static final long serialVersionUID = -4404095849459619922L;
688   }
689 }