1 // This file is part of OpenTSDB.
2 // Copyright (C) 2013  The OpenTSDB Authors.
3 //
4 // This program is free software: you can redistribute it and/or modify it
5 // under the terms of the GNU Lesser General Public License as published by
6 // the Free Software Foundation, either version 2.1 of the License, or (at your
7 // option) any later version.  This program is distributed in the hope that it
8 // will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
9 // of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser
10 // General Public License for more details.  You should have received a copy
11 // of the GNU Lesser General Public License along with this program.  If not,
12 // see <http://www.gnu.org/licenses/>.
13 package net.opentsdb.tsd;
14 
15 import java.util.ArrayList;
16 import java.util.Arrays;
17 import java.util.HashMap;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.concurrent.ConcurrentHashMap;
21 
22 import org.jboss.netty.handler.codec.http.HttpMethod;
23 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
24 
25 import com.stumbleupon.async.Callback;
26 import com.stumbleupon.async.Deferred;
27 import com.stumbleupon.async.DeferredGroupException;
28 
29 import net.opentsdb.core.RowKey;
30 import net.opentsdb.core.TSDB;
31 import net.opentsdb.core.Tags;
32 import net.opentsdb.search.SearchQuery;
33 import net.opentsdb.search.TimeSeriesLookup;
34 import net.opentsdb.search.SearchQuery.SearchType;
35 import net.opentsdb.uid.NoSuchUniqueId;
36 import net.opentsdb.uid.NoSuchUniqueName;
37 import net.opentsdb.uid.UniqueId;
38 import net.opentsdb.uid.UniqueId.UniqueIdType;
39 import net.opentsdb.utils.Exceptions;
40 import net.opentsdb.utils.Pair;
41 
42 /**
43  * Handles very basic search calls by passing the user's query to the configured
44  * search plugin and pushing the response back through the serializers.
45  * Also allows for time series lookups given a metric, tag name, tag value or
46  * combination thereof using the tsdb-meta table.
47  * @since 2.0
48  */
49 final class SearchRpc implements HttpRpc {
50 
51   /**
52    * Handles the /api/search/&lt;type&gt; endpoint
53    * @param tsdb The TSDB to which we belong
54    * @param query The HTTP query to work with
55    */
56   @Override
execute(TSDB tsdb, HttpQuery query)57   public void execute(TSDB tsdb, HttpQuery query) {
58 
59     final HttpMethod method = query.getAPIMethod();
60     if (method != HttpMethod.GET && method != HttpMethod.POST) {
61       throw new BadRequestException("Unsupported method: " + method.getName());
62     }
63 
64     // the uri will be /api/vX/search/<type> or /api/search/<type>
65     final String[] uri = query.explodeAPIPath();
66     final String endpoint = uri.length > 1 ? uri[1] : "";
67     final SearchType type;
68     final SearchQuery search_query;
69 
70     try {
71       type = SearchQuery.parseSearchType(endpoint);
72     } catch (IllegalArgumentException e) {
73       throw new BadRequestException("Invalid search query type supplied", e);
74     }
75 
76     if (query.hasContent()) {
77       search_query = query.serializer().parseSearchQueryV1();
78     } else {
79       search_query = parseQueryString(query, type);
80     }
81 
82     search_query.setType(type);
83 
84     if (type == SearchType.LOOKUP) {
85       processLookup(tsdb, query, search_query);
86       return;
87     }
88 
89     try {
90       final SearchQuery results =
91         tsdb.executeSearch(search_query).joinUninterruptibly();
92       query.sendReply(query.serializer().formatSearchResultsV1(results));
93     } catch (IllegalStateException e) {
94       throw new BadRequestException("Searching is not enabled", e);
95     } catch (Exception e) {
96       throw new RuntimeException(e);
97     }
98   }
99 
100   /**
101    * Parses required search values from the query string
102    * @param query The HTTP query to work with
103    * @param type The type of search query requested
104    * @return A parsed SearchQuery object
105    */
parseQueryString(final HttpQuery query, final SearchType type)106   private final SearchQuery parseQueryString(final HttpQuery query,
107       final SearchType type) {
108     final SearchQuery search_query = new SearchQuery();
109 
110     if (type == SearchType.LOOKUP) {
111       final String query_string = query.getRequiredQueryStringParam("m");
112       search_query.setTags(new ArrayList<Pair<String, String>>());
113 
114       try {
115       search_query.setMetric(Tags.parseWithMetric(query_string,
116           search_query.getTags()));
117       } catch (IllegalArgumentException e) {
118         throw new BadRequestException("Unable to parse query", e);
119       }
120       if (query.hasQueryStringParam("limit")) {
121         final String limit = query.getQueryStringParam("limit");
122         try {
123           search_query.setLimit(Integer.parseInt(limit));
124         } catch (NumberFormatException e) {
125           throw new BadRequestException(
126                   "Unable to convert 'limit' to a valid number");
127         }
128       }
129       return search_query;
130     }
131 
132     // process a regular search query
133     search_query.setQuery(query.getRequiredQueryStringParam("query"));
134 
135     if (query.hasQueryStringParam("limit")) {
136       final String limit = query.getQueryStringParam("limit");
137       try {
138         search_query.setLimit(Integer.parseInt(limit));
139       } catch (NumberFormatException e) {
140         throw new BadRequestException(
141             "Unable to convert 'limit' to a valid number");
142       }
143     }
144 
145     if (query.hasQueryStringParam("start_index")) {
146       final String idx = query.getQueryStringParam("start_index");
147       try {
148         search_query.setStartIndex(Integer.parseInt(idx));
149       } catch (NumberFormatException e) {
150         throw new BadRequestException(
151             "Unable to convert 'start_index' to a valid number");
152       }
153     }
154 
155     return search_query;
156   }
157 
158   /**
159    * Processes a lookup query against the tsdb-meta table, returning (and
160    * resolving) the TSUIDs of any series that matched the query.
161    * @param tsdb The TSDB to which we belong
162    * @param query The HTTP query to work with
163    * @param search_query A search query configured with at least a metric
164    * or a list of tag pairs. If neither are set, the method will throw an error.
165    * @throws BadRequestException if the metric and tags are null or empty or
166    * a UID fails to resolve.
167    * @since 2.1
168    */
processLookup(final TSDB tsdb, final HttpQuery query, final SearchQuery search_query)169   private void processLookup(final TSDB tsdb, final HttpQuery query,
170       final SearchQuery search_query) {
171     if (search_query.getMetric() == null &&
172         (search_query.getTags() == null || search_query.getTags().size() < 1)) {
173       throw new BadRequestException(
174           "Missing metric and tags. Please supply at least one value.");
175     }
176     final long start = System.currentTimeMillis();
177 
178     class MetricCB implements Callback<Object, String> {
179       final Map<String, Object> series;
180       MetricCB(final Map<String, Object> series) {
181         this.series = series;
182       }
183 
184       @Override
185       public Object call(final String name) throws Exception {
186         series.put("metric", name);
187         return null;
188       }
189     }
190 
191     class TagsCB implements Callback<Object, HashMap<String, String>> {
192       final Map<String, Object> series;
193       TagsCB(final Map<String, Object> series) {
194         this.series = series;
195       }
196 
197       @Override
198       public Object call(final HashMap<String, String> names) throws Exception {
199         series.put("tags", names);
200         return null;
201       }
202     }
203 
204     class Serialize implements Callback<Object, ArrayList<Object>> {
205       final List<Object> results;
206       Serialize(final List<Object> results) {
207         this.results = results;
208       }
209 
210       @Override
211       public Object call(final ArrayList<Object> ignored) throws Exception {
212         search_query.setResults(results);
213         search_query.setTime(System.currentTimeMillis() - start);
214         query.sendReply(query.serializer().formatSearchResultsV1(search_query));
215         return null;
216       }
217     }
218 
219     class LookupCB implements Callback<Deferred<Object>, List<byte[]>> {
220       @Override
221       public Deferred<Object> call(final List<byte[]> tsuids) throws Exception {
222         final List<Object> results = new ArrayList<Object>(tsuids.size());
223         search_query.setTotalResults(tsuids.size());
224 
225         final ArrayList<Deferred<Object>> deferreds =
226             new ArrayList<Deferred<Object>>(tsuids.size());
227 
228         for (final byte[] tsuid : tsuids) {
229           // has to be concurrent if the uid table is split across servers
230           final Map<String, Object> series =
231               new ConcurrentHashMap<String, Object>(3);
232           results.add(series);
233 
234           series.put("tsuid", UniqueId.uidToString(tsuid));
235           byte[] metric_uid = Arrays.copyOfRange(tsuid, 0, TSDB.metrics_width());
236           deferreds.add(tsdb.getUidName(UniqueIdType.METRIC, metric_uid)
237               .addCallback(new MetricCB(series)));
238 
239           final List<byte[]> tag_ids = UniqueId.getTagPairsFromTSUID(tsuid);
240           deferreds.add(Tags.resolveIdsAsync(tsdb, tag_ids)
241               .addCallback(new TagsCB(series)));
242         }
243 
244         return Deferred.group(deferreds).addCallback(new Serialize(results));
245       }
246     }
247 
248     class ErrCB implements Callback<Object, Exception> {
249       @Override
250       public Object call(final Exception e) throws Exception {
251         if (e instanceof NoSuchUniqueId) {
252           query.sendReply(HttpResponseStatus.NOT_FOUND,
253               query.serializer().formatErrorV1(
254                   new BadRequestException(HttpResponseStatus.NOT_FOUND,
255               "Unable to resolve one or more TSUIDs", (NoSuchUniqueId)e)));
256         } else if (e instanceof NoSuchUniqueName) {
257           query.sendReply(HttpResponseStatus.NOT_FOUND,
258             query.serializer().formatErrorV1(
259               new BadRequestException(HttpResponseStatus.NOT_FOUND,
260               "Unable to resolve one or more UIDs", (NoSuchUniqueName)e)));
261         } else if (e instanceof DeferredGroupException) {
262           final Throwable ex = Exceptions.getCause((DeferredGroupException)e);
263           if (ex instanceof NoSuchUniqueId) {
264             query.sendReply(HttpResponseStatus.NOT_FOUND,
265                 query.serializer().formatErrorV1(
266                     new BadRequestException(HttpResponseStatus.NOT_FOUND,
267                 "Unable to resolve one or more TSUIDs", (NoSuchUniqueId)ex)));
268           } else if (ex instanceof NoSuchUniqueName) {
269             query.sendReply(HttpResponseStatus.NOT_FOUND,
270               query.serializer().formatErrorV1(
271                 new BadRequestException(HttpResponseStatus.NOT_FOUND,
272                 "Unable to resolve one or more UIDs", (NoSuchUniqueName)ex)));
273           } else {
274             query.sendReply(HttpResponseStatus.INTERNAL_SERVER_ERROR,
275                 query.serializer().formatErrorV1(
276                     new BadRequestException(HttpResponseStatus.INTERNAL_SERVER_ERROR,
277                 "Unexpected exception", ex)));
278           }
279         } else {
280           query.sendReply(HttpResponseStatus.INTERNAL_SERVER_ERROR,
281               query.serializer().formatErrorV1(
282                   new BadRequestException(HttpResponseStatus.INTERNAL_SERVER_ERROR,
283               "Unexpected exception", e)));
284         }
285         return null;
286       }
287     }
288 
289     new TimeSeriesLookup(tsdb, search_query).lookupAsync()
290       .addCallback(new LookupCB())
291       .addErrback(new ErrCB());
292   }
293 }
294