1 // This file is part of OpenTSDB.
2 // Copyright (C) 2010-2014  The OpenTSDB Authors.
3 //
4 // This program is free software: you can redistribute it and/or modify it
5 // under the terms of the GNU Lesser General Public License as published by
6 // the Free Software Foundation, either version 2.1 of the License, or (at your
7 // option) any later version.  This program is distributed in the hope that it
8 // will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
9 // of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser
10 // General Public License for more details.  You should have received a copy
11 // of the GNU Lesser General Public License along with this program.  If not,
12 // see <http://www.gnu.org/licenses/>.
13 package net.opentsdb.tools;
14 
15 import java.io.BufferedReader;
16 import java.io.FileInputStream;
17 import java.io.IOException;
18 import java.io.InputStream;
19 import java.io.InputStreamReader;
20 import java.util.HashMap;
21 import java.util.zip.GZIPInputStream;
22 
23 import com.stumbleupon.async.Callback;
24 import com.stumbleupon.async.Deferred;
25 
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 
29 import org.hbase.async.HBaseClient;
30 import org.hbase.async.HBaseRpc;
31 import org.hbase.async.PleaseThrottleException;
32 import org.hbase.async.PutRequest;
33 
34 import net.opentsdb.core.Tags;
35 import net.opentsdb.core.TSDB;
36 import net.opentsdb.core.WritableDataPoints;
37 import net.opentsdb.stats.StatsCollector;
38 import net.opentsdb.utils.Config;
39 
40 final class TextImporter {
41 
42   private static final Logger LOG = LoggerFactory.getLogger(TextImporter.class);
43 
44   /** Prints usage and exits with the given retval.  */
usage(final ArgP argp, final int retval)45   static void usage(final ArgP argp, final int retval) {
46     System.err.println("Usage: import path [more paths]");
47     System.err.print(argp.usage());
48     System.err.println("This tool can directly read gzip'ed input files.");
49     System.exit(retval);
50   }
51 
main(String[] args)52   public static void main(String[] args) throws Exception {
53     ArgP argp = new ArgP();
54     CliOptions.addCommon(argp);
55     CliOptions.addAutoMetricFlag(argp);
56     argp.addOption("--skip-errors", "Whether or not to skip exceptions "
57         + "during processing");
58     args = CliOptions.parse(argp, args);
59     if (args == null) {
60       usage(argp, 1);
61     } else if (args.length < 1) {
62       usage(argp, 2);
63     }
64 
65     // get a config object
66     Config config = CliOptions.getConfig(argp);
67 
68     final TSDB tsdb = new TSDB(config);
69     final boolean skip_errors = argp.has("--skip-errors");
70     tsdb.checkNecessaryTablesExist().joinUninterruptibly();
71     argp = null;
72     try {
73       int points = 0;
74       final long start_time = System.nanoTime();
75       for (final String path : args) {
76         points += importFile(tsdb.getClient(), tsdb, path, skip_errors);
77       }
78       final double time_delta = (System.nanoTime() - start_time) / 1000000000.0;
79       LOG.info(String.format("Total: imported %d data points in %.3fs"
80                              + " (%.1f points/s)",
81                              points, time_delta, (points / time_delta)));
82       // TODO(tsuna): Figure out something better than just writing to stderr.
83       tsdb.collectStats(new StatsCollector("tsd") {
84         @Override
85         public final void emit(final String line) {
86           System.err.print(line);
87         }
88       });
89     } finally {
90       try {
91         tsdb.shutdown().joinUninterruptibly();
92       } catch (Exception e) {
93         LOG.error("Unexpected exception", e);
94         System.exit(1);
95       }
96     }
97   }
98 
99   static volatile boolean throttle = false;
100 
importFile(final HBaseClient client, final TSDB tsdb, final String path, final boolean skip_errors)101   private static int importFile(final HBaseClient client,
102                                 final TSDB tsdb,
103                                 final String path,
104                                 final boolean skip_errors) throws IOException {
105     final long start_time = System.nanoTime();
106     long ping_start_time = start_time;
107     final BufferedReader in = open(path);
108     String line = null;
109     int points = 0;
110     try {
111       final class Errback implements Callback<Object, Exception> {
112         public Object call(final Exception arg) {
113           if (arg instanceof PleaseThrottleException) {
114             final PleaseThrottleException e = (PleaseThrottleException) arg;
115             LOG.warn("Need to throttle, HBase isn't keeping up.", e);
116             throttle = true;
117             final HBaseRpc rpc = e.getFailedRpc();
118             if (rpc instanceof PutRequest) {
119               client.put((PutRequest) rpc);  // Don't lose edits.
120             }
121             return null;
122           }
123           LOG.error("Exception caught while processing file "
124                     + path, arg);
125           System.exit(2);
126           return arg;
127         }
128         public String toString() {
129           return "importFile errback";
130         }
131       };
132       final Errback errback = new Errback();
133       LOG.info("reading from file:" + path);
134       while ((line = in.readLine()) != null) {
135         final String[] words = Tags.splitString(line, ' ');
136         final String metric = words[0];
137         if (metric.length() <= 0) {
138           if (skip_errors) {
139             LOG.error("invalid metric: " + metric);
140             LOG.error("error while processing file "
141                       + path + " line=" + line + "... Continuing");
142             continue;
143           } else {
144             throw new RuntimeException("invalid metric: " + metric);
145           }
146         }
147         final long timestamp;
148         try {
149           timestamp = Tags.parseLong(words[1]);
150           if (timestamp <= 0) {
151             if (skip_errors) {
152               LOG.error("invalid timestamp: " + timestamp);
153               LOG.error("error while processing file "
154                         + path + " line=" + line + "... Continuing");
155               continue;
156             } else {
157               throw new RuntimeException("invalid timestamp: " + timestamp);
158             }
159           }
160         } catch (final RuntimeException e) {
161           if (skip_errors) {
162             LOG.error("invalid timestamp: " + e.getMessage());
163             LOG.error("error while processing file "
164                       + path + " line=" + line + "... Continuing");
165             continue;
166           } else {
167             throw e;
168           }
169         }
170 
171         final String value = words[2];
172         if (value.length() <= 0) {
173           if (skip_errors) {
174             LOG.error("invalid value: " + value);
175             LOG.error("error while processing file "
176                       + path + " line=" + line + "... Continuing");
177             continue;
178           } else {
179             throw new RuntimeException("invalid value: " + value);
180           }
181         }
182 
183         try {
184           final HashMap<String, String> tags = new HashMap<String, String>();
185           for (int i = 3; i < words.length; i++) {
186             if (!words[i].isEmpty()) {
187               Tags.parse(tags, words[i]);
188             }
189           }
190 
191           final WritableDataPoints dp = getDataPoints(tsdb, metric, tags);
192           Deferred<Object> d;
193           if (Tags.looksLikeInteger(value)) {
194             d = dp.addPoint(timestamp, Tags.parseLong(value));
195           } else {  // floating point value
196             d = dp.addPoint(timestamp, Float.parseFloat(value));
197           }
198           d.addErrback(errback);
199           points++;
200           if (points % 1000000 == 0) {
201             final long now = System.nanoTime();
202             ping_start_time = (now - ping_start_time) / 1000000;
203             LOG.info(String.format("... %d data points in %dms (%.1f points/s)",
204                                    points, ping_start_time,
205                                    (1000000 * 1000.0 / ping_start_time)));
206             ping_start_time = now;
207           }
208           if (throttle) {
209             LOG.info("Throttling...");
210             long throttle_time = System.nanoTime();
211             try {
212               d.joinUninterruptibly();
213             } catch (final Exception e) {
214               throw new RuntimeException("Should never happen", e);
215             }
216             throttle_time = System.nanoTime() - throttle_time;
217             if (throttle_time < 1000000000L) {
218               LOG.info("Got throttled for only " + throttle_time +
219                   "ns, sleeping a bit now");
220               try {
221                 Thread.sleep(1000);
222               } catch (InterruptedException e) {
223                 throw new RuntimeException("interrupted", e);
224               }
225             }
226             LOG.info("Done throttling...");
227             throttle = false;
228           }
229         } catch (final RuntimeException e) {
230           if (skip_errors) {
231             LOG.error("Exception: " + e.getMessage());
232             LOG.error("error while processing file "
233                       + path + " line=" + line + "... Continuing");
234             continue;
235           } else {
236             throw e;
237           }
238         }
239       }
240     } catch (RuntimeException e) {
241         LOG.error("Exception caught while processing file "
242                   + path + " line=[" + line + "]", e);
243         throw e;
244     } finally {
245       in.close();
246     }
247     final long time_delta = (System.nanoTime() - start_time) / 1000000;
248     LOG.info(String.format("Processed %s in %d ms, %d data points"
249                            + " (%.1f points/s)",
250                            path, time_delta, points,
251                            (points * 1000.0 / time_delta)));
252     return points;
253   }
254 
255   /**
256    * Opens a file for reading, handling gzipped files.
257    * @param path The file to open.
258    * @return A buffered reader to read the file, decompressing it if needed.
259    * @throws IOException when shit happens.
260    */
open(final String path)261   private static BufferedReader open(final String path) throws IOException {
262     if (path.equals("-")) {
263       return new BufferedReader(new InputStreamReader(System.in));
264     }
265 
266     InputStream is = new FileInputStream(path);
267     if (path.endsWith(".gz")) {
268       is = new GZIPInputStream(is);
269     }
270     // I <3 Java's IO library.
271     return new BufferedReader(new InputStreamReader(is));
272   }
273 
274   private static final HashMap<String, WritableDataPoints> datapoints =
275     new HashMap<String, WritableDataPoints>();
276 
277   private static
getDataPoints(final TSDB tsdb, final String metric, final HashMap<String, String> tags)278     WritableDataPoints getDataPoints(final TSDB tsdb,
279                                      final String metric,
280                                      final HashMap<String, String> tags) {
281     final String key = metric + tags;
282     WritableDataPoints dp = datapoints.get(key);
283     if (dp != null) {
284       return dp;
285     }
286     dp = tsdb.newDataPoints();
287     dp.setSeries(metric, tags);
288     dp.setBatchImport(true);
289     datapoints.put(key, dp);
290     return dp;
291   }
292 
293 }
294