1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3"""
4
5   Copyright 2014-2019 OpenEEmeter contributors
6
7   Licensed under the Apache License, Version 2.0 (the "License");
8   you may not use this file except in compliance with the License.
9   You may obtain a copy of the License at
10
11       http://www.apache.org/licenses/LICENSE-2.0
12
13   Unless required by applicable law or agreed to in writing, software
14   distributed under the License is distributed on an "AS IS" BASIS,
15   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   See the License for the specific language governing permissions and
17   limitations under the License.
18
19"""
20from datetime import datetime, timedelta
21from functools import partial
22
23import numpy as np
24import pandas as pd
25import pytz
26
27from .exceptions import NoBaselineDataError, NoReportingDataError
28from .warnings import EEMeterWarning
29
30
31__all__ = (
32    "Term",
33    "as_freq",
34    "day_counts",
35    "get_baseline_data",
36    "get_reporting_data",
37    "get_terms",
38    "remove_duplicates",
39    "overwrite_partial_rows_with_nan",
40    "clean_caltrack_billing_data",
41    "clean_caltrack_billing_daily_data",
42)
43
44
45def overwrite_partial_rows_with_nan(df):
46    return df.dropna().reindex(df.index)
47
48
49def remove_duplicates(df_or_series):
50    """Remove duplicate rows or values by keeping the first of each duplicate.
51
52    Parameters
53    ----------
54    df_or_series : :any:`pandas.DataFrame` or :any:`pandas.Series`
55        Pandas object from which to drop duplicate index values.
56
57    Returns
58    -------
59    deduplicated : :any:`pandas.DataFrame` or :any:`pandas.Series`
60        The deduplicated pandas object.
61    """
62    # CalTrack 2.3.2.2
63    return df_or_series[~df_or_series.index.duplicated(keep="first")]
64
65
66def as_freq(
67    data_series,
68    freq,
69    atomic_freq="1 Min",
70    series_type="cumulative",
71    include_coverage=False,
72):
73    """Resample data to a different frequency.
74
75    This method can be used to upsample or downsample meter data. The
76    assumption it makes to do so is that meter data is constant and averaged
77    over the given periods. For instance, to convert billing-period data to
78    daily data, this method first upsamples to the atomic frequency
79    (1 minute freqency, by default), "spreading" usage evenly across all
80    minutes in each period. Then it downsamples to hourly frequency and
81    returns that result. With instantaneous series, the data is copied to all
82    contiguous time intervals and the mean over `freq` is returned.
83
84    **Caveats**:
85
86     - This method gives a fair amount of flexibility in
87       resampling as long as you are OK with the assumption that usage is
88       constant over the period (this assumption is generally broken in
89       observed data at large enough frequencies, so this caveat should not be
90       taken lightly).
91
92    Parameters
93    ----------
94    data_series : :any:`pandas.Series`
95        Data to resample. Should have a :any:`pandas.DatetimeIndex`.
96    freq : :any:`str`
97        The frequency to resample to. This should be given in a form recognized
98        by the :any:`pandas.Series.resample` method.
99    atomic_freq : :any:`str`, optional
100        The "atomic" frequency of the intermediate data form. This can be
101        adjusted to a higher atomic frequency to increase speed or memory
102        performance.
103    series_type : :any:`str`, {'cumulative', ‘instantaneous’},
104        default 'cumulative'
105        Type of data sampling. 'cumulative' data can be spread over smaller
106        time intervals and is aggregated using addition (e.g. meter data).
107        'instantaneous' data is copied (not spread) over smaller time intervals
108        and is aggregated by averaging (e.g. weather data).
109    include_coverage: :any:`bool`,
110        default `False`
111        Option of whether to return a series with just the resampled values
112        or a dataframe with a column that includes percent coverage of source data
113        used for each sample.
114
115    Returns
116    -------
117    resampled_data : :any:`pandas.Series` or :any:`pandas.DataFrame`
118        Data resampled to the given frequency (optionally as a dataframe with a coverage column if `include_coverage` is used.
119    """
120    # TODO(philngo): make sure this complies with CalTRACK 2.2.2.1
121    if not isinstance(data_series, pd.Series):
122        raise ValueError(
123            "expected series, got object with class {}".format(data_series.__class__)
124        )
125    if data_series.empty:
126        return data_series
127    series = remove_duplicates(data_series)
128    target_freq = pd.Timedelta(atomic_freq)
129    timedeltas = (series.index[1:] - series.index[:-1]).append(
130        pd.TimedeltaIndex([pd.NaT])
131    )
132
133    if series_type == "cumulative":
134        spread_factor = target_freq.total_seconds() / timedeltas.total_seconds()
135        series_spread = series * spread_factor
136        atomic_series = series_spread.asfreq(atomic_freq, method="ffill")
137        resampled = atomic_series.resample(freq).sum()
138        resampled_with_nans = atomic_series.resample(freq).first()
139        n_coverage = atomic_series.resample(freq).count()
140        resampled = resampled[resampled_with_nans.notnull()].reindex(resampled.index)
141
142    elif series_type == "instantaneous":
143        atomic_series = series.asfreq(atomic_freq, method="ffill")
144        resampled = atomic_series.resample(freq).mean()
145
146    if resampled.index[-1] < series.index[-1]:
147        # this adds a null at the end using the target frequency
148        last_index = pd.date_range(resampled.index[-1], freq=freq, periods=2)[1:]
149        resampled = (
150            pd.concat([resampled, pd.Series(np.nan, index=last_index)])
151            .resample(freq)
152            .mean()
153        )
154    if include_coverage:
155        n_total = resampled.resample(atomic_freq).count().resample(freq).count()
156        resampled = resampled.to_frame("value")
157        resampled["coverage"] = n_coverage / n_total
158        return resampled
159    else:
160        return resampled
161
162
163def day_counts(index):
164    """Days between DatetimeIndex values as a :any:`pandas.Series`.
165
166    Parameters
167    ----------
168    index : :any:`pandas.DatetimeIndex`
169        The index for which to get day counts.
170
171    Returns
172    -------
173    day_counts : :any:`pandas.Series`
174        A :any:`pandas.Series` with counts of days between periods. Counts are
175        given on start dates of periods.
176    """
177    # dont affect the original data
178    index = index.copy()
179
180    if len(index) == 0:
181        return pd.Series([], index=index)
182
183    timedeltas = (index[1:] - index[:-1]).append(pd.TimedeltaIndex([pd.NaT]))
184    timedelta_days = timedeltas.total_seconds() / (60 * 60 * 24)
185
186    return pd.Series(timedelta_days, index=index)
187
188
189def _make_baseline_warnings(
190    end_inf, start_inf, data_start, data_end, start_limit, end_limit
191):
192    warnings = []
193    # warn if there is a gap at end
194    if not end_inf and data_end < end_limit:
195        warnings.append(
196            EEMeterWarning(
197                qualified_name="eemeter.get_baseline_data.gap_at_baseline_end",
198                description=(
199                    "Data does not have coverage at requested baseline end date."
200                ),
201                data={
202                    "requested_end": end_limit.isoformat(),
203                    "data_end": data_end.isoformat(),
204                },
205            )
206        )
207    # warn if there is a gap at start
208    if not start_inf and start_limit < data_start:
209        warnings.append(
210            EEMeterWarning(
211                qualified_name="eemeter.get_baseline_data.gap_at_baseline_start",
212                description=(
213                    "Data does not have coverage at requested baseline start date."
214                ),
215                data={
216                    "requested_start": start_limit.isoformat(),
217                    "data_start": data_start.isoformat(),
218                },
219            )
220        )
221    return warnings
222
223
224def get_baseline_data(
225    data,
226    start=None,
227    end=None,
228    max_days=365,
229    allow_billing_period_overshoot=False,
230    n_days_billing_period_overshoot=None,
231    ignore_billing_period_gap_for_day_count=False,
232):
233    """Filter down to baseline period data.
234
235    .. note::
236
237        For compliance with CalTRACK, set ``max_days=365`` (section 2.2.1.1).
238
239    Parameters
240    ----------
241    data : :any:`pandas.DataFrame` or :any:`pandas.Series`
242        The data to filter to baseline data. This data will be filtered down
243        to an acceptable baseline period according to the dates passed as
244        `start` and `end`, or the maximum period specified with `max_days`.
245    start : :any:`datetime.datetime`
246        A timezone-aware datetime that represents the earliest allowable start
247        date for the baseline data. The stricter of this or `max_days` is used
248        to determine the earliest allowable baseline period date.
249    end : :any:`datetime.datetime`
250        A timezone-aware datetime that represents the latest allowable end
251        date for the baseline data, i.e., the latest date for which data is
252        available before the intervention begins.
253    max_days : :any:`int`, default 365
254        The maximum length of the period. Ignored if `end` is not set.
255        The stricter of this or `start` is used to determine the earliest
256        allowable baseline period date.
257    allow_billing_period_overshoot : :any:`bool`, default False
258        If True, count `max_days` from the end of the last billing data period
259        that ends before the `end` date, rather than from the exact `end` date.
260        Otherwise use the exact `end` date as the cutoff.
261    n_days_billing_period_overshoot: :any:`int`, default None
262        If `allow_billing_period_overshoot` is set to True, this determines
263        the number of days of overshoot that will be tolerated. A value of
264        None implies that any number of days is allowed.
265    ignore_billing_period_gap_for_day_count : :any:`bool`, default False
266        If True, instead of going back `max_days` from either the
267        `end` date or end of the last billing period before that date (depending
268        on the value of the `allow_billing_period_overshoot` setting) and
269        excluding the last period that began before that date, first check to
270        see if excluding or including that period gets closer to a total of
271        `max_days` of data.
272
273        For example, with `max_days=365`, if an exact 365 period would targeted
274        Feb 15, but the billing period went from Jan 20 to Feb 20, exclude that
275        period for a total of ~360 days of data, because that's closer to 365
276        than ~390 days, which would be the total if that period was included.
277        If, on the other hand, if that period started Feb 10 and went to Mar 10,
278        include the period, because ~370 days of data is closer to than ~340.
279
280    Returns
281    -------
282    baseline_data, warnings : :any:`tuple` of (:any:`pandas.DataFrame` or :any:`pandas.Series`, :any:`list` of :any:`eemeter.EEMeterWarning`)
283        Data for only the specified baseline period and any associated warnings.
284    """
285    if max_days is not None:
286        if start is not None:
287            raise ValueError(  # pragma: no cover
288                "If max_days is set, start cannot be set: start={}, max_days={}.".format(
289                    start, max_days
290                )
291            )
292
293    start_inf = False
294    if start is None:
295        # py datetime min/max are out of range of pd.Timestamp min/max
296        start_target = pytz.UTC.localize(pd.Timestamp.min) + timedelta(days=1)
297        start_inf = True
298    else:
299        start_target = start
300
301    end_inf = False
302    if end is None:
303        end_limit = pytz.UTC.localize(pd.Timestamp.max) - timedelta(days=1)
304        end_inf = True
305    else:
306        end_limit = end
307
308    # copying prevents setting on slice warnings
309    data_before_end_limit = data[:end_limit].copy()
310    data_end = data_before_end_limit.index.max()
311
312    if ignore_billing_period_gap_for_day_count and (
313        n_days_billing_period_overshoot is None
314        or end_limit - timedelta(days=n_days_billing_period_overshoot) < data_end
315    ):
316        end_limit = data_before_end_limit.index.max()
317
318    if not end_inf and max_days is not None:
319        start_target = end_limit - timedelta(days=max_days)
320
321    if allow_billing_period_overshoot:
322        # adjust start limit to get a selection closest to max_days
323        # also consider ffill for get_loc method - always picks previous
324        try:
325            loc = data_before_end_limit.index.get_loc(start_target, method="nearest")
326        except (KeyError, IndexError):  # pragma: no cover
327            baseline_data = data_before_end_limit
328            start_limit = start_target
329        else:
330            start_limit = data_before_end_limit.index[loc]
331            baseline_data = data_before_end_limit[start_limit:].copy()
332
333    else:
334        # use hard limit for baseline start
335        start_limit = start_target
336        baseline_data = data_before_end_limit[start_limit:].copy()
337
338    if baseline_data.dropna().empty:
339        raise NoBaselineDataError()
340
341    baseline_data.iloc[-1] = np.nan
342
343    data_end = data.index.max()
344    data_start = data.index.min()
345    return (
346        baseline_data,
347        _make_baseline_warnings(
348            end_inf, start_inf, data_start, data_end, start_limit, end_limit
349        ),
350    )
351
352
353def _make_reporting_warnings(
354    end_inf, start_inf, data_start, data_end, start_limit, end_limit
355):
356    warnings = []
357    # warn if there is a gap at end
358    if not end_inf and data_end < end_limit:
359        warnings.append(
360            EEMeterWarning(
361                qualified_name="eemeter.get_reporting_data.gap_at_reporting_end",
362                description=(
363                    "Data does not have coverage at requested reporting end date."
364                ),
365                data={
366                    "requested_end": end_limit.isoformat(),
367                    "data_end": data_end.isoformat(),
368                },
369            )
370        )
371    # warn if there is a gap at start
372    if not start_inf and start_limit < data_start:
373        warnings.append(
374            EEMeterWarning(
375                qualified_name="eemeter.get_reporting_data.gap_at_reporting_start",
376                description=(
377                    "Data does not have coverage at requested reporting start date."
378                ),
379                data={
380                    "requested_start": start_limit.isoformat(),
381                    "data_start": data_start.isoformat(),
382                },
383            )
384        )
385    return warnings
386
387
388def get_reporting_data(
389    data,
390    start=None,
391    end=None,
392    max_days=365,
393    allow_billing_period_overshoot=False,
394    ignore_billing_period_gap_for_day_count=False,
395):
396    """Filter down to reporting period data.
397
398    Parameters
399    ----------
400    data : :any:`pandas.DataFrame` or :any:`pandas.Series`
401        The data to filter to reporting data. This data will be filtered down
402        to an acceptable reporting period according to the dates passed as
403        `start` and `end`, or the maximum period specified with `max_days`.
404    start : :any:`datetime.datetime`
405        A timezone-aware datetime that represents the earliest allowable start
406        date for the reporting data, i.e., the earliest date for which data is
407        available after the intervention begins.
408    end : :any:`datetime.datetime`
409        A timezone-aware datetime that represents the latest allowable end
410        date for the reporting data. The stricter of this or `max_days` is used
411        to determine the latest allowable reporting period date.
412    max_days : :any:`int`, default 365
413        The maximum length of the period. Ignored if `start` is not set.
414        The stricter of this or `end` is used to determine the latest
415        allowable reporting period date.
416    allow_billing_period_overshoot : :any:`bool`, default False
417        If True, count `max_days` from the start of the first billing data period
418        that starts after the `start` date, rather than from the exact `start` date.
419        Otherwise use the exact `start` date as the cutoff.
420    ignore_billing_period_gap_for_day_count : :any:`bool`, default False
421        If True, instead of going forward `max_days` from either the
422        `start` date or the `start` of the first billing period after that date
423        (depending on the value of the `allow_billing_period_overshoot` setting)
424        and excluding the first period that ended after that date, first check
425        to see if excluding or including that period gets closer to a total of
426        `max_days` of data.
427
428        For example, with `max_days=365`, if an exact 365 period would targeted
429        Feb 15, but the billing period went from Jan 20 to Feb 20, include that
430        period for a total of ~370 days of data, because that's closer to 365
431        than ~340 days, which would be the total if that period was excluded.
432        If, on the other hand, if that period started Feb 10 and went to Mar 10,
433        exclude the period, because ~360 days of data is closer to than ~390.
434
435    Returns
436    -------
437    reporting_data, warnings : :any:`tuple` of (:any:`pandas.DataFrame` or
438    :any:`pandas.Series`, :any:`list` of :any:`eemeter.EEMeterWarning`)
439        Data for only the specified reporting period and any associated warnings.
440    """
441    if max_days is not None:
442        if end is not None:
443            raise ValueError(  # pragma: no cover
444                "If max_days is set, end cannot be set: end={}, max_days={}.".format(
445                    end, max_days
446                )
447            )
448
449    start_inf = False
450    if start is None:
451        # py datetime min/max are out of range of pd.Timestamp min/max
452        start_limit = pytz.UTC.localize(pd.Timestamp.min) + timedelta(days=1)
453        start_inf = True
454    else:
455        start_limit = start
456
457    end_inf = False
458    if end is None:
459        end_target = pytz.UTC.localize(pd.Timestamp.max) - timedelta(days=1)
460        end_inf = True
461    else:
462        end_target = end
463
464    # copying prevents setting on slice warnings
465    data_after_start_limit = data[start_limit:].copy()
466
467    if ignore_billing_period_gap_for_day_count:
468        start_limit = data_after_start_limit.index.min()
469
470    if not start_inf and max_days is not None:
471        end_target = start_limit + timedelta(days=max_days)
472
473    if allow_billing_period_overshoot:
474        # adjust start limit to get a selection closest to max_days
475        # also consider bfill for get_loc method - always picks next
476        try:
477            loc = data_after_start_limit.index.get_loc(end_target, method="nearest")
478        except (KeyError, IndexError):  # pragma: no cover
479            reporting_data = data_after_start_limit
480            end_limit = end_target
481        else:
482            end_limit = data_after_start_limit.index[loc]
483            reporting_data = data_after_start_limit[:end_limit].copy()
484
485    else:
486        # use hard limit for baseline start
487        end_limit = end_target
488        reporting_data = data_after_start_limit[:end_limit].copy()
489
490    if reporting_data.dropna().empty:
491        raise NoReportingDataError()
492
493    reporting_data.iloc[-1] = np.nan
494
495    data_end = data.index.max()
496    data_start = data.index.min()
497    return (
498        reporting_data,
499        _make_reporting_warnings(
500            end_inf, start_inf, data_start, data_end, start_limit, end_limit
501        ),
502    )
503
504
505class Term(object):
506    """
507    The term object represents a subset of an index.
508
509    Attributes
510    ----------
511    index : :any:`pandas.DatetimeIndex`
512        The index of the term. Includes a period at the end meant to be NaN-value.
513    label : :any:`str`
514        The label for the term.
515    target_start_date : :any:`pandas.Timestamp` or :any:`datetime.datetime`
516        The start date inferred for this term from the start date and target term
517        lenths.
518    target_end_date : :any:`pandas.Timestamp` or :any:`datetime.datetime`
519        The end date inferred for this term from the start date and target term
520        lenths.
521    target_term_length_days : :any:`int`
522        The number of days targeted for this term.
523    actual_start_date : :any:`pandas.Timestamp`
524        The first date in the index.
525    actual_end_date : :any:`pandas.Timestamp`
526        The last date in the index.
527    actual_term_length_days : :any:`int`
528        The number of days between the actual start date and actual end date.
529    complete : :any:`bool`
530        True if this term is conclusively complete, such that additional data added
531        to the series would not add more data to this term.
532
533    """
534
535    def __init__(
536        self,
537        index,
538        label,
539        target_start_date,
540        target_end_date,
541        target_term_length_days,
542        actual_start_date,
543        actual_end_date,
544        actual_term_length_days,
545        complete,
546    ):
547        self.index = index
548        self.label = label
549        self.target_start_date = target_start_date
550        self.target_end_date = target_end_date
551        self.target_term_length_days = target_term_length_days
552        self.actual_start_date = actual_start_date
553        self.actual_end_date = actual_end_date
554        self.actual_term_length_days = actual_term_length_days
555        self.complete = complete
556
557    def __repr__(self):
558        return (
559            "Term(label={}, target_term_length_days={}, actual_term_length_days={},"
560            " complete={})"
561        ).format(
562            self.label,
563            self.target_term_length_days,
564            self.actual_term_length_days,
565            self.complete,
566        )
567
568
569def get_terms(index, term_lengths, term_labels=None, start=None, method="strict"):
570    """Breaks a :any:`pandas.DatetimeIndex` into consecutive terms of specified
571    lengths.
572
573    Parameters
574    ----------
575    index : :any:`pandas.DatetimeIndex`
576        The index to split into terms, generally `meter_data.index`
577        or `temperature_data.index`.
578    term_lengths : :any:`list` of :any:`int`
579        The lengths (in days) of the terms into which to split the data.
580    term_labels : :any:`list` of :any:`str`, default None
581        Labels to use for each term. List must be the same length as the
582        `term_lengths` list.
583    start : :any:`datetime.datetime`, default None
584        A timezone-aware datetime that represents the earliest allowable start
585        date for the terms. If None, use the first element of the index.
586    method: one of ['strict', 'nearest'], default 'strict'
587        The method to use to get terms.
588
589        - "strict": Ensures that the term end will come on or before the length of
590
591    Returns
592    -------
593    terms : :any:`list` of :any:`eemeter.Term`
594        A dataframe of term labels with the same :any:`pandas.DatetimeIndex`
595        given as `index`. This can be used to filter the original data into
596        terms of approximately the desired length.
597
598
599    """
600    if method == "strict":
601        get_loc_method = "pad"
602    elif method == "nearest":
603        get_loc_method = "nearest"
604    else:
605        raise ValueError(
606            "method {} not supported - use either 'strict' or 'closest'".format(method)
607        )
608
609    if not index.is_monotonic_increasing:
610        raise ValueError("get_terms requires a sorted index")
611
612    if term_labels is None:
613        term_labels = [
614            "term_{:03d}".format(i + 1) for i, term_length in enumerate(term_lengths)
615        ]
616
617    elif len(term_labels) != len(term_lengths):
618        raise ValueError(
619            "term_labels (len {}) must be the same length as term_length (len {})".format(
620                len(term_labels), len(term_lengths)
621            )
622        )
623
624    if start is None:
625        prev_start = index.min()
626    else:
627        prev_start = start
628
629    term_end_targets = [
630        prev_start + timedelta(days=sum(term_lengths[: i + 1]))
631        for i in range(len(term_lengths))
632    ]
633
634    terms = []
635    remaining_index = index[index >= prev_start]
636
637    for label, target_term_length, end_target in zip(
638        term_labels, term_lengths, term_end_targets
639    ):
640        if len(remaining_index) <= 1:
641            break
642
643        next_index = remaining_index.get_loc(end_target, method=get_loc_method)
644
645        # keep one extra index point for the end NaN - this could be confusing, but
646        # helps identify the full range of the last data point
647        term_index = remaining_index[: next_index + 1]
648
649        # find the next start
650        next_start = remaining_index[next_index]
651
652        # reset the remaining index
653        remaining_index = remaining_index[next_index:]
654
655        # There may be a better way to tell if the term is conclusively complete,
656        # but the logic here is that if there's more than one remaining point then
657        # the term must be complete - since that final point was a worse candidate
658        # than the one before it which was chosen.
659        complete = len(remaining_index) > 1
660
661        terms.append(
662            Term(
663                index=term_index,
664                label=label,
665                target_start_date=prev_start,
666                target_end_date=end_target,
667                target_term_length_days=target_term_length,
668                actual_start_date=term_index[0],
669                actual_end_date=term_index[-1],
670                actual_term_length_days=(term_index[-1] - term_index[0]).days,
671                complete=complete,
672            )
673        )
674
675        # reset the previous start
676        prev_start = next_start
677
678    return terms
679
680
681def clean_caltrack_billing_data(data, source_interval):
682    # check for empty data
683    if data["value"].dropna().empty:
684        return data[:0]
685
686    if source_interval.startswith("billing"):
687        diff = list((data.index[1:] - data.index[:-1]).days)
688        filter_ = pd.Series(diff + [np.nan], index=data.index)
689
690        # CalTRACK 2.2.3.4, 2.2.3.5
691        if source_interval == "billing_monthly":
692            data = data[
693                (filter_ <= 35) & (filter_ >= 25)  # keep these, inclusive
694            ].reindex(data.index)
695
696        # CalTRACK 2.2.3.4, 2.2.3.5
697        if source_interval == "billing_bimonthly":
698            data = data[
699                (filter_ <= 70) & (filter_ >= 25)  # keep these, inclusive
700            ].reindex(data.index)
701
702        # CalTRACK 2.2.3.1
703        """
704        Adds estimate to subsequent read if there aren't more than one estimate in a row
705        and then removes the estimated row.
706
707        Input:
708        index   value   estimated
709        1       2       False
710        2       3       False
711        3       5       True
712        4       4       False
713        5       6       True
714        6       3       True
715        7       4       False
716        8       NaN     NaN
717
718        Output:
719        index   value
720        1       2
721        2       3
722        4       9
723        5       NaN
724        7       7
725        8       NaN
726        """
727        add_estimated = []
728        remove_estimated_fixed_rows = []
729        orig_data = data.copy()
730        if "estimated" in data.columns:
731            data["unestimated_value"] = (
732                data[:-1].value[(data[:-1].estimated == False)].reindex(data.index)
733            )
734            data["estimated_value"] = (
735                data[:-1].value[(data[:-1].estimated)].reindex(data.index)
736            )
737            for i, (index, row) in enumerate(data[:-1].iterrows()):
738                # ensures there is a prev_row and previous row value is null
739                if i > 0 and pd.isnull(prev_row["unestimated_value"]):
740                    # current row value is not null
741                    add_estimated.append(prev_row["estimated_value"])
742                    if not pd.isnull(row["unestimated_value"]):
743                        # get all rows that had only estimated reads that will be
744                        # added to the subsequent row meaning this row
745                        # needs to be removed
746                        remove_estimated_fixed_rows.append(prev_index)
747                else:
748                    add_estimated.append(0)
749                prev_row = row
750                prev_index = index
751            add_estimated.append(np.nan)
752            data["value"] = data["unestimated_value"] + add_estimated
753            data = data[~data.index.isin(remove_estimated_fixed_rows)]
754            data = data[["value"]]  # remove the estimated column
755
756    # check again for empty data
757    if data.dropna().empty:
758        return data[:0]
759
760    return data
761
762
763def downsample_and_clean_caltrack_daily_data(data):
764    data = as_freq(data.value, "D", include_coverage=True)
765
766    # CalTRACK 2.2.2.1 - interpolate with average of non-null values
767    data.value[data.coverage > 0.5] = (
768        data[data.coverage > 0.5].value / data[data.coverage > 0.5].coverage
769    )
770
771    # CalTRACK 2.2.2.1 - discard days with less than 50% coverage
772    return data[data.coverage > 0.5].reindex(data.index)[["value"]]
773
774
775def clean_caltrack_billing_daily_data(data, source_interval):
776    # billing data is cleaned but not resampled
777    if source_interval.startswith("billing"):
778        # CalTRACK 2.2.3.4, 2.2.3.5
779        return clean_caltrack_billing_data(data, source_interval)
780
781    # higher intervals like daily, hourly, 30min, 15min are
782    # resampled (daily) or downsampled (hourly, 30min, 15min)
783    elif source_interval == "daily":
784        return data
785    else:
786        return downsample_and_clean_caltrack_daily_data(data)
787