1 // This file is part of OpenTSDB. 2 // Copyright (C) 2014 The OpenTSDB Authors. 3 // 4 // This program is free software: you can redistribute it and/or modify it 5 // under the terms of the GNU Lesser General Public License as published by 6 // the Free Software Foundation, either version 2.1 of the License, or (at your 7 // option) any later version. This program is distributed in the hope that it 8 // will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty 9 // of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser 10 // General Public License for more details. You should have received a copy 11 // of the GNU Lesser General Public License along with this program. If not, 12 // see <http://www.gnu.org/licenses/>. 13 package net.opentsdb.tools; 14 15 import java.lang.reflect.Field; 16 import java.lang.reflect.Method; 17 import java.nio.charset.Charset; 18 import java.util.ArrayList; 19 import java.util.Arrays; 20 import java.util.List; 21 22 import net.opentsdb.core.Const; 23 import net.opentsdb.core.Internal; 24 import net.opentsdb.core.RowKey; 25 import net.opentsdb.core.TSDB; 26 import net.opentsdb.uid.UniqueId; 27 28 import org.hbase.async.Bytes; 29 import org.hbase.async.GetRequest; 30 import org.hbase.async.HBaseClient; 31 import org.hbase.async.HBaseException; 32 import org.hbase.async.KeyValue; 33 import org.hbase.async.Scanner; 34 35 /** 36 * Various utilities shared amongst the CLI tools. 37 * @since 2.1 38 */ 39 final class CliUtils { 40 /** Function used to convert a String to a byte[]. */ 41 static final Method toBytes; 42 /** Function used to convert a byte[] to a String. */ 43 static final Method fromBytes; 44 /** Charset used to convert Strings to byte arrays and back. */ 45 static final Charset CHARSET; 46 /** The single column family used by this class. */ 47 static final byte[] ID_FAMILY; 48 /** The single column family used by this class. */ 49 static final byte[] NAME_FAMILY; 50 /** Row key of the special row used to track the max ID already assigned. */ 51 static final byte[] MAXID_ROW; 52 static { 53 final Class<UniqueId> uidclass = UniqueId.class; 54 try { 55 // Those are all implementation details so they're not part of the 56 // interface. We access them anyway using reflection. I think this 57 // is better than marking those and adding a javadoc comment 58 // "THIS IS INTERNAL DO NOT USE". If only Java had C++'s "friend" or 59 // a less stupid notion of a package. 60 Field f; 61 f = uidclass.getDeclaredField("CHARSET"); 62 f.setAccessible(true); 63 CHARSET = (Charset) f.get(null); 64 f = uidclass.getDeclaredField("ID_FAMILY"); 65 f.setAccessible(true); 66 ID_FAMILY = (byte[]) f.get(null); 67 f = uidclass.getDeclaredField("NAME_FAMILY"); 68 f.setAccessible(true); 69 NAME_FAMILY = (byte[]) f.get(null); 70 f = uidclass.getDeclaredField("MAXID_ROW"); 71 f.setAccessible(true); 72 MAXID_ROW = (byte[]) f.get(null); 73 toBytes = uidclass.getDeclaredMethod("toBytes", String.class); 74 toBytes.setAccessible(true); 75 fromBytes = uidclass.getDeclaredMethod("fromBytes", byte[].class); 76 fromBytes.setAccessible(true); 77 } catch (Exception e) { 78 throw new RuntimeException("static initializer failed", e); 79 } 80 } 81 /** Qualifier for metrics meta data */ 82 static final byte[] METRICS_META = "metric_meta".getBytes(CHARSET); 83 /** Qualifier for tagk meta data */ 84 static final byte[] TAGK_META = "tagk_meta".getBytes(CHARSET); 85 /** Qualifier for tagv meta data */ 86 static final byte[] TAGV_META = "tagv_meta".getBytes(CHARSET); 87 /** Qualifier for metrics UIDs */ 88 static final byte[] METRICS = "metrics".getBytes(CHARSET); 89 /** Qualifier for tagk UIDs */ 90 static final byte[] TAGK = "tagk".getBytes(CHARSET); 91 /** Qualifier for tagv UIDs */ 92 static final byte[] TAGV = "tagv".getBytes(CHARSET); 93 94 /** 95 * Returns the max metric ID from the UID table 96 * @param tsdb The TSDB to use for data access 97 * @return The max metric ID as an integer value, may be 0 if the UID table 98 * hasn't been initialized or is missing the UID row or metrics column. 99 * @throws IllegalStateException if the UID column can't be found or couldn't 100 * be parsed 101 */ getMaxMetricID(final TSDB tsdb)102 static long getMaxMetricID(final TSDB tsdb) { 103 // first up, we need the max metric ID so we can split up the data table 104 // amongst threads. 105 final GetRequest get = new GetRequest(tsdb.uidTable(), new byte[] { 0 }); 106 get.family("id".getBytes(CHARSET)); 107 get.qualifier("metrics".getBytes(CHARSET)); 108 ArrayList<KeyValue> row; 109 try { 110 row = tsdb.getClient().get(get).joinUninterruptibly(); 111 if (row == null || row.isEmpty()) { 112 return 0; 113 } 114 final byte[] id_bytes = row.get(0).value(); 115 if (id_bytes.length != 8) { 116 throw new IllegalStateException("Invalid metric max UID, wrong # of bytes"); 117 } 118 return Bytes.getLong(id_bytes); 119 } catch (Exception e) { 120 throw new RuntimeException("Shouldn't be here", e); 121 } 122 } 123 124 /** 125 * Returns a scanner set to iterate over a range of metrics in the main 126 * tsdb-data table. 127 * @param tsdb The TSDB to use for data access 128 * @param start_id A metric ID to start scanning on 129 * @param end_id A metric ID to end scanning on 130 * @return A scanner on the "t" CF configured for the specified range 131 * @throws HBaseException if something goes pear shaped 132 */ getDataTableScanner(final TSDB tsdb, final long start_id, final long end_id)133 static final Scanner getDataTableScanner(final TSDB tsdb, final long start_id, 134 final long end_id) throws HBaseException { 135 final short metric_width = TSDB.metrics_width(); 136 final byte[] start_row = 137 Arrays.copyOfRange(Bytes.fromLong(start_id), 8 - metric_width, 8); 138 final byte[] end_row = 139 Arrays.copyOfRange(Bytes.fromLong(end_id), 8 - metric_width, 8); 140 141 final Scanner scanner = tsdb.getClient().newScanner(tsdb.dataTable()); 142 scanner.setStartKey(start_row); 143 scanner.setStopKey(end_row); 144 scanner.setFamily(TSDB.FAMILY()); 145 return scanner; 146 } 147 148 /** 149 * Generates a list of Scanners to use for iterating over the full TSDB 150 * data table. If salting is enabled then {@link Const.SaltBukets()} scanners 151 * will be returned. If salting is disabled then {@link num_scanners} 152 * scanners will be returned. 153 * @param tsdb The TSDB to generate scanners from 154 * @param num_scanners The max number of scanners if salting is disabled 155 * @return A list of scanners to use for scanning the table. 156 */ getDataTableScanners(final TSDB tsdb, final int num_scanners)157 static final List<Scanner> getDataTableScanners(final TSDB tsdb, 158 final int num_scanners) { 159 if (num_scanners < 1) { 160 throw new IllegalArgumentException( 161 "Number of scanners must be 1 or more: " + num_scanners); 162 } 163 // TODO - It would be neater to get a list of regions then create scanners 164 // on those boundaries. We'll have to modify AsyncHBase for that to avoid 165 // creating lots of custom HBase logic in here. 166 final short metric_width = TSDB.metrics_width(); 167 final List<Scanner> scanners = new ArrayList<Scanner>(); 168 169 if (Const.SALT_WIDTH() > 0) { 170 // salting is enabled so we'll create one scanner per salt for now 171 byte[] start_key = HBaseClient.EMPTY_ARRAY; 172 byte[] stop_key = HBaseClient.EMPTY_ARRAY; 173 174 for (int i = 1; i < Const.SALT_BUCKETS() + 1; i++) { 175 // move stop key to start key 176 if (i > 1) { 177 start_key = Arrays.copyOf(stop_key, stop_key.length); 178 } 179 180 if (i >= Const.SALT_BUCKETS()) { 181 stop_key = HBaseClient.EMPTY_ARRAY; 182 } else { 183 stop_key = RowKey.getSaltBytes(i); 184 } 185 final Scanner scanner = tsdb.getClient().newScanner(tsdb.dataTable()); 186 scanner.setStartKey(Arrays.copyOf(start_key, start_key.length)); 187 scanner.setStopKey(Arrays.copyOf(stop_key, stop_key.length)); 188 scanner.setFamily(TSDB.FAMILY()); 189 scanners.add(scanner); 190 } 191 192 } else { 193 // No salt, just go by the max metric ID 194 long max_id = CliUtils.getMaxMetricID(tsdb); 195 if (max_id < 1) { 196 max_id = Internal.getMaxUnsignedValueOnBytes(metric_width); 197 } 198 final long quotient = max_id % num_scanners == 0 ? max_id / num_scanners : 199 (max_id / num_scanners) + 1; 200 201 byte[] start_key = HBaseClient.EMPTY_ARRAY; 202 byte[] stop_key = new byte[metric_width]; 203 204 for (int i = 0; i < num_scanners; i++) { 205 // move stop key to start key 206 if (i > 0) { 207 start_key = Arrays.copyOf(stop_key, stop_key.length); 208 } 209 210 // setup the next stop key 211 final byte[] stop_id; 212 if ((i +1) * quotient > max_id) { 213 stop_id = null; 214 } else { 215 stop_id = Bytes.fromLong((i + 1) * quotient); 216 } 217 if ((i +1) * quotient >= max_id) { 218 stop_key = HBaseClient.EMPTY_ARRAY; 219 } else { 220 System.arraycopy(stop_id, stop_id.length - metric_width, stop_key, 221 0, metric_width); 222 } 223 224 final Scanner scanner = tsdb.getClient().newScanner(tsdb.dataTable()); 225 scanner.setStartKey(Arrays.copyOf(start_key, start_key.length)); 226 if (stop_key != null) { 227 scanner.setStopKey(Arrays.copyOf(stop_key, stop_key.length)); 228 } 229 scanner.setFamily(TSDB.FAMILY()); 230 scanners.add(scanner); 231 } 232 233 } 234 235 return scanners; 236 } 237 238 /** 239 * Invokes the reflected {@code UniqueId.toBytes()} method with the given 240 * string using the UniqueId character set. 241 * @param s The string to convert to a byte array 242 * @return The byte array 243 * @throws RuntimeException if reflection failed 244 */ toBytes(final String s)245 static byte[] toBytes(final String s) { 246 try { 247 return (byte[]) toBytes.invoke(null, s); 248 } catch (Exception e) { 249 throw new RuntimeException("toBytes=" + toBytes, e); 250 } 251 } 252 253 /** 254 * Invokces the reflected {@code UnqieuiId.fromBytes()} method with the given 255 * byte array using the UniqueId character set. 256 * @param b The byte array to convert to a string 257 * @return The string 258 * @throws RuntimeException if reflection failed 259 */ fromBytes(final byte[] b)260 static String fromBytes(final byte[] b) { 261 try { 262 return (String) fromBytes.invoke(null, b); 263 } catch (Exception e) { 264 throw new RuntimeException("fromBytes=" + fromBytes, e); 265 } 266 } 267 } 268