1 // This file is part of OpenTSDB. 2 // Copyright (C) 2013 The OpenTSDB Authors. 3 // 4 // This program is free software: you can redistribute it and/or modify it 5 // under the terms of the GNU Lesser General Public License as published by 6 // the Free Software Foundation, either version 2.1 of the License, or (at your 7 // option) any later version. This program is distributed in the hope that it 8 // will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty 9 // of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser 10 // General Public License for more details. You should have received a copy 11 // of the GNU Lesser General Public License along with this program. If not, 12 // see <http://www.gnu.org/licenses/>. 13 package net.opentsdb.tsd; 14 15 import java.io.IOException; 16 import java.util.ArrayList; 17 import java.util.List; 18 19 import org.jboss.netty.handler.codec.http.HttpMethod; 20 import org.jboss.netty.handler.codec.http.HttpResponseStatus; 21 import org.slf4j.Logger; 22 import org.slf4j.LoggerFactory; 23 24 import com.stumbleupon.async.Callback; 25 import com.stumbleupon.async.Deferred; 26 27 import net.opentsdb.core.TSDB; 28 import net.opentsdb.meta.Annotation; 29 import net.opentsdb.uid.UniqueId; 30 import net.opentsdb.utils.DateTime; 31 import net.opentsdb.utils.JSONException; 32 33 /** 34 * Handles create, update, replace and delete calls for individual annotation 35 * objects. Annotations are stored in the data table alongside data points. 36 * Queries will return annotations along with the data if requested. This RPC 37 * is only used for modifying the individual entries. 38 * @since 2.0 39 */ 40 final class AnnotationRpc implements HttpRpc { 41 private static final Logger LOG = LoggerFactory.getLogger(AnnotationRpc.class); 42 43 /** 44 * Performs CRUD methods on individual annotation objects. 45 * @param tsdb The TSD to which we belong 46 * @param query The query to parse and respond to 47 */ execute(final TSDB tsdb, HttpQuery query)48 public void execute(final TSDB tsdb, HttpQuery query) throws IOException { 49 final HttpMethod method = query.getAPIMethod(); 50 51 final String[] uri = query.explodeAPIPath(); 52 final String endpoint = uri.length > 1 ? uri[1] : ""; 53 if (endpoint != null && endpoint.toLowerCase().endsWith("bulk")) { 54 executeBulk(tsdb, method, query); 55 return; 56 } 57 58 final Annotation note; 59 if (query.hasContent()) { 60 note = query.serializer().parseAnnotationV1(); 61 } else { 62 note = parseQS(query); 63 } 64 65 // GET 66 if (method == HttpMethod.GET) { 67 try { 68 if ("annotations".toLowerCase().equals(uri[0])) { 69 fetchMultipleAnnotations(tsdb, note, query); 70 } else { 71 fetchSingleAnnotation(tsdb, note, query); 72 } 73 } catch (BadRequestException e) { 74 throw e; 75 } catch (Exception e) { 76 throw new RuntimeException(e); 77 } 78 // POST 79 } else if (method == HttpMethod.POST || method == HttpMethod.PUT) { 80 81 /** 82 * Storage callback used to determine if the storage call was successful 83 * or not. Also returns the updated object from storage. 84 */ 85 class SyncCB implements Callback<Deferred<Annotation>, Boolean> { 86 87 @Override 88 public Deferred<Annotation> call(Boolean success) throws Exception { 89 if (!success) { 90 throw new BadRequestException( 91 HttpResponseStatus.INTERNAL_SERVER_ERROR, 92 "Failed to save the Annotation to storage", 93 "This may be caused by another process modifying storage data"); 94 } 95 96 return Annotation.getAnnotation(tsdb, note.getTSUID(), 97 note.getStartTime()); 98 } 99 100 } 101 102 try { 103 final Deferred<Annotation> process_meta = note.syncToStorage(tsdb, 104 method == HttpMethod.PUT).addCallbackDeferring(new SyncCB()); 105 final Annotation updated_meta = process_meta.joinUninterruptibly(); 106 tsdb.indexAnnotation(note); 107 query.sendReply(query.serializer().formatAnnotationV1(updated_meta)); 108 } catch (IllegalStateException e) { 109 query.sendStatusOnly(HttpResponseStatus.NOT_MODIFIED); 110 } catch (IllegalArgumentException e) { 111 throw new BadRequestException(e); 112 } catch (Exception e) { 113 throw new RuntimeException(e); 114 } 115 // DELETE 116 } else if (method == HttpMethod.DELETE) { 117 118 try { 119 note.delete(tsdb).joinUninterruptibly(); 120 tsdb.deleteAnnotation(note); 121 } catch (IllegalArgumentException e) { 122 throw new BadRequestException( 123 "Unable to delete Annotation information", e); 124 } catch (Exception e) { 125 throw new RuntimeException(e); 126 } 127 query.sendStatusOnly(HttpResponseStatus.NO_CONTENT); 128 129 } else { 130 throw new BadRequestException(HttpResponseStatus.METHOD_NOT_ALLOWED, 131 "Method not allowed", "The HTTP method [" + method.getName() + 132 "] is not permitted for this endpoint"); 133 } 134 } 135 136 /** 137 * Performs CRUD methods on a list of annotation objects to reduce calls to 138 * the API. 139 * @param tsdb The TSD to which we belong 140 * @param method The request method 141 * @param query The query to parse and respond to 142 */ executeBulk(final TSDB tsdb, final HttpMethod method, HttpQuery query)143 void executeBulk(final TSDB tsdb, final HttpMethod method, HttpQuery query) { 144 if (method == HttpMethod.POST || method == HttpMethod.PUT) { 145 executeBulkUpdate(tsdb, method, query); 146 } else if (method == HttpMethod.DELETE) { 147 executeBulkDelete(tsdb, query); 148 } else { 149 throw new BadRequestException(HttpResponseStatus.METHOD_NOT_ALLOWED, 150 "Method not allowed", "The HTTP method [" + query.method().getName() + 151 "] is not permitted for this endpoint"); 152 } 153 } 154 155 /** 156 * Performs CRU methods on a list of annotation objects to reduce calls to 157 * the API. Only supports body content and adding or updating annotation 158 * objects. Deletions are separate. 159 * @param tsdb The TSD to which we belong 160 * @param method The request method 161 * @param query The query to parse and respond to 162 */ executeBulkUpdate(final TSDB tsdb, final HttpMethod method, HttpQuery query)163 void executeBulkUpdate(final TSDB tsdb, final HttpMethod method, HttpQuery query) { 164 final List<Annotation> notes; 165 try { 166 notes = query.serializer().parseAnnotationsV1(); 167 } catch (IllegalArgumentException e){ 168 throw new BadRequestException(e); 169 } catch (JSONException e){ 170 throw new BadRequestException(e); 171 } 172 final List<Deferred<Annotation>> callbacks = 173 new ArrayList<Deferred<Annotation>>(notes.size()); 174 175 /** 176 * Storage callback used to determine if the storage call was successful 177 * or not. Also returns the updated object from storage. 178 */ 179 class SyncCB implements Callback<Deferred<Annotation>, Boolean> { 180 final private Annotation note; 181 182 public SyncCB(final Annotation note) { 183 this.note = note; 184 } 185 186 @Override 187 public Deferred<Annotation> call(Boolean success) throws Exception { 188 if (!success) { 189 throw new BadRequestException( 190 HttpResponseStatus.INTERNAL_SERVER_ERROR, 191 "Failed to save an Annotation to storage", 192 "This may be caused by another process modifying storage data: " 193 + note); 194 } 195 196 return Annotation.getAnnotation(tsdb, note.getTSUID(), 197 note.getStartTime()); 198 } 199 } 200 201 /** 202 * Simple callback that will index the updated annotation 203 */ 204 class IndexCB implements Callback<Deferred<Annotation>, Annotation> { 205 @Override 206 public Deferred<Annotation> call(final Annotation note) throws Exception { 207 tsdb.indexAnnotation(note); 208 return Deferred.fromResult(note); 209 } 210 } 211 212 for (Annotation note : notes) { 213 try { 214 Deferred<Annotation> deferred = 215 note.syncToStorage(tsdb, method == HttpMethod.PUT) 216 .addCallbackDeferring(new SyncCB(note)); 217 Deferred<Annotation> indexer = 218 deferred.addCallbackDeferring(new IndexCB()); 219 callbacks.add(indexer); 220 } catch (IllegalStateException e) { 221 LOG.info("No changes for annotation: " + note); 222 } catch (IllegalArgumentException e) { 223 throw new BadRequestException(HttpResponseStatus.BAD_REQUEST, 224 e.getMessage(), "Annotation error: " + note, e); 225 } 226 } 227 228 try { 229 // wait untill all of the syncs have completed, then rebuild the list 230 // of annotations using the data synced from storage. 231 Deferred.group(callbacks).joinUninterruptibly(); 232 notes.clear(); 233 for (Deferred<Annotation> note : callbacks) { 234 notes.add(note.joinUninterruptibly()); 235 } 236 query.sendReply(query.serializer().formatAnnotationsV1(notes)); 237 } catch (IllegalArgumentException e) { 238 throw new BadRequestException(e); 239 } catch (Exception e) { 240 throw new RuntimeException(e); 241 } 242 } 243 244 /** 245 * Handles bulk deletions of a range of annotations (local or global) using 246 * query string or body data 247 * @param tsdb The TSD to which we belong 248 * @param query The query to parse and respond to 249 */ executeBulkDelete(final TSDB tsdb, HttpQuery query)250 void executeBulkDelete(final TSDB tsdb, HttpQuery query) { 251 try { 252 final AnnotationBulkDelete delete_request; 253 if (query.hasContent()) { 254 delete_request = query.serializer().parseAnnotationBulkDeleteV1(); 255 } else { 256 delete_request = parseBulkDeleteQS(query); 257 } 258 259 // validate the start time on the string. Users could request a timestamp of 260 // 0 to delete all annotations, BUT we don't want them doing that accidentally 261 if (delete_request.start_time == null || delete_request.start_time.isEmpty()) { 262 throw new BadRequestException(HttpResponseStatus.BAD_REQUEST, 263 "Missing the start time value"); 264 } 265 if (!delete_request.global && 266 (delete_request.tsuids == null || delete_request.tsuids.isEmpty())) { 267 throw new BadRequestException(HttpResponseStatus.BAD_REQUEST, 268 "Missing the TSUIDs or global annotations flag"); 269 } 270 271 final int pre_allocate = delete_request.tsuids != null ? 272 delete_request.tsuids.size() + 1 : 1; 273 List<Deferred<Integer>> deletes = new ArrayList<Deferred<Integer>>(pre_allocate); 274 if (delete_request.global) { 275 deletes.add(Annotation.deleteRange(tsdb, null, 276 delete_request.getStartTime(), delete_request.getEndTime())); 277 } 278 if (delete_request.tsuids != null) { 279 for (String tsuid : delete_request.tsuids) { 280 deletes.add(Annotation.deleteRange(tsdb, UniqueId.stringToUid(tsuid), 281 delete_request.getStartTime(), delete_request.getEndTime())); 282 } 283 } 284 285 Deferred.group(deletes).joinUninterruptibly(); 286 delete_request.total_deleted = 0; // just in case the caller set it 287 for (Deferred<Integer> count : deletes) { 288 delete_request.total_deleted += count.joinUninterruptibly(); 289 } 290 query.sendReply(query.serializer() 291 .formatAnnotationBulkDeleteV1(delete_request)); 292 } catch (BadRequestException e) { 293 throw e; 294 } catch (IllegalArgumentException e) { 295 throw new BadRequestException(e); 296 } catch (RuntimeException e) { 297 throw new BadRequestException(e); 298 } catch (Exception e) { 299 throw new RuntimeException("Shouldn't be here", e); 300 } 301 } 302 303 /** 304 * Parses a query string for annotation information. Note that {@code custom} 305 * key/values are not supported via query string. Users must issue a POST or 306 * PUT with content data. 307 * @param query The query to parse 308 * @return An annotation object if parsing was successful 309 * @throws IllegalArgumentException - if the request was malformed 310 */ parseQS(final HttpQuery query)311 private Annotation parseQS(final HttpQuery query) { 312 final Annotation note = new Annotation(); 313 314 final String tsuid = query.getQueryStringParam("tsuid"); 315 if (tsuid != null) { 316 note.setTSUID(tsuid); 317 } 318 319 final String start = query.getQueryStringParam("start_time"); 320 final Long start_time = DateTime.parseDateTimeString(start, ""); 321 if (start_time < 1) { 322 throw new BadRequestException("Missing start time"); 323 } 324 // TODO - fix for ms support in the future 325 note.setStartTime(start_time / 1000); 326 327 final String end = query.getQueryStringParam("end_time"); 328 final Long end_time = DateTime.parseDateTimeString(end, ""); 329 // TODO - fix for ms support in the future 330 note.setEndTime(end_time / 1000); 331 332 final String description = query.getQueryStringParam("description"); 333 if (description != null) { 334 note.setDescription(description); 335 } 336 337 final String notes = query.getQueryStringParam("notes"); 338 if (notes != null) { 339 note.setNotes(notes); 340 } 341 342 return note; 343 } 344 fetchSingleAnnotation(final TSDB tsdb, final Annotation note, final HttpQuery query)345 private void fetchSingleAnnotation(final TSDB tsdb, final Annotation note, 346 final HttpQuery query) throws Exception { 347 final Annotation stored_annotation = 348 Annotation.getAnnotation(tsdb, note.getTSUID(), note.getStartTime()) 349 .joinUninterruptibly(); 350 if (stored_annotation == null) { 351 throw new BadRequestException(HttpResponseStatus.NOT_FOUND, 352 "Unable to locate annotation in storage"); 353 } 354 query.sendReply(query.serializer().formatAnnotationV1(stored_annotation)); 355 } 356 fetchMultipleAnnotations(final TSDB tsdb, final Annotation note, final HttpQuery query)357 private void fetchMultipleAnnotations(final TSDB tsdb, final Annotation note, 358 final HttpQuery query) throws Exception { 359 if (note.getEndTime() == 0) { 360 note.setEndTime(System.currentTimeMillis()); 361 } 362 final List<Annotation> annotations = 363 Annotation.getGlobalAnnotations(tsdb, note.getStartTime(), note.getEndTime()) 364 .joinUninterruptibly(); 365 if (annotations == null) { 366 throw new BadRequestException(HttpResponseStatus.NOT_FOUND, 367 "Unable to locate annotations in storage"); 368 } 369 query.sendReply(query.serializer().formatAnnotationsV1(annotations)); 370 } 371 372 /** 373 * Parses a query string for a bulk delet request 374 * @param query The query to parse 375 * @return A bulk delete query 376 */ parseBulkDeleteQS(final HttpQuery query)377 private AnnotationBulkDelete parseBulkDeleteQS(final HttpQuery query) { 378 final AnnotationBulkDelete settings = new AnnotationBulkDelete(); 379 settings.start_time = query.getRequiredQueryStringParam("start_time"); 380 settings.end_time = query.getQueryStringParam("end_time"); 381 382 if (query.hasQueryStringParam("tsuids")) { 383 String[] tsuids = query.getQueryStringParam("tsuids").split(","); 384 settings.tsuids = new ArrayList<String>(tsuids.length); 385 for (String tsuid : tsuids) { 386 settings.tsuids.add(tsuid.trim()); 387 } 388 } 389 390 if (query.hasQueryStringParam("global")) { 391 settings.global = true; 392 } 393 return settings; 394 } 395 396 /** 397 * Represents a bulk annotation delete query. Either one or more TSUIDs must 398 * be supplied or the global flag can be set to determine what annotations 399 * are purged. Both may be set in one request. Annotations for the time 400 * between and including the start and end times will be removed based on 401 * the annotation's recorded start time. 402 */ 403 public static class AnnotationBulkDelete { 404 /** The start time, may be relative, absolute or unixy */ 405 private String start_time; 406 /** An option end time. If not set, current time is used */ 407 private String end_time; 408 /** Optional list of TSUIDs */ 409 private List<String> tsuids; 410 /** Optional flag to determine whether global notes for the range should be 411 * purged */ 412 private boolean global; 413 /** Total number of items deleted (for later response to the user) */ 414 private long total_deleted; 415 416 /** 417 * Default ctor for Jackson 418 */ AnnotationBulkDelete()419 public AnnotationBulkDelete() { 420 421 } 422 423 /** @return The start timestamp in milliseconds */ getStartTime()424 public long getStartTime() { 425 return DateTime.parseDateTimeString(start_time, null); 426 } 427 428 /** @return The ending timestamp in milliseconds. If it wasn't set, the 429 * current time is returned */ getEndTime()430 public long getEndTime() { 431 if (end_time == null || end_time.isEmpty()) { 432 return System.currentTimeMillis(); 433 } 434 return DateTime.parseDateTimeString(end_time, null); 435 } 436 437 /** @return List of TSUIDs to delete annotations for (may be NULL) */ getTsuids()438 public List<String> getTsuids() { 439 return tsuids; 440 } 441 442 /** @return Whether or not global annotations for the span should be purged */ getGlobal()443 public boolean getGlobal() { 444 return global; 445 } 446 447 /** @return The total number of annotations matched and deleted */ getTotalDeleted()448 public long getTotalDeleted() { 449 return total_deleted; 450 } 451 452 /** @param start_time Start time for the range. May be relative, absolute 453 * or unixy in seconds or milliseconds */ setStartTime(String start_time)454 public void setStartTime(String start_time) { 455 this.start_time = start_time; 456 } 457 458 /** @param end_time Optional end time to set for the range. Similar to start */ setEndTime(String end_time)459 public void setEndTime(String end_time) { 460 this.end_time = end_time; 461 } 462 463 /** @param tsuids A list of TSUIDs to scan for annotations */ setTsuids(List<String> tsuids)464 public void setTsuids(List<String> tsuids) { 465 this.tsuids = tsuids; 466 } 467 468 /** @param global Whether or not to delete global annotations for the range */ setGlobal(boolean global)469 public void setGlobal(boolean global) { 470 this.global = global; 471 } 472 473 /** @param total_deleted Total number of annotations deleted */ setTotalDeleted(long total_deleted)474 public void setTotalDeleted(long total_deleted) { 475 this.total_deleted = total_deleted; 476 } 477 478 } 479 } 480