1 // This file is part of OpenTSDB.
2 // Copyright (C) 2014  The OpenTSDB Authors.
3 //
4 // This program is free software: you can redistribute it and/or modify it
5 // under the terms of the GNU Lesser General Public License as published by
6 // the Free Software Foundation, either version 2.1 of the License, or (at your
7 // option) any later version.  This program is distributed in the hope that it
8 // will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
9 // of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser
10 // General Public License for more details.  You should have received a copy
11 // of the GNU Lesser General Public License along with this program.  If not,
12 // see <http://www.gnu.org/licenses/>.
13 package net.opentsdb.core;
14 
15 import java.util.Calendar;
16 import java.util.NoSuchElementException;
17 
18 import net.opentsdb.utils.DateTime;
19 
20 /**
21  * Iterator that downsamples data points using an {@link Aggregator}.
22  */
23 public class Downsampler implements SeekableView, DataPoint {
24 
25   /** Matches the weekly downsampler as it requires special handling. */
26   protected final static int WEEK_UNIT = DateTime.unitsToCalendarType("w");
27   protected final static int DAY_UNIT = DateTime.unitsToCalendarType("d");
28   protected final static int WEEK_LENGTH = 7;
29 
30   /** The downsampling specification when provided */
31   protected final DownsamplingSpecification specification;
32 
33   /** The start timestamp of the actual query for use with "all" */
34   protected final long query_start;
35 
36   /** The end timestamp of the actual query for use with "all" */
37   protected final long query_end;
38 
39   /** The data source */
40   protected final SeekableView source;
41 
42   /** Iterator to iterate the values of the current interval. */
43   protected final ValuesInInterval values_in_interval;
44 
45   /** Last normalized timestamp */
46   protected long timestamp;
47 
48   /** Last value as a double */
49   protected double value;
50 
51   /** Whether or not to merge all DPs in the source into one vaalue */
52   protected final boolean run_all;
53 
54   /** The interval to use with a calendar */
55   protected final int interval;
56 
57   /** The unit to use with a calendar as a Calendar integer */
58   protected final int unit;
59 
60   /**
61    * Ctor.
62    * @param source The iterator to access the underlying data.
63    * @param interval_ms The interval in milli seconds wanted between each data
64    * point.
65    * @param downsampler The downsampling function to use.
66    * @deprecated as of 2.3
67    */
Downsampler(final SeekableView source, final long interval_ms, final Aggregator downsampler)68   Downsampler(final SeekableView source,
69               final long interval_ms,
70               final Aggregator downsampler) {
71     this.source = source;
72     if (downsampler == Aggregators.NONE) {
73       throw new IllegalArgumentException("cannot use the NONE "
74           + "aggregator for downsampling");
75     }
76     specification = new DownsamplingSpecification(interval_ms, downsampler,
77         DownsamplingSpecification.DEFAULT_FILL_POLICY);
78     values_in_interval = new ValuesInInterval();
79     query_start = 0;
80     query_end = 0;
81     interval = unit = 0;
82     run_all = false;
83   }
84 
85   /**
86    * Ctor.
87    * @param source The iterator to access the underlying data.
88    * @param specification The downsampling spec to use
89    * @param query_start The start timestamp of the actual query for use with "all"
90    * @param query_end The end timestamp of the actual query for use with "all"
91    * @since 2.3
92    */
Downsampler(final SeekableView source, final DownsamplingSpecification specification, final long query_start, final long query_end )93   Downsampler(final SeekableView source,
94               final DownsamplingSpecification specification,
95               final long query_start,
96               final long query_end
97       ) {
98     this.source = source;
99     this.specification = specification;
100     values_in_interval = new ValuesInInterval();
101     this.query_start = query_start;
102     this.query_end = query_end;
103 
104     final String s = specification.getStringInterval();
105     if (s != null && s.toLowerCase().contains("all")) {
106       run_all = true;
107       interval = unit = 0;
108     } else if (s != null && specification.useCalendar()) {
109       if (s.toLowerCase().contains("ms")) {
110         interval = Integer.parseInt(s.substring(0, s.length() - 2));
111         unit = DateTime.unitsToCalendarType(s.substring(s.length() - 2));
112       } else {
113         interval = Integer.parseInt(s.substring(0, s.length() - 1));
114         unit = DateTime.unitsToCalendarType(s.substring(s.length() - 1));
115       }
116       run_all = false;
117     } else {
118       run_all = false;
119       interval = unit = 0;
120     }
121   }
122 
123   // ------------------ //
124   // Iterator interface //
125   // ------------------ //
126 
127   @Override
hasNext()128   public boolean hasNext() {
129     return values_in_interval.hasNextValue();
130   }
131 
132   /**
133    * @throws NoSuchElementException if no data points remain.
134    */
135   @Override
next()136   public DataPoint next() {
137     if (hasNext()) {
138       value = specification.getFunction().runDouble(values_in_interval);
139       timestamp = values_in_interval.getIntervalTimestamp();
140       values_in_interval.moveToNextInterval();
141       return this;
142     }
143     throw new NoSuchElementException("no more data points in " + this);
144   }
145 
146   @Override
remove()147   public void remove() {
148     throw new UnsupportedOperationException();
149   }
150 
151   // ---------------------- //
152   // SeekableView interface //
153   // ---------------------- //
154 
155   @Override
seek(final long timestamp)156   public void seek(final long timestamp) {
157     values_in_interval.seekInterval(timestamp);
158   }
159 
160   // ------------------- //
161   // DataPoint interface //
162   // ------------------- //
163 
164   @Override
timestamp()165   public long timestamp() {
166     if (run_all) {
167       return query_start;
168     }
169     return timestamp;
170   }
171 
172   @Override
isInteger()173   public boolean isInteger() {
174     return false;
175   }
176 
177   @Override
longValue()178   public long longValue() {
179     throw new ClassCastException("Downsampled values are doubles");
180   }
181 
182   @Override
doubleValue()183   public double doubleValue() {
184     return value;
185   }
186 
187   @Override
toDouble()188   public double toDouble() {
189     return value;
190   }
191 
192   @Override
toString()193   public String toString() {
194     final StringBuilder buf = new StringBuilder();
195     buf.append("Downsampler: ")
196        .append(", downsampler=").append(specification)
197        .append(", queryStart=").append(query_start)
198        .append(", queryEnd=").append(query_end)
199        .append(", runAll=").append(run_all)
200        .append(", current data=(timestamp=").append(timestamp)
201        .append(", value=").append(value)
202        .append("), values_in_interval=").append(values_in_interval);
203    return buf.toString();
204   }
205 
206   /** Iterates source values for an interval. */
207   protected class ValuesInInterval implements Aggregator.Doubles {
208 
209     /** An optional calendar set to the current timestamp for the data point */
210     private Calendar previous_calendar;
211 
212     /** An optional calendar set to the end of the interval timestamp */
213     private Calendar next_calendar;
214 
215     /** The end of the current interval. */
216     private long timestamp_end_interval = Long.MIN_VALUE;
217 
218     /** True if the last value was successfully extracted from the source. */
219     private boolean has_next_value_from_source = false;
220 
221     /** The last data point extracted from the source. */
222     private DataPoint next_dp = null;
223 
224     /** True if it is initialized for iterating intervals. */
225     private boolean initialized = false;
226 
227     /**
228      * Constructor.
229      */
ValuesInInterval()230     protected ValuesInInterval() {
231       if (run_all) {
232         timestamp_end_interval = query_end;
233       } else if (!specification.useCalendar()) {
234         timestamp_end_interval = specification.getInterval();
235       }
236     }
237 
238     /** Initializes to iterate intervals. */
initializeIfNotDone()239     protected void initializeIfNotDone() {
240       // NOTE: Delay initialization is required to not access any data point
241       // from the source until a user requests it explicitly to avoid the severe
242       // performance penalty by accessing the unnecessary first data of a span.
243       if (!initialized) {
244         initialized = true;
245         if (source.hasNext()) {
246           moveToNextValue();
247           if (!run_all) {
248             if (specification.useCalendar()) {
249               previous_calendar = DateTime.previousInterval(next_dp.timestamp(),
250                   interval, unit, specification.getTimezone());
251               next_calendar = DateTime.previousInterval(next_dp.timestamp(),
252                   interval, unit, specification.getTimezone());
253               if (unit == WEEK_UNIT) {
254                 next_calendar.add(DAY_UNIT, interval * WEEK_LENGTH);
255               } else {
256                 next_calendar.add(unit, interval);
257               }
258               timestamp_end_interval = next_calendar.getTimeInMillis();
259             } else {
260               timestamp_end_interval = alignTimestamp(next_dp.timestamp()) +
261                   specification.getInterval();
262             }
263           }
264         }
265       }
266     }
267 
268     /** Extracts the next value from the source. */
moveToNextValue()269     private void moveToNextValue() {
270       if (source.hasNext()) {
271         has_next_value_from_source = true;
272         // filter out dps that don't match start and end for run_alls
273         if (run_all) {
274           while (source.hasNext()) {
275             next_dp = source.next();
276             if (next_dp.timestamp() < query_start) {
277               next_dp = null;
278               continue;
279             }
280             if (next_dp.timestamp() >= query_end) {
281               has_next_value_from_source = false;
282             }
283             break;
284           }
285           if (next_dp == null) {
286             has_next_value_from_source = false;
287           }
288         } else {
289           next_dp = source.next();
290         }
291       } else {
292         has_next_value_from_source = false;
293       }
294     }
295 
296     /**
297      * Resets the current interval with the interval of the timestamp of
298      * the next value read from source. It is the first value of the next
299      * interval. */
resetEndOfInterval()300     private void resetEndOfInterval() {
301       if (has_next_value_from_source && !run_all) {
302         if (specification.useCalendar()) {
303           while (next_dp.timestamp() >= timestamp_end_interval) {
304             if (unit == WEEK_UNIT) {
305               previous_calendar.add(DAY_UNIT, interval * WEEK_LENGTH);
306               next_calendar.add(DAY_UNIT, interval * WEEK_LENGTH);
307             } else {
308               previous_calendar.add(unit, interval);
309               next_calendar.add(unit, interval);
310             }
311             timestamp_end_interval = next_calendar.getTimeInMillis();
312           }
313         } else {
314           timestamp_end_interval = alignTimestamp(next_dp.timestamp()) +
315               specification.getInterval();
316         }
317       }
318     }
319 
320     /** Moves to the next available interval. */
moveToNextInterval()321     void moveToNextInterval() {
322       initializeIfNotDone();
323       resetEndOfInterval();
324     }
325 
326     /** Advances the interval iterator to the given timestamp. */
seekInterval(final long timestamp)327     void seekInterval(final long timestamp) {
328       // To make sure that the interval of the given timestamp is fully filled,
329       // rounds up the seeking timestamp to the smallest timestamp that is
330       // a multiple of the interval and is greater than or equal to the given
331       // timestamp..
332       if (run_all) {
333         source.seek(timestamp);
334       } else if (specification.useCalendar()) {
335         final Calendar seek_calendar = DateTime.previousInterval(
336             timestamp, interval, unit, specification.getTimezone());
337         if (timestamp > seek_calendar.getTimeInMillis()) {
338           if (unit == WEEK_UNIT) {
339             seek_calendar.add(DAY_UNIT, interval * WEEK_LENGTH);
340           } else {
341             seek_calendar.add(unit, interval);
342           }
343         }
344         source.seek(seek_calendar.getTimeInMillis());
345       } else {
346         source.seek(alignTimestamp(timestamp + specification.getInterval() - 1));
347       }
348       initialized = false;
349     }
350 
351     /** Returns the representative timestamp of the current interval. */
getIntervalTimestamp()352     protected long getIntervalTimestamp() {
353       // NOTE: It is well-known practice taking the start time of
354       // a downsample interval as a representative timestamp of it. It also
355       // provides the correct context for seek.
356       if (run_all) {
357         return timestamp_end_interval;
358       } else if (specification.useCalendar()) {
359         return previous_calendar.getTimeInMillis();
360       } else {
361         return alignTimestamp(timestamp_end_interval -
362             specification.getInterval());
363       }
364     }
365 
366     /** Returns timestamp aligned by interval. */
alignTimestamp(final long timestamp)367     protected long alignTimestamp(final long timestamp) {
368       return timestamp - (timestamp % specification.getInterval());
369     }
370 
371     // ---------------------- //
372     // Doubles interface //
373     // ---------------------- //
374 
375     @Override
hasNextValue()376     public boolean hasNextValue() {
377       initializeIfNotDone();
378       if (run_all) {
379         return has_next_value_from_source;
380       }
381       return has_next_value_from_source &&
382           next_dp.timestamp() < timestamp_end_interval;
383     }
384 
385     @Override
nextDoubleValue()386     public double nextDoubleValue() {
387       if (hasNextValue()) {
388         double value = next_dp.toDouble();
389         moveToNextValue();
390         return value;
391       }
392       throw new NoSuchElementException("no more values in interval of "
393           + timestamp_end_interval);
394     }
395 
396     @Override
toString()397     public String toString() {
398       final StringBuilder buf = new StringBuilder();
399       buf.append("ValuesInInterval: ")
400          .append(", timestamp_end_interval=").append(timestamp_end_interval)
401          .append(", has_next_value_from_source=")
402          .append(has_next_value_from_source)
403          .append(", previousCalendar=")
404          .append(previous_calendar == null ? "null" : previous_calendar)
405          .append(", nextCalendar=")
406          .append(next_calendar == null ? "null" : next_calendar);
407       if (has_next_value_from_source) {
408         buf.append(", nextValue=(").append(next_dp).append(')');
409       }
410       buf.append(", source=").append(source);
411       return buf.toString();
412     }
413   }
414 }
415