1 // This file is part of OpenTSDB. 2 // Copyright (C) 2015 The OpenTSDB Authors. 3 // 4 // This program is free software: you can redistribute it and/or modify it 5 // under the terms of the GNU Lesser General Public License as published by 6 // the Free Software Foundation, either version 2.1 of the License, or (at your 7 // option) any later version. This program is distributed in the hope that it 8 // will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty 9 // of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser 10 // General Public License for more details. You should have received a copy 11 // of the GNU Lesser General Public License along with this program. If not, 12 // see <http://www.gnu.org/licenses/>. 13 package net.opentsdb.core; 14 15 import java.util.Calendar; 16 import java.util.NoSuchElementException; 17 18 import net.opentsdb.utils.DateTime; 19 20 /** 21 * A specialized downsampler that returns special values, based on the fill 22 * policy, for intervals for which no data could be found. The default 23 * implementation, {@link Downsampler}, simply skips intervals that have no 24 * data, which causes the {@link AggregationIterator} up the chain to 25 * interpolate. 26 * @since 2.2 27 */ 28 public class FillingDownsampler extends Downsampler { 29 /** Track when the downsampled data should end. */ 30 protected long end_timestamp; 31 32 /** An optional calendar set to the current timestamp for the data point */ 33 private final Calendar previous_calendar; 34 35 /** An optional calendar set to the end of the interval timestamp */ 36 private final Calendar next_calendar; 37 38 /** 39 * Create a new nulling downsampler. 40 * @param source The iterator to access the underlying data. 41 * @param start_time The time in milliseconds at which the data begins. 42 * @param end_time The time in milliseconds at which the data ends. 43 * @param interval_ms The interval in milli seconds wanted between each data 44 * point. 45 * @param downsampler The downsampling function to use. 46 * @param fill_policy Policy specifying whether to interpolate or to fill 47 * missing intervals with special values. 48 * @throws IllegalArgumentException if fill_policy is interpolation. 49 * @deprecated as of 2.3 50 */ FillingDownsampler(final SeekableView source, final long start_time, final long end_time, final long interval_ms, final Aggregator downsampler, final FillPolicy fill_policy)51 FillingDownsampler(final SeekableView source, final long start_time, 52 final long end_time, final long interval_ms, 53 final Aggregator downsampler, final FillPolicy fill_policy) { 54 this(source, start_time, end_time, 55 new DownsamplingSpecification(interval_ms, downsampler, fill_policy) 56 , 0, 0); 57 } 58 59 /** 60 * Create a new filling downsampler. 61 * @param source The iterator to access the underlying data. 62 * @param start_time The time in milliseconds at which the data begins. 63 * @param end_time The time in milliseconds at which the data ends. 64 * @param specification The downsampling spec to use 65 * @param query_start The start timestamp of the actual query for use with "all" 66 * @param query_end The end timestamp of the actual query for use with "all" 67 * @throws IllegalArgumentException if fill_policy is interpolation. 68 * @since 2.3 69 */ FillingDownsampler(final SeekableView source, final long start_time, final long end_time, final DownsamplingSpecification specification, final long query_start, final long end_start)70 FillingDownsampler(final SeekableView source, final long start_time, 71 final long end_time, final DownsamplingSpecification specification, 72 final long query_start, final long end_start) { 73 // Lean on the superclass implementation. 74 super(source, specification, query_start, end_start); 75 76 // Ensure we aren't given a bogus fill policy. 77 if (FillPolicy.NONE == specification.getFillPolicy()) { 78 throw new IllegalArgumentException("Cannot instantiate this class with" + 79 " linear-interpolation fill policy"); 80 } 81 82 // Use the values-in-interval object to align the timestamps at which we 83 // expect data to arrive for the first and last intervals. 84 if (run_all) { 85 timestamp = start_time; 86 end_timestamp = end_time; 87 previous_calendar = next_calendar = null; 88 } else if (specification.useCalendar()) { 89 previous_calendar = DateTime.previousInterval(start_time, interval, unit, 90 specification.getTimezone()); 91 if (unit == WEEK_UNIT) { 92 previous_calendar.add(DAY_UNIT, -(interval * WEEK_LENGTH)); 93 } else { 94 previous_calendar.add(unit, -interval); 95 } 96 next_calendar = DateTime.previousInterval(start_time, interval, unit, 97 specification.getTimezone()); 98 99 final Calendar end_calendar = DateTime.previousInterval( 100 end_time, interval, unit, specification.getTimezone()); 101 if (end_calendar.getTimeInMillis() == next_calendar.getTimeInMillis()) { 102 // advance once 103 if (unit == WEEK_UNIT) { 104 end_calendar.add(DAY_UNIT, interval * WEEK_LENGTH); 105 } else { 106 end_calendar.add(unit, interval); 107 } 108 } 109 timestamp = next_calendar.getTimeInMillis(); 110 end_timestamp = end_calendar.getTimeInMillis(); 111 } else { 112 // Use the values-in-interval object to align the timestamps at which we 113 // expect data to arrive for the first and last intervals. 114 timestamp = values_in_interval.alignTimestamp(start_time); 115 end_timestamp = values_in_interval.alignTimestamp(end_time); 116 previous_calendar = next_calendar = null; 117 } 118 } 119 120 /** 121 * Please note that when this method returns true, the value yielded by the 122 * object returned by {@link #next()} might be NaN, which indicates no data 123 * could be found for the current interval. 124 * @return true if this iterator has not yet reached the end of the specified 125 * range of data; otherwise, false. 126 */ 127 @Override hasNext()128 public boolean hasNext() { 129 // No matter the state of the values-in-interval object, if our current 130 // timestamp hasn't reached the end of the requested overall interval, then 131 // we still have iterating to do. 132 if (run_all) { 133 return values_in_interval.hasNextValue(); 134 } 135 return timestamp < end_timestamp; 136 } 137 138 /** 139 * Please note that the object returned by this method may return the value 140 * NaN, which indicates that no data count be found for the interval. This is 141 * intentional. Future intervals, if any, may still hava data and thus yield 142 * non-NaN values. 143 * @return the next data point, which might yield a NaN value. 144 * @throws NoSuchElementException if no more intervals remain. 145 */ 146 @Override next()147 public DataPoint next() { 148 // Don't proceed if we've already completed iteration. 149 if (hasNext()) { 150 // Ensure that the timestamp we request is valid. 151 values_in_interval.initializeIfNotDone(); 152 153 // Skip any leading data outside the query bounds. 154 long actual = values_in_interval.hasNextValue() ? 155 values_in_interval.getIntervalTimestamp() : Long.MAX_VALUE; 156 157 while (!run_all && values_in_interval.hasNextValue() 158 && actual < timestamp) { 159 // The actual timestamp precedes our expected, so there's data in the 160 // values-in-interval object that we wish to ignore. 161 specification.getFunction().runDouble(values_in_interval); 162 values_in_interval.moveToNextInterval(); 163 actual = values_in_interval.getIntervalTimestamp(); 164 } 165 166 // Check whether the timestamp of the calculation interval matches what 167 // we expect. 168 if (run_all || actual == timestamp) { 169 // The calculated interval timestamp matches what we expect, so we can 170 // do normal processing. 171 value = specification.getFunction().runDouble(values_in_interval); 172 values_in_interval.moveToNextInterval(); 173 } else { 174 // Our expected timestamp precedes the actual, so the interval is 175 // missing. We will use a special value, based on the fill policy, to 176 // represent this case. 177 switch (specification.getFillPolicy()) { 178 case NOT_A_NUMBER: 179 case NULL: 180 value = Double.NaN; 181 break; 182 183 case ZERO: 184 value = 0.0; 185 break; 186 187 default: 188 throw new RuntimeException("unhandled fill policy"); 189 } 190 } 191 192 // Advance the expected timestamp to the next interval. 193 if (!run_all) { 194 if (specification.useCalendar()) { 195 if (unit == WEEK_UNIT) { 196 previous_calendar.add(DAY_UNIT, interval * WEEK_LENGTH); 197 next_calendar.add(DAY_UNIT, interval * WEEK_LENGTH); 198 } else { 199 previous_calendar.add(unit, interval); 200 next_calendar.add(unit, interval); 201 } 202 timestamp = next_calendar.getTimeInMillis(); 203 } else { 204 timestamp += specification.getInterval(); 205 } 206 } 207 208 // This object also represents the data. 209 return this; 210 } 211 212 // Ideally, the user will not call this method when no data remains, but 213 // we can't enforce that. 214 throw new NoSuchElementException("no more data points in " + this); 215 } 216 217 @Override timestamp()218 public long timestamp() { 219 if (run_all) { 220 return query_start; 221 } else if (specification.useCalendar()) { 222 return previous_calendar.getTimeInMillis(); 223 } 224 return timestamp - specification.getInterval(); 225 } 226 } 227 228