1#!/usr/bin/env python 2# -*- coding: utf-8 -*- 3""" 4 5 Copyright 2014-2019 OpenEEmeter contributors 6 7 Licensed under the Apache License, Version 2.0 (the "License"); 8 you may not use this file except in compliance with the License. 9 You may obtain a copy of the License at 10 11 http://www.apache.org/licenses/LICENSE-2.0 12 13 Unless required by applicable law or agreed to in writing, software 14 distributed under the License is distributed on an "AS IS" BASIS, 15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 See the License for the specific language governing permissions and 17 limitations under the License. 18 19""" 20from datetime import datetime, timedelta 21from functools import partial 22 23import numpy as np 24import pandas as pd 25import pytz 26 27from .exceptions import NoBaselineDataError, NoReportingDataError 28from .warnings import EEMeterWarning 29 30 31__all__ = ( 32 "Term", 33 "as_freq", 34 "day_counts", 35 "get_baseline_data", 36 "get_reporting_data", 37 "get_terms", 38 "remove_duplicates", 39 "overwrite_partial_rows_with_nan", 40 "clean_caltrack_billing_data", 41 "clean_caltrack_billing_daily_data", 42) 43 44 45def overwrite_partial_rows_with_nan(df): 46 return df.dropna().reindex(df.index) 47 48 49def remove_duplicates(df_or_series): 50 """Remove duplicate rows or values by keeping the first of each duplicate. 51 52 Parameters 53 ---------- 54 df_or_series : :any:`pandas.DataFrame` or :any:`pandas.Series` 55 Pandas object from which to drop duplicate index values. 56 57 Returns 58 ------- 59 deduplicated : :any:`pandas.DataFrame` or :any:`pandas.Series` 60 The deduplicated pandas object. 61 """ 62 # CalTrack 2.3.2.2 63 return df_or_series[~df_or_series.index.duplicated(keep="first")] 64 65 66def as_freq( 67 data_series, 68 freq, 69 atomic_freq="1 Min", 70 series_type="cumulative", 71 include_coverage=False, 72): 73 """Resample data to a different frequency. 74 75 This method can be used to upsample or downsample meter data. The 76 assumption it makes to do so is that meter data is constant and averaged 77 over the given periods. For instance, to convert billing-period data to 78 daily data, this method first upsamples to the atomic frequency 79 (1 minute freqency, by default), "spreading" usage evenly across all 80 minutes in each period. Then it downsamples to hourly frequency and 81 returns that result. With instantaneous series, the data is copied to all 82 contiguous time intervals and the mean over `freq` is returned. 83 84 **Caveats**: 85 86 - This method gives a fair amount of flexibility in 87 resampling as long as you are OK with the assumption that usage is 88 constant over the period (this assumption is generally broken in 89 observed data at large enough frequencies, so this caveat should not be 90 taken lightly). 91 92 Parameters 93 ---------- 94 data_series : :any:`pandas.Series` 95 Data to resample. Should have a :any:`pandas.DatetimeIndex`. 96 freq : :any:`str` 97 The frequency to resample to. This should be given in a form recognized 98 by the :any:`pandas.Series.resample` method. 99 atomic_freq : :any:`str`, optional 100 The "atomic" frequency of the intermediate data form. This can be 101 adjusted to a higher atomic frequency to increase speed or memory 102 performance. 103 series_type : :any:`str`, {'cumulative', ‘instantaneous’}, 104 default 'cumulative' 105 Type of data sampling. 'cumulative' data can be spread over smaller 106 time intervals and is aggregated using addition (e.g. meter data). 107 'instantaneous' data is copied (not spread) over smaller time intervals 108 and is aggregated by averaging (e.g. weather data). 109 include_coverage: :any:`bool`, 110 default `False` 111 Option of whether to return a series with just the resampled values 112 or a dataframe with a column that includes percent coverage of source data 113 used for each sample. 114 115 Returns 116 ------- 117 resampled_data : :any:`pandas.Series` or :any:`pandas.DataFrame` 118 Data resampled to the given frequency (optionally as a dataframe with a coverage column if `include_coverage` is used. 119 """ 120 # TODO(philngo): make sure this complies with CalTRACK 2.2.2.1 121 if not isinstance(data_series, pd.Series): 122 raise ValueError( 123 "expected series, got object with class {}".format(data_series.__class__) 124 ) 125 if data_series.empty: 126 return data_series 127 series = remove_duplicates(data_series) 128 target_freq = pd.Timedelta(atomic_freq) 129 timedeltas = (series.index[1:] - series.index[:-1]).append( 130 pd.TimedeltaIndex([pd.NaT]) 131 ) 132 133 if series_type == "cumulative": 134 spread_factor = target_freq.total_seconds() / timedeltas.total_seconds() 135 series_spread = series * spread_factor 136 atomic_series = series_spread.asfreq(atomic_freq, method="ffill") 137 resampled = atomic_series.resample(freq).sum() 138 resampled_with_nans = atomic_series.resample(freq).first() 139 n_coverage = atomic_series.resample(freq).count() 140 resampled = resampled[resampled_with_nans.notnull()].reindex(resampled.index) 141 142 elif series_type == "instantaneous": 143 atomic_series = series.asfreq(atomic_freq, method="ffill") 144 resampled = atomic_series.resample(freq).mean() 145 146 if resampled.index[-1] < series.index[-1]: 147 # this adds a null at the end using the target frequency 148 last_index = pd.date_range(resampled.index[-1], freq=freq, periods=2)[1:] 149 resampled = ( 150 pd.concat([resampled, pd.Series(np.nan, index=last_index)]) 151 .resample(freq) 152 .mean() 153 ) 154 if include_coverage: 155 n_total = resampled.resample(atomic_freq).count().resample(freq).count() 156 resampled = resampled.to_frame("value") 157 resampled["coverage"] = n_coverage / n_total 158 return resampled 159 else: 160 return resampled 161 162 163def day_counts(index): 164 """Days between DatetimeIndex values as a :any:`pandas.Series`. 165 166 Parameters 167 ---------- 168 index : :any:`pandas.DatetimeIndex` 169 The index for which to get day counts. 170 171 Returns 172 ------- 173 day_counts : :any:`pandas.Series` 174 A :any:`pandas.Series` with counts of days between periods. Counts are 175 given on start dates of periods. 176 """ 177 # dont affect the original data 178 index = index.copy() 179 180 if len(index) == 0: 181 return pd.Series([], index=index) 182 183 timedeltas = (index[1:] - index[:-1]).append(pd.TimedeltaIndex([pd.NaT])) 184 timedelta_days = timedeltas.total_seconds() / (60 * 60 * 24) 185 186 return pd.Series(timedelta_days, index=index) 187 188 189def _make_baseline_warnings( 190 end_inf, start_inf, data_start, data_end, start_limit, end_limit 191): 192 warnings = [] 193 # warn if there is a gap at end 194 if not end_inf and data_end < end_limit: 195 warnings.append( 196 EEMeterWarning( 197 qualified_name="eemeter.get_baseline_data.gap_at_baseline_end", 198 description=( 199 "Data does not have coverage at requested baseline end date." 200 ), 201 data={ 202 "requested_end": end_limit.isoformat(), 203 "data_end": data_end.isoformat(), 204 }, 205 ) 206 ) 207 # warn if there is a gap at start 208 if not start_inf and start_limit < data_start: 209 warnings.append( 210 EEMeterWarning( 211 qualified_name="eemeter.get_baseline_data.gap_at_baseline_start", 212 description=( 213 "Data does not have coverage at requested baseline start date." 214 ), 215 data={ 216 "requested_start": start_limit.isoformat(), 217 "data_start": data_start.isoformat(), 218 }, 219 ) 220 ) 221 return warnings 222 223 224def get_baseline_data( 225 data, 226 start=None, 227 end=None, 228 max_days=365, 229 allow_billing_period_overshoot=False, 230 n_days_billing_period_overshoot=None, 231 ignore_billing_period_gap_for_day_count=False, 232): 233 """Filter down to baseline period data. 234 235 .. note:: 236 237 For compliance with CalTRACK, set ``max_days=365`` (section 2.2.1.1). 238 239 Parameters 240 ---------- 241 data : :any:`pandas.DataFrame` or :any:`pandas.Series` 242 The data to filter to baseline data. This data will be filtered down 243 to an acceptable baseline period according to the dates passed as 244 `start` and `end`, or the maximum period specified with `max_days`. 245 start : :any:`datetime.datetime` 246 A timezone-aware datetime that represents the earliest allowable start 247 date for the baseline data. The stricter of this or `max_days` is used 248 to determine the earliest allowable baseline period date. 249 end : :any:`datetime.datetime` 250 A timezone-aware datetime that represents the latest allowable end 251 date for the baseline data, i.e., the latest date for which data is 252 available before the intervention begins. 253 max_days : :any:`int`, default 365 254 The maximum length of the period. Ignored if `end` is not set. 255 The stricter of this or `start` is used to determine the earliest 256 allowable baseline period date. 257 allow_billing_period_overshoot : :any:`bool`, default False 258 If True, count `max_days` from the end of the last billing data period 259 that ends before the `end` date, rather than from the exact `end` date. 260 Otherwise use the exact `end` date as the cutoff. 261 n_days_billing_period_overshoot: :any:`int`, default None 262 If `allow_billing_period_overshoot` is set to True, this determines 263 the number of days of overshoot that will be tolerated. A value of 264 None implies that any number of days is allowed. 265 ignore_billing_period_gap_for_day_count : :any:`bool`, default False 266 If True, instead of going back `max_days` from either the 267 `end` date or end of the last billing period before that date (depending 268 on the value of the `allow_billing_period_overshoot` setting) and 269 excluding the last period that began before that date, first check to 270 see if excluding or including that period gets closer to a total of 271 `max_days` of data. 272 273 For example, with `max_days=365`, if an exact 365 period would targeted 274 Feb 15, but the billing period went from Jan 20 to Feb 20, exclude that 275 period for a total of ~360 days of data, because that's closer to 365 276 than ~390 days, which would be the total if that period was included. 277 If, on the other hand, if that period started Feb 10 and went to Mar 10, 278 include the period, because ~370 days of data is closer to than ~340. 279 280 Returns 281 ------- 282 baseline_data, warnings : :any:`tuple` of (:any:`pandas.DataFrame` or :any:`pandas.Series`, :any:`list` of :any:`eemeter.EEMeterWarning`) 283 Data for only the specified baseline period and any associated warnings. 284 """ 285 if max_days is not None: 286 if start is not None: 287 raise ValueError( # pragma: no cover 288 "If max_days is set, start cannot be set: start={}, max_days={}.".format( 289 start, max_days 290 ) 291 ) 292 293 start_inf = False 294 if start is None: 295 # py datetime min/max are out of range of pd.Timestamp min/max 296 start_target = pytz.UTC.localize(pd.Timestamp.min) + timedelta(days=1) 297 start_inf = True 298 else: 299 start_target = start 300 301 end_inf = False 302 if end is None: 303 end_limit = pytz.UTC.localize(pd.Timestamp.max) - timedelta(days=1) 304 end_inf = True 305 else: 306 end_limit = end 307 308 # copying prevents setting on slice warnings 309 data_before_end_limit = data[:end_limit].copy() 310 data_end = data_before_end_limit.index.max() 311 312 if ignore_billing_period_gap_for_day_count and ( 313 n_days_billing_period_overshoot is None 314 or end_limit - timedelta(days=n_days_billing_period_overshoot) < data_end 315 ): 316 end_limit = data_before_end_limit.index.max() 317 318 if not end_inf and max_days is not None: 319 start_target = end_limit - timedelta(days=max_days) 320 321 if allow_billing_period_overshoot: 322 # adjust start limit to get a selection closest to max_days 323 # also consider ffill for get_loc method - always picks previous 324 try: 325 loc = data_before_end_limit.index.get_loc(start_target, method="nearest") 326 except (KeyError, IndexError): # pragma: no cover 327 baseline_data = data_before_end_limit 328 start_limit = start_target 329 else: 330 start_limit = data_before_end_limit.index[loc] 331 baseline_data = data_before_end_limit[start_limit:].copy() 332 333 else: 334 # use hard limit for baseline start 335 start_limit = start_target 336 baseline_data = data_before_end_limit[start_limit:].copy() 337 338 if baseline_data.dropna().empty: 339 raise NoBaselineDataError() 340 341 baseline_data.iloc[-1] = np.nan 342 343 data_end = data.index.max() 344 data_start = data.index.min() 345 return ( 346 baseline_data, 347 _make_baseline_warnings( 348 end_inf, start_inf, data_start, data_end, start_limit, end_limit 349 ), 350 ) 351 352 353def _make_reporting_warnings( 354 end_inf, start_inf, data_start, data_end, start_limit, end_limit 355): 356 warnings = [] 357 # warn if there is a gap at end 358 if not end_inf and data_end < end_limit: 359 warnings.append( 360 EEMeterWarning( 361 qualified_name="eemeter.get_reporting_data.gap_at_reporting_end", 362 description=( 363 "Data does not have coverage at requested reporting end date." 364 ), 365 data={ 366 "requested_end": end_limit.isoformat(), 367 "data_end": data_end.isoformat(), 368 }, 369 ) 370 ) 371 # warn if there is a gap at start 372 if not start_inf and start_limit < data_start: 373 warnings.append( 374 EEMeterWarning( 375 qualified_name="eemeter.get_reporting_data.gap_at_reporting_start", 376 description=( 377 "Data does not have coverage at requested reporting start date." 378 ), 379 data={ 380 "requested_start": start_limit.isoformat(), 381 "data_start": data_start.isoformat(), 382 }, 383 ) 384 ) 385 return warnings 386 387 388def get_reporting_data( 389 data, 390 start=None, 391 end=None, 392 max_days=365, 393 allow_billing_period_overshoot=False, 394 ignore_billing_period_gap_for_day_count=False, 395): 396 """Filter down to reporting period data. 397 398 Parameters 399 ---------- 400 data : :any:`pandas.DataFrame` or :any:`pandas.Series` 401 The data to filter to reporting data. This data will be filtered down 402 to an acceptable reporting period according to the dates passed as 403 `start` and `end`, or the maximum period specified with `max_days`. 404 start : :any:`datetime.datetime` 405 A timezone-aware datetime that represents the earliest allowable start 406 date for the reporting data, i.e., the earliest date for which data is 407 available after the intervention begins. 408 end : :any:`datetime.datetime` 409 A timezone-aware datetime that represents the latest allowable end 410 date for the reporting data. The stricter of this or `max_days` is used 411 to determine the latest allowable reporting period date. 412 max_days : :any:`int`, default 365 413 The maximum length of the period. Ignored if `start` is not set. 414 The stricter of this or `end` is used to determine the latest 415 allowable reporting period date. 416 allow_billing_period_overshoot : :any:`bool`, default False 417 If True, count `max_days` from the start of the first billing data period 418 that starts after the `start` date, rather than from the exact `start` date. 419 Otherwise use the exact `start` date as the cutoff. 420 ignore_billing_period_gap_for_day_count : :any:`bool`, default False 421 If True, instead of going forward `max_days` from either the 422 `start` date or the `start` of the first billing period after that date 423 (depending on the value of the `allow_billing_period_overshoot` setting) 424 and excluding the first period that ended after that date, first check 425 to see if excluding or including that period gets closer to a total of 426 `max_days` of data. 427 428 For example, with `max_days=365`, if an exact 365 period would targeted 429 Feb 15, but the billing period went from Jan 20 to Feb 20, include that 430 period for a total of ~370 days of data, because that's closer to 365 431 than ~340 days, which would be the total if that period was excluded. 432 If, on the other hand, if that period started Feb 10 and went to Mar 10, 433 exclude the period, because ~360 days of data is closer to than ~390. 434 435 Returns 436 ------- 437 reporting_data, warnings : :any:`tuple` of (:any:`pandas.DataFrame` or 438 :any:`pandas.Series`, :any:`list` of :any:`eemeter.EEMeterWarning`) 439 Data for only the specified reporting period and any associated warnings. 440 """ 441 if max_days is not None: 442 if end is not None: 443 raise ValueError( # pragma: no cover 444 "If max_days is set, end cannot be set: end={}, max_days={}.".format( 445 end, max_days 446 ) 447 ) 448 449 start_inf = False 450 if start is None: 451 # py datetime min/max are out of range of pd.Timestamp min/max 452 start_limit = pytz.UTC.localize(pd.Timestamp.min) + timedelta(days=1) 453 start_inf = True 454 else: 455 start_limit = start 456 457 end_inf = False 458 if end is None: 459 end_target = pytz.UTC.localize(pd.Timestamp.max) - timedelta(days=1) 460 end_inf = True 461 else: 462 end_target = end 463 464 # copying prevents setting on slice warnings 465 data_after_start_limit = data[start_limit:].copy() 466 467 if ignore_billing_period_gap_for_day_count: 468 start_limit = data_after_start_limit.index.min() 469 470 if not start_inf and max_days is not None: 471 end_target = start_limit + timedelta(days=max_days) 472 473 if allow_billing_period_overshoot: 474 # adjust start limit to get a selection closest to max_days 475 # also consider bfill for get_loc method - always picks next 476 try: 477 loc = data_after_start_limit.index.get_loc(end_target, method="nearest") 478 except (KeyError, IndexError): # pragma: no cover 479 reporting_data = data_after_start_limit 480 end_limit = end_target 481 else: 482 end_limit = data_after_start_limit.index[loc] 483 reporting_data = data_after_start_limit[:end_limit].copy() 484 485 else: 486 # use hard limit for baseline start 487 end_limit = end_target 488 reporting_data = data_after_start_limit[:end_limit].copy() 489 490 if reporting_data.dropna().empty: 491 raise NoReportingDataError() 492 493 reporting_data.iloc[-1] = np.nan 494 495 data_end = data.index.max() 496 data_start = data.index.min() 497 return ( 498 reporting_data, 499 _make_reporting_warnings( 500 end_inf, start_inf, data_start, data_end, start_limit, end_limit 501 ), 502 ) 503 504 505class Term(object): 506 """ 507 The term object represents a subset of an index. 508 509 Attributes 510 ---------- 511 index : :any:`pandas.DatetimeIndex` 512 The index of the term. Includes a period at the end meant to be NaN-value. 513 label : :any:`str` 514 The label for the term. 515 target_start_date : :any:`pandas.Timestamp` or :any:`datetime.datetime` 516 The start date inferred for this term from the start date and target term 517 lenths. 518 target_end_date : :any:`pandas.Timestamp` or :any:`datetime.datetime` 519 The end date inferred for this term from the start date and target term 520 lenths. 521 target_term_length_days : :any:`int` 522 The number of days targeted for this term. 523 actual_start_date : :any:`pandas.Timestamp` 524 The first date in the index. 525 actual_end_date : :any:`pandas.Timestamp` 526 The last date in the index. 527 actual_term_length_days : :any:`int` 528 The number of days between the actual start date and actual end date. 529 complete : :any:`bool` 530 True if this term is conclusively complete, such that additional data added 531 to the series would not add more data to this term. 532 533 """ 534 535 def __init__( 536 self, 537 index, 538 label, 539 target_start_date, 540 target_end_date, 541 target_term_length_days, 542 actual_start_date, 543 actual_end_date, 544 actual_term_length_days, 545 complete, 546 ): 547 self.index = index 548 self.label = label 549 self.target_start_date = target_start_date 550 self.target_end_date = target_end_date 551 self.target_term_length_days = target_term_length_days 552 self.actual_start_date = actual_start_date 553 self.actual_end_date = actual_end_date 554 self.actual_term_length_days = actual_term_length_days 555 self.complete = complete 556 557 def __repr__(self): 558 return ( 559 "Term(label={}, target_term_length_days={}, actual_term_length_days={}," 560 " complete={})" 561 ).format( 562 self.label, 563 self.target_term_length_days, 564 self.actual_term_length_days, 565 self.complete, 566 ) 567 568 569def get_terms(index, term_lengths, term_labels=None, start=None, method="strict"): 570 """Breaks a :any:`pandas.DatetimeIndex` into consecutive terms of specified 571 lengths. 572 573 Parameters 574 ---------- 575 index : :any:`pandas.DatetimeIndex` 576 The index to split into terms, generally `meter_data.index` 577 or `temperature_data.index`. 578 term_lengths : :any:`list` of :any:`int` 579 The lengths (in days) of the terms into which to split the data. 580 term_labels : :any:`list` of :any:`str`, default None 581 Labels to use for each term. List must be the same length as the 582 `term_lengths` list. 583 start : :any:`datetime.datetime`, default None 584 A timezone-aware datetime that represents the earliest allowable start 585 date for the terms. If None, use the first element of the index. 586 method: one of ['strict', 'nearest'], default 'strict' 587 The method to use to get terms. 588 589 - "strict": Ensures that the term end will come on or before the length of 590 591 Returns 592 ------- 593 terms : :any:`list` of :any:`eemeter.Term` 594 A dataframe of term labels with the same :any:`pandas.DatetimeIndex` 595 given as `index`. This can be used to filter the original data into 596 terms of approximately the desired length. 597 598 599 """ 600 if method == "strict": 601 get_loc_method = "pad" 602 elif method == "nearest": 603 get_loc_method = "nearest" 604 else: 605 raise ValueError( 606 "method {} not supported - use either 'strict' or 'closest'".format(method) 607 ) 608 609 if not index.is_monotonic_increasing: 610 raise ValueError("get_terms requires a sorted index") 611 612 if term_labels is None: 613 term_labels = [ 614 "term_{:03d}".format(i + 1) for i, term_length in enumerate(term_lengths) 615 ] 616 617 elif len(term_labels) != len(term_lengths): 618 raise ValueError( 619 "term_labels (len {}) must be the same length as term_length (len {})".format( 620 len(term_labels), len(term_lengths) 621 ) 622 ) 623 624 if start is None: 625 prev_start = index.min() 626 else: 627 prev_start = start 628 629 term_end_targets = [ 630 prev_start + timedelta(days=sum(term_lengths[: i + 1])) 631 for i in range(len(term_lengths)) 632 ] 633 634 terms = [] 635 remaining_index = index[index >= prev_start] 636 637 for label, target_term_length, end_target in zip( 638 term_labels, term_lengths, term_end_targets 639 ): 640 if len(remaining_index) <= 1: 641 break 642 643 next_index = remaining_index.get_loc(end_target, method=get_loc_method) 644 645 # keep one extra index point for the end NaN - this could be confusing, but 646 # helps identify the full range of the last data point 647 term_index = remaining_index[: next_index + 1] 648 649 # find the next start 650 next_start = remaining_index[next_index] 651 652 # reset the remaining index 653 remaining_index = remaining_index[next_index:] 654 655 # There may be a better way to tell if the term is conclusively complete, 656 # but the logic here is that if there's more than one remaining point then 657 # the term must be complete - since that final point was a worse candidate 658 # than the one before it which was chosen. 659 complete = len(remaining_index) > 1 660 661 terms.append( 662 Term( 663 index=term_index, 664 label=label, 665 target_start_date=prev_start, 666 target_end_date=end_target, 667 target_term_length_days=target_term_length, 668 actual_start_date=term_index[0], 669 actual_end_date=term_index[-1], 670 actual_term_length_days=(term_index[-1] - term_index[0]).days, 671 complete=complete, 672 ) 673 ) 674 675 # reset the previous start 676 prev_start = next_start 677 678 return terms 679 680 681def clean_caltrack_billing_data(data, source_interval): 682 # check for empty data 683 if data["value"].dropna().empty: 684 return data[:0] 685 686 if source_interval.startswith("billing"): 687 diff = list((data.index[1:] - data.index[:-1]).days) 688 filter_ = pd.Series(diff + [np.nan], index=data.index) 689 690 # CalTRACK 2.2.3.4, 2.2.3.5 691 if source_interval == "billing_monthly": 692 data = data[ 693 (filter_ <= 35) & (filter_ >= 25) # keep these, inclusive 694 ].reindex(data.index) 695 696 # CalTRACK 2.2.3.4, 2.2.3.5 697 if source_interval == "billing_bimonthly": 698 data = data[ 699 (filter_ <= 70) & (filter_ >= 25) # keep these, inclusive 700 ].reindex(data.index) 701 702 # CalTRACK 2.2.3.1 703 """ 704 Adds estimate to subsequent read if there aren't more than one estimate in a row 705 and then removes the estimated row. 706 707 Input: 708 index value estimated 709 1 2 False 710 2 3 False 711 3 5 True 712 4 4 False 713 5 6 True 714 6 3 True 715 7 4 False 716 8 NaN NaN 717 718 Output: 719 index value 720 1 2 721 2 3 722 4 9 723 5 NaN 724 7 7 725 8 NaN 726 """ 727 add_estimated = [] 728 remove_estimated_fixed_rows = [] 729 orig_data = data.copy() 730 if "estimated" in data.columns: 731 data["unestimated_value"] = ( 732 data[:-1].value[(data[:-1].estimated == False)].reindex(data.index) 733 ) 734 data["estimated_value"] = ( 735 data[:-1].value[(data[:-1].estimated)].reindex(data.index) 736 ) 737 for i, (index, row) in enumerate(data[:-1].iterrows()): 738 # ensures there is a prev_row and previous row value is null 739 if i > 0 and pd.isnull(prev_row["unestimated_value"]): 740 # current row value is not null 741 add_estimated.append(prev_row["estimated_value"]) 742 if not pd.isnull(row["unestimated_value"]): 743 # get all rows that had only estimated reads that will be 744 # added to the subsequent row meaning this row 745 # needs to be removed 746 remove_estimated_fixed_rows.append(prev_index) 747 else: 748 add_estimated.append(0) 749 prev_row = row 750 prev_index = index 751 add_estimated.append(np.nan) 752 data["value"] = data["unestimated_value"] + add_estimated 753 data = data[~data.index.isin(remove_estimated_fixed_rows)] 754 data = data[["value"]] # remove the estimated column 755 756 # check again for empty data 757 if data.dropna().empty: 758 return data[:0] 759 760 return data 761 762 763def downsample_and_clean_caltrack_daily_data(data): 764 data = as_freq(data.value, "D", include_coverage=True) 765 766 # CalTRACK 2.2.2.1 - interpolate with average of non-null values 767 data.value[data.coverage > 0.5] = ( 768 data[data.coverage > 0.5].value / data[data.coverage > 0.5].coverage 769 ) 770 771 # CalTRACK 2.2.2.1 - discard days with less than 50% coverage 772 return data[data.coverage > 0.5].reindex(data.index)[["value"]] 773 774 775def clean_caltrack_billing_daily_data(data, source_interval): 776 # billing data is cleaned but not resampled 777 if source_interval.startswith("billing"): 778 # CalTRACK 2.2.3.4, 2.2.3.5 779 return clean_caltrack_billing_data(data, source_interval) 780 781 # higher intervals like daily, hourly, 30min, 15min are 782 # resampled (daily) or downsampled (hourly, 30min, 15min) 783 elif source_interval == "daily": 784 return data 785 else: 786 return downsample_and_clean_caltrack_daily_data(data) 787