1 // This file is part of OpenTSDB.
2 // Copyright (C) 2014  The OpenTSDB Authors.
3 //
4 // This program is free software: you can redistribute it and/or modify it
5 // under the terms of the GNU Lesser General Public License as published by
6 // the Free Software Foundation, either version 2.1 of the License, or (at your
7 // option) any later version.  This program is distributed in the hope that it
8 // will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
9 // of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser
10 // General Public License for more details.  You should have received a copy
11 // of the GNU Lesser General Public License along with this program.  If not,
12 // see <http://www.gnu.org/licenses/>.
13 package net.opentsdb.tools;
14 
15 import java.lang.reflect.Field;
16 import java.lang.reflect.Method;
17 import java.nio.charset.Charset;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.List;
21 
22 import net.opentsdb.core.Const;
23 import net.opentsdb.core.Internal;
24 import net.opentsdb.core.RowKey;
25 import net.opentsdb.core.TSDB;
26 import net.opentsdb.uid.UniqueId;
27 
28 import org.hbase.async.Bytes;
29 import org.hbase.async.GetRequest;
30 import org.hbase.async.HBaseClient;
31 import org.hbase.async.HBaseException;
32 import org.hbase.async.KeyValue;
33 import org.hbase.async.Scanner;
34 
35 /**
36  * Various utilities shared amongst the CLI tools.
37  * @since 2.1
38  */
39 final class CliUtils {
40   /** Function used to convert a String to a byte[]. */
41   static final Method toBytes;
42   /** Function used to convert a byte[] to a String. */
43   static final Method fromBytes;
44   /** Charset used to convert Strings to byte arrays and back. */
45   static final Charset CHARSET;
46   /** The single column family used by this class. */
47   static final byte[] ID_FAMILY;
48   /** The single column family used by this class. */
49   static final byte[] NAME_FAMILY;
50   /** Row key of the special row used to track the max ID already assigned. */
51   static final byte[] MAXID_ROW;
52   static {
53     final Class<UniqueId> uidclass = UniqueId.class;
54     try {
55       // Those are all implementation details so they're not part of the
56       // interface.  We access them anyway using reflection.  I think this
57       // is better than marking those and adding a javadoc comment
58       // "THIS IS INTERNAL DO NOT USE".  If only Java had C++'s "friend" or
59       // a less stupid notion of a package.
60       Field f;
61       f = uidclass.getDeclaredField("CHARSET");
62       f.setAccessible(true);
63       CHARSET = (Charset) f.get(null);
64       f = uidclass.getDeclaredField("ID_FAMILY");
65       f.setAccessible(true);
66       ID_FAMILY = (byte[]) f.get(null);
67       f = uidclass.getDeclaredField("NAME_FAMILY");
68       f.setAccessible(true);
69       NAME_FAMILY = (byte[]) f.get(null);
70       f = uidclass.getDeclaredField("MAXID_ROW");
71       f.setAccessible(true);
72       MAXID_ROW = (byte[]) f.get(null);
73       toBytes = uidclass.getDeclaredMethod("toBytes", String.class);
74       toBytes.setAccessible(true);
75       fromBytes = uidclass.getDeclaredMethod("fromBytes", byte[].class);
76       fromBytes.setAccessible(true);
77     } catch (Exception e) {
78       throw new RuntimeException("static initializer failed", e);
79     }
80   }
81   /** Qualifier for metrics meta data */
82   static final byte[] METRICS_META = "metric_meta".getBytes(CHARSET);
83   /** Qualifier for tagk meta data */
84   static final byte[] TAGK_META = "tagk_meta".getBytes(CHARSET);
85   /** Qualifier for tagv meta data */
86   static final byte[] TAGV_META = "tagv_meta".getBytes(CHARSET);
87   /** Qualifier for metrics UIDs */
88   static final byte[] METRICS = "metrics".getBytes(CHARSET);
89   /** Qualifier for tagk UIDs */
90   static final byte[] TAGK = "tagk".getBytes(CHARSET);
91   /** Qualifier for tagv UIDs */
92   static final byte[] TAGV = "tagv".getBytes(CHARSET);
93 
94   /**
95    * Returns the max metric ID from the UID table
96    * @param tsdb The TSDB to use for data access
97    * @return The max metric ID as an integer value, may be 0 if the UID table
98    * hasn't been initialized or is missing the UID row or metrics column.
99    * @throws IllegalStateException if the UID column can't be found or couldn't
100    * be parsed
101    */
getMaxMetricID(final TSDB tsdb)102   static long getMaxMetricID(final TSDB tsdb) {
103     // first up, we need the max metric ID so we can split up the data table
104     // amongst threads.
105     final GetRequest get = new GetRequest(tsdb.uidTable(), new byte[] { 0 });
106     get.family("id".getBytes(CHARSET));
107     get.qualifier("metrics".getBytes(CHARSET));
108     ArrayList<KeyValue> row;
109     try {
110       row = tsdb.getClient().get(get).joinUninterruptibly();
111       if (row == null || row.isEmpty()) {
112         return 0;
113       }
114       final byte[] id_bytes = row.get(0).value();
115       if (id_bytes.length != 8) {
116         throw new IllegalStateException("Invalid metric max UID, wrong # of bytes");
117       }
118       return Bytes.getLong(id_bytes);
119     } catch (Exception e) {
120       throw new RuntimeException("Shouldn't be here", e);
121     }
122   }
123 
124   /**
125    * Returns a scanner set to iterate over a range of metrics in the main
126    * tsdb-data table.
127    * @param tsdb The TSDB to use for data access
128    * @param start_id A metric ID to start scanning on
129    * @param end_id A metric ID to end scanning on
130    * @return A scanner on the "t" CF configured for the specified range
131    * @throws HBaseException if something goes pear shaped
132    */
getDataTableScanner(final TSDB tsdb, final long start_id, final long end_id)133   static final Scanner getDataTableScanner(final TSDB tsdb, final long start_id,
134       final long end_id) throws HBaseException {
135     final short metric_width = TSDB.metrics_width();
136     final byte[] start_row =
137       Arrays.copyOfRange(Bytes.fromLong(start_id), 8 - metric_width, 8);
138     final byte[] end_row =
139       Arrays.copyOfRange(Bytes.fromLong(end_id), 8 - metric_width, 8);
140 
141     final Scanner scanner = tsdb.getClient().newScanner(tsdb.dataTable());
142     scanner.setStartKey(start_row);
143     scanner.setStopKey(end_row);
144     scanner.setFamily(TSDB.FAMILY());
145     return scanner;
146   }
147 
148   /**
149    * Generates a list of Scanners to use for iterating over the full TSDB
150    * data table. If salting is enabled then {@link Const.SaltBukets()} scanners
151    * will be returned. If salting is disabled then {@link num_scanners}
152    * scanners will be returned.
153    * @param tsdb The TSDB to generate scanners from
154    * @param num_scanners The max number of scanners if salting is disabled
155    * @return A list of scanners to use for scanning the table.
156    */
getDataTableScanners(final TSDB tsdb, final int num_scanners)157   static final List<Scanner> getDataTableScanners(final TSDB tsdb,
158       final int num_scanners) {
159     if (num_scanners < 1) {
160       throw new IllegalArgumentException(
161           "Number of scanners must be 1 or more: " + num_scanners);
162     }
163     // TODO - It would be neater to get a list of regions then create scanners
164     // on those boundaries. We'll have to modify AsyncHBase for that to avoid
165     // creating lots of custom HBase logic in here.
166     final short metric_width = TSDB.metrics_width();
167     final List<Scanner> scanners = new ArrayList<Scanner>();
168 
169     if (Const.SALT_WIDTH() > 0) {
170       // salting is enabled so we'll create one scanner per salt for now
171       byte[] start_key = HBaseClient.EMPTY_ARRAY;
172       byte[] stop_key = HBaseClient.EMPTY_ARRAY;
173 
174       for (int i = 1; i < Const.SALT_BUCKETS() + 1; i++) {
175         // move stop key to start key
176         if (i > 1) {
177           start_key = Arrays.copyOf(stop_key, stop_key.length);
178         }
179 
180         if (i >= Const.SALT_BUCKETS()) {
181           stop_key = HBaseClient.EMPTY_ARRAY;
182         } else {
183           stop_key = RowKey.getSaltBytes(i);
184         }
185         final Scanner scanner = tsdb.getClient().newScanner(tsdb.dataTable());
186         scanner.setStartKey(Arrays.copyOf(start_key, start_key.length));
187         scanner.setStopKey(Arrays.copyOf(stop_key, stop_key.length));
188         scanner.setFamily(TSDB.FAMILY());
189         scanners.add(scanner);
190       }
191 
192     } else {
193       // No salt, just go by the max metric ID
194       long max_id = CliUtils.getMaxMetricID(tsdb);
195       if (max_id < 1) {
196         max_id = Internal.getMaxUnsignedValueOnBytes(metric_width);
197       }
198       final long quotient = max_id % num_scanners == 0 ? max_id / num_scanners :
199         (max_id / num_scanners) + 1;
200 
201       byte[] start_key = HBaseClient.EMPTY_ARRAY;
202       byte[] stop_key = new byte[metric_width];
203 
204       for (int i = 0; i < num_scanners; i++) {
205         // move stop key to start key
206         if (i > 0) {
207           start_key = Arrays.copyOf(stop_key, stop_key.length);
208         }
209 
210         // setup the next stop key
211         final byte[] stop_id;
212         if ((i +1) * quotient > max_id) {
213           stop_id = null;
214         } else {
215           stop_id = Bytes.fromLong((i + 1) * quotient);
216         }
217         if ((i +1) * quotient >= max_id) {
218           stop_key = HBaseClient.EMPTY_ARRAY;
219         } else {
220           System.arraycopy(stop_id, stop_id.length - metric_width, stop_key,
221               0, metric_width);
222         }
223 
224         final Scanner scanner = tsdb.getClient().newScanner(tsdb.dataTable());
225         scanner.setStartKey(Arrays.copyOf(start_key, start_key.length));
226         if (stop_key != null) {
227           scanner.setStopKey(Arrays.copyOf(stop_key, stop_key.length));
228         }
229         scanner.setFamily(TSDB.FAMILY());
230         scanners.add(scanner);
231       }
232 
233     }
234 
235     return scanners;
236   }
237 
238   /**
239    * Invokes the reflected {@code UniqueId.toBytes()} method with the given
240    * string  using the UniqueId character set.
241    * @param s The string to convert to a byte array
242    * @return The byte array
243    * @throws RuntimeException if reflection failed
244    */
toBytes(final String s)245   static byte[] toBytes(final String s) {
246     try {
247       return (byte[]) toBytes.invoke(null, s);
248     } catch (Exception e) {
249       throw new RuntimeException("toBytes=" + toBytes, e);
250     }
251   }
252 
253   /**
254    * Invokces the reflected {@code UnqieuiId.fromBytes()} method with the given
255    * byte array using the UniqueId character set.
256    * @param b The byte array to convert to a string
257    * @return The string
258    * @throws RuntimeException if reflection failed
259    */
fromBytes(final byte[] b)260   static String fromBytes(final byte[] b) {
261     try {
262       return (String) fromBytes.invoke(null, b);
263     } catch (Exception e) {
264       throw new RuntimeException("fromBytes=" + fromBytes, e);
265     }
266   }
267 }
268