1 // This file is part of OpenTSDB. 2 // Copyright (C) 2013 The OpenTSDB Authors. 3 // 4 // This program is free software: you can redistribute it and/or modify it 5 // under the terms of the GNU Lesser General Public License as published by 6 // the Free Software Foundation, either version 2.1 of the License, or (at your 7 // option) any later version. This program is distributed in the hope that it 8 // will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty 9 // of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser 10 // General Public License for more details. You should have received a copy 11 // of the GNU Lesser General Public License along with this program. If not, 12 // see <http://www.gnu.org/licenses/>. 13 package net.opentsdb.tsd; 14 15 import java.util.ArrayList; 16 import java.util.Arrays; 17 import java.util.HashMap; 18 import java.util.List; 19 import java.util.Map; 20 import java.util.concurrent.ConcurrentHashMap; 21 22 import org.jboss.netty.handler.codec.http.HttpMethod; 23 import org.jboss.netty.handler.codec.http.HttpResponseStatus; 24 25 import com.stumbleupon.async.Callback; 26 import com.stumbleupon.async.Deferred; 27 import com.stumbleupon.async.DeferredGroupException; 28 29 import net.opentsdb.core.RowKey; 30 import net.opentsdb.core.TSDB; 31 import net.opentsdb.core.Tags; 32 import net.opentsdb.search.SearchQuery; 33 import net.opentsdb.search.TimeSeriesLookup; 34 import net.opentsdb.search.SearchQuery.SearchType; 35 import net.opentsdb.uid.NoSuchUniqueId; 36 import net.opentsdb.uid.NoSuchUniqueName; 37 import net.opentsdb.uid.UniqueId; 38 import net.opentsdb.uid.UniqueId.UniqueIdType; 39 import net.opentsdb.utils.Exceptions; 40 import net.opentsdb.utils.Pair; 41 42 /** 43 * Handles very basic search calls by passing the user's query to the configured 44 * search plugin and pushing the response back through the serializers. 45 * Also allows for time series lookups given a metric, tag name, tag value or 46 * combination thereof using the tsdb-meta table. 47 * @since 2.0 48 */ 49 final class SearchRpc implements HttpRpc { 50 51 /** 52 * Handles the /api/search/<type> endpoint 53 * @param tsdb The TSDB to which we belong 54 * @param query The HTTP query to work with 55 */ 56 @Override execute(TSDB tsdb, HttpQuery query)57 public void execute(TSDB tsdb, HttpQuery query) { 58 59 final HttpMethod method = query.getAPIMethod(); 60 if (method != HttpMethod.GET && method != HttpMethod.POST) { 61 throw new BadRequestException("Unsupported method: " + method.getName()); 62 } 63 64 // the uri will be /api/vX/search/<type> or /api/search/<type> 65 final String[] uri = query.explodeAPIPath(); 66 final String endpoint = uri.length > 1 ? uri[1] : ""; 67 final SearchType type; 68 final SearchQuery search_query; 69 70 try { 71 type = SearchQuery.parseSearchType(endpoint); 72 } catch (IllegalArgumentException e) { 73 throw new BadRequestException("Invalid search query type supplied", e); 74 } 75 76 if (query.hasContent()) { 77 search_query = query.serializer().parseSearchQueryV1(); 78 } else { 79 search_query = parseQueryString(query, type); 80 } 81 82 search_query.setType(type); 83 84 if (type == SearchType.LOOKUP) { 85 processLookup(tsdb, query, search_query); 86 return; 87 } 88 89 try { 90 final SearchQuery results = 91 tsdb.executeSearch(search_query).joinUninterruptibly(); 92 query.sendReply(query.serializer().formatSearchResultsV1(results)); 93 } catch (IllegalStateException e) { 94 throw new BadRequestException("Searching is not enabled", e); 95 } catch (Exception e) { 96 throw new RuntimeException(e); 97 } 98 } 99 100 /** 101 * Parses required search values from the query string 102 * @param query The HTTP query to work with 103 * @param type The type of search query requested 104 * @return A parsed SearchQuery object 105 */ parseQueryString(final HttpQuery query, final SearchType type)106 private final SearchQuery parseQueryString(final HttpQuery query, 107 final SearchType type) { 108 final SearchQuery search_query = new SearchQuery(); 109 110 if (type == SearchType.LOOKUP) { 111 final String query_string = query.getRequiredQueryStringParam("m"); 112 search_query.setTags(new ArrayList<Pair<String, String>>()); 113 114 try { 115 search_query.setMetric(Tags.parseWithMetric(query_string, 116 search_query.getTags())); 117 } catch (IllegalArgumentException e) { 118 throw new BadRequestException("Unable to parse query", e); 119 } 120 if (query.hasQueryStringParam("limit")) { 121 final String limit = query.getQueryStringParam("limit"); 122 try { 123 search_query.setLimit(Integer.parseInt(limit)); 124 } catch (NumberFormatException e) { 125 throw new BadRequestException( 126 "Unable to convert 'limit' to a valid number"); 127 } 128 } 129 return search_query; 130 } 131 132 // process a regular search query 133 search_query.setQuery(query.getRequiredQueryStringParam("query")); 134 135 if (query.hasQueryStringParam("limit")) { 136 final String limit = query.getQueryStringParam("limit"); 137 try { 138 search_query.setLimit(Integer.parseInt(limit)); 139 } catch (NumberFormatException e) { 140 throw new BadRequestException( 141 "Unable to convert 'limit' to a valid number"); 142 } 143 } 144 145 if (query.hasQueryStringParam("start_index")) { 146 final String idx = query.getQueryStringParam("start_index"); 147 try { 148 search_query.setStartIndex(Integer.parseInt(idx)); 149 } catch (NumberFormatException e) { 150 throw new BadRequestException( 151 "Unable to convert 'start_index' to a valid number"); 152 } 153 } 154 155 return search_query; 156 } 157 158 /** 159 * Processes a lookup query against the tsdb-meta table, returning (and 160 * resolving) the TSUIDs of any series that matched the query. 161 * @param tsdb The TSDB to which we belong 162 * @param query The HTTP query to work with 163 * @param search_query A search query configured with at least a metric 164 * or a list of tag pairs. If neither are set, the method will throw an error. 165 * @throws BadRequestException if the metric and tags are null or empty or 166 * a UID fails to resolve. 167 * @since 2.1 168 */ processLookup(final TSDB tsdb, final HttpQuery query, final SearchQuery search_query)169 private void processLookup(final TSDB tsdb, final HttpQuery query, 170 final SearchQuery search_query) { 171 if (search_query.getMetric() == null && 172 (search_query.getTags() == null || search_query.getTags().size() < 1)) { 173 throw new BadRequestException( 174 "Missing metric and tags. Please supply at least one value."); 175 } 176 final long start = System.currentTimeMillis(); 177 178 class MetricCB implements Callback<Object, String> { 179 final Map<String, Object> series; 180 MetricCB(final Map<String, Object> series) { 181 this.series = series; 182 } 183 184 @Override 185 public Object call(final String name) throws Exception { 186 series.put("metric", name); 187 return null; 188 } 189 } 190 191 class TagsCB implements Callback<Object, HashMap<String, String>> { 192 final Map<String, Object> series; 193 TagsCB(final Map<String, Object> series) { 194 this.series = series; 195 } 196 197 @Override 198 public Object call(final HashMap<String, String> names) throws Exception { 199 series.put("tags", names); 200 return null; 201 } 202 } 203 204 class Serialize implements Callback<Object, ArrayList<Object>> { 205 final List<Object> results; 206 Serialize(final List<Object> results) { 207 this.results = results; 208 } 209 210 @Override 211 public Object call(final ArrayList<Object> ignored) throws Exception { 212 search_query.setResults(results); 213 search_query.setTime(System.currentTimeMillis() - start); 214 query.sendReply(query.serializer().formatSearchResultsV1(search_query)); 215 return null; 216 } 217 } 218 219 class LookupCB implements Callback<Deferred<Object>, List<byte[]>> { 220 @Override 221 public Deferred<Object> call(final List<byte[]> tsuids) throws Exception { 222 final List<Object> results = new ArrayList<Object>(tsuids.size()); 223 search_query.setTotalResults(tsuids.size()); 224 225 final ArrayList<Deferred<Object>> deferreds = 226 new ArrayList<Deferred<Object>>(tsuids.size()); 227 228 for (final byte[] tsuid : tsuids) { 229 // has to be concurrent if the uid table is split across servers 230 final Map<String, Object> series = 231 new ConcurrentHashMap<String, Object>(3); 232 results.add(series); 233 234 series.put("tsuid", UniqueId.uidToString(tsuid)); 235 byte[] metric_uid = Arrays.copyOfRange(tsuid, 0, TSDB.metrics_width()); 236 deferreds.add(tsdb.getUidName(UniqueIdType.METRIC, metric_uid) 237 .addCallback(new MetricCB(series))); 238 239 final List<byte[]> tag_ids = UniqueId.getTagPairsFromTSUID(tsuid); 240 deferreds.add(Tags.resolveIdsAsync(tsdb, tag_ids) 241 .addCallback(new TagsCB(series))); 242 } 243 244 return Deferred.group(deferreds).addCallback(new Serialize(results)); 245 } 246 } 247 248 class ErrCB implements Callback<Object, Exception> { 249 @Override 250 public Object call(final Exception e) throws Exception { 251 if (e instanceof NoSuchUniqueId) { 252 query.sendReply(HttpResponseStatus.NOT_FOUND, 253 query.serializer().formatErrorV1( 254 new BadRequestException(HttpResponseStatus.NOT_FOUND, 255 "Unable to resolve one or more TSUIDs", (NoSuchUniqueId)e))); 256 } else if (e instanceof NoSuchUniqueName) { 257 query.sendReply(HttpResponseStatus.NOT_FOUND, 258 query.serializer().formatErrorV1( 259 new BadRequestException(HttpResponseStatus.NOT_FOUND, 260 "Unable to resolve one or more UIDs", (NoSuchUniqueName)e))); 261 } else if (e instanceof DeferredGroupException) { 262 final Throwable ex = Exceptions.getCause((DeferredGroupException)e); 263 if (ex instanceof NoSuchUniqueId) { 264 query.sendReply(HttpResponseStatus.NOT_FOUND, 265 query.serializer().formatErrorV1( 266 new BadRequestException(HttpResponseStatus.NOT_FOUND, 267 "Unable to resolve one or more TSUIDs", (NoSuchUniqueId)ex))); 268 } else if (ex instanceof NoSuchUniqueName) { 269 query.sendReply(HttpResponseStatus.NOT_FOUND, 270 query.serializer().formatErrorV1( 271 new BadRequestException(HttpResponseStatus.NOT_FOUND, 272 "Unable to resolve one or more UIDs", (NoSuchUniqueName)ex))); 273 } else { 274 query.sendReply(HttpResponseStatus.INTERNAL_SERVER_ERROR, 275 query.serializer().formatErrorV1( 276 new BadRequestException(HttpResponseStatus.INTERNAL_SERVER_ERROR, 277 "Unexpected exception", ex))); 278 } 279 } else { 280 query.sendReply(HttpResponseStatus.INTERNAL_SERVER_ERROR, 281 query.serializer().formatErrorV1( 282 new BadRequestException(HttpResponseStatus.INTERNAL_SERVER_ERROR, 283 "Unexpected exception", e))); 284 } 285 return null; 286 } 287 } 288 289 new TimeSeriesLookup(tsdb, search_query).lookupAsync() 290 .addCallback(new LookupCB()) 291 .addErrback(new ErrCB()); 292 } 293 } 294