1 // This file is part of OpenTSDB. 2 // Copyright (C) 2010-2012 The OpenTSDB Authors. 3 // 4 // This program is free software: you can redistribute it and/or modify it 5 // under the terms of the GNU Lesser General Public License as published by 6 // the Free Software Foundation, either version 2.1 of the License, or (at your 7 // option) any later version. This program is distributed in the hope that it 8 // will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty 9 // of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser 10 // General Public License for more details. You should have received a copy 11 // of the GNU Lesser General Public License along with this program. If not, 12 // see <http://www.gnu.org/licenses/>. 13 package net.opentsdb.tsd; 14 15 import java.io.IOException; 16 import java.util.ArrayList; 17 import java.util.HashMap; 18 import java.util.List; 19 import java.util.concurrent.TimeUnit; 20 import java.util.concurrent.atomic.AtomicBoolean; 21 import java.util.concurrent.atomic.AtomicLong; 22 23 import com.stumbleupon.async.Callback; 24 import com.stumbleupon.async.Deferred; 25 import com.stumbleupon.async.TimeoutException; 26 27 import org.jboss.netty.channel.Channel; 28 import org.jboss.netty.handler.codec.http.HttpMethod; 29 import org.jboss.netty.handler.codec.http.HttpResponseStatus; 30 import org.jboss.netty.util.Timeout; 31 import org.jboss.netty.util.TimerTask; 32 import org.slf4j.Logger; 33 import org.slf4j.LoggerFactory; 34 35 import net.opentsdb.core.IncomingDataPoint; 36 import net.opentsdb.core.TSDB; 37 import net.opentsdb.core.Tags; 38 import net.opentsdb.stats.StatsCollector; 39 import net.opentsdb.uid.NoSuchUniqueName; 40 41 /** Implements the "put" telnet-style command. */ 42 final class PutDataPointRpc implements TelnetRpc, HttpRpc { 43 private static final Logger LOG = LoggerFactory.getLogger(PutDataPointRpc.class); 44 private static final ArrayList<Boolean> EMPTY_DEFERREDS = 45 new ArrayList<Boolean>(0); 46 private static final AtomicLong requests = new AtomicLong(); 47 private static final AtomicLong hbase_errors = new AtomicLong(); 48 private static final AtomicLong invalid_values = new AtomicLong(); 49 private static final AtomicLong illegal_arguments = new AtomicLong(); 50 private static final AtomicLong unknown_metrics = new AtomicLong(); 51 private static final AtomicLong writes_blocked = new AtomicLong(); 52 private static final AtomicLong writes_timedout = new AtomicLong(); 53 execute(final TSDB tsdb, final Channel chan, final String[] cmd)54 public Deferred<Object> execute(final TSDB tsdb, final Channel chan, 55 final String[] cmd) { 56 requests.incrementAndGet(); 57 String errmsg = null; 58 try { 59 final class PutErrback implements Callback<Exception, Exception> { 60 public Exception call(final Exception arg) { 61 // we handle the storage exceptions here so as to avoid creating yet 62 // another callback object on every data point. 63 handleStorageException(tsdb, getDataPointFromString(cmd), arg); 64 if (chan.isConnected()) { 65 if (chan.isWritable()) { 66 chan.write("put: HBase error: " + arg.getMessage() + '\n'); 67 } else { 68 writes_blocked.incrementAndGet(); 69 } 70 } 71 hbase_errors.incrementAndGet(); 72 return null; 73 } 74 public String toString() { 75 return "report error to channel"; 76 } 77 } 78 return importDataPoint(tsdb, cmd).addErrback(new PutErrback()); 79 } catch (NumberFormatException x) { 80 errmsg = "put: invalid value: " + x.getMessage() + '\n'; 81 invalid_values.incrementAndGet(); 82 } catch (IllegalArgumentException x) { 83 errmsg = "put: illegal argument: " + x.getMessage() + '\n'; 84 illegal_arguments.incrementAndGet(); 85 } catch (NoSuchUniqueName x) { 86 errmsg = "put: unknown metric: " + x.getMessage() + '\n'; 87 unknown_metrics.incrementAndGet(); 88 } 89 if (errmsg != null) { 90 LOG.debug(errmsg); 91 if (chan.isConnected()) { 92 if (chan.isWritable()) { 93 chan.write(errmsg); 94 } else { 95 writes_blocked.incrementAndGet(); 96 } 97 } 98 } 99 return Deferred.fromResult(null); 100 } 101 102 /** 103 * Handles HTTP RPC put requests 104 * @param tsdb The TSDB to which we belong 105 * @param query The HTTP query from the user 106 * @throws IOException if there is an error parsing the query or formatting 107 * the output 108 * @throws BadRequestException if the user supplied bad data 109 * @since 2.0 110 */ execute(final TSDB tsdb, final HttpQuery query)111 public void execute(final TSDB tsdb, final HttpQuery query) 112 throws IOException { 113 requests.incrementAndGet(); 114 115 // only accept POST 116 if (query.method() != HttpMethod.POST) { 117 throw new BadRequestException(HttpResponseStatus.METHOD_NOT_ALLOWED, 118 "Method not allowed", "The HTTP method [" + query.method().getName() + 119 "] is not permitted for this endpoint"); 120 } 121 122 final List<IncomingDataPoint> dps = query.serializer().parsePutV1(); 123 if (dps.size() < 1) { 124 throw new BadRequestException("No datapoints found in content"); 125 } 126 127 final boolean show_details = query.hasQueryStringParam("details"); 128 final boolean show_summary = query.hasQueryStringParam("summary"); 129 final boolean synchronous = query.hasQueryStringParam("sync"); 130 final int sync_timeout = query.hasQueryStringParam("sync_timeout") ? 131 Integer.parseInt(query.getQueryStringParam("sync_timeout")) : 0; 132 // this is used to coordinate timeouts 133 final AtomicBoolean sending_response = new AtomicBoolean(); 134 sending_response.set(false); 135 136 final ArrayList<HashMap<String, Object>> details = show_details 137 ? new ArrayList<HashMap<String, Object>>() : null; 138 int queued = 0; 139 final List<Deferred<Boolean>> deferreds = synchronous ? 140 new ArrayList<Deferred<Boolean>>(dps.size()) : null; 141 for (final IncomingDataPoint dp : dps) { 142 143 /** Handles passing a data point to the storage exception handler if 144 * we were unable to store it for any reason */ 145 final class PutErrback implements Callback<Boolean, Exception> { 146 public Boolean call(final Exception arg) { 147 handleStorageException(tsdb, dp, arg); 148 hbase_errors.incrementAndGet(); 149 150 if (show_details) { 151 details.add(getHttpDetails("Storage exception: " 152 + arg.getMessage(), dp)); 153 } 154 return false; 155 } 156 public String toString() { 157 return "HTTP Put Exception CB"; 158 } 159 } 160 161 /** Simply marks the put as successful */ 162 final class SuccessCB implements Callback<Boolean, Object> { 163 @Override 164 public Boolean call(final Object obj) { 165 return true; 166 } 167 public String toString() { 168 return "HTTP Put success CB"; 169 } 170 } 171 172 try { 173 if (dp.getMetric() == null || dp.getMetric().isEmpty()) { 174 if (show_details) { 175 details.add(this.getHttpDetails("Metric name was empty", dp)); 176 } 177 LOG.warn("Metric name was empty: " + dp); 178 illegal_arguments.incrementAndGet(); 179 continue; 180 } 181 if (dp.getTimestamp() <= 0) { 182 if (show_details) { 183 details.add(this.getHttpDetails("Invalid timestamp", dp)); 184 } 185 LOG.warn("Invalid timestamp: " + dp); 186 illegal_arguments.incrementAndGet(); 187 continue; 188 } 189 if (dp.getValue() == null || dp.getValue().isEmpty()) { 190 if (show_details) { 191 details.add(this.getHttpDetails("Empty value", dp)); 192 } 193 LOG.warn("Empty value: " + dp); 194 invalid_values.incrementAndGet(); 195 continue; 196 } 197 if (dp.getTags() == null || dp.getTags().size() < 1) { 198 if (show_details) { 199 details.add(this.getHttpDetails("Missing tags", dp)); 200 } 201 LOG.warn("Missing tags: " + dp); 202 illegal_arguments.incrementAndGet(); 203 continue; 204 } 205 final Deferred<Object> deferred; 206 if (Tags.looksLikeInteger(dp.getValue())) { 207 deferred = tsdb.addPoint(dp.getMetric(), dp.getTimestamp(), 208 Tags.parseLong(dp.getValue()), dp.getTags()); 209 } else { 210 deferred = tsdb.addPoint(dp.getMetric(), dp.getTimestamp(), 211 Float.parseFloat(dp.getValue()), dp.getTags()); 212 } 213 if (synchronous) { 214 deferreds.add(deferred.addCallback(new SuccessCB())); 215 } 216 deferred.addErrback(new PutErrback()); 217 ++queued; 218 } catch (NumberFormatException x) { 219 if (show_details) { 220 details.add(this.getHttpDetails("Unable to parse value to a number", 221 dp)); 222 } 223 LOG.warn("Unable to parse value to a number: " + dp); 224 invalid_values.incrementAndGet(); 225 } catch (IllegalArgumentException iae) { 226 if (show_details) { 227 details.add(this.getHttpDetails(iae.getMessage(), dp)); 228 } 229 LOG.warn(iae.getMessage() + ": " + dp); 230 illegal_arguments.incrementAndGet(); 231 } catch (NoSuchUniqueName nsu) { 232 if (show_details) { 233 details.add(this.getHttpDetails("Unknown metric", dp)); 234 } 235 LOG.warn("Unknown metric: " + dp); 236 unknown_metrics.incrementAndGet(); 237 } 238 } 239 240 /** A timer task that will respond to the user with the number of timeouts 241 * for synchronous writes. */ 242 class PutTimeout implements TimerTask { 243 final int queued; 244 public PutTimeout(final int queued) { 245 this.queued = queued; 246 } 247 @Override 248 public void run(final Timeout timeout) throws Exception { 249 if (sending_response.get()) { 250 if (LOG.isDebugEnabled()) { 251 LOG.debug("Put data point call " + query + 252 " already responded successfully"); 253 } 254 return; 255 } else { 256 sending_response.set(true); 257 } 258 259 // figure out how many writes are outstanding 260 int good_writes = 0; 261 int failed_writes = 0; 262 int timeouts = 0; 263 for (int i = 0; i < deferreds.size(); i++) { 264 try { 265 if (deferreds.get(i).join(1)) { 266 ++good_writes; 267 } else { 268 ++failed_writes; 269 } 270 } catch (TimeoutException te) { 271 if (show_details) { 272 details.add(getHttpDetails("Write timedout", dps.get(i))); 273 } 274 ++timeouts; 275 } 276 } 277 writes_timedout.addAndGet(timeouts); 278 final int failures = dps.size() - queued; 279 if (!show_summary && !show_details) { 280 throw new BadRequestException(HttpResponseStatus.BAD_REQUEST, 281 "The put call has timedout with " + good_writes 282 + " successful writes, " + failed_writes + " failed writes and " 283 + timeouts + " timed out writes.", 284 "Please see the TSD logs or append \"details\" to the put request"); 285 } else { 286 final HashMap<String, Object> summary = new HashMap<String, Object>(); 287 summary.put("success", good_writes); 288 summary.put("failed", failures + failed_writes); 289 summary.put("timeouts", timeouts); 290 if (show_details) { 291 summary.put("errors", details); 292 } 293 294 query.sendReply(HttpResponseStatus.BAD_REQUEST, 295 query.serializer().formatPutV1(summary)); 296 } 297 } 298 } 299 300 // now after everything has been sent we can schedule a timeout if so 301 // the caller asked for a synchronous write. 302 final Timeout timeout = sync_timeout > 0 ? 303 tsdb.getTimer().newTimeout(new PutTimeout(queued), sync_timeout, 304 TimeUnit.MILLISECONDS) : null; 305 306 /** Serializes the response to the client */ 307 class GroupCB implements Callback<Object, ArrayList<Boolean>> { 308 final int queued; 309 public GroupCB(final int queued) { 310 this.queued = queued; 311 } 312 313 @Override 314 public Object call(final ArrayList<Boolean> results) { 315 if (sending_response.get()) { 316 if (LOG.isDebugEnabled()) { 317 LOG.debug("Put data point call " + query + " was marked as timedout"); 318 } 319 return null; 320 } else { 321 sending_response.set(true); 322 if (timeout != null) { 323 timeout.cancel(); 324 } 325 } 326 int good_writes = 0; 327 int failed_writes = 0; 328 for (final boolean result : results) { 329 if (result) { 330 ++good_writes; 331 } else { 332 ++failed_writes; 333 } 334 } 335 336 final int failures = dps.size() - queued; 337 if (!show_summary && !show_details) { 338 if (failures + failed_writes > 0) { 339 query.sendReply(HttpResponseStatus.BAD_REQUEST, 340 query.serializer().formatErrorV1( 341 new BadRequestException(HttpResponseStatus.BAD_REQUEST, 342 "One or more data points had errors", 343 "Please see the TSD logs or append \"details\" to the put request"))); 344 } else { 345 query.sendReply(HttpResponseStatus.NO_CONTENT, "".getBytes()); 346 } 347 } else { 348 final HashMap<String, Object> summary = new HashMap<String, Object>(); 349 if (sync_timeout > 0) { 350 summary.put("timeouts", 0); 351 } 352 summary.put("success", results.isEmpty() ? queued : good_writes); 353 summary.put("failed", failures + failed_writes); 354 if (show_details) { 355 summary.put("errors", details); 356 } 357 358 if (failures > 0) { 359 query.sendReply(HttpResponseStatus.BAD_REQUEST, 360 query.serializer().formatPutV1(summary)); 361 } else { 362 query.sendReply(query.serializer().formatPutV1(summary)); 363 } 364 } 365 366 return null; 367 } 368 @Override 369 public String toString() { 370 return "put data point serialization callback"; 371 } 372 } 373 374 /** Catches any unexpected exceptions thrown in the callback chain */ 375 class ErrCB implements Callback<Object, Exception> { 376 @Override 377 public Object call(final Exception e) throws Exception { 378 if (sending_response.get()) { 379 if (LOG.isDebugEnabled()) { 380 LOG.debug("Put data point call " + query + " was marked as timedout"); 381 } 382 return null; 383 } else { 384 sending_response.set(true); 385 if (timeout != null) { 386 timeout.cancel(); 387 } 388 } 389 LOG.error("Unexpected exception", e); 390 throw new RuntimeException("Unexpected exception", e); 391 } 392 @Override 393 public String toString() { 394 return "put data point error callback"; 395 } 396 } 397 398 if (synchronous) { 399 Deferred.groupInOrder(deferreds).addCallback(new GroupCB(queued)) 400 .addErrback(new ErrCB()); 401 } else { 402 new GroupCB(queued).call(EMPTY_DEFERREDS); 403 } 404 } 405 406 /** 407 * Collects the stats and metrics tracked by this instance. 408 * @param collector The collector to use. 409 */ collectStats(final StatsCollector collector)410 public static void collectStats(final StatsCollector collector) { 411 collector.record("rpc.received", requests, "type=put"); 412 collector.record("rpc.errors", hbase_errors, "type=hbase_errors"); 413 collector.record("rpc.errors", invalid_values, "type=invalid_values"); 414 collector.record("rpc.errors", illegal_arguments, "type=illegal_arguments"); 415 collector.record("rpc.errors", unknown_metrics, "type=unknown_metrics"); 416 collector.record("rpc.errors", writes_blocked, "type=socket_writes_blocked"); 417 } 418 419 /** 420 * Imports a single data point. 421 * @param tsdb The TSDB to import the data point into. 422 * @param words The words describing the data point to import, in 423 * the following format: {@code [metric, timestamp, value, ..tags..]} 424 * @return A deferred object that indicates the completion of the request. 425 * @throws NumberFormatException if the timestamp or value is invalid. 426 * @throws IllegalArgumentException if any other argument is invalid. 427 * @throws NoSuchUniqueName if the metric isn't registered. 428 */ importDataPoint(final TSDB tsdb, final String[] words)429 private Deferred<Object> importDataPoint(final TSDB tsdb, final String[] words) { 430 words[0] = null; // Ditch the "put". 431 if (words.length < 5) { // Need at least: metric timestamp value tag 432 // ^ 5 and not 4 because words[0] is "put". 433 throw new IllegalArgumentException("not enough arguments" 434 + " (need least 4, got " + (words.length - 1) + ')'); 435 } 436 final String metric = words[1]; 437 if (metric.length() <= 0) { 438 throw new IllegalArgumentException("empty metric name"); 439 } 440 final long timestamp; 441 if (words[2].contains(".")) { 442 timestamp = Tags.parseLong(words[2].replace(".", "")); 443 } else { 444 timestamp = Tags.parseLong(words[2]); 445 } 446 if (timestamp <= 0) { 447 throw new IllegalArgumentException("invalid timestamp: " + timestamp); 448 } 449 final String value = words[3]; 450 if (value.length() <= 0) { 451 throw new IllegalArgumentException("empty value"); 452 } 453 final HashMap<String, String> tags = new HashMap<String, String>(); 454 for (int i = 4; i < words.length; i++) { 455 if (!words[i].isEmpty()) { 456 Tags.parse(tags, words[i]); 457 } 458 } 459 if (Tags.looksLikeInteger(value)) { 460 return tsdb.addPoint(metric, timestamp, Tags.parseLong(value), tags); 461 } else { // floating point value 462 return tsdb.addPoint(metric, timestamp, Float.parseFloat(value), tags); 463 } 464 } 465 466 467 /** 468 * Converts the string array to an IncomingDataPoint. WARNING: This method 469 * does not perform validation. It should only be used by the Telnet style 470 * {@code execute} above within the error callback. At that point it means 471 * the array parsed correctly as per {@code importDataPoint}. 472 * @param words The array of strings representing a data point 473 * @return An incoming data point object. 474 */ getDataPointFromString(final String[] words)475 final private IncomingDataPoint getDataPointFromString(final String[] words) { 476 final IncomingDataPoint dp = new IncomingDataPoint(); 477 dp.setMetric(words[1]); 478 479 if (words[2].contains(".")) { 480 dp.setTimestamp(Tags.parseLong(words[2].replace(".", ""))); 481 } else { 482 dp.setTimestamp(Tags.parseLong(words[2])); 483 } 484 485 dp.setValue(words[3]); 486 487 final HashMap<String, String> tags = new HashMap<String, String>(); 488 for (int i = 4; i < words.length; i++) { 489 if (!words[i].isEmpty()) { 490 Tags.parse(tags, words[i]); 491 } 492 } 493 dp.setTags(tags); 494 return dp; 495 } 496 497 /** 498 * Simple helper to format an error trying to save a data point 499 * @param message The message to return to the user 500 * @param dp The datapoint that caused the error 501 * @return A hashmap with information 502 * @since 2.0 503 */ getHttpDetails(final String message, final IncomingDataPoint dp)504 final private HashMap<String, Object> getHttpDetails(final String message, 505 final IncomingDataPoint dp) { 506 final HashMap<String, Object> map = new HashMap<String, Object>(); 507 map.put("error", message); 508 map.put("datapoint", dp); 509 return map; 510 } 511 512 /** 513 * Passes a data point off to the storage handler plugin if it has been 514 * configured. 515 * @param tsdb The TSDB from which to grab the SEH plugin 516 * @param dp The data point to process 517 * @param e The exception that caused this 518 */ handleStorageException(final TSDB tsdb, final IncomingDataPoint dp, final Exception e)519 void handleStorageException(final TSDB tsdb, final IncomingDataPoint dp, 520 final Exception e) { 521 final StorageExceptionHandler handler = tsdb.getStorageExceptionHandler(); 522 if (handler != null) { 523 handler.handleError(dp, e); 524 } 525 } 526 } 527