1 // This file is part of OpenTSDB. 2 // Copyright (C) 2015 The OpenTSDB Authors. 3 // 4 // This program is free software: you can redistribute it and/or modify it 5 // under the terms of the GNU Lesser General Public License as published by 6 // the Free Software Foundation, either version 2.1 of the License, or (at your 7 // option) any later version. This program is distributed in the hope that it 8 // will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty 9 // of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser 10 // General Public License for more details. You should have received a copy 11 // of the GNU Lesser General Public License along with this program. If not, 12 // see <http://www.gnu.org/licenses/>. 13 package net.opentsdb.core; 14 15 import static org.junit.Assert.assertEquals; 16 import static org.junit.Assert.assertNotNull; 17 import static org.junit.Assert.assertNull; 18 import static org.junit.Assert.assertTrue; 19 import static org.mockito.Mockito.when; 20 import static org.powermock.api.mockito.PowerMockito.mock; 21 22 import java.util.HashMap; 23 import java.util.Map; 24 import java.util.Set; 25 import java.util.concurrent.TimeUnit; 26 27 import net.opentsdb.meta.Annotation; 28 import net.opentsdb.storage.MockBase; 29 import net.opentsdb.uid.NoSuchUniqueId; 30 import net.opentsdb.uid.NoSuchUniqueName; 31 import net.opentsdb.uid.UniqueId; 32 import net.opentsdb.utils.Config; 33 34 import org.hbase.async.HBaseClient; 35 import org.hbase.async.Scanner; 36 import org.jboss.netty.util.HashedWheelTimer; 37 import org.jboss.netty.util.Timeout; 38 import org.jboss.netty.util.TimerTask; 39 import org.junit.Before; 40 import org.junit.runner.RunWith; 41 import org.mockito.invocation.InvocationOnMock; 42 import org.mockito.stubbing.Answer; 43 import org.powermock.api.mockito.PowerMockito; 44 import org.powermock.core.classloader.annotations.PowerMockIgnore; 45 import org.powermock.core.classloader.annotations.PrepareForTest; 46 import org.powermock.modules.junit4.PowerMockRunner; 47 import org.powermock.reflect.Whitebox; 48 49 import com.stumbleupon.async.Deferred; 50 51 /** 52 * Sets up a real TSDB with mocked client, compaction queue and timer along 53 * with mocked UID assignment, fetches for common unit tests. 54 */ 55 @RunWith(PowerMockRunner.class) 56 @PowerMockIgnore({"javax.management.*", "javax.xml.*", 57 "ch.qos.*", "org.slf4j.*", 58 "com.sum.*", "org.xml.*"}) 59 @PrepareForTest({TSDB.class, Config.class, UniqueId.class, HBaseClient.class, 60 HashedWheelTimer.class, Scanner.class, Const.class }) 61 public class BaseTsdbTest { 62 /** A list of UIDs from A to Z for unit testing UIDs values */ 63 public static final Map<String, byte[]> METRIC_UIDS = 64 new HashMap<String, byte[]>(26); 65 public static final Map<String, byte[]> TAGK_UIDS = 66 new HashMap<String, byte[]>(26); 67 public static final Map<String, byte[]> TAGV_UIDS = 68 new HashMap<String, byte[]>(26); 69 static { 70 char letter = 'A'; 71 int uid = 10; 72 for (int i = 0; i < 26; i++) { Character.toString(letter)73 METRIC_UIDS.put(Character.toString(letter), 74 UniqueId.longToUID(uid, TSDB.metrics_width())); Character.toString(letter)75 TAGK_UIDS.put(Character.toString(letter), 76 UniqueId.longToUID(uid, TSDB.tagk_width())); 77 TAGV_UIDS.put(Character.toString(letter++), 78 UniqueId.longToUID(uid++, TSDB.tagv_width())); 79 } 80 } 81 82 public static final String METRIC_STRING = "sys.cpu.user"; 83 public static final byte[] METRIC_BYTES = new byte[] { 0, 0, 1 }; 84 public static final String METRIC_B_STRING = "sys.cpu.system"; 85 public static final byte[] METRIC_B_BYTES = new byte[] { 0, 0, 2 }; 86 public static final String NSUN_METRIC = "sys.cpu.nice"; 87 public static final byte[] NSUI_METRIC = new byte[] { 0, 0, 3 }; 88 89 public static final String TAGK_STRING = "host"; 90 public static final byte[] TAGK_BYTES = new byte[] { 0, 0, 1 }; 91 public static final String TAGK_B_STRING = "owner"; 92 public static final byte[] TAGK_B_BYTES = new byte[] { 0, 0, 3 }; 93 public static final String NSUN_TAGK = "dc"; 94 public static final byte[] NSUI_TAGK = new byte[] { 0, 0, 4 }; 95 96 public static final String TAGV_STRING = "web01"; 97 public static final byte[] TAGV_BYTES = new byte[] { 0, 0, 1 }; 98 public static final String TAGV_B_STRING = "web02"; 99 public static final byte[] TAGV_B_BYTES = new byte[] { 0, 0, 2 }; 100 public static final String NSUN_TAGV = "web03"; 101 public static final byte[] NSUI_TAGV = new byte[] { 0, 0, 3 }; 102 103 static final String NOTE_DESCRIPTION = "Hello DiscWorld!"; 104 static final String NOTE_NOTES = "Millenium hand and shrimp"; 105 106 protected HashedWheelTimer timer; 107 protected Config config; 108 protected TSDB tsdb; 109 protected HBaseClient client = mock(HBaseClient.class); 110 protected UniqueId metrics = mock(UniqueId.class); 111 protected UniqueId tag_names = mock(UniqueId.class); 112 protected UniqueId tag_values = mock(UniqueId.class); 113 protected Map<String, String> tags = new HashMap<String, String>(1); 114 protected MockBase storage; 115 116 @Before before()117 public void before() throws Exception { 118 timer = mock(HashedWheelTimer.class); 119 120 PowerMockito.whenNew(HashedWheelTimer.class).withNoArguments() 121 .thenReturn(timer); 122 PowerMockito.whenNew(HBaseClient.class).withAnyArguments() 123 .thenReturn(client); 124 125 config = new Config(false); 126 config.overrideConfig("tsd.storage.enable_compaction", "false"); 127 tsdb = PowerMockito.spy(new TSDB(config)); 128 129 config.setAutoMetric(true); 130 131 Whitebox.setInternalState(tsdb, "metrics", metrics); 132 Whitebox.setInternalState(tsdb, "tag_names", tag_names); 133 Whitebox.setInternalState(tsdb, "tag_values", tag_values); 134 135 setupMetricMaps(); 136 setupTagkMaps(); 137 setupTagvMaps(); 138 139 when(metrics.width()).thenReturn((short)3); 140 when(tag_names.width()).thenReturn((short)3); 141 when(tag_values.width()).thenReturn((short)3); 142 143 tags.put(TAGK_STRING, TAGV_STRING); 144 } 145 146 /** Adds the static UIDs to the metrics UID mock object */ setupMetricMaps()147 void setupMetricMaps() { 148 when(metrics.getId(METRIC_STRING)).thenReturn(METRIC_BYTES); 149 when(metrics.getIdAsync(METRIC_STRING)) 150 .thenAnswer(new Answer<Deferred<byte[]>>() { 151 @Override 152 public Deferred<byte[]> answer(InvocationOnMock invocation) 153 throws Throwable { 154 return Deferred.fromResult(METRIC_BYTES); 155 } 156 }); 157 when(metrics.getOrCreateId(METRIC_STRING)) 158 .thenReturn(METRIC_BYTES); 159 160 when(metrics.getId(METRIC_B_STRING)).thenReturn(METRIC_B_BYTES); 161 when(metrics.getIdAsync(METRIC_B_STRING)) 162 .thenAnswer(new Answer<Deferred<byte[]>>() { 163 @Override 164 public Deferred<byte[]> answer(InvocationOnMock invocation) 165 throws Throwable { 166 return Deferred.fromResult(METRIC_B_BYTES); 167 } 168 }); 169 when(metrics.getOrCreateId(METRIC_B_STRING)) 170 .thenReturn(METRIC_B_BYTES); 171 172 when(metrics.getNameAsync(METRIC_BYTES)) 173 .thenAnswer(new Answer<Deferred<String>>() { 174 @Override 175 public Deferred<String> answer(InvocationOnMock invocation) 176 throws Throwable { 177 return Deferred.fromResult(METRIC_STRING); 178 } 179 }); 180 when(metrics.getNameAsync(METRIC_B_BYTES)) 181 .thenAnswer(new Answer<Deferred<String>>() { 182 @Override 183 public Deferred<String> answer(InvocationOnMock invocation) 184 throws Throwable { 185 return Deferred.fromResult(METRIC_B_STRING); 186 } 187 }); 188 when(metrics.getNameAsync(NSUI_METRIC)) 189 .thenThrow(new NoSuchUniqueId("metrics", NSUI_METRIC)); 190 191 final NoSuchUniqueName nsun = new NoSuchUniqueName(NSUN_METRIC, "metrics"); 192 193 when(metrics.getId(NSUN_METRIC)).thenThrow(nsun); 194 when(metrics.getIdAsync(NSUN_METRIC)) 195 .thenReturn(Deferred.<byte[]>fromError(nsun)); 196 when(metrics.getOrCreateId(NSUN_METRIC)).thenThrow(nsun); 197 198 // Iterate over the metric UIDs and handle both forward and reverse 199 for (final Map.Entry<String, byte[]> uid : METRIC_UIDS.entrySet()) { 200 when(metrics.getId(uid.getKey())).thenReturn(uid.getValue()); 201 when(metrics.getIdAsync(uid.getKey())) 202 .thenAnswer(new Answer<Deferred<byte[]>>() { 203 @Override 204 public Deferred<byte[]> answer(InvocationOnMock invocation) 205 throws Throwable { 206 return Deferred.fromResult(uid.getValue()); 207 } 208 }); 209 when(metrics.getOrCreateId(uid.getKey())) 210 .thenReturn(uid.getValue()); 211 when(metrics.getNameAsync(uid.getValue())) 212 .thenAnswer(new Answer<Deferred<String>>() { 213 @Override 214 public Deferred<String> answer(InvocationOnMock invocation) 215 throws Throwable { 216 return Deferred.fromResult(uid.getKey()); 217 } 218 }); 219 } 220 } 221 222 /** Adds the static UIDs to the tag keys UID mock object */ setupTagkMaps()223 void setupTagkMaps() { 224 when(tag_names.getId(TAGK_STRING)).thenReturn(TAGK_BYTES); 225 when(tag_names.getOrCreateId(TAGK_STRING)).thenReturn(TAGK_BYTES); 226 when(tag_names.getIdAsync(TAGK_STRING)) 227 .thenAnswer(new Answer<Deferred<byte[]>>() { 228 @Override 229 public Deferred<byte[]> answer(InvocationOnMock invocation) 230 throws Throwable { 231 return Deferred.fromResult(TAGK_BYTES); 232 } 233 }); 234 when(tag_names.getOrCreateIdAsync(TAGK_STRING)) 235 .thenReturn(Deferred.fromResult(TAGK_BYTES)); 236 237 when(tag_names.getId(TAGK_B_STRING)).thenReturn(TAGK_B_BYTES); 238 when(tag_names.getOrCreateId(TAGK_B_STRING)).thenReturn(TAGK_B_BYTES); 239 when(tag_names.getIdAsync(TAGK_B_STRING)) 240 .thenAnswer(new Answer<Deferred<byte[]>>() { 241 @Override 242 public Deferred<byte[]> answer(InvocationOnMock invocation) 243 throws Throwable { 244 return Deferred.fromResult(TAGK_B_BYTES); 245 } 246 }); 247 when(tag_names.getOrCreateIdAsync(TAGK_B_STRING)) 248 .thenReturn(Deferred.fromResult(TAGK_B_BYTES)); 249 250 when(tag_names.getNameAsync(TAGK_BYTES)) 251 .thenAnswer(new Answer<Deferred<String>>() { 252 @Override 253 public Deferred<String> answer(InvocationOnMock invocation) 254 throws Throwable { 255 return Deferred.fromResult(TAGK_STRING); 256 } 257 }); 258 when(tag_names.getNameAsync(TAGK_B_BYTES)) 259 .thenAnswer(new Answer<Deferred<String>>() { 260 @Override 261 public Deferred<String> answer(InvocationOnMock invocation) 262 throws Throwable { 263 return Deferred.fromResult(TAGK_B_STRING); 264 } 265 }); 266 when(tag_names.getNameAsync(NSUI_TAGK)) 267 .thenThrow(new NoSuchUniqueId("tagk", NSUI_TAGK)); 268 269 final NoSuchUniqueName nsun = new NoSuchUniqueName(NSUN_TAGK, "tagk"); 270 271 when(tag_names.getId(NSUN_TAGK)) 272 .thenThrow(nsun); 273 when(tag_names.getIdAsync(NSUN_TAGK)) 274 .thenReturn(Deferred.<byte[]>fromError(nsun)); 275 276 // Iterate over the tagk UIDs and handle both forward and reverse 277 for (final Map.Entry<String, byte[]> uid : TAGK_UIDS.entrySet()) { 278 when(tag_names.getId(uid.getKey())).thenReturn(uid.getValue()); 279 when(tag_names.getIdAsync(uid.getKey())) 280 .thenAnswer(new Answer<Deferred<byte[]>>() { 281 @Override 282 public Deferred<byte[]> answer(InvocationOnMock invocation) 283 throws Throwable { 284 return Deferred.fromResult(uid.getValue()); 285 } 286 }); 287 when(tag_names.getOrCreateId(uid.getKey())) 288 .thenReturn(uid.getValue()); 289 when(tag_names.getNameAsync(uid.getValue())) 290 .thenAnswer(new Answer<Deferred<String>>() { 291 @Override 292 public Deferred<String> answer(InvocationOnMock invocation) 293 throws Throwable { 294 return Deferred.fromResult(uid.getKey()); 295 } 296 }); 297 } 298 } 299 300 /** Adds the static UIDs to the tag values UID mock object */ setupTagvMaps()301 void setupTagvMaps() { 302 when(tag_values.getId(TAGV_STRING)).thenReturn(TAGV_BYTES); 303 when(tag_values.getOrCreateId(TAGV_STRING)).thenReturn(TAGV_BYTES); 304 when(tag_values.getIdAsync(TAGV_STRING)) 305 .thenAnswer(new Answer<Deferred<byte[]>>() { 306 @Override 307 public Deferred<byte[]> answer(InvocationOnMock invocation) 308 throws Throwable { 309 return Deferred.fromResult(TAGV_BYTES); 310 } 311 }); 312 when(tag_values.getOrCreateIdAsync(TAGV_STRING)) 313 .thenReturn(Deferred.fromResult(TAGV_BYTES)); 314 315 when(tag_values.getId(TAGV_B_STRING)).thenReturn(TAGV_B_BYTES); 316 when(tag_values.getOrCreateId(TAGV_B_STRING)).thenReturn(TAGV_B_BYTES); 317 when(tag_values.getIdAsync(TAGV_B_STRING)) 318 .thenAnswer(new Answer<Deferred<byte[]>>() { 319 @Override 320 public Deferred<byte[]> answer(InvocationOnMock invocation) 321 throws Throwable { 322 return Deferred.fromResult(TAGV_B_BYTES); 323 } 324 }); 325 when(tag_values.getOrCreateIdAsync(TAGV_B_STRING)) 326 .thenReturn(Deferred.fromResult(TAGV_B_BYTES)); 327 328 when(tag_values.getNameAsync(TAGV_BYTES)) 329 .thenReturn(Deferred.fromResult(TAGV_STRING)); 330 when(tag_values.getNameAsync(TAGV_B_BYTES)) 331 .thenReturn(Deferred.fromResult(TAGV_B_STRING)); 332 when(tag_values.getNameAsync(NSUI_TAGV)) 333 .thenThrow(new NoSuchUniqueId("tagv", NSUI_TAGV)); 334 335 final NoSuchUniqueName nsun = new NoSuchUniqueName(NSUN_TAGV, "tagv"); 336 337 when(tag_values.getId(NSUN_TAGV)).thenThrow(nsun); 338 when(tag_values.getIdAsync(NSUN_TAGV)) 339 .thenReturn(Deferred.<byte[]>fromError(nsun)); 340 341 // Iterate over the tagv UIDs and handle both forward and reverse 342 for (final Map.Entry<String, byte[]> uid : TAGV_UIDS.entrySet()) { 343 when(tag_values.getId(uid.getKey())).thenReturn(uid.getValue()); 344 when(tag_values.getIdAsync(uid.getKey())) 345 .thenAnswer(new Answer<Deferred<byte[]>>() { 346 @Override 347 public Deferred<byte[]> answer(InvocationOnMock invocation) 348 throws Throwable { 349 return Deferred.fromResult(uid.getValue()); 350 } 351 }); 352 when(tag_values.getOrCreateId(uid.getKey())) 353 .thenReturn(uid.getValue()); 354 when(tag_values.getNameAsync(uid.getValue())) 355 .thenAnswer(new Answer<Deferred<String>>() { 356 @Override 357 public Deferred<String> answer(InvocationOnMock invocation) 358 throws Throwable { 359 return Deferred.fromResult(uid.getKey()); 360 } 361 }); 362 } 363 } 364 365 // ----------------- // 366 // Helper functions. // 367 // ----------------- // 368 369 /** @return a row key template with the default metric and tags */ getRowKeyTemplate()370 protected byte[] getRowKeyTemplate() { 371 return IncomingDataPoints.rowKeyTemplate(tsdb, METRIC_STRING, tags); 372 } 373 setDataPointStorage()374 protected void setDataPointStorage() throws Exception { 375 storage = new MockBase(tsdb, client, true, true, true, true); 376 storage.setFamily("t".getBytes(MockBase.ASCII())); 377 } 378 storeLongTimeSeriesSeconds(final boolean two_metrics, final boolean offset)379 protected void storeLongTimeSeriesSeconds(final boolean two_metrics, 380 final boolean offset) throws Exception { 381 setDataPointStorage(); 382 383 // dump a bunch of rows of two metrics so that we can test filtering out 384 // on the metric 385 HashMap<String, String> tags_local = new HashMap<String, String>(tags); 386 long timestamp = 1356998400; 387 for (int i = 1; i <= 300; i++) { 388 tsdb.addPoint(METRIC_STRING, timestamp += 30, i, tags_local) 389 .joinUninterruptibly(); 390 if (two_metrics) { 391 tsdb.addPoint(METRIC_B_STRING, timestamp, i, tags_local) 392 .joinUninterruptibly(); 393 } 394 } 395 396 // dump a parallel set but invert the values 397 tags_local.clear(); 398 tags_local.put(TAGK_STRING, TAGV_B_STRING); 399 timestamp = offset ? 1356998415 : 1356998400; 400 for (int i = 300; i > 0; i--) { 401 tsdb.addPoint(METRIC_STRING, timestamp += 30, i, tags_local) 402 .joinUninterruptibly(); 403 if (two_metrics) { 404 tsdb.addPoint(METRIC_B_STRING, timestamp, i, tags_local) 405 .joinUninterruptibly(); 406 } 407 } 408 } 409 storeLongTimeSeriesMs()410 protected void storeLongTimeSeriesMs() throws Exception { 411 setDataPointStorage(); 412 // dump a bunch of rows of two metrics so that we can test filtering out 413 // on the metric 414 HashMap<String, String> tags_local = new HashMap<String, String>(tags); 415 long timestamp = 1356998400000L; 416 for (int i = 1; i <= 300; i++) { 417 tsdb.addPoint(METRIC_STRING, timestamp += 500, i, tags_local) 418 .joinUninterruptibly(); 419 tsdb.addPoint(METRIC_B_STRING, timestamp, i, tags_local) 420 .joinUninterruptibly(); 421 } 422 423 // dump a parallel set but invert the values 424 tags_local.clear(); 425 tags_local.put(TAGK_STRING, TAGV_B_STRING); 426 timestamp = 1356998400000L; 427 for (int i = 300; i > 0; i--) { 428 tsdb.addPoint(METRIC_STRING, timestamp += 500, i, tags_local) 429 .joinUninterruptibly(); 430 tsdb.addPoint(METRIC_B_STRING, timestamp, i, tags_local) 431 .joinUninterruptibly(); 432 } 433 } 434 435 /** 436 * Create two metrics with same name, skipping every third point in host=web01 437 * and every other point in host=web02. To wit: 438 * 439 * METRIC TAG t0 t1 t2 t3 t4 t5 ... 440 * sys.cpu.user web01 X 2 3 X 5 6 ... 441 * sys.cpu.user web02 X 299 X 297 X 295 ... 442 */ storeLongTimeSeriesWithMissingData()443 protected void storeLongTimeSeriesWithMissingData() throws Exception { 444 setDataPointStorage(); 445 446 // host=web01 447 HashMap<String, String> tags_local = new HashMap<String, String>(tags); 448 long timestamp = 1356998400L; 449 for (int i = 0; i < 300; ++i) { 450 // Skip every third point. 451 if (0 != (i % 3)) { 452 tsdb.addPoint(METRIC_STRING, timestamp, i + 1, tags_local) 453 .joinUninterruptibly(); 454 } 455 timestamp += 10L; 456 } 457 458 // host=web02 459 tags_local.clear(); 460 tags_local.put(TAGK_STRING, TAGV_B_STRING); 461 timestamp = 1356998400L; 462 for (int i = 300; i > 0; --i) { 463 // Skip every other point. 464 if (0 != (i % 2)) { 465 tsdb.addPoint(METRIC_STRING, timestamp, i, tags_local) 466 .joinUninterruptibly(); 467 } 468 timestamp += 10L; 469 } 470 } 471 storeFloatTimeSeriesSeconds(final boolean two_metrics, final boolean offset)472 protected void storeFloatTimeSeriesSeconds(final boolean two_metrics, 473 final boolean offset) throws Exception { 474 setDataPointStorage(); 475 // dump a bunch of rows of two metrics so that we can test filtering out 476 // on the metric 477 HashMap<String, String> tags_local = new HashMap<String, String>(tags); 478 long timestamp = 1356998400; 479 for (float i = 1.25F; i <= 76; i += 0.25F) { 480 tsdb.addPoint(METRIC_STRING, timestamp += 30, i, tags_local) 481 .joinUninterruptibly(); 482 if (two_metrics) { 483 tsdb.addPoint(METRIC_B_STRING, timestamp, i, tags_local) 484 .joinUninterruptibly(); 485 } 486 } 487 488 // dump a parallel set but invert the values 489 tags_local.clear(); 490 tags_local.put(TAGK_STRING, TAGV_B_STRING); 491 timestamp = offset ? 1356998415 : 1356998400; 492 for (float i = 75F; i > 0; i -= 0.25F) { 493 tsdb.addPoint(METRIC_STRING, timestamp += 30, i, tags_local) 494 .joinUninterruptibly(); 495 if (two_metrics) { 496 tsdb.addPoint(METRIC_B_STRING, timestamp, i, tags_local) 497 .joinUninterruptibly(); 498 } 499 } 500 } 501 storeFloatTimeSeriesMs()502 protected void storeFloatTimeSeriesMs() throws Exception { 503 setDataPointStorage(); 504 // dump a bunch of rows of two metrics so that we can test filtering out 505 // on the metric 506 HashMap<String, String> tags_local = new HashMap<String, String>(tags); 507 long timestamp = 1356998400000L; 508 for (float i = 1.25F; i <= 76; i += 0.25F) { 509 tsdb.addPoint(METRIC_STRING, timestamp += 500, i, tags_local) 510 .joinUninterruptibly(); 511 tsdb.addPoint(METRIC_B_STRING, timestamp, i, tags_local) 512 .joinUninterruptibly(); 513 } 514 515 // dump a parallel set but invert the values 516 tags_local.clear(); 517 tags_local.put(TAGK_STRING, TAGV_B_STRING); 518 timestamp = 1356998400000L; 519 for (float i = 75F; i > 0; i -= 0.25F) { 520 tsdb.addPoint(METRIC_STRING, timestamp += 500, i, tags_local) 521 .joinUninterruptibly(); 522 tsdb.addPoint(METRIC_B_STRING, timestamp, i, tags_local) 523 .joinUninterruptibly(); 524 } 525 } 526 storeMixedTimeSeriesSeconds()527 protected void storeMixedTimeSeriesSeconds() throws Exception { 528 setDataPointStorage(); 529 HashMap<String, String> tags_local = new HashMap<String, String>(tags); 530 long timestamp = 1356998400; 531 for (float i = 1.25F; i <= 76; i += 0.25F) { 532 if (i % 2 == 0) { 533 tsdb.addPoint(METRIC_STRING, timestamp += 30, (long)i, tags_local) 534 .joinUninterruptibly(); 535 } else { 536 tsdb.addPoint(METRIC_STRING, timestamp += 30, i, tags_local) 537 .joinUninterruptibly(); 538 } 539 } 540 } 541 542 // dumps ints, floats, seconds and ms storeMixedTimeSeriesMsAndS()543 protected void storeMixedTimeSeriesMsAndS() throws Exception { 544 setDataPointStorage(); 545 HashMap<String, String> tags_local = new HashMap<String, String>(tags); 546 long timestamp = 1356998400000L; 547 for (float i = 1.25F; i <= 76; i += 0.25F) { 548 long ts = timestamp += 500; 549 if (ts % 1000 == 0) { 550 ts /= 1000; 551 } 552 if (i % 2 == 0) { 553 tsdb.addPoint(METRIC_STRING, ts, (long)i, tags_local).joinUninterruptibly(); 554 } else { 555 tsdb.addPoint(METRIC_STRING, ts, i, tags_local).joinUninterruptibly(); 556 } 557 } 558 } 559 560 /** 561 * Validates the metric name, tags and annotations 562 * @param dps The datapoints array returned from the query 563 * @param index The index to peek into the array 564 * @param agged_tags Whether or not the tags were aggregated out 565 */ assertMeta(final DataPoints[] dps, final int index, final boolean agged_tags)566 protected void assertMeta(final DataPoints[] dps, final int index, 567 final boolean agged_tags) { 568 assertMeta(dps, index, agged_tags, false); 569 } 570 571 /** 572 * Validates the metric name, tags and annotations 573 * @param dps The datapoints array returned from the query 574 * @param index The index to peek into the array 575 * @param agged_tags Whether or not the tags were aggregated out 576 * @param annotation Whether we're expecting a note or not 577 */ assertMeta(final DataPoints[] dps, final int index, final boolean agged_tags, final boolean annotation)578 protected void assertMeta(final DataPoints[] dps, final int index, 579 final boolean agged_tags, final boolean annotation) { 580 assertNotNull(dps); 581 assertEquals(METRIC_STRING, dps[index].metricName()); 582 583 if (agged_tags) { 584 assertTrue(dps[index].getTags().isEmpty()); 585 assertEquals(TAGK_STRING, dps[index].getAggregatedTags().get(0)); 586 } else { 587 if (index == 0) { 588 assertTrue(dps[index].getAggregatedTags().isEmpty()); 589 assertEquals(TAGV_STRING, dps[index].getTags().get(TAGK_STRING)); 590 } else { 591 assertEquals(TAGV_B_STRING, dps[index].getTags().get(TAGK_STRING)); 592 } 593 } 594 595 if (annotation) { 596 assertEquals(1, dps[index].getAnnotations().size()); 597 assertEquals(NOTE_DESCRIPTION, dps[index].getAnnotations().get(0) 598 .getDescription()); 599 assertEquals(NOTE_NOTES, dps[index].getAnnotations().get(0).getNotes()); 600 } else { 601 assertNull(dps[index].getAnnotations()); 602 } 603 } 604 605 /** 606 * Stores a single annotation in the given row 607 * @param timestamp The time to store the data point at 608 * @throws Exception 609 */ storeAnnotation(final long timestamp)610 protected void storeAnnotation(final long timestamp) throws Exception { 611 final Annotation note = new Annotation(); 612 note.setTSUID("000001000001000001"); 613 note.setStartTime(timestamp); 614 note.setDescription(NOTE_DESCRIPTION); 615 note.setNotes(NOTE_NOTES); 616 note.syncToStorage(tsdb, false).joinUninterruptibly(); 617 } 618 619 /** 620 * A fake {@link org.jboss.netty.util.Timer} implementation. 621 * Instead of executing the task it will store that task in a internal state 622 * and provides a function to start the execution of the stored task. 623 * This implementation thus allows the flexibility of simulating the 624 * things that will be going on during the time out period of a TimerTask. 625 * This was mainly return to simulate the timeout period for 626 * alreadyNSREdRegion test, where the region will be in the NSREd mode only 627 * during this timeout period, which was difficult to simulate using the 628 * above {@link FakeTimer} implementation, as we don't get back the control 629 * during the timeout period 630 * 631 * Here it will hold at most two Tasks. We have two tasks here because when 632 * one is being executed, it may call for newTimeOut for another task. 633 */ 634 public static final class FakeTaskTimer extends HashedWheelTimer { 635 636 public TimerTask newPausedTask = null; 637 public TimerTask pausedTask = null; 638 public Timeout timeout = null; 639 640 @Override newTimeout(final TimerTask task, final long delay, final TimeUnit unit)641 public synchronized Timeout newTimeout(final TimerTask task, 642 final long delay, 643 final TimeUnit unit) { 644 if (pausedTask == null) { 645 pausedTask = task; 646 } else if (newPausedTask == null) { 647 newPausedTask = task; 648 } else { 649 throw new IllegalStateException("Cannot Pause Two Timer Tasks"); 650 } 651 timeout = mock(Timeout.class); 652 return timeout; 653 } 654 655 @Override stop()656 public Set<Timeout> stop() { 657 return null; 658 } 659 continuePausedTask()660 public boolean continuePausedTask() { 661 if (pausedTask == null) { 662 return false; 663 } 664 try { 665 if (newPausedTask != null) { 666 throw new IllegalStateException("Cannot be in this state"); 667 } 668 pausedTask.run(null); // Argument never used in this code base 669 pausedTask = newPausedTask; 670 newPausedTask = null; 671 return true; 672 } catch (Exception e) { 673 throw new RuntimeException("Timer task failed: " + pausedTask, e); 674 } 675 } 676 } 677 678 /** 679 * A little class used to throw a very specific type of exception for matching 680 * in Unit Tests. 681 */ 682 public static class UnitTestException extends RuntimeException { UnitTestException()683 public UnitTestException() { } UnitTestException(final String msg)684 public UnitTestException(final String msg) { 685 super(msg); 686 } 687 private static final long serialVersionUID = -4404095849459619922L; 688 } 689 }