1# -*- coding: utf-8 -*- 2"""DataFrame client for InfluxDB v0.8.""" 3 4from __future__ import absolute_import 5from __future__ import division 6from __future__ import print_function 7from __future__ import unicode_literals 8 9import math 10import warnings 11 12from .client import InfluxDBClient 13 14 15class DataFrameClient(InfluxDBClient): 16 """Primary defintion of the DataFrameClient for v0.8. 17 18 The ``DataFrameClient`` object holds information necessary to connect 19 to InfluxDB. Requests can be made to InfluxDB directly through the client. 20 The client reads and writes from pandas DataFrames. 21 """ 22 23 def __init__(self, ignore_nan=True, *args, **kwargs): 24 """Initialize an instance of the DataFrameClient.""" 25 super(DataFrameClient, self).__init__(*args, **kwargs) 26 27 try: 28 global pd 29 import pandas as pd 30 except ImportError as ex: 31 raise ImportError('DataFrameClient requires Pandas, ' 32 '"{ex}" problem importing'.format(ex=str(ex))) 33 34 self.EPOCH = pd.Timestamp('1970-01-01 00:00:00.000+00:00') 35 self.ignore_nan = ignore_nan 36 37 def write_points(self, data, *args, **kwargs): 38 """Write to multiple time series names. 39 40 :param data: A dictionary mapping series names to pandas DataFrames 41 :param time_precision: [Optional, default 's'] Either 's', 'm', 'ms' 42 or 'u'. 43 :param batch_size: [Optional] Value to write the points in batches 44 instead of all at one time. Useful for when doing data dumps from 45 one database to another or when doing a massive write operation 46 :type batch_size: int 47 """ 48 batch_size = kwargs.get('batch_size') 49 time_precision = kwargs.get('time_precision', 's') 50 if batch_size: 51 kwargs.pop('batch_size') # don't hand over to InfluxDBClient 52 for key, data_frame in data.items(): 53 number_batches = int(math.ceil( 54 len(data_frame) / float(batch_size))) 55 for batch in range(number_batches): 56 start_index = batch * batch_size 57 end_index = (batch + 1) * batch_size 58 outdata = [ 59 self._convert_dataframe_to_json( 60 name=key, 61 dataframe=data_frame 62 .iloc[start_index:end_index].copy(), 63 time_precision=time_precision)] 64 InfluxDBClient.write_points(self, outdata, *args, **kwargs) 65 return True 66 67 outdata = [ 68 self._convert_dataframe_to_json(name=key, dataframe=dataframe, 69 time_precision=time_precision) 70 for key, dataframe in data.items()] 71 return InfluxDBClient.write_points(self, outdata, *args, **kwargs) 72 73 def write_points_with_precision(self, data, time_precision='s'): 74 """Write to multiple time series names. 75 76 DEPRECATED 77 """ 78 warnings.warn( 79 "write_points_with_precision is deprecated, and will be removed " 80 "in future versions. Please use " 81 "``DataFrameClient.write_points(time_precision='..')`` instead.", 82 FutureWarning) 83 return self.write_points(data, time_precision='s') 84 85 def query(self, query, time_precision='s', chunked=False): 86 """Query data into DataFrames. 87 88 Returns a DataFrame for a single time series and a map for multiple 89 time series with the time series as value and its name as key. 90 91 :param time_precision: [Optional, default 's'] Either 's', 'm', 'ms' 92 or 'u'. 93 :param chunked: [Optional, default=False] True if the data shall be 94 retrieved in chunks, False otherwise. 95 """ 96 result = InfluxDBClient.query(self, query=query, 97 time_precision=time_precision, 98 chunked=chunked) 99 if len(result) == 0: 100 return result 101 elif len(result) == 1: 102 return self._to_dataframe(result[0], time_precision) 103 else: 104 ret = {} 105 for time_series in result: 106 ret[time_series['name']] = self._to_dataframe(time_series, 107 time_precision) 108 return ret 109 110 @staticmethod 111 def _to_dataframe(json_result, time_precision): 112 dataframe = pd.DataFrame(data=json_result['points'], 113 columns=json_result['columns']) 114 if 'sequence_number' in dataframe.keys(): 115 dataframe.sort_values(['time', 'sequence_number'], inplace=True) 116 else: 117 dataframe.sort_values(['time'], inplace=True) 118 119 pandas_time_unit = time_precision 120 if time_precision == 'm': 121 pandas_time_unit = 'ms' 122 elif time_precision == 'u': 123 pandas_time_unit = 'us' 124 125 dataframe.index = pd.to_datetime(list(dataframe['time']), 126 unit=pandas_time_unit, 127 utc=True) 128 del dataframe['time'] 129 return dataframe 130 131 def _convert_dataframe_to_json(self, dataframe, name, time_precision='s'): 132 if not isinstance(dataframe, pd.DataFrame): 133 raise TypeError('Must be DataFrame, but type was: {0}.' 134 .format(type(dataframe))) 135 if not (isinstance(dataframe.index, pd.PeriodIndex) or 136 isinstance(dataframe.index, pd.DatetimeIndex)): 137 raise TypeError('Must be DataFrame with DatetimeIndex or \ 138 PeriodIndex.') 139 140 if isinstance(dataframe.index, pd.PeriodIndex): 141 dataframe.index = dataframe.index.to_timestamp() 142 else: 143 dataframe.index = pd.to_datetime(dataframe.index) 144 145 if dataframe.index.tzinfo is None: 146 dataframe.index = dataframe.index.tz_localize('UTC') 147 dataframe['time'] = [self._datetime_to_epoch(dt, time_precision) 148 for dt in dataframe.index] 149 data = {'name': name, 150 'columns': [str(column) for column in dataframe.columns], 151 'points': [self._convert_array(x) for x in dataframe.values]} 152 return data 153 154 def _convert_array(self, array): 155 try: 156 global np 157 import numpy as np 158 except ImportError as ex: 159 raise ImportError('DataFrameClient requires Numpy, ' 160 '"{ex}" problem importing'.format(ex=str(ex))) 161 162 if self.ignore_nan: 163 number_types = (int, float, np.number) 164 condition = (all(isinstance(el, number_types) for el in array) and 165 np.isnan(array)) 166 return list(np.where(condition, None, array)) 167 168 return list(array) 169 170 def _datetime_to_epoch(self, datetime, time_precision='s'): 171 seconds = (datetime - self.EPOCH).total_seconds() 172 if time_precision == 's': 173 return seconds 174 elif time_precision == 'm' or time_precision == 'ms': 175 return seconds * 1000 176 elif time_precision == 'u': 177 return seconds * 1000000 178