1 // This file is part of OpenTSDB.
2 // Copyright (C) 2013  The OpenTSDB Authors.
3 //
4 // This program is free software: you can redistribute it and/or modify it
5 // under the terms of the GNU Lesser General Public License as published by
6 // the Free Software Foundation, either version 2.1 of the License, or (at your
7 // option) any later version.  This program is distributed in the hope that it
8 // will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
9 // of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser
10 // General Public License for more details.  You should have received a copy
11 // of the GNU Lesser General Public License along with this program.  If not,
12 // see <http://www.gnu.org/licenses/>.
13 package net.opentsdb.storage;
14 
15 import static org.mockito.Matchers.any;
16 import static org.mockito.Matchers.anyString;
17 import static org.mockito.Mockito.doAnswer;
18 import static org.mockito.Mockito.when;
19 import static org.powermock.api.mockito.PowerMockito.mock;
20 
21 import java.nio.charset.Charset;
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.HashSet;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Map.Entry;
29 import java.util.Set;
30 import java.util.TreeMap;
31 import java.util.regex.Pattern;
32 import java.util.regex.PatternSyntaxException;
33 
34 import javax.xml.bind.DatatypeConverter;
35 
36 import net.opentsdb.core.Const;
37 import net.opentsdb.core.TSDB;
38 import net.opentsdb.utils.Pair;
39 
40 import org.hbase.async.AtomicIncrementRequest;
41 import org.hbase.async.Bytes;
42 import org.hbase.async.Bytes.ByteMap;
43 import org.hbase.async.AppendRequest;
44 import org.hbase.async.DeleteRequest;
45 import org.hbase.async.FilterList;
46 import org.hbase.async.GetRequest;
47 import org.hbase.async.HBaseClient;
48 import org.hbase.async.KeyRegexpFilter;
49 import org.hbase.async.KeyValue;
50 import org.hbase.async.PutRequest;
51 import org.hbase.async.ScanFilter;
52 import org.hbase.async.Scanner;
53 import org.junit.Ignore;
54 import org.mockito.invocation.InvocationOnMock;
55 import org.mockito.stubbing.Answer;
56 import org.powermock.reflect.Whitebox;
57 
58 import com.stumbleupon.async.Deferred;
59 
60 /**
61  * Mock HBase implementation useful in testing calls to and from storage with
62  * actual pretend data. The underlying data store is an incredibly ugly nesting
63  * of ByteMaps from AsyncHbase so it stores and orders byte arrays similar to
64  * HBase. It supports tables and column families along with timestamps but
65  * doesn't deal with TTLs or other features.
66  * <p>
67  * By default we configure the "'tsdb', {NAME => 't'}" and
68  * "'tsdb-uid', {NAME => 'id'}, {NAME => 'name'}" tables. If you need more, just
69  * add em.
70  *
71  * <p>
72  * It's not a perfect mock but is useful for the majority of unit tests. Gets,
73  * puts, cas, deletes and scans are currently supported. See notes for each
74  * inner class below about what does and doesn't work.
75  * <p>
76  * Regarding timestamps, whenever you execute an RPC request, the
77  * {@code current_timestamp} will be incremented by one millisecond. By default
78  * the timestamp starts at 1/1/2014 00:00:00 but you can set it to any value
79  * at any time. If a PutRequest comes in with a specific time, that time will
80  * be stored and the timestamp will not be incremented.
81  * <p>
82  * <b>Warning:</b> To use this class, you need to prepare the classes for testing
83  * with the @PrepareForTest annotation. The classes you need to prepare are:
84  * <ul><li>TSDB</li>
85  * <li>HBaseClient</li>
86  * <li>GetRequest</li>
87  * <li>PutRequest</li>
88  * <li>AppendRequest</li>
89  * <li>KeyValue</li>
90  * <li>Scanner</li>
91  * <li>DeleteRequest</li>
92  * <li>AtomicIncrementRequest</li></ul>
93  * @since 2.0
94  */
95 @Ignore
96 public final class MockBase {
97   private static final Charset ASCII = Charset.forName("ISO-8859-1");
98   private TSDB tsdb;
99 
100   /** Gross huh? <table, <cf, <row, <qual, <ts, value>>>>>
101    * Why is CF before row? Because we want to throw exceptions if a CF hasn't
102    * been "configured"
103    */
104   private ByteMap<ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>>>
105   storage = new ByteMap<ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>>>();
106   private HashSet<MockScanner> scanners = new HashSet<MockScanner>(2);
107 
108   /** The default family for shortcuts */
109   private byte[] default_family;
110 
111   /** The default table for shortcuts */
112   private byte[] default_table;
113 
114   /** Incremented every time a new value is stored (without a timestamp) */
115   private long current_timestamp = 1388534400000L;
116 
117   /** A list of exceptions that can be thrown when working with a row key */
118   private ByteMap<Pair<RuntimeException, Boolean>> exceptions;
119 
120   /**
121    * Setups up mock intercepts for all of the calls. Depending on the given
122    * flags, some mocks may not be enabled, allowing local unit tests to setup
123    * their own mocks.
124    * @param tsdb A real TSDB (not mocked) that should have it's client set with
125    * the given mock
126    * @param client A mock client that may have been instantiated and should be
127    * captured for use with MockBase
128    * @param default_get Enable the default .get() mock
129    * @param default_put Enable the default .put() and .compareAndSet() mocks
130    * @param default_delete Enable the default .delete() mock
131    * @param default_scan Enable the Scanner mock implementation
132    */
MockBase( final TSDB tsdb, final HBaseClient client, final boolean default_get, final boolean default_put, final boolean default_delete, final boolean default_scan)133   public MockBase(
134       final TSDB tsdb, final HBaseClient client,
135       final boolean default_get,
136       final boolean default_put,
137       final boolean default_delete,
138       final boolean default_scan) {
139     this.tsdb = tsdb;
140 
141     default_family = "t".getBytes(ASCII);
142     default_table = "tsdb".getBytes(ASCII);
143     setupDefaultTables();
144 
145     // replace the "real" field objects with mocks
146     Whitebox.setInternalState(tsdb, "client", client);
147 
148     // Default get answer will return one or more columns from the requested row
149     if (default_get) {
150       when(client.get((GetRequest)any())).thenAnswer(new MockGet());
151     }
152 
153     // Default put answer will store the given values in the proper location.
154     if (default_put) {
155       when(client.put((PutRequest)any())).thenAnswer(new MockPut());
156       when(client.compareAndSet((PutRequest)any(), (byte[])any()))
157         .thenAnswer(new MockCAS());
158     }
159 
160     if (default_delete) {
161       when(client.delete((DeleteRequest)any())).thenAnswer(new MockDelete());
162     }
163 
164     if (default_scan) {
165       // to facilitate unit tests where more than one scanner is used (i.e. in a
166       // callback chain) we have to provide a new mock scanner for each new
167       // scanner request. That's the way the mock scanner method knows when a
168       // second call has been issued and it should return a null.
169       when(client.newScanner((byte[]) any())).thenAnswer(new Answer<Scanner>() {
170 
171         @Override
172         public Scanner answer(InvocationOnMock arg0) throws Throwable {
173           final Scanner scanner = mock(Scanner.class);
174           final byte[] table = (byte[])arg0.getArguments()[0];
175           scanners.add(new MockScanner(scanner, table));
176           return scanner;
177         }
178 
179       });
180 
181     }
182 
183     when(client.atomicIncrement((AtomicIncrementRequest)any()))
184       .then(new MockAtomicIncrement());
185     when(client.bufferAtomicIncrement((AtomicIncrementRequest)any()))
186       .then(new MockAtomicIncrement());
187     when(client.append((AppendRequest)any())).thenAnswer(new MockAppend());
188   }
189 
190   /**
191    * Add a table with families to the data store. If the table or family
192    * exists, it's a no-op. Give real values as we don't check `em.
193    * @param table The table to add
194    * @param families A list of one or more famlies to add to the table
195    */
addTable(final byte[] table, final List<byte[]> families)196   public void addTable(final byte[] table, final List<byte[]> families) {
197     ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map = storage.get(table);
198     if (map == null) {
199       map = new ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>>();
200       storage.put(table, map);
201     }
202     for (final byte[] family : families) {
203       if (!map.containsKey(family)) {
204         map.put(family, new ByteMap<ByteMap<TreeMap<Long, byte[]>>>());
205       }
206     }
207   }
208 
209   /**
210    * Pops the table out of the map
211    * @param table The table to pop
212    * @return True if the table was there, false if it wasn't.
213    */
deleteTable(final byte[] table)214   public boolean deleteTable(final byte[] table) {
215     return storage.remove(table) != null;
216   }
217 
218   /** @param family Sets the default family for calls that need it */
setFamily(final byte[] family)219   public void setFamily(final byte[] family) {
220     default_family = family;
221   }
222 
223   /** @param table Sets the default table for calls that need it */
setDefaultTable(final byte[] table)224   public void setDefaultTable(final byte[] table) {
225     default_table = table;
226   }
227 
228   /** @param timestamp The timestamp to use for further storage increments */
setCurrentTimestamp(final long timestamp)229   public void setCurrentTimestamp(final long timestamp) {
230     this.current_timestamp = timestamp;
231   }
232 
233   /** @return the incrementing timestamp */
getCurrentTimestamp()234   public long getCurrentTimestamp() {
235     return current_timestamp;
236   }
237 
238   /**
239    * Add a column to the hash table using the default column family.
240    * The proper row will be created if it doesn't exist. If the column already
241    * exists, the original value will be overwritten with the new data.
242    * Uses the default table and family
243    * @param key The row key
244    * @param qualifier The qualifier
245    * @param value The value to store
246    */
addColumn(final byte[] key, final byte[] qualifier, final byte[] value)247   public void addColumn(final byte[] key, final byte[] qualifier,
248       final byte[] value) {
249     addColumn(default_table, key, default_family, qualifier, value,
250         current_timestamp++);
251   }
252 
253   /**
254    * Add a column to the hash table
255    * The proper row will be created if it doesn't exist. If the column already
256    * exists, the original value will be overwritten with the new data.
257    * Uses the default table.
258    * @param key The row key
259    * @param family The column family to store the value in
260    * @param qualifier The qualifier
261    * @param value The value to store
262    */
addColumn(final byte[] key, final byte[] family, final byte[] qualifier, final byte[] value)263   public void addColumn(final byte[] key, final byte[] family,
264       final byte[] qualifier, final byte[] value) {
265     addColumn(default_table, key, family, qualifier, value, current_timestamp++);
266   }
267 
268   /**
269    * Add a column to the hash table
270    * The proper row will be created if it doesn't exist. If the column already
271    * exists, the original value will be overwritten with the new data
272    * @param table The table
273    * @param key The row key
274    * @param family The column family to store the value in
275    * @param qualifier The qualifier
276    * @param value The value to store
277    */
addColumn(final byte[] table, final byte[] key, final byte[] family, final byte[] qualifier, final byte[] value)278   public void addColumn(final byte[] table, final byte[] key, final byte[] family,
279       final byte[] qualifier, final byte[] value) {
280     addColumn(table, key, family, qualifier, value, current_timestamp++);
281   }
282 
283   /**
284    * Add a column to the hash table
285    * The proper row will be created if it doesn't exist. If the column already
286    * exists, the original value will be overwritten with the new data
287    * @param table The table
288    * @param key The row key
289    * @param family The column family to store the value in
290    * @param qualifier The qualifier
291    * @param value The value to store
292    * @param timestamp The timestamp to store
293    */
addColumn(final byte[] table, final byte[] key, final byte[] family, final byte[] qualifier, final byte[] value, final long timestamp)294   public void addColumn(final byte[] table, final byte[] key, final byte[] family,
295       final byte[] qualifier, final byte[] value, final long timestamp) {
296     // AsyncHBase will throw an NPE if the user tries to write a NULL value
297     // so we better do the same. An empty value is ok though, i.e. new byte[] {}
298     if (value == null) {
299       throw new NullPointerException();
300     }
301     final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map = storage.get(table);
302     if (map == null) {
303       throw new RuntimeException(
304           "No such table " + Bytes.pretty(table));
305     }
306     final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf = map.get(family);
307     if (cf == null) {
308       throw new RuntimeException(
309           "No such CF " + Bytes.pretty(family));
310     }
311 
312     ByteMap<TreeMap<Long, byte[]>> row = cf.get(key);
313     if (row == null) {
314       row = new ByteMap<TreeMap<Long, byte[]>>();
315       cf.put(key, row);
316     }
317 
318     TreeMap<Long, byte[]> column = row.get(qualifier);
319     if (column == null) {
320       // remember, most recent at the top!
321       column = new TreeMap<Long, byte[]>(Collections.reverseOrder());
322       row.put(qualifier, column);
323     }
324     column.put(timestamp, value);
325   }
326 
327   /**
328    * Stores an exception so that any operation on the given key will cause it
329    * to be thrown.
330    * @param key The key to go pear shaped on
331    * @param exception The exception to throw
332    */
throwException(final byte[] key, final RuntimeException exception)333   public void throwException(final byte[] key, final RuntimeException exception) {
334     throwException(key, exception, true);
335   }
336 
337   /**
338    * Stores an exception so that any operation on the given key will cause it
339    * to be thrown.
340    * @param key The key to go pear shaped on
341    * @param exception The exception to throw
342    * @param as_result Whether or not to return the exception in the deferred
343    * result or throw it outright.
344    */
throwException(final byte[] key, final RuntimeException exception, final boolean as_result)345   public void throwException(final byte[] key, final RuntimeException exception,
346       final boolean as_result) {
347     if (exceptions == null) {
348       exceptions = new ByteMap<Pair<RuntimeException, Boolean>>();
349     }
350     exceptions.put(key, new Pair<RuntimeException, Boolean>(exception, as_result));
351   }
352 
353   /** Removes all exceptions from the exception list */
clearExceptions()354   public void clearExceptions() {
355     exceptions.clear();
356   }
357 
358   /** @return Total number of unique rows in the default table. Returns 0 if the
359    * default table does not exist */
numRows()360   public int numRows() {
361     return numRows(default_table);
362   }
363 
364   /**
365    * Total number of rows in the given table. Returns 0 if the table does not exit.
366    * @param table The table to scan
367    * @return The number of rows
368    */
numRows(final byte[] table)369   public int numRows(final byte[] table) {
370     final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map =
371         storage.get(table);
372     if (map == null) {
373       return 0;
374     }
375     final ByteMap<Void> unique_rows = new ByteMap<Void>();
376     for (final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf : map.values()) {
377       for (final byte[] key : cf.keySet()) {
378         unique_rows.put(key, null);
379       }
380     }
381     return unique_rows.size();
382   }
383 
384   /**
385    * Return the total number of column families for the row in the default table
386    * @param key The row to search for
387    * @return -1 if the table or row did not exist, otherwise the number of
388    * column families.
389    */
numColumnFamilies(final byte[] key)390   public int numColumnFamilies(final byte[] key) {
391     return numColumnFamilies(default_table, key);
392   }
393 
394   /**
395    * Return the number of column families for the given row key in the given table.
396    * @param table The table to iterate over
397    * @param key The row to search for
398    * @return -1 if the table or row did not exist, otherwise the number of
399    * column families.
400    */
numColumnFamilies(final byte[] table, final byte[] key)401   public int numColumnFamilies(final byte[] table, final byte[] key) {
402     final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map =
403         storage.get(table);
404     if (map == null) {
405       return -1;
406     }
407     int sum = 0;
408     for (final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf : map.values()) {
409       if (cf.containsKey(key)) {
410         ++sum;
411       }
412     }
413     return sum == 0 ? -1 : sum;
414   }
415 
416   /**
417    * Total number of columns in the given row across all column families in the
418    * default table
419    * @param key The row to search for
420    * @return -1 if the row did not exist, otherwise the number of columns.
421    */
numColumns(final byte[] key)422   public long numColumns(final byte[] key) {
423     return numColumns(default_table, key);
424   }
425 
426   /**
427    * Total number of columns in the given row across all column families in the
428    * default table
429    * @param table The table to iterate over
430    * @param key The row to search for
431    * @return -1 if the row did not exist, otherwise the number of columns.
432    */
numColumns(final byte[] table, final byte[] key)433   public long numColumns(final byte[] table, final byte[] key) {
434     final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map =
435         storage.get(table);
436     if (map == null) {
437       return -1;
438     }
439     long size = 0;
440     for (final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf : map.values()) {
441       final ByteMap<TreeMap<Long, byte[]>> row = cf.get(key);
442       if (row != null) {
443         size += row.size();
444       }
445     }
446     return size == 0 ? -1 : size;
447   }
448 
449   /**
450    * Return the total number of columns for a specific row and family in the
451    * default table
452    * @param key The row to search for
453    * @param family The column family to search for
454    * @return -1 if the row did not exist, otherwise the number of columns.
455    */
numColumnsInFamily(final byte[] key, final byte[] family)456   public int numColumnsInFamily(final byte[] key, final byte[] family) {
457     return numColumnsInFamily(default_table, key, family);
458   }
459 
460   /**
461    * Return the total number of columns for a specific row and family
462    * @param key The row to search for
463    * @param family The column family to search for
464    * @return -1 if the row did not exist, otherwise the number of columns.
465    */
numColumnsInFamily(final byte[] table, final byte[] key, final byte[] family)466   public int numColumnsInFamily(final byte[] table, final byte[] key,
467       final byte[] family) {
468     final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map =
469         storage.get(table);
470     if (map == null) {
471       return -1;
472     }
473     final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf = map.get(family);
474     if (cf == null) {
475       return -1;
476     }
477     return cf.size();
478   }
479 
480   /**
481    * Retrieve the most recent contents of a single column with the default family
482    * and in the default table
483    * @param key The row key of the column
484    * @param qualifier The column qualifier
485    * @return The byte array of data or null if not found
486    */
getColumn(final byte[] key, final byte[] qualifier)487   public byte[] getColumn(final byte[] key, final byte[] qualifier) {
488     return getColumn(default_table, key, default_family, qualifier);
489   }
490 
491   /**
492    * Retrieve the most recent contents of a single column with the default table
493    * @param key The row key of the column
494    * @param family The column family
495    * @param qualifier The column qualifier
496    * @return The byte array of data or null if not found
497    */
getColumn(final byte[] key, final byte[] family, final byte[] qualifier)498   public byte[] getColumn(final byte[] key, final byte[] family,
499       final byte[] qualifier) {
500     return getColumn(default_table, key, family, qualifier);
501   }
502 
503   /**
504    * Retrieve the most recent contents of a single column
505    * @param table The table to fetch from
506    * @param key The row key of the column
507    * @param family The column family
508    * @param qualifier The column qualifier
509    * @return The byte array of data or null if not found
510    */
getColumn(final byte[] table, final byte[] key, final byte[] family, final byte[] qualifier)511   public byte[] getColumn(final byte[] table, final byte[] key,
512       final byte[] family, final byte[] qualifier) {
513     final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map =
514         storage.get(table);
515     if (map == null) {
516       return null;
517     }
518     final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf = map.get(family);
519     if (cf == null) {
520       return null;
521     }
522     final ByteMap<TreeMap<Long, byte[]>> row = cf.get(key);
523     if (row == null) {
524       return null;
525     }
526     final TreeMap<Long, byte[]> column = row.get(qualifier);
527     if (column == null) {
528       return null;
529     }
530     return column.firstEntry().getValue();
531   }
532 
533   /**
534    * Retrieve the full map of timestamps and values of a single column with
535    * the default family and default table
536    * @param key The row key of the column
537    * @param qualifier The column qualifier
538    * @return The byte array of data or null if not found
539    */
getFullColumn(final byte[] key, final byte[] qualifier)540   public TreeMap<Long, byte[]> getFullColumn(final byte[] key,
541       final byte[] qualifier) {
542     return getFullColumn(default_table, key, default_family, qualifier);
543   }
544 
545   /**
546    * Retrieve the full map of timestamps and values of a single column
547    * @param table The table to fetch from
548    * @param key The row key of the column
549    * @param family The column family
550    * @param qualifier The column qualifier
551    * @return The tree map of timestamps and values or null if not found
552    */
getFullColumn(final byte[] table, final byte[] key, final byte[] family, final byte[] qualifier)553   public TreeMap<Long, byte[]> getFullColumn(final byte[] table, final byte[] key,
554       final byte[] family, final byte[] qualifier) {
555     final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map =
556         storage.get(table);
557     if (map == null) {
558       return null;
559     }
560     final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf = map.get(family);
561     if (cf == null) {
562       return null;
563     }
564     final ByteMap<TreeMap<Long, byte[]>> row = cf.get(key);
565     if (row == null) {
566       return null;
567     }
568     return row.get(qualifier);
569   }
570 
571   /**
572    * Returns the most recent value from all columns for a given column family
573    * in the default table
574    * @param key The row key
575    * @param family The column family ID
576    * @return A map of columns if the CF was found, null if no such CF
577    */
getColumnFamily(final byte[] key, final byte[] family)578   public ByteMap<byte[]> getColumnFamily(final byte[] key,
579       final byte[] family) {
580     return getColumnFamily(default_table, key , family);
581   }
582 
583   /**
584    * Returns the most recent value from all columns for a given column family
585    * @param table The table to fetch from
586    * @param key The row key
587    * @param family The column family ID
588    * @return A map of columns if the CF was found, null if no such CF
589    */
getColumnFamily(final byte[] table, final byte[] key, final byte[] family)590   public ByteMap<byte[]> getColumnFamily(final byte[] table, final byte[] key,
591       final byte[] family) {
592     final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map =
593         storage.get(table);
594     if (map == null) {
595       return null;
596     }
597     final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf = map.get(family);
598     if (cf == null) {
599       return null;
600     }
601     final ByteMap<TreeMap<Long, byte[]>> row = cf.get(key);
602     if (row == null) {
603       return null;
604     }
605     // convert to a <qualifier, value> byte map
606     final ByteMap<byte[]> columns = new ByteMap<byte[]>();
607     for (Entry<byte[], TreeMap<Long, byte[]>> entry : row.entrySet()) {
608       // the <timestamp, value> map should never be null
609       columns.put(entry.getKey(), entry.getValue().firstEntry().getValue());
610     }
611     return columns;
612   }
613 
614   /** @return the list of keys stored in the default table for all CFs */
getKeys()615   public Set<byte[]> getKeys() {
616     return getKeys(default_table);
617   }
618 
619   /**
620    * Return the list of unique keys in the given table for all CFs
621    * @param table The table to pull from
622    * @return A list of keys. May be null if the table doesn't exist
623    */
getKeys(final byte[] table)624   public Set<byte[]> getKeys(final byte[] table) {
625     final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map =
626         storage.get(table);
627     if (map == null) {
628       return null;
629     }
630     final ByteMap<Void> unique_rows = new ByteMap<Void>();
631     for (final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf : map.values()) {
632       for (final byte[] key : cf.keySet()) {
633         unique_rows.put(key, null);
634       }
635     }
636     return unique_rows.keySet();
637   }
638 
639   /** @return The set of scanners configured by the caller */
getScanners()640   public HashSet<MockScanner> getScanners() {
641     return scanners;
642   }
643 
644   /**
645    * Return the mocked TSDB object to use for HBaseClient access
646    * @return
647    */
getTSDB()648   public TSDB getTSDB() {
649     return tsdb;
650   }
651 
652   /**
653    * Runs through all rows in the "tsdb" table and compacts them by making a
654    * call to the {@link TSDB.compact} method. It will delete any columns
655    * that were compacted and leave others untouched, just as the normal
656    * method does.
657    * And only iterates over the 't' family.
658    * @throws Exception if Whitebox couldn't access the compact method
659    */
tsdbCompactAllRows()660   public void tsdbCompactAllRows() throws Exception {
661     final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map =
662         storage.get("tsdb".getBytes(ASCII));
663     if (map == null) {
664       return;
665     }
666     final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf = map.get("t".getBytes(ASCII));
667     if (cf == null) {
668       return;
669     }
670 
671     for (Entry<byte[], ByteMap<TreeMap<Long, byte[]>>> entry : cf.entrySet()) {
672       final byte[] key = entry.getKey();
673 
674       final ByteMap<TreeMap<Long, byte[]>> row = entry.getValue();
675       ArrayList<KeyValue> kvs = new ArrayList<KeyValue>(row.size());
676       final Set<byte[]> deletes = new HashSet<byte[]>();
677       for (Map.Entry<byte[], TreeMap<Long, byte[]>> column : row.entrySet()) {
678         if (column.getKey().length % 2 == 0) {
679           kvs.add(new KeyValue(key, default_family, column.getKey(),
680               column.getValue().firstKey(),
681               column.getValue().firstEntry().getValue()));
682           deletes.add(column.getKey());
683         }
684       }
685       if (kvs.size() > 0) {
686         for (final byte[] k : deletes) {
687           row.remove(k);
688         }
689         final KeyValue compacted =
690             Whitebox.invokeMethod(tsdb, "compact", kvs, Collections.EMPTY_LIST);
691         final TreeMap<Long, byte[]> compacted_value = new TreeMap<Long, byte[]>();
692         compacted_value.put(current_timestamp++, compacted.value());
693         row.put(compacted.qualifier(), compacted_value);
694       }
695     }
696   }
697 
698   /**
699    * Clears out all rows from storage but doesn't delete the tables or families.
700    */
flushStorage()701   public void flushStorage() {
702     for (final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> table :
703       storage.values()) {
704       for (final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf : table.values()) {
705         cf.clear();
706       }
707     }
708   }
709 
710   /**
711    * Clears out all rows for a given table
712    * @param table The table to empty out
713    */
flushStorage(final byte[] table)714   public void flushStorage(final byte[] table) {
715     final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map = storage.get(table);
716     if (map == null) {
717       return;
718     }
719     for (final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf : map.values()) {
720       cf.clear();
721     }
722   }
723 
724   /**
725    * Removes the entire row from the default table for all column families
726    * @param key The row to remove
727    */
flushRow(final byte[] key)728   public void flushRow(final byte[] key) {
729     flushRow(default_table, key);
730   }
731 
732   /**
733    * Removes the entire row from the table for all column families
734    * @param table The table to purge
735    * @param key The row to remove
736    */
flushRow(final byte[] table, final byte[] key)737   public void flushRow(final byte[] table, final byte[] key) {
738     final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map = storage.get(table);
739     if (map == null) {
740       return;
741     }
742     for (final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf : map.values()) {
743       cf.remove(key);
744     }
745   }
746 
747   /**
748    * Removes all rows from the default table for the given column family
749    * @param family The family to remove
750    */
flushFamily(final byte[] family)751   public void flushFamily(final byte[] family) {
752     flushFamily(default_table, family);
753   }
754 
755   /**
756    * Removes all rows from the default table for the given column family
757    * @param table The table to purge from
758    * @param family The family to remove
759    */
flushFamily(final byte[] table, final byte[] family)760   public void flushFamily(final byte[] table, final byte[] family) {
761     final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map = storage.get(table);
762     if (map == null) {
763       return;
764     }
765     final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf = map.get(family);
766     if (cf != null) {
767       cf.clear();
768     }
769   }
770 
771   /**
772    * Removes the given column from the default table
773    * @param key Row key
774    * @param family Column family
775    * @param qualifier Column qualifier
776    */
flushColumn(final byte[] key, final byte[] family, final byte[] qualifier)777   public void flushColumn(final byte[] key, final byte[] family,
778       final byte[] qualifier) {
779     flushColumn(default_table, key, family, qualifier);
780   }
781 
782   /**
783    * Removes the given column from the table
784    * @param table The table to purge from
785    * @param key Row key
786    * @param family Column family
787    * @param qualifier Column qualifier
788    */
flushColumn(final byte[] table, final byte[] key, final byte[] family, final byte[] qualifier)789   public void flushColumn(final byte[] table, final byte[] key,
790       final byte[] family, final byte[] qualifier) {
791     final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map = storage.get(table);
792     if (map == null) {
793       return;
794     }
795     final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf = map.get(family);
796     if (cf == null) {
797       return;
798     }
799     final ByteMap<TreeMap<Long, byte[]>> row = cf.get(key);
800     if (row == null) {
801       return;
802     }
803     row.remove(qualifier);
804   }
805 
806   /**
807    * Dumps the entire storage hash to stdout in a sort of tree style format with
808    * all byte arrays hex encoded
809    */
dumpToSystemOut()810   public void dumpToSystemOut() {
811     dumpToSystemOut(false);
812   }
813 
814   /**
815    * Dumps the entire storage hash to stdout in a sort of tree style format
816    * @param ascii Whether or not the values should be converted to ascii
817    */
dumpToSystemOut(final boolean ascii)818   public void dumpToSystemOut(final boolean ascii) {
819     if (storage.isEmpty()) {
820       System.out.println("Storage is Empty");
821       return;
822     }
823 
824     for (Entry<byte[], ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>>> table :
825       storage.entrySet()) {
826       System.out.println("[Table] " + new String(table.getKey(), ASCII));
827 
828       for (Entry<byte[], ByteMap<ByteMap<TreeMap<Long, byte[]>>>> cf :
829         table.getValue().entrySet()) {
830         System.out.println("  [CF] " + new String(cf.getKey(), ASCII));
831 
832         for (Entry<byte[], ByteMap<TreeMap<Long, byte[]>>> row :
833           cf.getValue().entrySet()) {
834           System.out.println("    [Row] " + (ascii ?
835               new String(row.getKey(), ASCII) : bytesToString(row.getKey())));
836 
837           for (Map.Entry<byte[], TreeMap<Long, byte[]>> column : row.getValue().entrySet()) {
838             System.out.println("      [Qual] " + (ascii ?
839                 "\"" + new String(column.getKey(), ASCII) + "\""
840                 : bytesToString(column.getKey())));
841             for (Map.Entry<Long, byte[]> cell : column.getValue().entrySet()) {
842               System.out.println("        [TS] " + cell.getKey() + "  [Value] " +
843                   (ascii ?  new String(cell.getValue(), ASCII)
844                   : bytesToString(cell.getValue())));
845             }
846           }
847         }
848       }
849     }
850   }
851 
852   /**
853    * Helper to convert an array of bytes to a hexadecimal encoded string.
854    * @param bytes The byte array to convert
855    * @return A hex string
856    */
bytesToString(final byte[] bytes)857   public static String bytesToString(final byte[] bytes) {
858     return DatatypeConverter.printHexBinary(bytes);
859   }
860 
861   /**
862    * Helper to convert a hex encoded string into a byte array.
863    * <b>Warning:</b> This method won't pad the string to make sure it's an
864    * even number of bytes.
865    * @param bytes The hex encoded string to convert
866    * @return A byte array from the hex string
867    * @throws IllegalArgumentException if the string contains illegal characters
868    * or can't be converted.
869    */
stringToBytes(final String bytes)870   public static byte[] stringToBytes(final String bytes) {
871     return DatatypeConverter.parseHexBinary(bytes);
872   }
873 
874   /** @return Returns the ASCII character set */
ASCII()875   public static Charset ASCII() {
876     return ASCII;
877   }
878 
879   /**
880    * Concatenates byte arrays into one big array
881    * @param arrays Any number of arrays to concatenate
882    * @return The concatenated array
883    */
concatByteArrays(final byte[]... arrays)884   public static byte[] concatByteArrays(final byte[]... arrays) {
885     int len = 0;
886     for (final byte[] array : arrays) {
887       len += array.length;
888     }
889     final byte[] result = new byte[len];
890     len = 0;
891     for (final byte[] array : arrays) {
892       System.arraycopy(array, 0, result, len, array.length);
893       len += array.length;
894     }
895     return result;
896   }
897 
898   /** Creates the TSDB and UID tables */
setupDefaultTables()899   private void setupDefaultTables() {
900     final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> tsdb =
901         new ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>>();
902     tsdb.put("t".getBytes(ASCII), new ByteMap<ByteMap<TreeMap<Long, byte[]>>>());
903     storage.put("tsdb".getBytes(ASCII), tsdb);
904 
905     final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> tsdb_uid =
906         new ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>>();
907     tsdb_uid.put("name".getBytes(ASCII),
908         new ByteMap<ByteMap<TreeMap<Long, byte[]>>>());
909     tsdb_uid.put("id".getBytes(ASCII),
910         new ByteMap<ByteMap<TreeMap<Long, byte[]>>>());
911     storage.put("tsdb-uid".getBytes(ASCII), tsdb_uid);
912   }
913 
914   /**
915    * Gets one or more columns from a row. If the row does not exist, a null is
916    * returned. If no qualifiers are given, the entire row is returned.
917    * NOTE: all timestamp, value pairs are returned.
918    */
919   private class MockGet implements Answer<Deferred<ArrayList<KeyValue>>> {
920     @Override
answer(InvocationOnMock invocation)921     public Deferred<ArrayList<KeyValue>> answer(InvocationOnMock invocation)
922         throws Throwable {
923       final Object[] args = invocation.getArguments();
924       final GetRequest get = (GetRequest)args[0];
925 
926       if (exceptions != null) {
927         final Pair<RuntimeException, Boolean> ex = exceptions.get(get.key());
928         if (ex != null) {
929           if (ex.getValue()) {
930             return Deferred.fromError(ex.getKey());
931           } else {
932             throw ex.getKey();
933           }
934         }
935       }
936 
937       final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map =
938           storage.get(get.table());
939       if (map == null) {
940         return Deferred.fromError(new RuntimeException(
941           "No such table " + Bytes.pretty(get.table())));
942       }
943 
944       // compile a set of qualifiers to use as a filter if necessary
945       final ByteMap<Object> qualifiers = new ByteMap<Object>();
946       if (get.qualifiers() != null && get.qualifiers().length > 0) {
947         for (byte[] q : get.qualifiers()) {
948           qualifiers.put(q, null);
949         }
950       }
951 
952       final ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
953       for (final Entry<byte[], ByteMap<ByteMap<TreeMap<Long, byte[]>>>> cf :
954           map.entrySet()) {
955         if (get.family() != null && Bytes.memcmp(get.family(), cf.getKey()) != 0) {
956           continue;
957         }
958 
959         final ByteMap<TreeMap<Long, byte[]>> row = cf.getValue().get(get.key());
960         if (row == null) {
961           continue;
962         }
963 
964         for (Entry<byte[], TreeMap<Long, byte[]>> column : row.entrySet()) {
965           if (!qualifiers.isEmpty() && !qualifiers.containsKey(column.getKey())) {
966             continue;
967           }
968 
969           // TODO - if we want to support multiple values, iterate over the
970           // tree map. Otherwise Get returns just the latest value.
971           kvs.add(new KeyValue(get.key(), cf.getKey(), column.getKey(),
972               column.getValue().firstKey(),
973               column.getValue().firstEntry().getValue()));
974         }
975       }
976       if (kvs.isEmpty()) {
977         return Deferred.fromResult(null);
978       }
979       return Deferred.fromResult(kvs);
980     }
981   }
982 
983   /**
984    * Stores one or more columns in a row. If the row does not exist, it's
985    * created.
986    */
987   private class MockPut implements Answer<Deferred<Boolean>> {
988     @Override
answer(final InvocationOnMock invocation)989     public Deferred<Boolean> answer(final InvocationOnMock invocation)
990       throws Throwable {
991       final Object[] args = invocation.getArguments();
992       final PutRequest put = (PutRequest)args[0];
993 
994       if (exceptions != null) {
995         final Pair<RuntimeException, Boolean> ex = exceptions.get(put.key());
996         if (ex != null) {
997           if (ex.getValue()) {
998             return Deferred.fromError(ex.getKey());
999           } else {
1000             throw ex.getKey();
1001           }
1002         }
1003       }
1004 
1005       final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map =
1006           storage.get(put.table());
1007       if (map == null) {
1008         return Deferred.fromError(new RuntimeException(
1009           "No such table " + Bytes.pretty(put.table())));
1010       }
1011 
1012       final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf = map.get(put.family());
1013       if (cf == null) {
1014         return Deferred.fromError(new RuntimeException(
1015             "No such CF " + Bytes.pretty(put.table())));
1016       }
1017 
1018       ByteMap<TreeMap<Long, byte[]>> row = cf.get(put.key());
1019       if (row == null) {
1020         row = new ByteMap<TreeMap<Long, byte[]>>();
1021         cf.put(put.key(), row);
1022       }
1023 
1024       for (int i = 0; i < put.qualifiers().length; i++) {
1025         TreeMap<Long, byte[]> column = row.get(put.qualifiers()[i]);
1026         if (column == null) {
1027           column = new TreeMap<Long, byte[]>(Collections.reverseOrder());
1028           row.put(put.qualifiers()[i], column);
1029         }
1030 
1031         column.put(put.timestamp() != Long.MAX_VALUE ? put.timestamp() :
1032           current_timestamp++, put.values()[i]);
1033       }
1034 
1035       return Deferred.fromResult(true);
1036     }
1037   }
1038 
1039   /**
1040    * Stores one or more columns in a row. If the row does not exist, it's
1041    * created.
1042    */
1043   private class MockAppend implements Answer<Deferred<Boolean>> {
1044     @Override
answer(final InvocationOnMock invocation)1045     public Deferred<Boolean> answer(final InvocationOnMock invocation)
1046       throws Throwable {
1047       final Object[] args = invocation.getArguments();
1048       final AppendRequest append = (AppendRequest)args[0];
1049 
1050       if (exceptions != null) {
1051         final Pair<RuntimeException, Boolean> ex = exceptions.get(append.key());
1052         if (ex != null) {
1053           if (ex.getValue()) {
1054             return Deferred.fromError(ex.getKey());
1055           } else {
1056             throw ex.getKey();
1057           }
1058         }
1059       }
1060 
1061       final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map =
1062           storage.get(append.table());
1063       if (map == null) {
1064         return Deferred.fromError(new RuntimeException(
1065           "No such table " + Bytes.pretty(append.table())));
1066       }
1067 
1068       final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf = map.get(append.family());
1069       if (cf == null) {
1070         return Deferred.fromError(new RuntimeException(
1071             "No such CF " + Bytes.pretty(append.table())));
1072       }
1073 
1074       ByteMap<TreeMap<Long, byte[]>> row = cf.get(append.key());
1075       if (row == null) {
1076         row = new ByteMap<TreeMap<Long, byte[]>>();
1077         cf.put(append.key(), row);
1078       }
1079 
1080       for (int i = 0; i < append.qualifiers().length; i++) {
1081         TreeMap<Long, byte[]> column = row.get(append.qualifiers()[i]);
1082         if (column == null) {
1083           column = new TreeMap<Long, byte[]>(Collections.reverseOrder());
1084           row.put(append.qualifiers()[i], column);
1085         }
1086 
1087         final byte[] values;
1088         long column_timestamp = 0;
1089         if (append.timestamp() != Long.MAX_VALUE) {
1090           values = column.get(append.timestamp());
1091           column_timestamp = append.timestamp();
1092         } else {
1093           if (column.isEmpty()) {
1094             values = null;
1095           } else {
1096             values = column.firstEntry().getValue();
1097             column_timestamp = column.firstKey();
1098           }
1099         }
1100         if (column_timestamp == 0) {
1101           column_timestamp = current_timestamp++;
1102         }
1103 
1104         final int current_len = values != null ? values.length : 0;
1105         final byte[] append_value = new byte[current_len + append.values()[i].length];
1106         if (current_len > 0) {
1107           System.arraycopy(values, 0, append_value, 0, values.length);
1108         }
1109 
1110         System.arraycopy(append.value(), 0, append_value, current_len,
1111             append.values()[i].length);
1112         column.put(column_timestamp, append_value);
1113       }
1114 
1115       return Deferred.fromResult(true);
1116     }
1117   }
1118 
1119   /**
1120    * Imitates the compareAndSet client call where a {@code PutRequest} is passed
1121    * along with a byte array to compared the stored value against. If the stored
1122    * value doesn't match, the put is ignored and a "false" is returned. If the
1123    * comparator matches, the new put is recorded.
1124    * <b>Warning:</b> While a put works on multiple qualifiers, CAS only works
1125    * with one. So if the put includes more than one qualifier, only the first
1126    * one will be processed in this CAS call.
1127    */
1128   private class MockCAS implements Answer<Deferred<Boolean>> {
1129 
1130     @Override
answer(final InvocationOnMock invocation)1131     public Deferred<Boolean> answer(final InvocationOnMock invocation)
1132       throws Throwable {
1133       final Object[] args = invocation.getArguments();
1134       final PutRequest put = (PutRequest)args[0];
1135       final byte[] expected = (byte[])args[1];
1136 
1137       if (exceptions != null) {
1138         final Pair<RuntimeException, Boolean> ex = exceptions.get(put.key());
1139         if (ex != null) {
1140           if (ex.getValue()) {
1141             return Deferred.fromError(ex.getKey());
1142           } else {
1143             throw ex.getKey();
1144           }
1145         }
1146       }
1147 
1148       final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map =
1149           storage.get(put.table());
1150       if (map == null) {
1151         return Deferred.fromError(new RuntimeException(
1152           "No such table " + Bytes.pretty(put.table())));
1153       }
1154 
1155       final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf = map.get(put.family());
1156       if (cf == null) {
1157         return Deferred.fromError(new RuntimeException(
1158             "No such CF " + Bytes.pretty(put.table())));
1159       }
1160 
1161       ByteMap<TreeMap<Long, byte[]>> row = cf.get(put.key());
1162       if (row == null) {
1163         if (expected != null && expected.length > 0) {
1164           return Deferred.fromResult(false);
1165         }
1166         row = new ByteMap<TreeMap<Long, byte[]>>();
1167         cf.put(put.key(), row);
1168       }
1169 
1170       // CAS can only operate on one cell, so if the put request has more than
1171       // one, we ignore any but the first
1172       TreeMap<Long, byte[]> column = row.get(put.qualifiers()[0]);
1173       if (column == null && (expected != null && expected.length > 0)) {
1174         return Deferred.fromResult(false);
1175       }
1176       // if a timestamp was specified, maybe we're CASing against a specific
1177       // cell. Otherwise we deal with the latest value
1178       final byte[] stored = column == null ? null :
1179         put.timestamp() != Long.MAX_VALUE ? column.get(put.timestamp()) :
1180         column.firstEntry().getValue();
1181       if (stored == null && (expected != null && expected.length > 0)) {
1182         return Deferred.fromResult(false);
1183       }
1184       if (stored != null && (expected == null || expected.length < 1)) {
1185         return Deferred.fromResult(false);
1186       }
1187       if (stored != null && expected != null &&
1188           Bytes.memcmp(stored, expected) != 0) {
1189         return Deferred.fromResult(false);
1190       }
1191 
1192       // passed CAS!
1193       if (column == null) {
1194         column = new TreeMap<Long, byte[]>(Collections.reverseOrder());
1195         row.put(put.qualifiers()[0], column);
1196       }
1197       column.put(put.timestamp() != Long.MAX_VALUE ? put.timestamp() :
1198         current_timestamp++, put.value());
1199       return Deferred.fromResult(true);
1200     }
1201 
1202   }
1203 
1204   /**
1205    * Deletes one or more columns. If a row no longer has any valid columns, the
1206    * entire row will be removed.
1207    */
1208   private class MockDelete implements Answer<Deferred<Object>> {
1209 
1210     @Override
answer(InvocationOnMock invocation)1211     public Deferred<Object> answer(InvocationOnMock invocation)
1212         throws Throwable {
1213       final Object[] args = invocation.getArguments();
1214       final DeleteRequest delete = (DeleteRequest)args[0];
1215 
1216       if (exceptions != null) {
1217         final Pair<RuntimeException, Boolean> ex = exceptions.get(delete.key());
1218         if (ex != null) {
1219           if (ex.getValue()) {
1220             return Deferred.fromError(ex.getKey());
1221           } else {
1222             throw ex.getKey();
1223           }
1224         }
1225       }
1226 
1227       final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map =
1228           storage.get(delete.table());
1229       if (map == null) {
1230         return Deferred.fromError(new RuntimeException(
1231           "No such table " + Bytes.pretty(delete.table())));
1232       }
1233 
1234       // if no qualifiers or family, then delete the row from all families
1235       if ((delete.qualifiers() == null || delete.qualifiers().length < 1 ||
1236           delete.qualifiers()[0].length < 1) && (delete.family() == null ||
1237           delete.family().length < 1)) {
1238         for (final Entry<byte[], ByteMap<ByteMap<TreeMap<Long, byte[]>>>> cf :
1239           map.entrySet()) {
1240           cf.getValue().remove(delete.key());
1241         }
1242         return Deferred.fromResult(new Object());
1243       }
1244 
1245       final byte[] family = delete.family();
1246       if (family != null && family.length > 0) {
1247         if (!map.containsKey(family)) {
1248           return Deferred.fromError(new RuntimeException(
1249               "No such CF " + Bytes.pretty(family)));
1250         }
1251       }
1252 
1253       // compile a set of qualifiers
1254       ByteMap<Object> qualifiers = new ByteMap<Object>();
1255       if (delete.qualifiers() != null || delete.qualifiers().length > 0) {
1256         for (byte[] q : delete.qualifiers()) {
1257           qualifiers.put(q, null);
1258         }
1259       }
1260 
1261       // TODO - validate the assumption that a delete with a row key and qual
1262       // but without a family would delete the columns in ALL families
1263 
1264       // if the request only has a column family and no qualifiers, we delete
1265       // the row from the entire family
1266       if (family != null && qualifiers.isEmpty()) {
1267         final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf = map.get(delete.family());
1268         // cf != null validated above
1269         cf.remove(delete.key());
1270         return Deferred.fromResult(new Object());
1271       }
1272 
1273       for (final Entry<byte[], ByteMap<ByteMap<TreeMap<Long, byte[]>>>> cf :
1274         map.entrySet()) {
1275 
1276         // column family filter
1277         if (family != null && family.length > 0 &&
1278             !Bytes.equals(family, cf.getKey())) {
1279           continue;
1280         }
1281 
1282         ByteMap<TreeMap<Long, byte[]>> row = cf.getValue().get(delete.key());
1283         if (row == null) {
1284           continue;
1285         }
1286 
1287         for (byte[] qualifier : qualifiers.keySet()) {
1288           final TreeMap<Long, byte[]> column = row.get(qualifier);
1289           if (column == null) {
1290             continue;
1291           }
1292 
1293           // with this flag we delete a single timestamp
1294           if (delete.deleteAtTimestampOnly()) {
1295             if (column != null) {
1296               column.remove(delete.timestamp());
1297               if (column.isEmpty()) {
1298                 row.remove(qualifier);
1299               }
1300             }
1301           } else {
1302             // otherwise we delete everything less than or equal to the
1303             // delete timestamp
1304             List<Long> column_removals = new ArrayList<Long>(column.size());
1305             for (Map.Entry<Long, byte[]> cell : column.entrySet()) {
1306               if (cell.getKey() <= delete.timestamp()) {
1307                 column_removals.add(cell.getKey());
1308               }
1309             }
1310             for (Long ts : column_removals) {
1311               column.remove(ts);
1312             }
1313             if (column.isEmpty()) {
1314               row.remove(qualifier);
1315             }
1316           }
1317         }
1318 
1319         if (row.isEmpty()) {
1320           cf.getValue().remove(delete.key());
1321         }
1322       }
1323       return Deferred.fromResult(new Object());
1324     }
1325 
1326   }
1327 
1328   /**
1329    * This is a limited implementation of the scanner object. The only fields
1330    * caputred and acted on are:
1331    * <ul><li>KeyRegexp</li>
1332    * <li>StartKey</li>
1333    * <li>StopKey</li>
1334    * <li>Qualifier</li>
1335    * <li>Qualifiers</li></ul>
1336    * Hence timestamps are ignored as are the max number of rows and qualifiers.
1337    * All matching rows/qualifiers will be returned in the first {@code nextRows}
1338    * call. The second {@code nextRows} call will always return null. Multiple
1339    * qualifiers are supported for matching.
1340    * <p>
1341    * The KeyRegexp can be set and it will run against the hex value of the
1342    * row key. In testing it seems to work nicely even with byte patterns.
1343    */
1344   public class MockScanner implements
1345     Answer<Deferred<ArrayList<ArrayList<KeyValue>>>> {
1346 
1347     private final Scanner mock_scanner;
1348     private final byte[] table;
1349     private byte[] start = null;
1350     private byte[] stop = null;
1351     private HashSet<String> scnr_qualifiers = null;
1352     private byte[] family = null;
1353     private ScanFilter filter = null;
1354     private int max_num_rows = Scanner.DEFAULT_MAX_NUM_ROWS;
1355     private ByteMap<Iterator<Entry<byte[], ByteMap<TreeMap<Long, byte[]>>>>>
1356       cursors;
1357     private ByteMap<Entry<byte[], ByteMap<TreeMap<Long, byte[]>>>> cf_rows;
1358     private byte[] last_row;
1359     private String rex; // TEMP
1360 
1361     /**
1362      * Default ctor
1363      * @param mock_scanner The scanner we're using
1364      * @param table The table (confirmed to exist)
1365      */
MockScanner(final Scanner mock_scanner, final byte[] table)1366     public MockScanner(final Scanner mock_scanner, final byte[] table) {
1367       this.mock_scanner = mock_scanner;
1368       this.table = table;
1369 
1370       // capture the scanner fields when set
1371       doAnswer(new Answer<Object>() {
1372         @Override
1373         public Object answer(InvocationOnMock invocation) throws Throwable {
1374           final Object[] args = invocation.getArguments();
1375           filter = new KeyRegexpFilter((String)args[0], Const.ASCII_CHARSET);
1376           rex = (String)args[0];
1377           return null;
1378         }
1379       }).when(mock_scanner).setKeyRegexp(anyString());
1380 
1381       doAnswer(new Answer<Object>() {
1382         @Override
1383         public Object answer(InvocationOnMock invocation) throws Throwable {
1384           final Object[] args = invocation.getArguments();
1385           filter = new KeyRegexpFilter((String)args[0], (Charset)args[1]);
1386           rex = (String)args[0];
1387           return null;
1388         }
1389       }).when(mock_scanner).setKeyRegexp(anyString(), (Charset)any());
1390 
1391       doAnswer(new Answer<Object>() {
1392         @Override
1393         public Object answer(InvocationOnMock invocation) throws Throwable {
1394           final Object[] args = invocation.getArguments();
1395           filter = (ScanFilter)args[0];
1396           return null;
1397         }
1398       }).when(mock_scanner).setFilter(any(ScanFilter.class));
1399 
1400       doAnswer(new Answer<Object>() {
1401         @Override
1402         public Object answer(InvocationOnMock invocation) throws Throwable {
1403           final Object[] args = invocation.getArguments();
1404           start = (byte[])args[0];
1405           return null;
1406         }
1407       }).when(mock_scanner).setStartKey((byte[])any());
1408 
1409       doAnswer(new Answer<Object>() {
1410         @Override
1411         public Object answer(InvocationOnMock invocation) throws Throwable {
1412           final Object[] args = invocation.getArguments();
1413           stop = (byte[])args[0];
1414           return null;
1415         }
1416       }).when(mock_scanner).setStopKey((byte[])any());
1417 
1418       doAnswer(new Answer<Object>() {
1419         @Override
1420         public Object answer(InvocationOnMock invocation) throws Throwable {
1421           final Object[] args = invocation.getArguments();
1422           family = (byte[])args[0];
1423           return null;
1424         }
1425       }).when(mock_scanner).setFamily((byte[])any());
1426 
1427       doAnswer(new Answer<Object>() {
1428         @Override
1429         public Object answer(InvocationOnMock invocation) throws Throwable {
1430           final Object[] args = invocation.getArguments();
1431           scnr_qualifiers = new HashSet<String>(1);
1432           scnr_qualifiers.add(bytesToString((byte[])args[0]));
1433           return null;
1434         }
1435       }).when(mock_scanner).setQualifier((byte[])any());
1436 
1437       doAnswer(new Answer<Object>() {
1438         @Override
1439         public Object answer(InvocationOnMock invocation) throws Throwable {
1440           final Object[] args = invocation.getArguments();
1441           final byte[][] qualifiers = (byte[][])args[0];
1442           scnr_qualifiers = new HashSet<String>(qualifiers.length);
1443           for (byte[] qualifier : qualifiers) {
1444             scnr_qualifiers.add(bytesToString(qualifier));
1445           }
1446           return null;
1447         }
1448       }).when(mock_scanner).setQualifiers((byte[][])any());
1449 
1450       doAnswer(new Answer<byte[]>() {
1451         @Override
1452         public byte[] answer(InvocationOnMock invocation) throws Throwable {
1453           return start;
1454         }
1455       }).when(mock_scanner).getCurrentKey();
1456 
1457       when(mock_scanner.nextRows()).thenAnswer(this);
1458 
1459     }
1460 
1461     @Override
answer( final InvocationOnMock invocation)1462     public Deferred<ArrayList<ArrayList<KeyValue>>> answer(
1463         final InvocationOnMock invocation) throws Throwable {
1464 
1465       if (cursors == null) {
1466         final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map =
1467             storage.get(table);
1468         if (map == null) {
1469           return Deferred.fromError( new RuntimeException(
1470               "No such table " + Bytes.pretty(table)));
1471         }
1472 
1473         cursors = new ByteMap<Iterator<Entry<byte[],
1474             ByteMap<TreeMap<Long, byte[]>>>>>();
1475         cf_rows = new ByteMap<Entry<byte[], ByteMap<TreeMap<Long, byte[]>>>>();
1476 
1477         if (family == null || family.length < 1) {
1478           for (final Entry<byte[], ByteMap<ByteMap<TreeMap<Long, byte[]>>>> cf : map) {
1479             final Iterator<Entry<byte[], ByteMap<TreeMap<Long, byte[]>>>>
1480             cursor = cf.getValue().iterator();
1481             cursors.put(cf.getKey(), cursor);
1482             cf_rows.put(cf.getKey(), null);
1483           }
1484         } else {
1485           final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf = map.get(family);
1486           if (cf == null) {
1487             return Deferred.fromError(new RuntimeException(
1488             "No such CF " + Bytes.pretty(family)));
1489           }
1490           final Iterator<Entry<byte[], ByteMap<TreeMap<Long, byte[]>>>>
1491             cursor = cf.iterator();
1492           cursors.put(family, cursor);
1493           cf_rows.put(family, null);
1494         }
1495       }
1496 
1497       // If we're out of rows to scan, then you HAVE to return null as the
1498       // HBase client does.
1499       if (!hasNext()) {
1500         return Deferred.fromResult(null);
1501       }
1502 
1503       // TODO - fuzzy filter support
1504       // TODO - fix the regex comparator
1505       Pattern pattern = null;
1506       if (rex != null) {
1507         if (!rex.isEmpty()) {
1508           pattern = Pattern.compile(rex);
1509         }
1510       } else if (filter != null) {
1511         KeyRegexpFilter regex_filter = null;
1512 
1513         if (filter instanceof KeyRegexpFilter) {
1514           regex_filter = (KeyRegexpFilter)filter;
1515         } else if (filter instanceof FilterList) {
1516           final List<ScanFilter> filters =
1517               Whitebox.getInternalState(filter, "filters");
1518           for (final ScanFilter f : filters) {
1519             if (f instanceof KeyRegexpFilter) {
1520               regex_filter = (KeyRegexpFilter)f;
1521             }
1522           }
1523         }
1524 
1525         if (regex_filter != null) {
1526           try {
1527             final String regexp = new String(
1528                 (byte[])Whitebox.getInternalState(regex_filter, "regexp"),
1529                 Charset.forName(new String(
1530                     (byte[])Whitebox.getInternalState(regex_filter, "charset"))));
1531             if (!regexp.isEmpty()) {
1532               pattern = Pattern.compile(regexp);
1533             }
1534           } catch (PatternSyntaxException e) {
1535             e.printStackTrace();
1536             return Deferred.fromError(e);
1537           }
1538         }
1539       }
1540 
1541       // return all matches
1542       final ArrayList<ArrayList<KeyValue>> results =
1543         new ArrayList<ArrayList<KeyValue>>();
1544       int rows_read = 0;
1545       while (hasNext()) {
1546         advance();
1547 
1548         // if it's before the start row, after the end row or doesn't
1549         // match the given regex, continue on to the next row
1550         if (start != null && Bytes.memcmp(last_row, start) < 0) {
1551           continue;
1552         }
1553         // asynchbase Scanner's logic:
1554         // - start_key is inclusive, stop key is exclusive
1555         // - when start key is equal to the stop key,
1556         //   include the key in scan result
1557         // - if stop key is empty, scan till the end
1558         if (stop != null && stop.length > 0 &&
1559             Bytes.memcmp(last_row, stop) >= 0 &&
1560             Bytes.memcmp(start, stop) != 0) {
1561           continue;
1562         }
1563         if (pattern != null) {
1564           final String from_bytes = new String(last_row, MockBase.ASCII);
1565           if (!pattern.matcher(from_bytes).find()) {
1566             continue;
1567           }
1568         }
1569 
1570         // throws AFTER we match on a row key
1571         if (exceptions != null) {
1572           final Pair<RuntimeException, Boolean> ex = exceptions.get(last_row);
1573           if (ex != null) {
1574             if (ex.getValue()) {
1575               return Deferred.fromError(ex.getKey());
1576             } else {
1577               throw ex.getKey();
1578             }
1579           }
1580         }
1581 
1582         // loop over the column family rows to see if they match
1583         final ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
1584         for (final Entry<byte[], Entry<byte[], ByteMap<TreeMap<Long, byte[]>>>> row :
1585           cf_rows.entrySet()) {
1586           if (row.getValue() == null ||
1587               Bytes.memcmp(last_row, row.getValue().getKey()) != 0) {
1588             continue;
1589           }
1590 
1591           for (final Entry<byte[], TreeMap<Long, byte[]>> column :
1592             row.getValue().getValue().entrySet()) {
1593             // if the qualifier isn't in the set, continue
1594             if (scnr_qualifiers != null &&
1595                 !scnr_qualifiers.contains(bytesToString(column.getKey()))) {
1596               continue;
1597             }
1598 
1599             kvs.add(new KeyValue(row.getValue().getKey(), row.getKey(),
1600                 column.getKey(), column.getValue().firstKey(),
1601                 column.getValue().firstEntry().getValue()));
1602           }
1603         }
1604 
1605         if (!kvs.isEmpty()) {
1606           results.add(kvs);
1607         }
1608         rows_read++;
1609 
1610         if (rows_read >= max_num_rows) {
1611           Thread.sleep(10); // this is here for time based unit tests
1612           break;
1613         }
1614       }
1615 
1616       if (results.isEmpty()) {
1617         return Deferred.fromResult(null);
1618       }
1619       return Deferred.fromResult(results);
1620     }
1621 
1622     /** @return Returns true if any of the CF iterators have another value */
hasNext()1623     private boolean hasNext() {
1624       for (final Iterator<Entry<byte[], ByteMap<TreeMap<Long, byte[]>>>> cursor :
1625           cursors.values()) {
1626         if (cursor.hasNext()) {
1627           return true;
1628         }
1629       }
1630       return false;
1631     }
1632 
1633     /** Insanely inefficient and ugly way of advancing the cursors */
advance()1634     private void advance() {
1635       // first time to get the ceiling
1636       if (last_row == null) {
1637         for (final Entry<byte[],
1638             Iterator<Entry<byte[], ByteMap<TreeMap<Long, byte[]>>>>> iterator :
1639           cursors.entrySet()) {
1640           final Entry<byte[], ByteMap<TreeMap<Long, byte[]>>> row =
1641               iterator.getValue().hasNext() ? iterator.getValue().next() : null;
1642           cf_rows.put(iterator.getKey(), row);
1643           if (last_row == null) {
1644             last_row = row.getKey();
1645           } else {
1646             if (Bytes.memcmp(last_row, row.getKey()) < 0) {
1647               last_row = row.getKey();
1648             }
1649           }
1650         }
1651         return;
1652       }
1653 
1654       for (final Entry<byte[], Entry<byte[], ByteMap<TreeMap<Long, byte[]>>>> cf :
1655         cf_rows.entrySet()) {
1656         final Entry<byte[], ByteMap<TreeMap<Long, byte[]>>> row = cf.getValue();
1657         if (row == null) {
1658           continue;
1659         }
1660 
1661         if (Bytes.memcmp(last_row, row.getKey()) == 0) {
1662           if (!cursors.get(cf.getKey()).hasNext()) {
1663             cf_rows.put(cf.getKey(), null); // EX?
1664           } else {
1665             cf_rows.put(cf.getKey(), cursors.get(cf.getKey()).next());
1666           }
1667         }
1668       }
1669 
1670       last_row = null;
1671       for (final Entry<byte[], ByteMap<TreeMap<Long, byte[]>>> row :
1672         cf_rows.values()) {
1673         if (row == null) {
1674           continue;
1675         }
1676 
1677         if (last_row == null) {
1678           last_row = row.getKey();
1679         } else {
1680           if (Bytes.memcmp(last_row, row.getKey()) < 0) {
1681             last_row = row.getKey();
1682           }
1683         }
1684       }
1685     }
1686 
1687     /** @return The scanner for this mock */
getScanner()1688     public Scanner getScanner() {
1689       return mock_scanner;
1690     }
1691 
1692     /** @return The filter for this mock */
getFilter()1693     public ScanFilter getFilter() {
1694       return filter;
1695     }
1696   }
1697 
1698   /**
1699    * Creates or increments (possibly decrements) a Long in the hash table at the
1700    * given location.
1701    */
1702   private class MockAtomicIncrement implements
1703     Answer<Deferred<Long>> {
1704 
1705     @Override
answer(InvocationOnMock invocation)1706     public Deferred<Long> answer(InvocationOnMock invocation) throws Throwable {
1707       final Object[] args = invocation.getArguments();
1708       final AtomicIncrementRequest air = (AtomicIncrementRequest)args[0];
1709       final long amount = air.getAmount();
1710 
1711       if (exceptions != null) {
1712         final Pair<RuntimeException, Boolean> ex = exceptions.get(air.key());
1713         if (ex != null) {
1714           if (ex.getValue()) {
1715             return Deferred.fromError(ex.getKey());
1716           } else {
1717             throw ex.getKey();
1718           }
1719         }
1720       }
1721 
1722       final ByteMap<ByteMap<ByteMap<TreeMap<Long, byte[]>>>> map =
1723           storage.get(air.table());
1724       if (map == null) {
1725         return Deferred.fromError(new RuntimeException(
1726           "No such table " + Bytes.pretty(air.table())));
1727       }
1728 
1729       final ByteMap<ByteMap<TreeMap<Long, byte[]>>> cf = map.get(air.family());
1730       if (cf == null) {
1731         return Deferred.fromError(new RuntimeException(
1732             "No such CF " + Bytes.pretty(air.table())));
1733       }
1734 
1735       ByteMap<TreeMap<Long, byte[]>> row = cf.get(air.key());
1736       if (row == null) {
1737         row = new ByteMap<TreeMap<Long, byte[]>>();
1738         cf.put(air.key(), row);
1739       }
1740 
1741       TreeMap<Long, byte[]> column = row.get(air.qualifier());
1742       if (column == null) {
1743         column = new TreeMap<Long, byte[]>(Collections.reverseOrder());
1744         row.put(air.qualifier(), column);
1745         column.put(current_timestamp++, Bytes.fromLong(amount));
1746         return Deferred.fromResult(amount);
1747       }
1748 
1749       long incremented_value = Bytes.getLong(column.firstEntry().getValue());
1750       incremented_value += amount;
1751       column.put(column.firstKey(), Bytes.fromLong(incremented_value));
1752       return Deferred.fromResult(incremented_value);
1753     }
1754 
1755   }
1756 
1757 }
1758