1 // This file is part of OpenTSDB.
2 // Copyright (C) 2015  The OpenTSDB Authors.
3 //
4 // This program is free software: you can redistribute it and/or modify it
5 // under the terms of the GNU Lesser General Public License as published by
6 // the Free Software Foundation, either version 2.1 of the License, or (at your
7 // option) any later version.  This program is distributed in the hope that it
8 // will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
9 // of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser
10 // General Public License for more details.  You should have received a copy
11 // of the GNU Lesser General Public License along with this program.  If not,
12 // see <http://www.gnu.org/licenses/>.
13 package net.opentsdb.core;
14 
15 import java.util.Calendar;
16 import java.util.NoSuchElementException;
17 
18 import net.opentsdb.utils.DateTime;
19 
20 /**
21  * A specialized downsampler that returns special values, based on the fill
22  * policy, for intervals for which no data could be found. The default
23  * implementation, {@link Downsampler}, simply skips intervals that have no
24  * data, which causes the {@link AggregationIterator} up the chain to
25  * interpolate.
26  * @since 2.2
27  */
28 public class FillingDownsampler extends Downsampler {
29   /** Track when the downsampled data should end. */
30   protected long end_timestamp;
31 
32   /** An optional calendar set to the current timestamp for the data point */
33   private final Calendar previous_calendar;
34 
35   /** An optional calendar set to the end of the interval timestamp */
36   private final Calendar next_calendar;
37 
38   /**
39    * Create a new nulling downsampler.
40    * @param source The iterator to access the underlying data.
41    * @param start_time The time in milliseconds at which the data begins.
42    * @param end_time The time in milliseconds at which the data ends.
43    * @param interval_ms The interval in milli seconds wanted between each data
44    * point.
45    * @param downsampler The downsampling function to use.
46    * @param fill_policy Policy specifying whether to interpolate or to fill
47    * missing intervals with special values.
48    * @throws IllegalArgumentException if fill_policy is interpolation.
49    * @deprecated as of 2.3
50    */
FillingDownsampler(final SeekableView source, final long start_time, final long end_time, final long interval_ms, final Aggregator downsampler, final FillPolicy fill_policy)51   FillingDownsampler(final SeekableView source, final long start_time,
52       final long end_time, final long interval_ms,
53       final Aggregator downsampler, final FillPolicy fill_policy) {
54     this(source, start_time, end_time,
55         new DownsamplingSpecification(interval_ms, downsampler, fill_policy)
56         , 0, 0);
57   }
58 
59   /**
60    * Create a new filling downsampler.
61    * @param source The iterator to access the underlying data.
62    * @param start_time The time in milliseconds at which the data begins.
63    * @param end_time The time in milliseconds at which the data ends.
64    * @param specification The downsampling spec to use
65    * @param query_start The start timestamp of the actual query for use with "all"
66    * @param query_end The end timestamp of the actual query for use with "all"
67    * @throws IllegalArgumentException if fill_policy is interpolation.
68    * @since 2.3
69    */
FillingDownsampler(final SeekableView source, final long start_time, final long end_time, final DownsamplingSpecification specification, final long query_start, final long end_start)70   FillingDownsampler(final SeekableView source, final long start_time,
71       final long end_time, final DownsamplingSpecification specification,
72       final long query_start, final long end_start) {
73     // Lean on the superclass implementation.
74     super(source, specification, query_start, end_start);
75 
76     // Ensure we aren't given a bogus fill policy.
77     if (FillPolicy.NONE == specification.getFillPolicy()) {
78       throw new IllegalArgumentException("Cannot instantiate this class with" +
79         " linear-interpolation fill policy");
80     }
81 
82     // Use the values-in-interval object to align the timestamps at which we
83     // expect data to arrive for the first and last intervals.
84     if (run_all) {
85       timestamp = start_time;
86       end_timestamp = end_time;
87       previous_calendar = next_calendar = null;
88     } else if (specification.useCalendar()) {
89       previous_calendar = DateTime.previousInterval(start_time, interval, unit,
90           specification.getTimezone());
91       if (unit == WEEK_UNIT) {
92         previous_calendar.add(DAY_UNIT, -(interval * WEEK_LENGTH));
93       } else {
94         previous_calendar.add(unit, -interval);
95       }
96       next_calendar = DateTime.previousInterval(start_time, interval, unit,
97           specification.getTimezone());
98 
99       final Calendar end_calendar = DateTime.previousInterval(
100           end_time, interval, unit, specification.getTimezone());
101       if (end_calendar.getTimeInMillis() == next_calendar.getTimeInMillis()) {
102         // advance once
103         if (unit == WEEK_UNIT) {
104           end_calendar.add(DAY_UNIT, interval * WEEK_LENGTH);
105         } else {
106           end_calendar.add(unit, interval);
107         }
108       }
109       timestamp = next_calendar.getTimeInMillis();
110       end_timestamp = end_calendar.getTimeInMillis();
111     } else {
112       // Use the values-in-interval object to align the timestamps at which we
113       // expect data to arrive for the first and last intervals.
114       timestamp = values_in_interval.alignTimestamp(start_time);
115       end_timestamp = values_in_interval.alignTimestamp(end_time);
116       previous_calendar = next_calendar = null;
117     }
118   }
119 
120   /**
121    * Please note that when this method returns true, the value yielded by the
122    * object returned by {@link #next()} might be NaN, which indicates no data
123    * could be found for the current interval.
124    * @return true if this iterator has not yet reached the end of the specified
125    * range of data; otherwise, false.
126    */
127   @Override
hasNext()128   public boolean hasNext() {
129     // No matter the state of the values-in-interval object, if our current
130     // timestamp hasn't reached the end of the requested overall interval, then
131     // we still have iterating to do.
132     if (run_all) {
133       return values_in_interval.hasNextValue();
134     }
135     return timestamp < end_timestamp;
136   }
137 
138   /**
139    * Please note that the object returned by this method may return the value
140    * NaN, which indicates that no data count be found for the interval. This is
141    * intentional. Future intervals, if any, may still hava data and thus yield
142    * non-NaN values.
143    * @return the next data point, which might yield a NaN value.
144    * @throws NoSuchElementException if no more intervals remain.
145    */
146   @Override
next()147   public DataPoint next() {
148     // Don't proceed if we've already completed iteration.
149     if (hasNext()) {
150       // Ensure that the timestamp we request is valid.
151       values_in_interval.initializeIfNotDone();
152 
153       // Skip any leading data outside the query bounds.
154       long actual = values_in_interval.hasNextValue() ?
155           values_in_interval.getIntervalTimestamp() : Long.MAX_VALUE;
156 
157       while (!run_all && values_in_interval.hasNextValue()
158           && actual < timestamp) {
159         // The actual timestamp precedes our expected, so there's data in the
160         // values-in-interval object that we wish to ignore.
161         specification.getFunction().runDouble(values_in_interval);
162         values_in_interval.moveToNextInterval();
163         actual = values_in_interval.getIntervalTimestamp();
164       }
165 
166       // Check whether the timestamp of the calculation interval matches what
167       // we expect.
168       if (run_all || actual == timestamp) {
169         // The calculated interval timestamp matches what we expect, so we can
170         // do normal processing.
171         value = specification.getFunction().runDouble(values_in_interval);
172         values_in_interval.moveToNextInterval();
173       } else {
174         // Our expected timestamp precedes the actual, so the interval is
175         // missing. We will use a special value, based on the fill policy, to
176         // represent this case.
177         switch (specification.getFillPolicy()) {
178         case NOT_A_NUMBER:
179         case NULL:
180           value = Double.NaN;
181           break;
182 
183         case ZERO:
184           value = 0.0;
185           break;
186 
187         default:
188           throw new RuntimeException("unhandled fill policy");
189         }
190       }
191 
192       // Advance the expected timestamp to the next interval.
193       if (!run_all) {
194         if (specification.useCalendar()) {
195           if (unit == WEEK_UNIT) {
196             previous_calendar.add(DAY_UNIT, interval * WEEK_LENGTH);
197             next_calendar.add(DAY_UNIT, interval * WEEK_LENGTH);
198           } else {
199             previous_calendar.add(unit, interval);
200             next_calendar.add(unit, interval);
201           }
202           timestamp = next_calendar.getTimeInMillis();
203         } else {
204           timestamp += specification.getInterval();
205         }
206       }
207 
208       // This object also represents the data.
209       return this;
210     }
211 
212     // Ideally, the user will not call this method when no data remains, but
213     // we can't enforce that.
214     throw new NoSuchElementException("no more data points in " + this);
215   }
216 
217   @Override
timestamp()218   public long timestamp() {
219     if (run_all) {
220       return query_start;
221     } else if (specification.useCalendar()) {
222       return previous_calendar.getTimeInMillis();
223     }
224     return timestamp - specification.getInterval();
225   }
226 }
227 
228