1 // This file is part of OpenTSDB. 2 // Copyright (C) 2010-2014 The OpenTSDB Authors. 3 // 4 // This program is free software: you can redistribute it and/or modify it 5 // under the terms of the GNU Lesser General Public License as published by 6 // the Free Software Foundation, either version 2.1 of the License, or (at your 7 // option) any later version. This program is distributed in the hope that it 8 // will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty 9 // of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser 10 // General Public License for more details. You should have received a copy 11 // of the GNU Lesser General Public License along with this program. If not, 12 // see <http://www.gnu.org/licenses/>. 13 package net.opentsdb.tools; 14 15 import java.io.BufferedReader; 16 import java.io.FileInputStream; 17 import java.io.IOException; 18 import java.io.InputStream; 19 import java.io.InputStreamReader; 20 import java.util.HashMap; 21 import java.util.zip.GZIPInputStream; 22 23 import com.stumbleupon.async.Callback; 24 import com.stumbleupon.async.Deferred; 25 26 import org.slf4j.Logger; 27 import org.slf4j.LoggerFactory; 28 29 import org.hbase.async.HBaseClient; 30 import org.hbase.async.HBaseRpc; 31 import org.hbase.async.PleaseThrottleException; 32 import org.hbase.async.PutRequest; 33 34 import net.opentsdb.core.Tags; 35 import net.opentsdb.core.TSDB; 36 import net.opentsdb.core.WritableDataPoints; 37 import net.opentsdb.stats.StatsCollector; 38 import net.opentsdb.utils.Config; 39 40 final class TextImporter { 41 42 private static final Logger LOG = LoggerFactory.getLogger(TextImporter.class); 43 44 /** Prints usage and exits with the given retval. */ usage(final ArgP argp, final int retval)45 static void usage(final ArgP argp, final int retval) { 46 System.err.println("Usage: import path [more paths]"); 47 System.err.print(argp.usage()); 48 System.err.println("This tool can directly read gzip'ed input files."); 49 System.exit(retval); 50 } 51 main(String[] args)52 public static void main(String[] args) throws Exception { 53 ArgP argp = new ArgP(); 54 CliOptions.addCommon(argp); 55 CliOptions.addAutoMetricFlag(argp); 56 argp.addOption("--skip-errors", "Whether or not to skip exceptions " 57 + "during processing"); 58 args = CliOptions.parse(argp, args); 59 if (args == null) { 60 usage(argp, 1); 61 } else if (args.length < 1) { 62 usage(argp, 2); 63 } 64 65 // get a config object 66 Config config = CliOptions.getConfig(argp); 67 68 final TSDB tsdb = new TSDB(config); 69 final boolean skip_errors = argp.has("--skip-errors"); 70 tsdb.checkNecessaryTablesExist().joinUninterruptibly(); 71 argp = null; 72 try { 73 int points = 0; 74 final long start_time = System.nanoTime(); 75 for (final String path : args) { 76 points += importFile(tsdb.getClient(), tsdb, path, skip_errors); 77 } 78 final double time_delta = (System.nanoTime() - start_time) / 1000000000.0; 79 LOG.info(String.format("Total: imported %d data points in %.3fs" 80 + " (%.1f points/s)", 81 points, time_delta, (points / time_delta))); 82 // TODO(tsuna): Figure out something better than just writing to stderr. 83 tsdb.collectStats(new StatsCollector("tsd") { 84 @Override 85 public final void emit(final String line) { 86 System.err.print(line); 87 } 88 }); 89 } finally { 90 try { 91 tsdb.shutdown().joinUninterruptibly(); 92 } catch (Exception e) { 93 LOG.error("Unexpected exception", e); 94 System.exit(1); 95 } 96 } 97 } 98 99 static volatile boolean throttle = false; 100 importFile(final HBaseClient client, final TSDB tsdb, final String path, final boolean skip_errors)101 private static int importFile(final HBaseClient client, 102 final TSDB tsdb, 103 final String path, 104 final boolean skip_errors) throws IOException { 105 final long start_time = System.nanoTime(); 106 long ping_start_time = start_time; 107 final BufferedReader in = open(path); 108 String line = null; 109 int points = 0; 110 try { 111 final class Errback implements Callback<Object, Exception> { 112 public Object call(final Exception arg) { 113 if (arg instanceof PleaseThrottleException) { 114 final PleaseThrottleException e = (PleaseThrottleException) arg; 115 LOG.warn("Need to throttle, HBase isn't keeping up.", e); 116 throttle = true; 117 final HBaseRpc rpc = e.getFailedRpc(); 118 if (rpc instanceof PutRequest) { 119 client.put((PutRequest) rpc); // Don't lose edits. 120 } 121 return null; 122 } 123 LOG.error("Exception caught while processing file " 124 + path, arg); 125 System.exit(2); 126 return arg; 127 } 128 public String toString() { 129 return "importFile errback"; 130 } 131 }; 132 final Errback errback = new Errback(); 133 LOG.info("reading from file:" + path); 134 while ((line = in.readLine()) != null) { 135 final String[] words = Tags.splitString(line, ' '); 136 final String metric = words[0]; 137 if (metric.length() <= 0) { 138 if (skip_errors) { 139 LOG.error("invalid metric: " + metric); 140 LOG.error("error while processing file " 141 + path + " line=" + line + "... Continuing"); 142 continue; 143 } else { 144 throw new RuntimeException("invalid metric: " + metric); 145 } 146 } 147 final long timestamp; 148 try { 149 timestamp = Tags.parseLong(words[1]); 150 if (timestamp <= 0) { 151 if (skip_errors) { 152 LOG.error("invalid timestamp: " + timestamp); 153 LOG.error("error while processing file " 154 + path + " line=" + line + "... Continuing"); 155 continue; 156 } else { 157 throw new RuntimeException("invalid timestamp: " + timestamp); 158 } 159 } 160 } catch (final RuntimeException e) { 161 if (skip_errors) { 162 LOG.error("invalid timestamp: " + e.getMessage()); 163 LOG.error("error while processing file " 164 + path + " line=" + line + "... Continuing"); 165 continue; 166 } else { 167 throw e; 168 } 169 } 170 171 final String value = words[2]; 172 if (value.length() <= 0) { 173 if (skip_errors) { 174 LOG.error("invalid value: " + value); 175 LOG.error("error while processing file " 176 + path + " line=" + line + "... Continuing"); 177 continue; 178 } else { 179 throw new RuntimeException("invalid value: " + value); 180 } 181 } 182 183 try { 184 final HashMap<String, String> tags = new HashMap<String, String>(); 185 for (int i = 3; i < words.length; i++) { 186 if (!words[i].isEmpty()) { 187 Tags.parse(tags, words[i]); 188 } 189 } 190 191 final WritableDataPoints dp = getDataPoints(tsdb, metric, tags); 192 Deferred<Object> d; 193 if (Tags.looksLikeInteger(value)) { 194 d = dp.addPoint(timestamp, Tags.parseLong(value)); 195 } else { // floating point value 196 d = dp.addPoint(timestamp, Float.parseFloat(value)); 197 } 198 d.addErrback(errback); 199 points++; 200 if (points % 1000000 == 0) { 201 final long now = System.nanoTime(); 202 ping_start_time = (now - ping_start_time) / 1000000; 203 LOG.info(String.format("... %d data points in %dms (%.1f points/s)", 204 points, ping_start_time, 205 (1000000 * 1000.0 / ping_start_time))); 206 ping_start_time = now; 207 } 208 if (throttle) { 209 LOG.info("Throttling..."); 210 long throttle_time = System.nanoTime(); 211 try { 212 d.joinUninterruptibly(); 213 } catch (final Exception e) { 214 throw new RuntimeException("Should never happen", e); 215 } 216 throttle_time = System.nanoTime() - throttle_time; 217 if (throttle_time < 1000000000L) { 218 LOG.info("Got throttled for only " + throttle_time + 219 "ns, sleeping a bit now"); 220 try { 221 Thread.sleep(1000); 222 } catch (InterruptedException e) { 223 throw new RuntimeException("interrupted", e); 224 } 225 } 226 LOG.info("Done throttling..."); 227 throttle = false; 228 } 229 } catch (final RuntimeException e) { 230 if (skip_errors) { 231 LOG.error("Exception: " + e.getMessage()); 232 LOG.error("error while processing file " 233 + path + " line=" + line + "... Continuing"); 234 continue; 235 } else { 236 throw e; 237 } 238 } 239 } 240 } catch (RuntimeException e) { 241 LOG.error("Exception caught while processing file " 242 + path + " line=[" + line + "]", e); 243 throw e; 244 } finally { 245 in.close(); 246 } 247 final long time_delta = (System.nanoTime() - start_time) / 1000000; 248 LOG.info(String.format("Processed %s in %d ms, %d data points" 249 + " (%.1f points/s)", 250 path, time_delta, points, 251 (points * 1000.0 / time_delta))); 252 return points; 253 } 254 255 /** 256 * Opens a file for reading, handling gzipped files. 257 * @param path The file to open. 258 * @return A buffered reader to read the file, decompressing it if needed. 259 * @throws IOException when shit happens. 260 */ open(final String path)261 private static BufferedReader open(final String path) throws IOException { 262 if (path.equals("-")) { 263 return new BufferedReader(new InputStreamReader(System.in)); 264 } 265 266 InputStream is = new FileInputStream(path); 267 if (path.endsWith(".gz")) { 268 is = new GZIPInputStream(is); 269 } 270 // I <3 Java's IO library. 271 return new BufferedReader(new InputStreamReader(is)); 272 } 273 274 private static final HashMap<String, WritableDataPoints> datapoints = 275 new HashMap<String, WritableDataPoints>(); 276 277 private static getDataPoints(final TSDB tsdb, final String metric, final HashMap<String, String> tags)278 WritableDataPoints getDataPoints(final TSDB tsdb, 279 final String metric, 280 final HashMap<String, String> tags) { 281 final String key = metric + tags; 282 WritableDataPoints dp = datapoints.get(key); 283 if (dp != null) { 284 return dp; 285 } 286 dp = tsdb.newDataPoints(); 287 dp.setSeries(metric, tags); 288 dp.setBatchImport(true); 289 datapoints.put(key, dp); 290 return dp; 291 } 292 293 } 294