1 // This file is part of OpenTSDB. 2 // Copyright (C) 2014 The OpenTSDB Authors. 3 // 4 // This program is free software: you can redistribute it and/or modify it 5 // under the terms of the GNU Lesser General Public License as published by 6 // the Free Software Foundation, either version 2.1 of the License, or (at your 7 // option) any later version. This program is distributed in the hope that it 8 // will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty 9 // of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser 10 // General Public License for more details. You should have received a copy 11 // of the GNU Lesser General Public License along with this program. If not, 12 // see <http://www.gnu.org/licenses/>. 13 package net.opentsdb.core; 14 15 import java.util.Calendar; 16 import java.util.NoSuchElementException; 17 18 import net.opentsdb.utils.DateTime; 19 20 /** 21 * Iterator that downsamples data points using an {@link Aggregator}. 22 */ 23 public class Downsampler implements SeekableView, DataPoint { 24 25 /** Matches the weekly downsampler as it requires special handling. */ 26 protected final static int WEEK_UNIT = DateTime.unitsToCalendarType("w"); 27 protected final static int DAY_UNIT = DateTime.unitsToCalendarType("d"); 28 protected final static int WEEK_LENGTH = 7; 29 30 /** The downsampling specification when provided */ 31 protected final DownsamplingSpecification specification; 32 33 /** The start timestamp of the actual query for use with "all" */ 34 protected final long query_start; 35 36 /** The end timestamp of the actual query for use with "all" */ 37 protected final long query_end; 38 39 /** The data source */ 40 protected final SeekableView source; 41 42 /** Iterator to iterate the values of the current interval. */ 43 protected final ValuesInInterval values_in_interval; 44 45 /** Last normalized timestamp */ 46 protected long timestamp; 47 48 /** Last value as a double */ 49 protected double value; 50 51 /** Whether or not to merge all DPs in the source into one vaalue */ 52 protected final boolean run_all; 53 54 /** The interval to use with a calendar */ 55 protected final int interval; 56 57 /** The unit to use with a calendar as a Calendar integer */ 58 protected final int unit; 59 60 /** 61 * Ctor. 62 * @param source The iterator to access the underlying data. 63 * @param interval_ms The interval in milli seconds wanted between each data 64 * point. 65 * @param downsampler The downsampling function to use. 66 * @deprecated as of 2.3 67 */ Downsampler(final SeekableView source, final long interval_ms, final Aggregator downsampler)68 Downsampler(final SeekableView source, 69 final long interval_ms, 70 final Aggregator downsampler) { 71 this.source = source; 72 if (downsampler == Aggregators.NONE) { 73 throw new IllegalArgumentException("cannot use the NONE " 74 + "aggregator for downsampling"); 75 } 76 specification = new DownsamplingSpecification(interval_ms, downsampler, 77 DownsamplingSpecification.DEFAULT_FILL_POLICY); 78 values_in_interval = new ValuesInInterval(); 79 query_start = 0; 80 query_end = 0; 81 interval = unit = 0; 82 run_all = false; 83 } 84 85 /** 86 * Ctor. 87 * @param source The iterator to access the underlying data. 88 * @param specification The downsampling spec to use 89 * @param query_start The start timestamp of the actual query for use with "all" 90 * @param query_end The end timestamp of the actual query for use with "all" 91 * @since 2.3 92 */ Downsampler(final SeekableView source, final DownsamplingSpecification specification, final long query_start, final long query_end )93 Downsampler(final SeekableView source, 94 final DownsamplingSpecification specification, 95 final long query_start, 96 final long query_end 97 ) { 98 this.source = source; 99 this.specification = specification; 100 values_in_interval = new ValuesInInterval(); 101 this.query_start = query_start; 102 this.query_end = query_end; 103 104 final String s = specification.getStringInterval(); 105 if (s != null && s.toLowerCase().contains("all")) { 106 run_all = true; 107 interval = unit = 0; 108 } else if (s != null && specification.useCalendar()) { 109 if (s.toLowerCase().contains("ms")) { 110 interval = Integer.parseInt(s.substring(0, s.length() - 2)); 111 unit = DateTime.unitsToCalendarType(s.substring(s.length() - 2)); 112 } else { 113 interval = Integer.parseInt(s.substring(0, s.length() - 1)); 114 unit = DateTime.unitsToCalendarType(s.substring(s.length() - 1)); 115 } 116 run_all = false; 117 } else { 118 run_all = false; 119 interval = unit = 0; 120 } 121 } 122 123 // ------------------ // 124 // Iterator interface // 125 // ------------------ // 126 127 @Override hasNext()128 public boolean hasNext() { 129 return values_in_interval.hasNextValue(); 130 } 131 132 /** 133 * @throws NoSuchElementException if no data points remain. 134 */ 135 @Override next()136 public DataPoint next() { 137 if (hasNext()) { 138 value = specification.getFunction().runDouble(values_in_interval); 139 timestamp = values_in_interval.getIntervalTimestamp(); 140 values_in_interval.moveToNextInterval(); 141 return this; 142 } 143 throw new NoSuchElementException("no more data points in " + this); 144 } 145 146 @Override remove()147 public void remove() { 148 throw new UnsupportedOperationException(); 149 } 150 151 // ---------------------- // 152 // SeekableView interface // 153 // ---------------------- // 154 155 @Override seek(final long timestamp)156 public void seek(final long timestamp) { 157 values_in_interval.seekInterval(timestamp); 158 } 159 160 // ------------------- // 161 // DataPoint interface // 162 // ------------------- // 163 164 @Override timestamp()165 public long timestamp() { 166 if (run_all) { 167 return query_start; 168 } 169 return timestamp; 170 } 171 172 @Override isInteger()173 public boolean isInteger() { 174 return false; 175 } 176 177 @Override longValue()178 public long longValue() { 179 throw new ClassCastException("Downsampled values are doubles"); 180 } 181 182 @Override doubleValue()183 public double doubleValue() { 184 return value; 185 } 186 187 @Override toDouble()188 public double toDouble() { 189 return value; 190 } 191 192 @Override toString()193 public String toString() { 194 final StringBuilder buf = new StringBuilder(); 195 buf.append("Downsampler: ") 196 .append(", downsampler=").append(specification) 197 .append(", queryStart=").append(query_start) 198 .append(", queryEnd=").append(query_end) 199 .append(", runAll=").append(run_all) 200 .append(", current data=(timestamp=").append(timestamp) 201 .append(", value=").append(value) 202 .append("), values_in_interval=").append(values_in_interval); 203 return buf.toString(); 204 } 205 206 /** Iterates source values for an interval. */ 207 protected class ValuesInInterval implements Aggregator.Doubles { 208 209 /** An optional calendar set to the current timestamp for the data point */ 210 private Calendar previous_calendar; 211 212 /** An optional calendar set to the end of the interval timestamp */ 213 private Calendar next_calendar; 214 215 /** The end of the current interval. */ 216 private long timestamp_end_interval = Long.MIN_VALUE; 217 218 /** True if the last value was successfully extracted from the source. */ 219 private boolean has_next_value_from_source = false; 220 221 /** The last data point extracted from the source. */ 222 private DataPoint next_dp = null; 223 224 /** True if it is initialized for iterating intervals. */ 225 private boolean initialized = false; 226 227 /** 228 * Constructor. 229 */ ValuesInInterval()230 protected ValuesInInterval() { 231 if (run_all) { 232 timestamp_end_interval = query_end; 233 } else if (!specification.useCalendar()) { 234 timestamp_end_interval = specification.getInterval(); 235 } 236 } 237 238 /** Initializes to iterate intervals. */ initializeIfNotDone()239 protected void initializeIfNotDone() { 240 // NOTE: Delay initialization is required to not access any data point 241 // from the source until a user requests it explicitly to avoid the severe 242 // performance penalty by accessing the unnecessary first data of a span. 243 if (!initialized) { 244 initialized = true; 245 if (source.hasNext()) { 246 moveToNextValue(); 247 if (!run_all) { 248 if (specification.useCalendar()) { 249 previous_calendar = DateTime.previousInterval(next_dp.timestamp(), 250 interval, unit, specification.getTimezone()); 251 next_calendar = DateTime.previousInterval(next_dp.timestamp(), 252 interval, unit, specification.getTimezone()); 253 if (unit == WEEK_UNIT) { 254 next_calendar.add(DAY_UNIT, interval * WEEK_LENGTH); 255 } else { 256 next_calendar.add(unit, interval); 257 } 258 timestamp_end_interval = next_calendar.getTimeInMillis(); 259 } else { 260 timestamp_end_interval = alignTimestamp(next_dp.timestamp()) + 261 specification.getInterval(); 262 } 263 } 264 } 265 } 266 } 267 268 /** Extracts the next value from the source. */ moveToNextValue()269 private void moveToNextValue() { 270 if (source.hasNext()) { 271 has_next_value_from_source = true; 272 // filter out dps that don't match start and end for run_alls 273 if (run_all) { 274 while (source.hasNext()) { 275 next_dp = source.next(); 276 if (next_dp.timestamp() < query_start) { 277 next_dp = null; 278 continue; 279 } 280 if (next_dp.timestamp() >= query_end) { 281 has_next_value_from_source = false; 282 } 283 break; 284 } 285 if (next_dp == null) { 286 has_next_value_from_source = false; 287 } 288 } else { 289 next_dp = source.next(); 290 } 291 } else { 292 has_next_value_from_source = false; 293 } 294 } 295 296 /** 297 * Resets the current interval with the interval of the timestamp of 298 * the next value read from source. It is the first value of the next 299 * interval. */ resetEndOfInterval()300 private void resetEndOfInterval() { 301 if (has_next_value_from_source && !run_all) { 302 if (specification.useCalendar()) { 303 while (next_dp.timestamp() >= timestamp_end_interval) { 304 if (unit == WEEK_UNIT) { 305 previous_calendar.add(DAY_UNIT, interval * WEEK_LENGTH); 306 next_calendar.add(DAY_UNIT, interval * WEEK_LENGTH); 307 } else { 308 previous_calendar.add(unit, interval); 309 next_calendar.add(unit, interval); 310 } 311 timestamp_end_interval = next_calendar.getTimeInMillis(); 312 } 313 } else { 314 timestamp_end_interval = alignTimestamp(next_dp.timestamp()) + 315 specification.getInterval(); 316 } 317 } 318 } 319 320 /** Moves to the next available interval. */ moveToNextInterval()321 void moveToNextInterval() { 322 initializeIfNotDone(); 323 resetEndOfInterval(); 324 } 325 326 /** Advances the interval iterator to the given timestamp. */ seekInterval(final long timestamp)327 void seekInterval(final long timestamp) { 328 // To make sure that the interval of the given timestamp is fully filled, 329 // rounds up the seeking timestamp to the smallest timestamp that is 330 // a multiple of the interval and is greater than or equal to the given 331 // timestamp.. 332 if (run_all) { 333 source.seek(timestamp); 334 } else if (specification.useCalendar()) { 335 final Calendar seek_calendar = DateTime.previousInterval( 336 timestamp, interval, unit, specification.getTimezone()); 337 if (timestamp > seek_calendar.getTimeInMillis()) { 338 if (unit == WEEK_UNIT) { 339 seek_calendar.add(DAY_UNIT, interval * WEEK_LENGTH); 340 } else { 341 seek_calendar.add(unit, interval); 342 } 343 } 344 source.seek(seek_calendar.getTimeInMillis()); 345 } else { 346 source.seek(alignTimestamp(timestamp + specification.getInterval() - 1)); 347 } 348 initialized = false; 349 } 350 351 /** Returns the representative timestamp of the current interval. */ getIntervalTimestamp()352 protected long getIntervalTimestamp() { 353 // NOTE: It is well-known practice taking the start time of 354 // a downsample interval as a representative timestamp of it. It also 355 // provides the correct context for seek. 356 if (run_all) { 357 return timestamp_end_interval; 358 } else if (specification.useCalendar()) { 359 return previous_calendar.getTimeInMillis(); 360 } else { 361 return alignTimestamp(timestamp_end_interval - 362 specification.getInterval()); 363 } 364 } 365 366 /** Returns timestamp aligned by interval. */ alignTimestamp(final long timestamp)367 protected long alignTimestamp(final long timestamp) { 368 return timestamp - (timestamp % specification.getInterval()); 369 } 370 371 // ---------------------- // 372 // Doubles interface // 373 // ---------------------- // 374 375 @Override hasNextValue()376 public boolean hasNextValue() { 377 initializeIfNotDone(); 378 if (run_all) { 379 return has_next_value_from_source; 380 } 381 return has_next_value_from_source && 382 next_dp.timestamp() < timestamp_end_interval; 383 } 384 385 @Override nextDoubleValue()386 public double nextDoubleValue() { 387 if (hasNextValue()) { 388 double value = next_dp.toDouble(); 389 moveToNextValue(); 390 return value; 391 } 392 throw new NoSuchElementException("no more values in interval of " 393 + timestamp_end_interval); 394 } 395 396 @Override toString()397 public String toString() { 398 final StringBuilder buf = new StringBuilder(); 399 buf.append("ValuesInInterval: ") 400 .append(", timestamp_end_interval=").append(timestamp_end_interval) 401 .append(", has_next_value_from_source=") 402 .append(has_next_value_from_source) 403 .append(", previousCalendar=") 404 .append(previous_calendar == null ? "null" : previous_calendar) 405 .append(", nextCalendar=") 406 .append(next_calendar == null ? "null" : next_calendar); 407 if (has_next_value_from_source) { 408 buf.append(", nextValue=(").append(next_dp).append(')'); 409 } 410 buf.append(", source=").append(source); 411 return buf.toString(); 412 } 413 } 414 } 415