1# -*- coding: utf-8 -*- 2"""DataFrame client for InfluxDB.""" 3 4from __future__ import absolute_import 5from __future__ import division 6from __future__ import print_function 7from __future__ import unicode_literals 8 9import math 10from collections import defaultdict 11 12import pandas as pd 13 14from .client import InfluxDBClient 15from .line_protocol import _escape_tag 16 17 18def _pandas_time_unit(time_precision): 19 unit = time_precision 20 if time_precision == 'm': 21 unit = 'ms' 22 elif time_precision == 'u': 23 unit = 'us' 24 elif time_precision == 'n': 25 unit = 'ns' 26 assert unit in ('s', 'ms', 'us', 'ns') 27 return unit 28 29 30def _escape_pandas_series(s): 31 return s.apply(lambda v: _escape_tag(v)) 32 33 34class DataFrameClient(InfluxDBClient): 35 """DataFrameClient instantiates InfluxDBClient to connect to the backend. 36 37 The ``DataFrameClient`` object holds information necessary to connect 38 to InfluxDB. Requests can be made to InfluxDB directly through the client. 39 The client reads and writes from pandas DataFrames. 40 """ 41 42 EPOCH = pd.Timestamp('1970-01-01 00:00:00.000+00:00') 43 44 def write_points(self, 45 dataframe, 46 measurement, 47 tags=None, 48 tag_columns=None, 49 field_columns=None, 50 time_precision=None, 51 database=None, 52 retention_policy=None, 53 batch_size=None, 54 protocol='line', 55 numeric_precision=None): 56 """Write to multiple time series names. 57 58 :param dataframe: data points in a DataFrame 59 :param measurement: name of measurement 60 :param tags: dictionary of tags, with string key-values 61 :param time_precision: [Optional, default None] Either 's', 'ms', 'u' 62 or 'n'. 63 :param batch_size: [Optional] Value to write the points in batches 64 instead of all at one time. Useful for when doing data dumps from 65 one database to another or when doing a massive write operation 66 :type batch_size: int 67 :param protocol: Protocol for writing data. Either 'line' or 'json'. 68 :param numeric_precision: Precision for floating point values. 69 Either None, 'full' or some int, where int is the desired decimal 70 precision. 'full' preserves full precision for int and float 71 datatypes. Defaults to None, which preserves 14-15 significant 72 figures for float and all significant figures for int datatypes. 73 """ 74 if tag_columns is None: 75 tag_columns = [] 76 77 if field_columns is None: 78 field_columns = [] 79 80 if batch_size: 81 number_batches = int(math.ceil(len(dataframe) / float(batch_size))) 82 83 for batch in range(number_batches): 84 start_index = batch * batch_size 85 end_index = (batch + 1) * batch_size 86 87 if protocol == 'line': 88 points = self._convert_dataframe_to_lines( 89 dataframe.iloc[start_index:end_index].copy(), 90 measurement=measurement, 91 global_tags=tags, 92 time_precision=time_precision, 93 tag_columns=tag_columns, 94 field_columns=field_columns, 95 numeric_precision=numeric_precision) 96 else: 97 points = self._convert_dataframe_to_json( 98 dataframe.iloc[start_index:end_index].copy(), 99 measurement=measurement, 100 tags=tags, 101 time_precision=time_precision, 102 tag_columns=tag_columns, 103 field_columns=field_columns) 104 105 super(DataFrameClient, self).write_points( 106 points, 107 time_precision, 108 database, 109 retention_policy, 110 protocol=protocol) 111 112 return True 113 114 if protocol == 'line': 115 points = self._convert_dataframe_to_lines( 116 dataframe, 117 measurement=measurement, 118 global_tags=tags, 119 tag_columns=tag_columns, 120 field_columns=field_columns, 121 time_precision=time_precision, 122 numeric_precision=numeric_precision) 123 else: 124 points = self._convert_dataframe_to_json( 125 dataframe, 126 measurement=measurement, 127 tags=tags, 128 time_precision=time_precision, 129 tag_columns=tag_columns, 130 field_columns=field_columns) 131 132 super(DataFrameClient, self).write_points( 133 points, 134 time_precision, 135 database, 136 retention_policy, 137 protocol=protocol) 138 139 return True 140 141 def query(self, 142 query, 143 params=None, 144 epoch=None, 145 expected_response_code=200, 146 database=None, 147 raise_errors=True, 148 chunked=False, 149 chunk_size=0, 150 dropna=True): 151 """ 152 Quering data into a DataFrame. 153 154 :param query: the actual query string 155 :param params: additional parameters for the request, defaults to {} 156 :param epoch: response timestamps to be in epoch format either 'h', 157 'm', 's', 'ms', 'u', or 'ns',defaults to `None` which is 158 RFC3339 UTC format with nanosecond precision 159 :param expected_response_code: the expected status code of response, 160 defaults to 200 161 :param database: database to query, defaults to None 162 :param raise_errors: Whether or not to raise exceptions when InfluxDB 163 returns errors, defaults to True 164 :param chunked: Enable to use chunked responses from InfluxDB. 165 With ``chunked`` enabled, one ResultSet is returned per chunk 166 containing all results within that chunk 167 :param chunk_size: Size of each chunk to tell InfluxDB to use. 168 :param dropna: drop columns where all values are missing 169 :returns: the queried data 170 :rtype: :class:`~.ResultSet` 171 """ 172 query_args = dict(params=params, 173 epoch=epoch, 174 expected_response_code=expected_response_code, 175 raise_errors=raise_errors, 176 chunked=chunked, 177 chunk_size=chunk_size) 178 results = super(DataFrameClient, self).query(query, **query_args) 179 if query.strip().upper().startswith("SELECT"): 180 if len(results) > 0: 181 return self._to_dataframe(results, dropna) 182 else: 183 return {} 184 else: 185 return results 186 187 def _to_dataframe(self, rs, dropna=True): 188 result = defaultdict(list) 189 if isinstance(rs, list): 190 return map(self._to_dataframe, rs) 191 192 for key, data in rs.items(): 193 name, tags = key 194 if tags is None: 195 key = name 196 else: 197 key = (name, tuple(sorted(tags.items()))) 198 df = pd.DataFrame(data) 199 df.time = pd.to_datetime(df.time) 200 df.set_index('time', inplace=True) 201 df.index = df.index.tz_localize('UTC') 202 df.index.name = None 203 result[key].append(df) 204 for key, data in result.items(): 205 df = pd.concat(data).sort_index() 206 if dropna: 207 df.dropna(how='all', axis=1, inplace=True) 208 result[key] = df 209 210 return result 211 212 @staticmethod 213 def _convert_dataframe_to_json(dataframe, 214 measurement, 215 tags=None, 216 tag_columns=None, 217 field_columns=None, 218 time_precision=None): 219 220 if not isinstance(dataframe, pd.DataFrame): 221 raise TypeError('Must be DataFrame, but type was: {0}.' 222 .format(type(dataframe))) 223 if not (isinstance(dataframe.index, pd.PeriodIndex) or 224 isinstance(dataframe.index, pd.DatetimeIndex)): 225 raise TypeError('Must be DataFrame with DatetimeIndex or ' 226 'PeriodIndex.') 227 228 # Make sure tags and tag columns are correctly typed 229 tag_columns = tag_columns if tag_columns is not None else [] 230 field_columns = field_columns if field_columns is not None else [] 231 tags = tags if tags is not None else {} 232 # Assume field columns are all columns not included in tag columns 233 if not field_columns: 234 field_columns = list( 235 set(dataframe.columns).difference(set(tag_columns))) 236 237 dataframe.index = dataframe.index.to_datetime() 238 if dataframe.index.tzinfo is None: 239 dataframe.index = dataframe.index.tz_localize('UTC') 240 241 # Convert column to strings 242 dataframe.columns = dataframe.columns.astype('str') 243 244 # Convert dtype for json serialization 245 dataframe = dataframe.astype('object') 246 247 precision_factor = { 248 "n": 1, 249 "u": 1e3, 250 "ms": 1e6, 251 "s": 1e9, 252 "m": 1e9 * 60, 253 "h": 1e9 * 3600, 254 }.get(time_precision, 1) 255 256 points = [ 257 {'measurement': measurement, 258 'tags': dict(list(tag.items()) + list(tags.items())), 259 'fields': rec, 260 'time': int(ts.value / precision_factor)} 261 for ts, tag, rec in zip(dataframe.index, 262 dataframe[tag_columns].to_dict('record'), 263 dataframe[field_columns].to_dict('record')) 264 ] 265 266 return points 267 268 def _convert_dataframe_to_lines(self, 269 dataframe, 270 measurement, 271 field_columns=None, 272 tag_columns=None, 273 global_tags=None, 274 time_precision=None, 275 numeric_precision=None): 276 277 if not isinstance(dataframe, pd.DataFrame): 278 raise TypeError('Must be DataFrame, but type was: {0}.' 279 .format(type(dataframe))) 280 if not (isinstance(dataframe.index, pd.PeriodIndex) or 281 isinstance(dataframe.index, pd.DatetimeIndex)): 282 raise TypeError('Must be DataFrame with DatetimeIndex or ' 283 'PeriodIndex.') 284 285 # Create a Series of columns for easier indexing 286 column_series = pd.Series(dataframe.columns) 287 288 if field_columns is None: 289 field_columns = [] 290 291 if tag_columns is None: 292 tag_columns = [] 293 294 if global_tags is None: 295 global_tags = {} 296 297 # Make sure field_columns and tag_columns are lists 298 field_columns = list(field_columns) if list(field_columns) else [] 299 tag_columns = list(tag_columns) if list(tag_columns) else [] 300 301 # If field columns but no tag columns, assume rest of columns are tags 302 if field_columns and (not tag_columns): 303 tag_columns = list(column_series[~column_series.isin( 304 field_columns)]) 305 306 # If no field columns, assume non-tag columns are fields 307 if not field_columns: 308 field_columns = list(column_series[~column_series.isin( 309 tag_columns)]) 310 311 precision_factor = { 312 "n": 1, 313 "u": 1e3, 314 "ms": 1e6, 315 "s": 1e9, 316 "m": 1e9 * 60, 317 "h": 1e9 * 3600, 318 }.get(time_precision, 1) 319 320 # Make array of timestamp ints 321 if isinstance(dataframe.index, pd.PeriodIndex): 322 time = ((dataframe.index.to_timestamp().values.astype(int) / 323 precision_factor).astype(int).astype(str)) 324 else: 325 time = ((pd.to_datetime(dataframe.index).values.astype(int) / 326 precision_factor).astype(int).astype(str)) 327 328 # If tag columns exist, make an array of formatted tag keys and values 329 if tag_columns: 330 331 # Make global_tags as tag_columns 332 if global_tags: 333 for tag in global_tags: 334 dataframe[tag] = global_tags[tag] 335 tag_columns.append(tag) 336 337 tag_df = dataframe[tag_columns] 338 tag_df = tag_df.fillna('') # replace NA with empty string 339 tag_df = tag_df.sort_index(axis=1) 340 tag_df = self._stringify_dataframe( 341 tag_df, numeric_precision, datatype='tag') 342 343 # join preprendded tags, leaving None values out 344 tags = tag_df.apply( 345 lambda s: [',' + s.name + '=' + v if v else '' for v in s]) 346 tags = tags.sum(axis=1) 347 348 del tag_df 349 elif global_tags: 350 tag_string = ''.join( 351 [",{}={}".format(k, _escape_tag(v)) if v else '' 352 for k, v in sorted(global_tags.items())] 353 ) 354 tags = pd.Series(tag_string, index=dataframe.index) 355 else: 356 tags = '' 357 358 # Make an array of formatted field keys and values 359 field_df = dataframe[field_columns] 360 field_df = self._stringify_dataframe(field_df, 361 numeric_precision, 362 datatype='field') 363 field_df = (field_df.columns.values + '=').tolist() + field_df 364 field_df[field_df.columns[1:]] = ',' + field_df[field_df.columns[1:]] 365 fields = field_df.sum(axis=1) 366 del field_df 367 368 # Generate line protocol string 369 points = (measurement + tags + ' ' + fields + ' ' + time).tolist() 370 return points 371 372 @staticmethod 373 def _stringify_dataframe(dframe, numeric_precision, datatype='field'): 374 # Find int and string columns for field-type data 375 int_columns = dframe.select_dtypes(include=['integer']).columns 376 string_columns = dframe.select_dtypes(include=['object']).columns 377 378 # Convert dframe to string 379 if numeric_precision is None: 380 # If no precision specified, convert directly to string (fast) 381 dframe = dframe.astype(str) 382 elif numeric_precision == 'full': 383 # If full precision, use repr to get full float precision 384 float_columns = (dframe.select_dtypes( 385 include=['floating']).columns) 386 nonfloat_columns = dframe.columns[~dframe.columns.isin( 387 float_columns)] 388 dframe[float_columns] = dframe[float_columns].applymap(repr) 389 dframe[nonfloat_columns] = (dframe[nonfloat_columns].astype(str)) 390 elif isinstance(numeric_precision, int): 391 # If precision is specified, round to appropriate precision 392 float_columns = (dframe.select_dtypes( 393 include=['floating']).columns) 394 nonfloat_columns = dframe.columns[~dframe.columns.isin( 395 float_columns)] 396 dframe[float_columns] = (dframe[float_columns].round( 397 numeric_precision)) 398 399 # If desired precision is > 10 decimal places, need to use repr 400 if numeric_precision > 10: 401 dframe[float_columns] = (dframe[float_columns].applymap(repr)) 402 dframe[nonfloat_columns] = (dframe[nonfloat_columns] 403 .astype(str)) 404 else: 405 dframe = dframe.astype(str) 406 else: 407 raise ValueError('Invalid numeric precision.') 408 409 if datatype == 'field': 410 # If dealing with fields, format ints and strings correctly 411 dframe[int_columns] += 'i' 412 dframe[string_columns] = '"' + dframe[string_columns] + '"' 413 elif datatype == 'tag': 414 dframe = dframe.apply(_escape_pandas_series) 415 416 dframe.columns = dframe.columns.astype(str) 417 return dframe 418 419 def _datetime_to_epoch(self, datetime, time_precision='s'): 420 seconds = (datetime - self.EPOCH).total_seconds() 421 if time_precision == 'h': 422 return seconds / 3600 423 elif time_precision == 'm': 424 return seconds / 60 425 elif time_precision == 's': 426 return seconds 427 elif time_precision == 'ms': 428 return seconds * 1e3 429 elif time_precision == 'u': 430 return seconds * 1e6 431 elif time_precision == 'n': 432 return seconds * 1e9 433