1 // This file is part of OpenTSDB.
2 // Copyright (C) 2013  The OpenTSDB Authors.
3 //
4 // This program is free software: you can redistribute it and/or modify it
5 // under the terms of the GNU Lesser General Public License as published by
6 // the Free Software Foundation, either version 2.1 of the License, or (at your
7 // option) any later version.  This program is distributed in the hope that it
8 // will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
9 // of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser
10 // General Public License for more details.  You should have received a copy
11 // of the GNU Lesser General Public License along with this program.  If not,
12 // see <http://www.gnu.org/licenses/>.
13 package net.opentsdb.tsd;
14 
15 import java.io.IOException;
16 import java.util.ArrayList;
17 import java.util.List;
18 
19 import org.jboss.netty.handler.codec.http.HttpMethod;
20 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23 
24 import com.stumbleupon.async.Callback;
25 import com.stumbleupon.async.Deferred;
26 
27 import net.opentsdb.core.TSDB;
28 import net.opentsdb.meta.Annotation;
29 import net.opentsdb.uid.UniqueId;
30 import net.opentsdb.utils.DateTime;
31 import net.opentsdb.utils.JSONException;
32 
33 /**
34  * Handles create, update, replace and delete calls for individual annotation
35  * objects. Annotations are stored in the data table alongside data points.
36  * Queries will return annotations along with the data if requested. This RPC
37  * is only used for modifying the individual entries.
38  * @since 2.0
39  */
40 final class AnnotationRpc implements HttpRpc {
41   private static final Logger LOG = LoggerFactory.getLogger(AnnotationRpc.class);
42 
43   /**
44    * Performs CRUD methods on individual annotation objects.
45    * @param tsdb The TSD to which we belong
46    * @param query The query to parse and respond to
47    */
execute(final TSDB tsdb, HttpQuery query)48   public void execute(final TSDB tsdb, HttpQuery query) throws IOException {
49     final HttpMethod method = query.getAPIMethod();
50 
51     final String[] uri = query.explodeAPIPath();
52     final String endpoint = uri.length > 1 ? uri[1] : "";
53     if (endpoint != null && endpoint.toLowerCase().endsWith("bulk")) {
54       executeBulk(tsdb, method, query);
55       return;
56     }
57 
58     final Annotation note;
59     if (query.hasContent()) {
60       note = query.serializer().parseAnnotationV1();
61     } else {
62       note = parseQS(query);
63     }
64 
65     // GET
66     if (method == HttpMethod.GET) {
67       try {
68         if ("annotations".toLowerCase().equals(uri[0])) {
69           fetchMultipleAnnotations(tsdb, note, query);
70         } else {
71           fetchSingleAnnotation(tsdb, note, query);
72         }
73       } catch (BadRequestException e) {
74         throw e;
75       } catch (Exception e) {
76         throw new RuntimeException(e);
77       }
78     // POST
79     } else if (method == HttpMethod.POST || method == HttpMethod.PUT) {
80 
81       /**
82        * Storage callback used to determine if the storage call was successful
83        * or not. Also returns the updated object from storage.
84        */
85       class SyncCB implements Callback<Deferred<Annotation>, Boolean> {
86 
87         @Override
88         public Deferred<Annotation> call(Boolean success) throws Exception {
89           if (!success) {
90             throw new BadRequestException(
91                 HttpResponseStatus.INTERNAL_SERVER_ERROR,
92                 "Failed to save the Annotation to storage",
93                 "This may be caused by another process modifying storage data");
94           }
95 
96           return Annotation.getAnnotation(tsdb, note.getTSUID(),
97               note.getStartTime());
98         }
99 
100       }
101 
102       try {
103         final Deferred<Annotation> process_meta = note.syncToStorage(tsdb,
104             method == HttpMethod.PUT).addCallbackDeferring(new SyncCB());
105         final Annotation updated_meta = process_meta.joinUninterruptibly();
106         tsdb.indexAnnotation(note);
107         query.sendReply(query.serializer().formatAnnotationV1(updated_meta));
108       } catch (IllegalStateException e) {
109         query.sendStatusOnly(HttpResponseStatus.NOT_MODIFIED);
110       } catch (IllegalArgumentException e) {
111         throw new BadRequestException(e);
112       } catch (Exception e) {
113         throw new RuntimeException(e);
114       }
115     // DELETE
116     } else if (method == HttpMethod.DELETE) {
117 
118       try {
119         note.delete(tsdb).joinUninterruptibly();
120         tsdb.deleteAnnotation(note);
121       } catch (IllegalArgumentException e) {
122         throw new BadRequestException(
123             "Unable to delete Annotation information", e);
124       } catch (Exception e) {
125         throw new RuntimeException(e);
126       }
127       query.sendStatusOnly(HttpResponseStatus.NO_CONTENT);
128 
129     } else {
130       throw new BadRequestException(HttpResponseStatus.METHOD_NOT_ALLOWED,
131           "Method not allowed", "The HTTP method [" + method.getName() +
132           "] is not permitted for this endpoint");
133     }
134   }
135 
136   /**
137    * Performs CRUD methods on a list of annotation objects to reduce calls to
138    * the API.
139    * @param tsdb The TSD to which we belong
140    * @param method The request method
141    * @param query The query to parse and respond to
142    */
executeBulk(final TSDB tsdb, final HttpMethod method, HttpQuery query)143   void executeBulk(final TSDB tsdb, final HttpMethod method, HttpQuery query) {
144     if (method == HttpMethod.POST || method == HttpMethod.PUT) {
145       executeBulkUpdate(tsdb, method, query);
146     } else if (method == HttpMethod.DELETE) {
147       executeBulkDelete(tsdb, query);
148     } else {
149       throw new BadRequestException(HttpResponseStatus.METHOD_NOT_ALLOWED,
150           "Method not allowed", "The HTTP method [" + query.method().getName() +
151           "] is not permitted for this endpoint");
152     }
153   }
154 
155   /**
156    * Performs CRU methods on a list of annotation objects to reduce calls to
157    * the API. Only supports body content and adding or updating annotation
158    * objects. Deletions are separate.
159    * @param tsdb The TSD to which we belong
160    * @param method The request method
161    * @param query The query to parse and respond to
162    */
executeBulkUpdate(final TSDB tsdb, final HttpMethod method, HttpQuery query)163   void executeBulkUpdate(final TSDB tsdb, final HttpMethod method, HttpQuery query) {
164     final List<Annotation> notes;
165     try {
166       notes = query.serializer().parseAnnotationsV1();
167     } catch (IllegalArgumentException e){
168       throw new BadRequestException(e);
169     } catch (JSONException e){
170       throw new BadRequestException(e);
171     }
172     final List<Deferred<Annotation>> callbacks =
173         new ArrayList<Deferred<Annotation>>(notes.size());
174 
175     /**
176      * Storage callback used to determine if the storage call was successful
177      * or not. Also returns the updated object from storage.
178      */
179     class SyncCB implements Callback<Deferred<Annotation>, Boolean> {
180       final private Annotation note;
181 
182       public SyncCB(final Annotation note) {
183         this.note = note;
184       }
185 
186       @Override
187       public Deferred<Annotation> call(Boolean success) throws Exception {
188         if (!success) {
189           throw new BadRequestException(
190               HttpResponseStatus.INTERNAL_SERVER_ERROR,
191               "Failed to save an Annotation to storage",
192               "This may be caused by another process modifying storage data: "
193               + note);
194         }
195 
196         return Annotation.getAnnotation(tsdb, note.getTSUID(),
197             note.getStartTime());
198       }
199     }
200 
201     /**
202      * Simple callback that will index the updated annotation
203      */
204     class IndexCB implements Callback<Deferred<Annotation>, Annotation> {
205       @Override
206       public Deferred<Annotation> call(final Annotation note) throws Exception {
207         tsdb.indexAnnotation(note);
208         return Deferred.fromResult(note);
209       }
210     }
211 
212     for (Annotation note : notes) {
213       try {
214         Deferred<Annotation> deferred =
215             note.syncToStorage(tsdb, method == HttpMethod.PUT)
216             .addCallbackDeferring(new SyncCB(note));
217         Deferred<Annotation> indexer =
218             deferred.addCallbackDeferring(new IndexCB());
219         callbacks.add(indexer);
220       } catch (IllegalStateException e) {
221         LOG.info("No changes for annotation: " + note);
222       } catch (IllegalArgumentException e) {
223         throw new BadRequestException(HttpResponseStatus.BAD_REQUEST,
224             e.getMessage(), "Annotation error: " + note, e);
225       }
226     }
227 
228     try {
229       // wait untill all of the syncs have completed, then rebuild the list
230       // of annotations using the data synced from storage.
231       Deferred.group(callbacks).joinUninterruptibly();
232       notes.clear();
233       for (Deferred<Annotation> note : callbacks) {
234         notes.add(note.joinUninterruptibly());
235       }
236       query.sendReply(query.serializer().formatAnnotationsV1(notes));
237     } catch (IllegalArgumentException e) {
238       throw new BadRequestException(e);
239     } catch (Exception e) {
240       throw new RuntimeException(e);
241     }
242   }
243 
244   /**
245    * Handles bulk deletions of a range of annotations (local or global) using
246    * query string or body data
247    * @param tsdb The TSD to which we belong
248    * @param query The query to parse and respond to
249    */
executeBulkDelete(final TSDB tsdb, HttpQuery query)250   void executeBulkDelete(final TSDB tsdb, HttpQuery query) {
251     try {
252       final AnnotationBulkDelete delete_request;
253       if (query.hasContent()) {
254         delete_request = query.serializer().parseAnnotationBulkDeleteV1();
255       } else {
256         delete_request = parseBulkDeleteQS(query);
257       }
258 
259       // validate the start time on the string. Users could request a timestamp of
260       // 0 to delete all annotations, BUT we don't want them doing that accidentally
261       if (delete_request.start_time == null || delete_request.start_time.isEmpty()) {
262         throw new BadRequestException(HttpResponseStatus.BAD_REQUEST,
263             "Missing the start time value");
264       }
265       if (!delete_request.global &&
266           (delete_request.tsuids == null || delete_request.tsuids.isEmpty())) {
267         throw new BadRequestException(HttpResponseStatus.BAD_REQUEST,
268             "Missing the TSUIDs or global annotations flag");
269       }
270 
271       final int pre_allocate = delete_request.tsuids != null ?
272           delete_request.tsuids.size() + 1 : 1;
273       List<Deferred<Integer>> deletes = new ArrayList<Deferred<Integer>>(pre_allocate);
274       if (delete_request.global) {
275         deletes.add(Annotation.deleteRange(tsdb, null,
276             delete_request.getStartTime(), delete_request.getEndTime()));
277       }
278       if (delete_request.tsuids != null) {
279         for (String tsuid : delete_request.tsuids) {
280           deletes.add(Annotation.deleteRange(tsdb, UniqueId.stringToUid(tsuid),
281               delete_request.getStartTime(), delete_request.getEndTime()));
282         }
283       }
284 
285       Deferred.group(deletes).joinUninterruptibly();
286       delete_request.total_deleted = 0; // just in case the caller set it
287       for (Deferred<Integer> count : deletes) {
288         delete_request.total_deleted += count.joinUninterruptibly();
289       }
290       query.sendReply(query.serializer()
291           .formatAnnotationBulkDeleteV1(delete_request));
292     } catch (BadRequestException e) {
293       throw e;
294     } catch (IllegalArgumentException e) {
295       throw new BadRequestException(e);
296     } catch (RuntimeException e) {
297       throw new BadRequestException(e);
298     } catch (Exception e) {
299       throw new RuntimeException("Shouldn't be here", e);
300     }
301   }
302 
303   /**
304    * Parses a query string for annotation information. Note that {@code custom}
305    * key/values are not supported via query string. Users must issue a POST or
306    * PUT with content data.
307    * @param query The query to parse
308    * @return An annotation object if parsing was successful
309    * @throws IllegalArgumentException - if the request was malformed
310    */
parseQS(final HttpQuery query)311   private Annotation parseQS(final HttpQuery query) {
312     final Annotation note = new Annotation();
313 
314     final String tsuid = query.getQueryStringParam("tsuid");
315     if (tsuid != null) {
316       note.setTSUID(tsuid);
317     }
318 
319     final String start = query.getQueryStringParam("start_time");
320     final Long start_time = DateTime.parseDateTimeString(start, "");
321     if (start_time < 1) {
322       throw new BadRequestException("Missing start time");
323     }
324     // TODO - fix for ms support in the future
325     note.setStartTime(start_time / 1000);
326 
327     final String end = query.getQueryStringParam("end_time");
328     final Long end_time = DateTime.parseDateTimeString(end, "");
329     // TODO - fix for ms support in the future
330     note.setEndTime(end_time / 1000);
331 
332     final String description = query.getQueryStringParam("description");
333     if (description != null) {
334       note.setDescription(description);
335     }
336 
337     final String notes = query.getQueryStringParam("notes");
338     if (notes != null) {
339       note.setNotes(notes);
340     }
341 
342     return note;
343   }
344 
fetchSingleAnnotation(final TSDB tsdb, final Annotation note, final HttpQuery query)345   private void fetchSingleAnnotation(final TSDB tsdb, final Annotation note,
346                                      final HttpQuery query) throws Exception {
347     final Annotation stored_annotation =
348       Annotation.getAnnotation(tsdb, note.getTSUID(), note.getStartTime())
349         .joinUninterruptibly();
350     if (stored_annotation == null) {
351       throw new BadRequestException(HttpResponseStatus.NOT_FOUND,
352           "Unable to locate annotation in storage");
353     }
354     query.sendReply(query.serializer().formatAnnotationV1(stored_annotation));
355   }
356 
fetchMultipleAnnotations(final TSDB tsdb, final Annotation note, final HttpQuery query)357   private void fetchMultipleAnnotations(final TSDB tsdb, final Annotation note,
358                                         final HttpQuery query) throws Exception {
359     if (note.getEndTime() == 0) {
360       note.setEndTime(System.currentTimeMillis());
361     }
362     final List<Annotation> annotations =
363       Annotation.getGlobalAnnotations(tsdb, note.getStartTime(), note.getEndTime())
364         .joinUninterruptibly();
365     if (annotations == null) {
366       throw new BadRequestException(HttpResponseStatus.NOT_FOUND,
367           "Unable to locate annotations in storage");
368     }
369     query.sendReply(query.serializer().formatAnnotationsV1(annotations));
370   }
371 
372   /**
373    * Parses a query string for a bulk delet request
374    * @param query The query to parse
375    * @return A bulk delete query
376    */
parseBulkDeleteQS(final HttpQuery query)377   private AnnotationBulkDelete parseBulkDeleteQS(final HttpQuery query) {
378     final AnnotationBulkDelete settings = new AnnotationBulkDelete();
379     settings.start_time = query.getRequiredQueryStringParam("start_time");
380     settings.end_time = query.getQueryStringParam("end_time");
381 
382     if (query.hasQueryStringParam("tsuids")) {
383       String[] tsuids = query.getQueryStringParam("tsuids").split(",");
384       settings.tsuids = new ArrayList<String>(tsuids.length);
385       for (String tsuid : tsuids) {
386         settings.tsuids.add(tsuid.trim());
387       }
388     }
389 
390     if (query.hasQueryStringParam("global")) {
391       settings.global = true;
392     }
393     return settings;
394   }
395 
396   /**
397    * Represents a bulk annotation delete query. Either one or more TSUIDs must
398    * be supplied or the global flag can be set to determine what annotations
399    * are purged. Both may be set in one request. Annotations for the time
400    * between and including the start and end times will be removed based on
401    * the annotation's recorded start time.
402    */
403   public static class AnnotationBulkDelete {
404     /** The start time, may be relative, absolute or unixy */
405     private String start_time;
406     /** An option end time. If not set, current time is used */
407     private String end_time;
408     /** Optional list of TSUIDs */
409     private List<String> tsuids;
410     /** Optional flag to determine whether global notes for the range should be
411      *  purged */
412     private boolean global;
413     /** Total number of items deleted (for later response to the user) */
414     private long total_deleted;
415 
416     /**
417      * Default ctor for Jackson
418      */
AnnotationBulkDelete()419     public AnnotationBulkDelete() {
420 
421     }
422 
423     /** @return The start timestamp in milliseconds */
getStartTime()424     public long getStartTime() {
425       return DateTime.parseDateTimeString(start_time, null);
426     }
427 
428     /** @return The ending timestamp in milliseconds. If it wasn't set, the
429      * current time is returned */
getEndTime()430     public long getEndTime() {
431       if (end_time == null || end_time.isEmpty()) {
432         return System.currentTimeMillis();
433       }
434       return DateTime.parseDateTimeString(end_time, null);
435     }
436 
437     /** @return List of TSUIDs to delete annotations for (may be NULL) */
getTsuids()438     public List<String> getTsuids() {
439       return tsuids;
440     }
441 
442     /** @return Whether or not global annotations for the span should be purged */
getGlobal()443     public boolean getGlobal() {
444       return global;
445     }
446 
447     /** @return The total number of annotations matched and deleted */
getTotalDeleted()448     public long getTotalDeleted() {
449       return total_deleted;
450     }
451 
452     /** @param start_time Start time for the range. May be relative, absolute
453      * or unixy in seconds or milliseconds */
setStartTime(String start_time)454     public void setStartTime(String start_time) {
455       this.start_time = start_time;
456     }
457 
458     /** @param end_time Optional end time to set for the range. Similar to start */
setEndTime(String end_time)459     public void setEndTime(String end_time) {
460       this.end_time = end_time;
461     }
462 
463     /** @param tsuids A list of TSUIDs to scan for annotations */
setTsuids(List<String> tsuids)464     public void setTsuids(List<String> tsuids) {
465       this.tsuids = tsuids;
466     }
467 
468     /** @param global Whether or not to delete global annotations for the range */
setGlobal(boolean global)469     public void setGlobal(boolean global) {
470       this.global = global;
471     }
472 
473     /** @param total_deleted Total number of annotations deleted */
setTotalDeleted(long total_deleted)474     public void setTotalDeleted(long total_deleted) {
475       this.total_deleted = total_deleted;
476     }
477 
478   }
479 }
480