1 // This file is part of OpenTSDB. 2 // Copyright (C) 2013 The OpenTSDB Authors. 3 // 4 // This program is free software: you can redistribute it and/or modify it 5 // under the terms of the GNU Lesser General Public License as published by 6 // the Free Software Foundation, either version 2.1 of the License, or (at your 7 // option) any later version. This program is distributed in the hope that it 8 // will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty 9 // of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser 10 // General Public License for more details. You should have received a copy 11 // of the GNU Lesser General Public License along with this program. If not, 12 // see <http://www.gnu.org/licenses/>. 13 package net.opentsdb.storage; 14 15 import static org.mockito.Matchers.any; 16 import static org.mockito.Matchers.anyString; 17 import static org.mockito.Mockito.doAnswer; 18 import static org.mockito.Mockito.when; 19 import static org.powermock.api.mockito.PowerMockito.mock; 20 21 import java.nio.charset.Charset; 22 import java.util.ArrayList; 23 import java.util.Collections; 24 import java.util.HashSet; 25 import java.util.Iterator; 26 import java.util.List; 27 import java.util.Map; 28 import java.util.Map.Entry; 29 import java.util.Set; 30 import java.util.TreeMap; 31 import java.util.regex.Pattern; 32 import java.util.regex.PatternSyntaxException; 33 34 import javax.xml.bind.DatatypeConverter; 35 36 import net.opentsdb.core.Const; 37 import net.opentsdb.core.TSDB; 38 import net.opentsdb.utils.Pair; 39 40 import org.hbase.async.AtomicIncrementRequest; 41 import org.hbase.async.Bytes; 42 import org.hbase.async.Bytes.ByteMap; 43 import org.hbase.async.AppendRequest; 44 import org.hbase.async.DeleteRequest; 45 import org.hbase.async.FilterList; 46 import org.hbase.async.GetRequest; 47 import org.hbase.async.HBaseClient; 48 import org.hbase.async.KeyRegexpFilter; 49 import org.hbase.async.KeyValue; 50 import org.hbase.async.PutRequest; 51 import org.hbase.async.ScanFilter; 52 import org.hbase.async.Scanner; 53 import org.junit.Ignore; 54 import org.mockito.invocation.InvocationOnMock; 55 import org.mockito.stubbing.Answer; 56 import org.powermock.reflect.Whitebox; 57 58 import com.stumbleupon.async.Deferred; 59 60 /** 61 * Mock HBase implementation useful in testing calls to and from storage with 62 * actual pretend data. The underlying data store is an incredibly ugly nesting 63 * of ByteMaps from AsyncHbase so it stores and orders byte arrays similar to 64 * HBase. It supports tables and column families along with timestamps but 65 * doesn't deal with TTLs or other features. 66 * <p> 67 * By default we configure the "'tsdb', {NAME => 't'}" and 68 * "'tsdb-uid', {NAME => 'id'}, {NAME => 'name'}" tables. If you need more, just 69 * add em. 70 * 71 * <p> 72 * It's not a perfect mock but is useful for the majority of unit tests. Gets, 73 * puts, cas, deletes and scans are currently supported. See notes for each 74 * inner class below about what does and doesn't work. 75 * <p> 76 * Regarding timestamps, whenever you execute an RPC request, the 77 * {@code current_timestamp} will be incremented by one millisecond. By default 78 * the timestamp starts at 1/1/2014 00:00:00 but you can set it to any value 79 * at any time. If a PutRequest comes in with a specific time, that time will 80 * be stored and the timestamp will not be incremented. 81 * <p> 82 * <b>Warning:</b> To use this class, you need to prepare the classes for testing 83 * with the @PrepareForTest annotation. The classes you need to prepare are: 84 * <ul><li>TSDB</li> 85 * <li>HBaseClient</li> 86 * <li>GetRequest</li> 87 * <li>PutRequest</li> 88 * <li>AppendRequest</li> 89 * <li>KeyValue</li> 90 * <li>Scanner</li> 91 * <li>DeleteRequest</li> 92 * <li>AtomicIncrementRequest</li></ul> 93 * @since 2.0 94 */ 95 @Ignore 96 public final class MockBase { 97 private static final Charset ASCII = Charset.forName("ISO-8859-1"); 98 private TSDB tsdb; 99 100 /** Gross huh? <table, <cf, <row, <qual, <ts, value>>>>> 101 * Why is CF before row? Because we want to throw exceptions if a CF hasn't 102 * been "configured" 103 */ 104 private ByteMap<ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>>> 105 storage = new ByteMap<ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>>>(); 106 private HashSet<MockScanner> scanners = new HashSet<MockScanner>(2); 107 108 /** The default family for shortcuts */ 109 private byte[] default_family; 110 111 /** The default table for shortcuts */ 112 private byte[] default_table; 113 114 /** Incremented every time a new value is stored (without a timestamp) */ 115 private long current_timestamp = 1388534400000L; 116 117 /** A list of exceptions that can be thrown when working with a row key */ 118 private ByteMap<Pair<RuntimeException, Boolean>> exceptions; 119 120 /** 121 * Setups up mock intercepts for all of the calls. Depending on the given 122 * flags, some mocks may not be enabled, allowing local unit tests to setup 123 * their own mocks. 124 * @param tsdb A real TSDB (not mocked) that should have it's client set with 125 * the given mock 126 * @param client A mock client that may have been instantiated and should be 127 * captured for use with MockBase 128 * @param default_get Enable the default .get() mock 129 * @param default_put Enable the default .put() and .compareAndSet() mocks 130 * @param default_delete Enable the default .delete() mock 131 * @param default_scan Enable the Scanner mock implementation 132 */ MockBase( final TSDB tsdb, final HBaseClient client, final boolean default_get, final boolean default_put, final boolean default_delete, final boolean default_scan)133 public MockBase( 134 final TSDB tsdb, final HBaseClient client, 135 final boolean default_get, 136 final boolean default_put, 137 final boolean default_delete, 138 final boolean default_scan) { 139 this.tsdb = tsdb; 140 141 default_family = "t".getBytes(ASCII); 142 default_table = "tsdb".getBytes(ASCII); 143 setupDefaultTables(); 144 145 // replace the "real" field objects with mocks 146 Whitebox.setInternalState(tsdb, "client", client); 147 148 // Default get answer will return one or more columns from the requested row 149 if (default_get) { 150 when(client.get((GetRequest)any())).thenAnswer(new MockGet()); 151 } 152 153 // Default put answer will store the given values in the proper location. 154 if (default_put) { 155 when(client.put((PutRequest)any())).thenAnswer(new MockPut()); 156 when(client.compareAndSet((PutRequest)any(), (byte[])any())) 157 .thenAnswer(new MockCAS()); 158 } 159 160 if (default_delete) { 161 when(client.delete((DeleteRequest)any())).thenAnswer(new MockDelete()); 162 } 163 164 if (default_scan) { 165 // to facilitate unit tests where more than one scanner is used (i.e. in a 166 // callback chain) we have to provide a new mock scanner for each new 167 // scanner request. That's the way the mock scanner method knows when a 168 // second call has been issued and it should return a null. 169 when(client.newScanner((byte[]) any())).thenAnswer(new Answer<Scanner>() { 170 171 @Override 172 public Scanner answer(InvocationOnMock arg0) throws Throwable { 173 final Scanner scanner = mock(Scanner.class); 174 final byte[] table = (byte[])arg0.getArguments()[0]; 175 scanners.add(new MockScanner(scanner, table)); 176 return scanner; 177 } 178 179 }); 180 181 } 182 183 when(client.atomicIncrement((AtomicIncrementRequest)any())) 184 .then(new MockAtomicIncrement()); 185 when(client.bufferAtomicIncrement((AtomicIncrementRequest)any())) 186 .then(new MockAtomicIncrement()); 187 when(client.append((AppendRequest)any())).thenAnswer(new MockAppend()); 188 } 189 190 /** 191 * Add a table with families to the data store. If the table or family 192 * exists, it's a no-op. Give real values as we don't check `em. 193 * @param table The table to add 194 * @param families A list of one or more famlies to add to the table 195 */ addTable(final byte[] table, final List<byte[]> families)196 public void addTable(final byte[] table, final List<byte[]> families) { 197 ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map = storage.get(table); 198 if (map == null) { 199 map = new ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>>(); 200 storage.put(table, map); 201 } 202 for (final byte[] family : families) { 203 if (!map.containsKey(family)) { 204 map.put(family, new ByteMap<ByteMap<TreeMap<Long, byte[]>>>()); 205 } 206 } 207 } 208 209 /** 210 * Pops the table out of the map 211 * @param table The table to pop 212 * @return True if the table was there, false if it wasn't. 213 */ deleteTable(final byte[] table)214 public boolean deleteTable(final byte[] table) { 215 return storage.remove(table) != null; 216 } 217 218 /** @param family Sets the default family for calls that need it */ setFamily(final byte[] family)219 public void setFamily(final byte[] family) { 220 default_family = family; 221 } 222 223 /** @param table Sets the default table for calls that need it */ setDefaultTable(final byte[] table)224 public void setDefaultTable(final byte[] table) { 225 default_table = table; 226 } 227 228 /** @param timestamp The timestamp to use for further storage increments */ setCurrentTimestamp(final long timestamp)229 public void setCurrentTimestamp(final long timestamp) { 230 this.current_timestamp = timestamp; 231 } 232 233 /** @return the incrementing timestamp */ getCurrentTimestamp()234 public long getCurrentTimestamp() { 235 return current_timestamp; 236 } 237 238 /** 239 * Add a column to the hash table using the default column family. 240 * The proper row will be created if it doesn't exist. If the column already 241 * exists, the original value will be overwritten with the new data. 242 * Uses the default table and family 243 * @param key The row key 244 * @param qualifier The qualifier 245 * @param value The value to store 246 */ addColumn(final byte[] key, final byte[] qualifier, final byte[] value)247 public void addColumn(final byte[] key, final byte[] qualifier, 248 final byte[] value) { 249 addColumn(default_table, key, default_family, qualifier, value, 250 current_timestamp++); 251 } 252 253 /** 254 * Add a column to the hash table 255 * The proper row will be created if it doesn't exist. If the column already 256 * exists, the original value will be overwritten with the new data. 257 * Uses the default table. 258 * @param key The row key 259 * @param family The column family to store the value in 260 * @param qualifier The qualifier 261 * @param value The value to store 262 */ addColumn(final byte[] key, final byte[] family, final byte[] qualifier, final byte[] value)263 public void addColumn(final byte[] key, final byte[] family, 264 final byte[] qualifier, final byte[] value) { 265 addColumn(default_table, key, family, qualifier, value, current_timestamp++); 266 } 267 268 /** 269 * Add a column to the hash table 270 * The proper row will be created if it doesn't exist. If the column already 271 * exists, the original value will be overwritten with the new data 272 * @param table The table 273 * @param key The row key 274 * @param family The column family to store the value in 275 * @param qualifier The qualifier 276 * @param value The value to store 277 */ addColumn(final byte[] table, final byte[] key, final byte[] family, final byte[] qualifier, final byte[] value)278 public void addColumn(final byte[] table, final byte[] key, final byte[] family, 279 final byte[] qualifier, final byte[] value) { 280 addColumn(table, key, family, qualifier, value, current_timestamp++); 281 } 282 283 /** 284 * Add a column to the hash table 285 * The proper row will be created if it doesn't exist. If the column already 286 * exists, the original value will be overwritten with the new data 287 * @param table The table 288 * @param key The row key 289 * @param family The column family to store the value in 290 * @param qualifier The qualifier 291 * @param value The value to store 292 * @param timestamp The timestamp to store 293 */ addColumn(final byte[] table, final byte[] key, final byte[] family, final byte[] qualifier, final byte[] value, final long timestamp)294 public void addColumn(final byte[] table, final byte[] key, final byte[] family, 295 final byte[] qualifier, final byte[] value, final long timestamp) { 296 // AsyncHBase will throw an NPE if the user tries to write a NULL value 297 // so we better do the same. An empty value is ok though, i.e. new byte[] {} 298 if (value == null) { 299 throw new NullPointerException(); 300 } 301 final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map = storage.get(table); 302 if (map == null) { 303 throw new RuntimeException( 304 "No such table " + Bytes.pretty(table)); 305 } 306 final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf = map.get(family); 307 if (cf == null) { 308 throw new RuntimeException( 309 "No such CF " + Bytes.pretty(family)); 310 } 311 312 ByteMap<TreeMap<Long, byte[]>> row = cf.get(key); 313 if (row == null) { 314 row = new ByteMap<TreeMap<Long, byte[]>>(); 315 cf.put(key, row); 316 } 317 318 TreeMap<Long, byte[]> column = row.get(qualifier); 319 if (column == null) { 320 // remember, most recent at the top! 321 column = new TreeMap<Long, byte[]>(Collections.reverseOrder()); 322 row.put(qualifier, column); 323 } 324 column.put(timestamp, value); 325 } 326 327 /** 328 * Stores an exception so that any operation on the given key will cause it 329 * to be thrown. 330 * @param key The key to go pear shaped on 331 * @param exception The exception to throw 332 */ throwException(final byte[] key, final RuntimeException exception)333 public void throwException(final byte[] key, final RuntimeException exception) { 334 throwException(key, exception, true); 335 } 336 337 /** 338 * Stores an exception so that any operation on the given key will cause it 339 * to be thrown. 340 * @param key The key to go pear shaped on 341 * @param exception The exception to throw 342 * @param as_result Whether or not to return the exception in the deferred 343 * result or throw it outright. 344 */ throwException(final byte[] key, final RuntimeException exception, final boolean as_result)345 public void throwException(final byte[] key, final RuntimeException exception, 346 final boolean as_result) { 347 if (exceptions == null) { 348 exceptions = new ByteMap<Pair<RuntimeException, Boolean>>(); 349 } 350 exceptions.put(key, new Pair<RuntimeException, Boolean>(exception, as_result)); 351 } 352 353 /** Removes all exceptions from the exception list */ clearExceptions()354 public void clearExceptions() { 355 exceptions.clear(); 356 } 357 358 /** @return Total number of unique rows in the default table. Returns 0 if the 359 * default table does not exist */ numRows()360 public int numRows() { 361 return numRows(default_table); 362 } 363 364 /** 365 * Total number of rows in the given table. Returns 0 if the table does not exit. 366 * @param table The table to scan 367 * @return The number of rows 368 */ numRows(final byte[] table)369 public int numRows(final byte[] table) { 370 final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map = 371 storage.get(table); 372 if (map == null) { 373 return 0; 374 } 375 final ByteMap<Void> unique_rows = new ByteMap<Void>(); 376 for (final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf : map.values()) { 377 for (final byte[] key : cf.keySet()) { 378 unique_rows.put(key, null); 379 } 380 } 381 return unique_rows.size(); 382 } 383 384 /** 385 * Return the total number of column families for the row in the default table 386 * @param key The row to search for 387 * @return -1 if the table or row did not exist, otherwise the number of 388 * column families. 389 */ numColumnFamilies(final byte[] key)390 public int numColumnFamilies(final byte[] key) { 391 return numColumnFamilies(default_table, key); 392 } 393 394 /** 395 * Return the number of column families for the given row key in the given table. 396 * @param table The table to iterate over 397 * @param key The row to search for 398 * @return -1 if the table or row did not exist, otherwise the number of 399 * column families. 400 */ numColumnFamilies(final byte[] table, final byte[] key)401 public int numColumnFamilies(final byte[] table, final byte[] key) { 402 final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map = 403 storage.get(table); 404 if (map == null) { 405 return -1; 406 } 407 int sum = 0; 408 for (final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf : map.values()) { 409 if (cf.containsKey(key)) { 410 ++sum; 411 } 412 } 413 return sum == 0 ? -1 : sum; 414 } 415 416 /** 417 * Total number of columns in the given row across all column families in the 418 * default table 419 * @param key The row to search for 420 * @return -1 if the row did not exist, otherwise the number of columns. 421 */ numColumns(final byte[] key)422 public long numColumns(final byte[] key) { 423 return numColumns(default_table, key); 424 } 425 426 /** 427 * Total number of columns in the given row across all column families in the 428 * default table 429 * @param table The table to iterate over 430 * @param key The row to search for 431 * @return -1 if the row did not exist, otherwise the number of columns. 432 */ numColumns(final byte[] table, final byte[] key)433 public long numColumns(final byte[] table, final byte[] key) { 434 final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map = 435 storage.get(table); 436 if (map == null) { 437 return -1; 438 } 439 long size = 0; 440 for (final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf : map.values()) { 441 final ByteMap<TreeMap<Long, byte[]>> row = cf.get(key); 442 if (row != null) { 443 size += row.size(); 444 } 445 } 446 return size == 0 ? -1 : size; 447 } 448 449 /** 450 * Return the total number of columns for a specific row and family in the 451 * default table 452 * @param key The row to search for 453 * @param family The column family to search for 454 * @return -1 if the row did not exist, otherwise the number of columns. 455 */ numColumnsInFamily(final byte[] key, final byte[] family)456 public int numColumnsInFamily(final byte[] key, final byte[] family) { 457 return numColumnsInFamily(default_table, key, family); 458 } 459 460 /** 461 * Return the total number of columns for a specific row and family 462 * @param key The row to search for 463 * @param family The column family to search for 464 * @return -1 if the row did not exist, otherwise the number of columns. 465 */ numColumnsInFamily(final byte[] table, final byte[] key, final byte[] family)466 public int numColumnsInFamily(final byte[] table, final byte[] key, 467 final byte[] family) { 468 final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map = 469 storage.get(table); 470 if (map == null) { 471 return -1; 472 } 473 final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf = map.get(family); 474 if (cf == null) { 475 return -1; 476 } 477 return cf.size(); 478 } 479 480 /** 481 * Retrieve the most recent contents of a single column with the default family 482 * and in the default table 483 * @param key The row key of the column 484 * @param qualifier The column qualifier 485 * @return The byte array of data or null if not found 486 */ getColumn(final byte[] key, final byte[] qualifier)487 public byte[] getColumn(final byte[] key, final byte[] qualifier) { 488 return getColumn(default_table, key, default_family, qualifier); 489 } 490 491 /** 492 * Retrieve the most recent contents of a single column with the default table 493 * @param key The row key of the column 494 * @param family The column family 495 * @param qualifier The column qualifier 496 * @return The byte array of data or null if not found 497 */ getColumn(final byte[] key, final byte[] family, final byte[] qualifier)498 public byte[] getColumn(final byte[] key, final byte[] family, 499 final byte[] qualifier) { 500 return getColumn(default_table, key, family, qualifier); 501 } 502 503 /** 504 * Retrieve the most recent contents of a single column 505 * @param table The table to fetch from 506 * @param key The row key of the column 507 * @param family The column family 508 * @param qualifier The column qualifier 509 * @return The byte array of data or null if not found 510 */ getColumn(final byte[] table, final byte[] key, final byte[] family, final byte[] qualifier)511 public byte[] getColumn(final byte[] table, final byte[] key, 512 final byte[] family, final byte[] qualifier) { 513 final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map = 514 storage.get(table); 515 if (map == null) { 516 return null; 517 } 518 final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf = map.get(family); 519 if (cf == null) { 520 return null; 521 } 522 final ByteMap<TreeMap<Long, byte[]>> row = cf.get(key); 523 if (row == null) { 524 return null; 525 } 526 final TreeMap<Long, byte[]> column = row.get(qualifier); 527 if (column == null) { 528 return null; 529 } 530 return column.firstEntry().getValue(); 531 } 532 533 /** 534 * Retrieve the full map of timestamps and values of a single column with 535 * the default family and default table 536 * @param key The row key of the column 537 * @param qualifier The column qualifier 538 * @return The byte array of data or null if not found 539 */ getFullColumn(final byte[] key, final byte[] qualifier)540 public TreeMap<Long, byte[]> getFullColumn(final byte[] key, 541 final byte[] qualifier) { 542 return getFullColumn(default_table, key, default_family, qualifier); 543 } 544 545 /** 546 * Retrieve the full map of timestamps and values of a single column 547 * @param table The table to fetch from 548 * @param key The row key of the column 549 * @param family The column family 550 * @param qualifier The column qualifier 551 * @return The tree map of timestamps and values or null if not found 552 */ getFullColumn(final byte[] table, final byte[] key, final byte[] family, final byte[] qualifier)553 public TreeMap<Long, byte[]> getFullColumn(final byte[] table, final byte[] key, 554 final byte[] family, final byte[] qualifier) { 555 final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map = 556 storage.get(table); 557 if (map == null) { 558 return null; 559 } 560 final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf = map.get(family); 561 if (cf == null) { 562 return null; 563 } 564 final ByteMap<TreeMap<Long, byte[]>> row = cf.get(key); 565 if (row == null) { 566 return null; 567 } 568 return row.get(qualifier); 569 } 570 571 /** 572 * Returns the most recent value from all columns for a given column family 573 * in the default table 574 * @param key The row key 575 * @param family The column family ID 576 * @return A map of columns if the CF was found, null if no such CF 577 */ getColumnFamily(final byte[] key, final byte[] family)578 public ByteMap<byte[]> getColumnFamily(final byte[] key, 579 final byte[] family) { 580 return getColumnFamily(default_table, key , family); 581 } 582 583 /** 584 * Returns the most recent value from all columns for a given column family 585 * @param table The table to fetch from 586 * @param key The row key 587 * @param family The column family ID 588 * @return A map of columns if the CF was found, null if no such CF 589 */ getColumnFamily(final byte[] table, final byte[] key, final byte[] family)590 public ByteMap<byte[]> getColumnFamily(final byte[] table, final byte[] key, 591 final byte[] family) { 592 final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map = 593 storage.get(table); 594 if (map == null) { 595 return null; 596 } 597 final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf = map.get(family); 598 if (cf == null) { 599 return null; 600 } 601 final ByteMap<TreeMap<Long, byte[]>> row = cf.get(key); 602 if (row == null) { 603 return null; 604 } 605 // convert to a <qualifier, value> byte map 606 final ByteMap<byte[]> columns = new ByteMap<byte[]>(); 607 for (Entry<byte[], TreeMap<Long, byte[]>> entry : row.entrySet()) { 608 // the <timestamp, value> map should never be null 609 columns.put(entry.getKey(), entry.getValue().firstEntry().getValue()); 610 } 611 return columns; 612 } 613 614 /** @return the list of keys stored in the default table for all CFs */ getKeys()615 public Set<byte[]> getKeys() { 616 return getKeys(default_table); 617 } 618 619 /** 620 * Return the list of unique keys in the given table for all CFs 621 * @param table The table to pull from 622 * @return A list of keys. May be null if the table doesn't exist 623 */ getKeys(final byte[] table)624 public Set<byte[]> getKeys(final byte[] table) { 625 final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map = 626 storage.get(table); 627 if (map == null) { 628 return null; 629 } 630 final ByteMap<Void> unique_rows = new ByteMap<Void>(); 631 for (final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf : map.values()) { 632 for (final byte[] key : cf.keySet()) { 633 unique_rows.put(key, null); 634 } 635 } 636 return unique_rows.keySet(); 637 } 638 639 /** @return The set of scanners configured by the caller */ getScanners()640 public HashSet<MockScanner> getScanners() { 641 return scanners; 642 } 643 644 /** 645 * Return the mocked TSDB object to use for HBaseClient access 646 * @return 647 */ getTSDB()648 public TSDB getTSDB() { 649 return tsdb; 650 } 651 652 /** 653 * Runs through all rows in the "tsdb" table and compacts them by making a 654 * call to the {@link TSDB.compact} method. It will delete any columns 655 * that were compacted and leave others untouched, just as the normal 656 * method does. 657 * And only iterates over the 't' family. 658 * @throws Exception if Whitebox couldn't access the compact method 659 */ tsdbCompactAllRows()660 public void tsdbCompactAllRows() throws Exception { 661 final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map = 662 storage.get("tsdb".getBytes(ASCII)); 663 if (map == null) { 664 return; 665 } 666 final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf = map.get("t".getBytes(ASCII)); 667 if (cf == null) { 668 return; 669 } 670 671 for (Entry<byte[], ByteMap<TreeMap<Long, byte[]>>> entry : cf.entrySet()) { 672 final byte[] key = entry.getKey(); 673 674 final ByteMap<TreeMap<Long, byte[]>> row = entry.getValue(); 675 ArrayList<KeyValue> kvs = new ArrayList<KeyValue>(row.size()); 676 final Set<byte[]> deletes = new HashSet<byte[]>(); 677 for (Map.Entry<byte[], TreeMap<Long, byte[]>> column : row.entrySet()) { 678 if (column.getKey().length % 2 == 0) { 679 kvs.add(new KeyValue(key, default_family, column.getKey(), 680 column.getValue().firstKey(), 681 column.getValue().firstEntry().getValue())); 682 deletes.add(column.getKey()); 683 } 684 } 685 if (kvs.size() > 0) { 686 for (final byte[] k : deletes) { 687 row.remove(k); 688 } 689 final KeyValue compacted = 690 Whitebox.invokeMethod(tsdb, "compact", kvs, Collections.EMPTY_LIST); 691 final TreeMap<Long, byte[]> compacted_value = new TreeMap<Long, byte[]>(); 692 compacted_value.put(current_timestamp++, compacted.value()); 693 row.put(compacted.qualifier(), compacted_value); 694 } 695 } 696 } 697 698 /** 699 * Clears out all rows from storage but doesn't delete the tables or families. 700 */ flushStorage()701 public void flushStorage() { 702 for (final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> table : 703 storage.values()) { 704 for (final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf : table.values()) { 705 cf.clear(); 706 } 707 } 708 } 709 710 /** 711 * Clears out all rows for a given table 712 * @param table The table to empty out 713 */ flushStorage(final byte[] table)714 public void flushStorage(final byte[] table) { 715 final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map = storage.get(table); 716 if (map == null) { 717 return; 718 } 719 for (final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf : map.values()) { 720 cf.clear(); 721 } 722 } 723 724 /** 725 * Removes the entire row from the default table for all column families 726 * @param key The row to remove 727 */ flushRow(final byte[] key)728 public void flushRow(final byte[] key) { 729 flushRow(default_table, key); 730 } 731 732 /** 733 * Removes the entire row from the table for all column families 734 * @param table The table to purge 735 * @param key The row to remove 736 */ flushRow(final byte[] table, final byte[] key)737 public void flushRow(final byte[] table, final byte[] key) { 738 final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map = storage.get(table); 739 if (map == null) { 740 return; 741 } 742 for (final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf : map.values()) { 743 cf.remove(key); 744 } 745 } 746 747 /** 748 * Removes all rows from the default table for the given column family 749 * @param family The family to remove 750 */ flushFamily(final byte[] family)751 public void flushFamily(final byte[] family) { 752 flushFamily(default_table, family); 753 } 754 755 /** 756 * Removes all rows from the default table for the given column family 757 * @param table The table to purge from 758 * @param family The family to remove 759 */ flushFamily(final byte[] table, final byte[] family)760 public void flushFamily(final byte[] table, final byte[] family) { 761 final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map = storage.get(table); 762 if (map == null) { 763 return; 764 } 765 final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf = map.get(family); 766 if (cf != null) { 767 cf.clear(); 768 } 769 } 770 771 /** 772 * Removes the given column from the default table 773 * @param key Row key 774 * @param family Column family 775 * @param qualifier Column qualifier 776 */ flushColumn(final byte[] key, final byte[] family, final byte[] qualifier)777 public void flushColumn(final byte[] key, final byte[] family, 778 final byte[] qualifier) { 779 flushColumn(default_table, key, family, qualifier); 780 } 781 782 /** 783 * Removes the given column from the table 784 * @param table The table to purge from 785 * @param key Row key 786 * @param family Column family 787 * @param qualifier Column qualifier 788 */ flushColumn(final byte[] table, final byte[] key, final byte[] family, final byte[] qualifier)789 public void flushColumn(final byte[] table, final byte[] key, 790 final byte[] family, final byte[] qualifier) { 791 final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map = storage.get(table); 792 if (map == null) { 793 return; 794 } 795 final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf = map.get(family); 796 if (cf == null) { 797 return; 798 } 799 final ByteMap<TreeMap<Long, byte[]>> row = cf.get(key); 800 if (row == null) { 801 return; 802 } 803 row.remove(qualifier); 804 } 805 806 /** 807 * Dumps the entire storage hash to stdout in a sort of tree style format with 808 * all byte arrays hex encoded 809 */ dumpToSystemOut()810 public void dumpToSystemOut() { 811 dumpToSystemOut(false); 812 } 813 814 /** 815 * Dumps the entire storage hash to stdout in a sort of tree style format 816 * @param ascii Whether or not the values should be converted to ascii 817 */ dumpToSystemOut(final boolean ascii)818 public void dumpToSystemOut(final boolean ascii) { 819 if (storage.isEmpty()) { 820 System.out.println("Storage is Empty"); 821 return; 822 } 823 824 for (Entry<byte[], ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>>> table : 825 storage.entrySet()) { 826 System.out.println("[Table] " + new String(table.getKey(), ASCII)); 827 828 for (Entry<byte[], ByteMap<ByteMap<TreeMap<Long, byte[]>>>> cf : 829 table.getValue().entrySet()) { 830 System.out.println(" [CF] " + new String(cf.getKey(), ASCII)); 831 832 for (Entry<byte[], ByteMap<TreeMap<Long, byte[]>>> row : 833 cf.getValue().entrySet()) { 834 System.out.println(" [Row] " + (ascii ? 835 new String(row.getKey(), ASCII) : bytesToString(row.getKey()))); 836 837 for (Map.Entry<byte[], TreeMap<Long, byte[]>> column : row.getValue().entrySet()) { 838 System.out.println(" [Qual] " + (ascii ? 839 "\"" + new String(column.getKey(), ASCII) + "\"" 840 : bytesToString(column.getKey()))); 841 for (Map.Entry<Long, byte[]> cell : column.getValue().entrySet()) { 842 System.out.println(" [TS] " + cell.getKey() + " [Value] " + 843 (ascii ? new String(cell.getValue(), ASCII) 844 : bytesToString(cell.getValue()))); 845 } 846 } 847 } 848 } 849 } 850 } 851 852 /** 853 * Helper to convert an array of bytes to a hexadecimal encoded string. 854 * @param bytes The byte array to convert 855 * @return A hex string 856 */ bytesToString(final byte[] bytes)857 public static String bytesToString(final byte[] bytes) { 858 return DatatypeConverter.printHexBinary(bytes); 859 } 860 861 /** 862 * Helper to convert a hex encoded string into a byte array. 863 * <b>Warning:</b> This method won't pad the string to make sure it's an 864 * even number of bytes. 865 * @param bytes The hex encoded string to convert 866 * @return A byte array from the hex string 867 * @throws IllegalArgumentException if the string contains illegal characters 868 * or can't be converted. 869 */ stringToBytes(final String bytes)870 public static byte[] stringToBytes(final String bytes) { 871 return DatatypeConverter.parseHexBinary(bytes); 872 } 873 874 /** @return Returns the ASCII character set */ ASCII()875 public static Charset ASCII() { 876 return ASCII; 877 } 878 879 /** 880 * Concatenates byte arrays into one big array 881 * @param arrays Any number of arrays to concatenate 882 * @return The concatenated array 883 */ concatByteArrays(final byte[]... arrays)884 public static byte[] concatByteArrays(final byte[]... arrays) { 885 int len = 0; 886 for (final byte[] array : arrays) { 887 len += array.length; 888 } 889 final byte[] result = new byte[len]; 890 len = 0; 891 for (final byte[] array : arrays) { 892 System.arraycopy(array, 0, result, len, array.length); 893 len += array.length; 894 } 895 return result; 896 } 897 898 /** Creates the TSDB and UID tables */ setupDefaultTables()899 private void setupDefaultTables() { 900 final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> tsdb = 901 new ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>>(); 902 tsdb.put("t".getBytes(ASCII), new ByteMap<ByteMap<TreeMap<Long, byte[]>>>()); 903 storage.put("tsdb".getBytes(ASCII), tsdb); 904 905 final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> tsdb_uid = 906 new ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>>(); 907 tsdb_uid.put("name".getBytes(ASCII), 908 new ByteMap<ByteMap<TreeMap<Long, byte[]>>>()); 909 tsdb_uid.put("id".getBytes(ASCII), 910 new ByteMap<ByteMap<TreeMap<Long, byte[]>>>()); 911 storage.put("tsdb-uid".getBytes(ASCII), tsdb_uid); 912 } 913 914 /** 915 * Gets one or more columns from a row. If the row does not exist, a null is 916 * returned. If no qualifiers are given, the entire row is returned. 917 * NOTE: all timestamp, value pairs are returned. 918 */ 919 private class MockGet implements Answer<Deferred<ArrayList<KeyValue>>> { 920 @Override answer(InvocationOnMock invocation)921 public Deferred<ArrayList<KeyValue>> answer(InvocationOnMock invocation) 922 throws Throwable { 923 final Object[] args = invocation.getArguments(); 924 final GetRequest get = (GetRequest)args[0]; 925 926 if (exceptions != null) { 927 final Pair<RuntimeException, Boolean> ex = exceptions.get(get.key()); 928 if (ex != null) { 929 if (ex.getValue()) { 930 return Deferred.fromError(ex.getKey()); 931 } else { 932 throw ex.getKey(); 933 } 934 } 935 } 936 937 final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map = 938 storage.get(get.table()); 939 if (map == null) { 940 return Deferred.fromError(new RuntimeException( 941 "No such table " + Bytes.pretty(get.table()))); 942 } 943 944 // compile a set of qualifiers to use as a filter if necessary 945 final ByteMap<Object> qualifiers = new ByteMap<Object>(); 946 if (get.qualifiers() != null && get.qualifiers().length > 0) { 947 for (byte[] q : get.qualifiers()) { 948 qualifiers.put(q, null); 949 } 950 } 951 952 final ArrayList<KeyValue> kvs = new ArrayList<KeyValue>(); 953 for (final Entry<byte[], ByteMap<ByteMap<TreeMap<Long, byte[]>>>> cf : 954 map.entrySet()) { 955 if (get.family() != null && Bytes.memcmp(get.family(), cf.getKey()) != 0) { 956 continue; 957 } 958 959 final ByteMap<TreeMap<Long, byte[]>> row = cf.getValue().get(get.key()); 960 if (row == null) { 961 continue; 962 } 963 964 for (Entry<byte[], TreeMap<Long, byte[]>> column : row.entrySet()) { 965 if (!qualifiers.isEmpty() && !qualifiers.containsKey(column.getKey())) { 966 continue; 967 } 968 969 // TODO - if we want to support multiple values, iterate over the 970 // tree map. Otherwise Get returns just the latest value. 971 kvs.add(new KeyValue(get.key(), cf.getKey(), column.getKey(), 972 column.getValue().firstKey(), 973 column.getValue().firstEntry().getValue())); 974 } 975 } 976 if (kvs.isEmpty()) { 977 return Deferred.fromResult(null); 978 } 979 return Deferred.fromResult(kvs); 980 } 981 } 982 983 /** 984 * Stores one or more columns in a row. If the row does not exist, it's 985 * created. 986 */ 987 private class MockPut implements Answer<Deferred<Boolean>> { 988 @Override answer(final InvocationOnMock invocation)989 public Deferred<Boolean> answer(final InvocationOnMock invocation) 990 throws Throwable { 991 final Object[] args = invocation.getArguments(); 992 final PutRequest put = (PutRequest)args[0]; 993 994 if (exceptions != null) { 995 final Pair<RuntimeException, Boolean> ex = exceptions.get(put.key()); 996 if (ex != null) { 997 if (ex.getValue()) { 998 return Deferred.fromError(ex.getKey()); 999 } else { 1000 throw ex.getKey(); 1001 } 1002 } 1003 } 1004 1005 final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map = 1006 storage.get(put.table()); 1007 if (map == null) { 1008 return Deferred.fromError(new RuntimeException( 1009 "No such table " + Bytes.pretty(put.table()))); 1010 } 1011 1012 final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf = map.get(put.family()); 1013 if (cf == null) { 1014 return Deferred.fromError(new RuntimeException( 1015 "No such CF " + Bytes.pretty(put.table()))); 1016 } 1017 1018 ByteMap<TreeMap<Long, byte[]>> row = cf.get(put.key()); 1019 if (row == null) { 1020 row = new ByteMap<TreeMap<Long, byte[]>>(); 1021 cf.put(put.key(), row); 1022 } 1023 1024 for (int i = 0; i < put.qualifiers().length; i++) { 1025 TreeMap<Long, byte[]> column = row.get(put.qualifiers()[i]); 1026 if (column == null) { 1027 column = new TreeMap<Long, byte[]>(Collections.reverseOrder()); 1028 row.put(put.qualifiers()[i], column); 1029 } 1030 1031 column.put(put.timestamp() != Long.MAX_VALUE ? put.timestamp() : 1032 current_timestamp++, put.values()[i]); 1033 } 1034 1035 return Deferred.fromResult(true); 1036 } 1037 } 1038 1039 /** 1040 * Stores one or more columns in a row. If the row does not exist, it's 1041 * created. 1042 */ 1043 private class MockAppend implements Answer<Deferred<Boolean>> { 1044 @Override answer(final InvocationOnMock invocation)1045 public Deferred<Boolean> answer(final InvocationOnMock invocation) 1046 throws Throwable { 1047 final Object[] args = invocation.getArguments(); 1048 final AppendRequest append = (AppendRequest)args[0]; 1049 1050 if (exceptions != null) { 1051 final Pair<RuntimeException, Boolean> ex = exceptions.get(append.key()); 1052 if (ex != null) { 1053 if (ex.getValue()) { 1054 return Deferred.fromError(ex.getKey()); 1055 } else { 1056 throw ex.getKey(); 1057 } 1058 } 1059 } 1060 1061 final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map = 1062 storage.get(append.table()); 1063 if (map == null) { 1064 return Deferred.fromError(new RuntimeException( 1065 "No such table " + Bytes.pretty(append.table()))); 1066 } 1067 1068 final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf = map.get(append.family()); 1069 if (cf == null) { 1070 return Deferred.fromError(new RuntimeException( 1071 "No such CF " + Bytes.pretty(append.table()))); 1072 } 1073 1074 ByteMap<TreeMap<Long, byte[]>> row = cf.get(append.key()); 1075 if (row == null) { 1076 row = new ByteMap<TreeMap<Long, byte[]>>(); 1077 cf.put(append.key(), row); 1078 } 1079 1080 for (int i = 0; i < append.qualifiers().length; i++) { 1081 TreeMap<Long, byte[]> column = row.get(append.qualifiers()[i]); 1082 if (column == null) { 1083 column = new TreeMap<Long, byte[]>(Collections.reverseOrder()); 1084 row.put(append.qualifiers()[i], column); 1085 } 1086 1087 final byte[] values; 1088 long column_timestamp = 0; 1089 if (append.timestamp() != Long.MAX_VALUE) { 1090 values = column.get(append.timestamp()); 1091 column_timestamp = append.timestamp(); 1092 } else { 1093 if (column.isEmpty()) { 1094 values = null; 1095 } else { 1096 values = column.firstEntry().getValue(); 1097 column_timestamp = column.firstKey(); 1098 } 1099 } 1100 if (column_timestamp == 0) { 1101 column_timestamp = current_timestamp++; 1102 } 1103 1104 final int current_len = values != null ? values.length : 0; 1105 final byte[] append_value = new byte[current_len + append.values()[i].length]; 1106 if (current_len > 0) { 1107 System.arraycopy(values, 0, append_value, 0, values.length); 1108 } 1109 1110 System.arraycopy(append.value(), 0, append_value, current_len, 1111 append.values()[i].length); 1112 column.put(column_timestamp, append_value); 1113 } 1114 1115 return Deferred.fromResult(true); 1116 } 1117 } 1118 1119 /** 1120 * Imitates the compareAndSet client call where a {@code PutRequest} is passed 1121 * along with a byte array to compared the stored value against. If the stored 1122 * value doesn't match, the put is ignored and a "false" is returned. If the 1123 * comparator matches, the new put is recorded. 1124 * <b>Warning:</b> While a put works on multiple qualifiers, CAS only works 1125 * with one. So if the put includes more than one qualifier, only the first 1126 * one will be processed in this CAS call. 1127 */ 1128 private class MockCAS implements Answer<Deferred<Boolean>> { 1129 1130 @Override answer(final InvocationOnMock invocation)1131 public Deferred<Boolean> answer(final InvocationOnMock invocation) 1132 throws Throwable { 1133 final Object[] args = invocation.getArguments(); 1134 final PutRequest put = (PutRequest)args[0]; 1135 final byte[] expected = (byte[])args[1]; 1136 1137 if (exceptions != null) { 1138 final Pair<RuntimeException, Boolean> ex = exceptions.get(put.key()); 1139 if (ex != null) { 1140 if (ex.getValue()) { 1141 return Deferred.fromError(ex.getKey()); 1142 } else { 1143 throw ex.getKey(); 1144 } 1145 } 1146 } 1147 1148 final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map = 1149 storage.get(put.table()); 1150 if (map == null) { 1151 return Deferred.fromError(new RuntimeException( 1152 "No such table " + Bytes.pretty(put.table()))); 1153 } 1154 1155 final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf = map.get(put.family()); 1156 if (cf == null) { 1157 return Deferred.fromError(new RuntimeException( 1158 "No such CF " + Bytes.pretty(put.table()))); 1159 } 1160 1161 ByteMap<TreeMap<Long, byte[]>> row = cf.get(put.key()); 1162 if (row == null) { 1163 if (expected != null && expected.length > 0) { 1164 return Deferred.fromResult(false); 1165 } 1166 row = new ByteMap<TreeMap<Long, byte[]>>(); 1167 cf.put(put.key(), row); 1168 } 1169 1170 // CAS can only operate on one cell, so if the put request has more than 1171 // one, we ignore any but the first 1172 TreeMap<Long, byte[]> column = row.get(put.qualifiers()[0]); 1173 if (column == null && (expected != null && expected.length > 0)) { 1174 return Deferred.fromResult(false); 1175 } 1176 // if a timestamp was specified, maybe we're CASing against a specific 1177 // cell. Otherwise we deal with the latest value 1178 final byte[] stored = column == null ? null : 1179 put.timestamp() != Long.MAX_VALUE ? column.get(put.timestamp()) : 1180 column.firstEntry().getValue(); 1181 if (stored == null && (expected != null && expected.length > 0)) { 1182 return Deferred.fromResult(false); 1183 } 1184 if (stored != null && (expected == null || expected.length < 1)) { 1185 return Deferred.fromResult(false); 1186 } 1187 if (stored != null && expected != null && 1188 Bytes.memcmp(stored, expected) != 0) { 1189 return Deferred.fromResult(false); 1190 } 1191 1192 // passed CAS! 1193 if (column == null) { 1194 column = new TreeMap<Long, byte[]>(Collections.reverseOrder()); 1195 row.put(put.qualifiers()[0], column); 1196 } 1197 column.put(put.timestamp() != Long.MAX_VALUE ? put.timestamp() : 1198 current_timestamp++, put.value()); 1199 return Deferred.fromResult(true); 1200 } 1201 1202 } 1203 1204 /** 1205 * Deletes one or more columns. If a row no longer has any valid columns, the 1206 * entire row will be removed. 1207 */ 1208 private class MockDelete implements Answer<Deferred<Object>> { 1209 1210 @Override answer(InvocationOnMock invocation)1211 public Deferred<Object> answer(InvocationOnMock invocation) 1212 throws Throwable { 1213 final Object[] args = invocation.getArguments(); 1214 final DeleteRequest delete = (DeleteRequest)args[0]; 1215 1216 if (exceptions != null) { 1217 final Pair<RuntimeException, Boolean> ex = exceptions.get(delete.key()); 1218 if (ex != null) { 1219 if (ex.getValue()) { 1220 return Deferred.fromError(ex.getKey()); 1221 } else { 1222 throw ex.getKey(); 1223 } 1224 } 1225 } 1226 1227 final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map = 1228 storage.get(delete.table()); 1229 if (map == null) { 1230 return Deferred.fromError(new RuntimeException( 1231 "No such table " + Bytes.pretty(delete.table()))); 1232 } 1233 1234 // if no qualifiers or family, then delete the row from all families 1235 if ((delete.qualifiers() == null || delete.qualifiers().length < 1 || 1236 delete.qualifiers()[0].length < 1) && (delete.family() == null || 1237 delete.family().length < 1)) { 1238 for (final Entry<byte[], ByteMap<ByteMap<TreeMap<Long, byte[]>>>> cf : 1239 map.entrySet()) { 1240 cf.getValue().remove(delete.key()); 1241 } 1242 return Deferred.fromResult(new Object()); 1243 } 1244 1245 final byte[] family = delete.family(); 1246 if (family != null && family.length > 0) { 1247 if (!map.containsKey(family)) { 1248 return Deferred.fromError(new RuntimeException( 1249 "No such CF " + Bytes.pretty(family))); 1250 } 1251 } 1252 1253 // compile a set of qualifiers 1254 ByteMap<Object> qualifiers = new ByteMap<Object>(); 1255 if (delete.qualifiers() != null || delete.qualifiers().length > 0) { 1256 for (byte[] q : delete.qualifiers()) { 1257 qualifiers.put(q, null); 1258 } 1259 } 1260 1261 // TODO - validate the assumption that a delete with a row key and qual 1262 // but without a family would delete the columns in ALL families 1263 1264 // if the request only has a column family and no qualifiers, we delete 1265 // the row from the entire family 1266 if (family != null && qualifiers.isEmpty()) { 1267 final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf = map.get(delete.family()); 1268 // cf != null validated above 1269 cf.remove(delete.key()); 1270 return Deferred.fromResult(new Object()); 1271 } 1272 1273 for (final Entry<byte[], ByteMap<ByteMap<TreeMap<Long, byte[]>>>> cf : 1274 map.entrySet()) { 1275 1276 // column family filter 1277 if (family != null && family.length > 0 && 1278 !Bytes.equals(family, cf.getKey())) { 1279 continue; 1280 } 1281 1282 ByteMap<TreeMap<Long, byte[]>> row = cf.getValue().get(delete.key()); 1283 if (row == null) { 1284 continue; 1285 } 1286 1287 for (byte[] qualifier : qualifiers.keySet()) { 1288 final TreeMap<Long, byte[]> column = row.get(qualifier); 1289 if (column == null) { 1290 continue; 1291 } 1292 1293 // with this flag we delete a single timestamp 1294 if (delete.deleteAtTimestampOnly()) { 1295 if (column != null) { 1296 column.remove(delete.timestamp()); 1297 if (column.isEmpty()) { 1298 row.remove(qualifier); 1299 } 1300 } 1301 } else { 1302 // otherwise we delete everything less than or equal to the 1303 // delete timestamp 1304 List<Long> column_removals = new ArrayList<Long>(column.size()); 1305 for (Map.Entry<Long, byte[]> cell : column.entrySet()) { 1306 if (cell.getKey() <= delete.timestamp()) { 1307 column_removals.add(cell.getKey()); 1308 } 1309 } 1310 for (Long ts : column_removals) { 1311 column.remove(ts); 1312 } 1313 if (column.isEmpty()) { 1314 row.remove(qualifier); 1315 } 1316 } 1317 } 1318 1319 if (row.isEmpty()) { 1320 cf.getValue().remove(delete.key()); 1321 } 1322 } 1323 return Deferred.fromResult(new Object()); 1324 } 1325 1326 } 1327 1328 /** 1329 * This is a limited implementation of the scanner object. The only fields 1330 * caputred and acted on are: 1331 * <ul><li>KeyRegexp</li> 1332 * <li>StartKey</li> 1333 * <li>StopKey</li> 1334 * <li>Qualifier</li> 1335 * <li>Qualifiers</li></ul> 1336 * Hence timestamps are ignored as are the max number of rows and qualifiers. 1337 * All matching rows/qualifiers will be returned in the first {@code nextRows} 1338 * call. The second {@code nextRows} call will always return null. Multiple 1339 * qualifiers are supported for matching. 1340 * <p> 1341 * The KeyRegexp can be set and it will run against the hex value of the 1342 * row key. In testing it seems to work nicely even with byte patterns. 1343 */ 1344 public class MockScanner implements 1345 Answer<Deferred<ArrayList<ArrayList<KeyValue>>>> { 1346 1347 private final Scanner mock_scanner; 1348 private final byte[] table; 1349 private byte[] start = null; 1350 private byte[] stop = null; 1351 private HashSet<String> scnr_qualifiers = null; 1352 private byte[] family = null; 1353 private ScanFilter filter = null; 1354 private int max_num_rows = Scanner.DEFAULT_MAX_NUM_ROWS; 1355 private ByteMap<Iterator<Entry<byte[], ByteMap<TreeMap<Long, byte[]>>>>> 1356 cursors; 1357 private ByteMap<Entry<byte[], ByteMap<TreeMap<Long, byte[]>>>> cf_rows; 1358 private byte[] last_row; 1359 private String rex; // TEMP 1360 1361 /** 1362 * Default ctor 1363 * @param mock_scanner The scanner we're using 1364 * @param table The table (confirmed to exist) 1365 */ MockScanner(final Scanner mock_scanner, final byte[] table)1366 public MockScanner(final Scanner mock_scanner, final byte[] table) { 1367 this.mock_scanner = mock_scanner; 1368 this.table = table; 1369 1370 // capture the scanner fields when set 1371 doAnswer(new Answer<Object>() { 1372 @Override 1373 public Object answer(InvocationOnMock invocation) throws Throwable { 1374 final Object[] args = invocation.getArguments(); 1375 filter = new KeyRegexpFilter((String)args[0], Const.ASCII_CHARSET); 1376 rex = (String)args[0]; 1377 return null; 1378 } 1379 }).when(mock_scanner).setKeyRegexp(anyString()); 1380 1381 doAnswer(new Answer<Object>() { 1382 @Override 1383 public Object answer(InvocationOnMock invocation) throws Throwable { 1384 final Object[] args = invocation.getArguments(); 1385 filter = new KeyRegexpFilter((String)args[0], (Charset)args[1]); 1386 rex = (String)args[0]; 1387 return null; 1388 } 1389 }).when(mock_scanner).setKeyRegexp(anyString(), (Charset)any()); 1390 1391 doAnswer(new Answer<Object>() { 1392 @Override 1393 public Object answer(InvocationOnMock invocation) throws Throwable { 1394 final Object[] args = invocation.getArguments(); 1395 filter = (ScanFilter)args[0]; 1396 return null; 1397 } 1398 }).when(mock_scanner).setFilter(any(ScanFilter.class)); 1399 1400 doAnswer(new Answer<Object>() { 1401 @Override 1402 public Object answer(InvocationOnMock invocation) throws Throwable { 1403 final Object[] args = invocation.getArguments(); 1404 start = (byte[])args[0]; 1405 return null; 1406 } 1407 }).when(mock_scanner).setStartKey((byte[])any()); 1408 1409 doAnswer(new Answer<Object>() { 1410 @Override 1411 public Object answer(InvocationOnMock invocation) throws Throwable { 1412 final Object[] args = invocation.getArguments(); 1413 stop = (byte[])args[0]; 1414 return null; 1415 } 1416 }).when(mock_scanner).setStopKey((byte[])any()); 1417 1418 doAnswer(new Answer<Object>() { 1419 @Override 1420 public Object answer(InvocationOnMock invocation) throws Throwable { 1421 final Object[] args = invocation.getArguments(); 1422 family = (byte[])args[0]; 1423 return null; 1424 } 1425 }).when(mock_scanner).setFamily((byte[])any()); 1426 1427 doAnswer(new Answer<Object>() { 1428 @Override 1429 public Object answer(InvocationOnMock invocation) throws Throwable { 1430 final Object[] args = invocation.getArguments(); 1431 scnr_qualifiers = new HashSet<String>(1); 1432 scnr_qualifiers.add(bytesToString((byte[])args[0])); 1433 return null; 1434 } 1435 }).when(mock_scanner).setQualifier((byte[])any()); 1436 1437 doAnswer(new Answer<Object>() { 1438 @Override 1439 public Object answer(InvocationOnMock invocation) throws Throwable { 1440 final Object[] args = invocation.getArguments(); 1441 final byte[][] qualifiers = (byte[][])args[0]; 1442 scnr_qualifiers = new HashSet<String>(qualifiers.length); 1443 for (byte[] qualifier : qualifiers) { 1444 scnr_qualifiers.add(bytesToString(qualifier)); 1445 } 1446 return null; 1447 } 1448 }).when(mock_scanner).setQualifiers((byte[][])any()); 1449 1450 doAnswer(new Answer<byte[]>() { 1451 @Override 1452 public byte[] answer(InvocationOnMock invocation) throws Throwable { 1453 return start; 1454 } 1455 }).when(mock_scanner).getCurrentKey(); 1456 1457 when(mock_scanner.nextRows()).thenAnswer(this); 1458 1459 } 1460 1461 @Override answer( final InvocationOnMock invocation)1462 public Deferred<ArrayList<ArrayList<KeyValue>>> answer( 1463 final InvocationOnMock invocation) throws Throwable { 1464 1465 if (cursors == null) { 1466 final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map = 1467 storage.get(table); 1468 if (map == null) { 1469 return Deferred.fromError( new RuntimeException( 1470 "No such table " + Bytes.pretty(table))); 1471 } 1472 1473 cursors = new ByteMap<Iterator<Entry<byte[], 1474 ByteMap<TreeMap<Long, byte[]>>>>>(); 1475 cf_rows = new ByteMap<Entry<byte[], ByteMap<TreeMap<Long, byte[]>>>>(); 1476 1477 if (family == null || family.length < 1) { 1478 for (final Entry<byte[], ByteMap<ByteMap<TreeMap<Long, byte[]>>>> cf : map) { 1479 final Iterator<Entry<byte[], ByteMap<TreeMap<Long, byte[]>>>> 1480 cursor = cf.getValue().iterator(); 1481 cursors.put(cf.getKey(), cursor); 1482 cf_rows.put(cf.getKey(), null); 1483 } 1484 } else { 1485 final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf = map.get(family); 1486 if (cf == null) { 1487 return Deferred.fromError(new RuntimeException( 1488 "No such CF " + Bytes.pretty(family))); 1489 } 1490 final Iterator<Entry<byte[], ByteMap<TreeMap<Long, byte[]>>>> 1491 cursor = cf.iterator(); 1492 cursors.put(family, cursor); 1493 cf_rows.put(family, null); 1494 } 1495 } 1496 1497 // If we're out of rows to scan, then you HAVE to return null as the 1498 // HBase client does. 1499 if (!hasNext()) { 1500 return Deferred.fromResult(null); 1501 } 1502 1503 // TODO - fuzzy filter support 1504 // TODO - fix the regex comparator 1505 Pattern pattern = null; 1506 if (rex != null) { 1507 if (!rex.isEmpty()) { 1508 pattern = Pattern.compile(rex); 1509 } 1510 } else if (filter != null) { 1511 KeyRegexpFilter regex_filter = null; 1512 1513 if (filter instanceof KeyRegexpFilter) { 1514 regex_filter = (KeyRegexpFilter)filter; 1515 } else if (filter instanceof FilterList) { 1516 final List<ScanFilter> filters = 1517 Whitebox.getInternalState(filter, "filters"); 1518 for (final ScanFilter f : filters) { 1519 if (f instanceof KeyRegexpFilter) { 1520 regex_filter = (KeyRegexpFilter)f; 1521 } 1522 } 1523 } 1524 1525 if (regex_filter != null) { 1526 try { 1527 final String regexp = new String( 1528 (byte[])Whitebox.getInternalState(regex_filter, "regexp"), 1529 Charset.forName(new String( 1530 (byte[])Whitebox.getInternalState(regex_filter, "charset")))); 1531 if (!regexp.isEmpty()) { 1532 pattern = Pattern.compile(regexp); 1533 } 1534 } catch (PatternSyntaxException e) { 1535 e.printStackTrace(); 1536 return Deferred.fromError(e); 1537 } 1538 } 1539 } 1540 1541 // return all matches 1542 final ArrayList<ArrayList<KeyValue>> results = 1543 new ArrayList<ArrayList<KeyValue>>(); 1544 int rows_read = 0; 1545 while (hasNext()) { 1546 advance(); 1547 1548 // if it's before the start row, after the end row or doesn't 1549 // match the given regex, continue on to the next row 1550 if (start != null && Bytes.memcmp(last_row, start) < 0) { 1551 continue; 1552 } 1553 // asynchbase Scanner's logic: 1554 // - start_key is inclusive, stop key is exclusive 1555 // - when start key is equal to the stop key, 1556 // include the key in scan result 1557 // - if stop key is empty, scan till the end 1558 if (stop != null && stop.length > 0 && 1559 Bytes.memcmp(last_row, stop) >= 0 && 1560 Bytes.memcmp(start, stop) != 0) { 1561 continue; 1562 } 1563 if (pattern != null) { 1564 final String from_bytes = new String(last_row, MockBase.ASCII); 1565 if (!pattern.matcher(from_bytes).find()) { 1566 continue; 1567 } 1568 } 1569 1570 // throws AFTER we match on a row key 1571 if (exceptions != null) { 1572 final Pair<RuntimeException, Boolean> ex = exceptions.get(last_row); 1573 if (ex != null) { 1574 if (ex.getValue()) { 1575 return Deferred.fromError(ex.getKey()); 1576 } else { 1577 throw ex.getKey(); 1578 } 1579 } 1580 } 1581 1582 // loop over the column family rows to see if they match 1583 final ArrayList<KeyValue> kvs = new ArrayList<KeyValue>(); 1584 for (final Entry<byte[], Entry<byte[], ByteMap<TreeMap<Long, byte[]>>>> row : 1585 cf_rows.entrySet()) { 1586 if (row.getValue() == null || 1587 Bytes.memcmp(last_row, row.getValue().getKey()) != 0) { 1588 continue; 1589 } 1590 1591 for (final Entry<byte[], TreeMap<Long, byte[]>> column : 1592 row.getValue().getValue().entrySet()) { 1593 // if the qualifier isn't in the set, continue 1594 if (scnr_qualifiers != null && 1595 !scnr_qualifiers.contains(bytesToString(column.getKey()))) { 1596 continue; 1597 } 1598 1599 kvs.add(new KeyValue(row.getValue().getKey(), row.getKey(), 1600 column.getKey(), column.getValue().firstKey(), 1601 column.getValue().firstEntry().getValue())); 1602 } 1603 } 1604 1605 if (!kvs.isEmpty()) { 1606 results.add(kvs); 1607 } 1608 rows_read++; 1609 1610 if (rows_read >= max_num_rows) { 1611 Thread.sleep(10); // this is here for time based unit tests 1612 break; 1613 } 1614 } 1615 1616 if (results.isEmpty()) { 1617 return Deferred.fromResult(null); 1618 } 1619 return Deferred.fromResult(results); 1620 } 1621 1622 /** @return Returns true if any of the CF iterators have another value */ hasNext()1623 private boolean hasNext() { 1624 for (final Iterator<Entry<byte[], ByteMap<TreeMap<Long, byte[]>>>> cursor : 1625 cursors.values()) { 1626 if (cursor.hasNext()) { 1627 return true; 1628 } 1629 } 1630 return false; 1631 } 1632 1633 /** Insanely inefficient and ugly way of advancing the cursors */ advance()1634 private void advance() { 1635 // first time to get the ceiling 1636 if (last_row == null) { 1637 for (final Entry<byte[], 1638 Iterator<Entry<byte[], ByteMap<TreeMap<Long, byte[]>>>>> iterator : 1639 cursors.entrySet()) { 1640 final Entry<byte[], ByteMap<TreeMap<Long, byte[]>>> row = 1641 iterator.getValue().hasNext() ? iterator.getValue().next() : null; 1642 cf_rows.put(iterator.getKey(), row); 1643 if (last_row == null) { 1644 last_row = row.getKey(); 1645 } else { 1646 if (Bytes.memcmp(last_row, row.getKey()) < 0) { 1647 last_row = row.getKey(); 1648 } 1649 } 1650 } 1651 return; 1652 } 1653 1654 for (final Entry<byte[], Entry<byte[], ByteMap<TreeMap<Long, byte[]>>>> cf : 1655 cf_rows.entrySet()) { 1656 final Entry<byte[], ByteMap<TreeMap<Long, byte[]>>> row = cf.getValue(); 1657 if (row == null) { 1658 continue; 1659 } 1660 1661 if (Bytes.memcmp(last_row, row.getKey()) == 0) { 1662 if (!cursors.get(cf.getKey()).hasNext()) { 1663 cf_rows.put(cf.getKey(), null); // EX? 1664 } else { 1665 cf_rows.put(cf.getKey(), cursors.get(cf.getKey()).next()); 1666 } 1667 } 1668 } 1669 1670 last_row = null; 1671 for (final Entry<byte[], ByteMap<TreeMap<Long, byte[]>>> row : 1672 cf_rows.values()) { 1673 if (row == null) { 1674 continue; 1675 } 1676 1677 if (last_row == null) { 1678 last_row = row.getKey(); 1679 } else { 1680 if (Bytes.memcmp(last_row, row.getKey()) < 0) { 1681 last_row = row.getKey(); 1682 } 1683 } 1684 } 1685 } 1686 1687 /** @return The scanner for this mock */ getScanner()1688 public Scanner getScanner() { 1689 return mock_scanner; 1690 } 1691 1692 /** @return The filter for this mock */ getFilter()1693 public ScanFilter getFilter() { 1694 return filter; 1695 } 1696 } 1697 1698 /** 1699 * Creates or increments (possibly decrements) a Long in the hash table at the 1700 * given location. 1701 */ 1702 private class MockAtomicIncrement implements 1703 Answer<Deferred<Long>> { 1704 1705 @Override answer(InvocationOnMock invocation)1706 public Deferred<Long> answer(InvocationOnMock invocation) throws Throwable { 1707 final Object[] args = invocation.getArguments(); 1708 final AtomicIncrementRequest air = (AtomicIncrementRequest)args[0]; 1709 final long amount = air.getAmount(); 1710 1711 if (exceptions != null) { 1712 final Pair<RuntimeException, Boolean> ex = exceptions.get(air.key()); 1713 if (ex != null) { 1714 if (ex.getValue()) { 1715 return Deferred.fromError(ex.getKey()); 1716 } else { 1717 throw ex.getKey(); 1718 } 1719 } 1720 } 1721 1722 final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map = 1723 storage.get(air.table()); 1724 if (map == null) { 1725 return Deferred.fromError(new RuntimeException( 1726 "No such table " + Bytes.pretty(air.table()))); 1727 } 1728 1729 final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf = map.get(air.family()); 1730 if (cf == null) { 1731 return Deferred.fromError(new RuntimeException( 1732 "No such CF " + Bytes.pretty(air.table()))); 1733 } 1734 1735 ByteMap<TreeMap<Long, byte[]>> row = cf.get(air.key()); 1736 if (row == null) { 1737 row = new ByteMap<TreeMap<Long, byte[]>>(); 1738 cf.put(air.key(), row); 1739 } 1740 1741 TreeMap<Long, byte[]> column = row.get(air.qualifier()); 1742 if (column == null) { 1743 column = new TreeMap<Long, byte[]>(Collections.reverseOrder()); 1744 row.put(air.qualifier(), column); 1745 column.put(current_timestamp++, Bytes.fromLong(amount)); 1746 return Deferred.fromResult(amount); 1747 } 1748 1749 long incremented_value = Bytes.getLong(column.firstEntry().getValue()); 1750 incremented_value += amount; 1751 column.put(column.firstKey(), Bytes.fromLong(incremented_value)); 1752 return Deferred.fromResult(incremented_value); 1753 } 1754 1755 } 1756 1757 } 1758