1# -*- coding: utf-8 -*-
2"""DataFrame client for InfluxDB."""
3
4from __future__ import absolute_import
5from __future__ import division
6from __future__ import print_function
7from __future__ import unicode_literals
8
9import math
10from collections import defaultdict
11
12import pandas as pd
13
14from .client import InfluxDBClient
15from .line_protocol import _escape_tag
16
17
18def _pandas_time_unit(time_precision):
19    unit = time_precision
20    if time_precision == 'm':
21        unit = 'ms'
22    elif time_precision == 'u':
23        unit = 'us'
24    elif time_precision == 'n':
25        unit = 'ns'
26    assert unit in ('s', 'ms', 'us', 'ns')
27    return unit
28
29
30def _escape_pandas_series(s):
31    return s.apply(lambda v: _escape_tag(v))
32
33
34class DataFrameClient(InfluxDBClient):
35    """DataFrameClient instantiates InfluxDBClient to connect to the backend.
36
37    The ``DataFrameClient`` object holds information necessary to connect
38    to InfluxDB. Requests can be made to InfluxDB directly through the client.
39    The client reads and writes from pandas DataFrames.
40    """
41
42    EPOCH = pd.Timestamp('1970-01-01 00:00:00.000+00:00')
43
44    def write_points(self,
45                     dataframe,
46                     measurement,
47                     tags=None,
48                     tag_columns=None,
49                     field_columns=None,
50                     time_precision=None,
51                     database=None,
52                     retention_policy=None,
53                     batch_size=None,
54                     protocol='line',
55                     numeric_precision=None):
56        """Write to multiple time series names.
57
58        :param dataframe: data points in a DataFrame
59        :param measurement: name of measurement
60        :param tags: dictionary of tags, with string key-values
61        :param time_precision: [Optional, default None] Either 's', 'ms', 'u'
62            or 'n'.
63        :param batch_size: [Optional] Value to write the points in batches
64            instead of all at one time. Useful for when doing data dumps from
65            one database to another or when doing a massive write operation
66        :type batch_size: int
67        :param protocol: Protocol for writing data. Either 'line' or 'json'.
68        :param numeric_precision: Precision for floating point values.
69            Either None, 'full' or some int, where int is the desired decimal
70            precision. 'full' preserves full precision for int and float
71            datatypes. Defaults to None, which preserves 14-15 significant
72            figures for float and all significant figures for int datatypes.
73        """
74        if tag_columns is None:
75            tag_columns = []
76
77        if field_columns is None:
78            field_columns = []
79
80        if batch_size:
81            number_batches = int(math.ceil(len(dataframe) / float(batch_size)))
82
83            for batch in range(number_batches):
84                start_index = batch * batch_size
85                end_index = (batch + 1) * batch_size
86
87                if protocol == 'line':
88                    points = self._convert_dataframe_to_lines(
89                        dataframe.iloc[start_index:end_index].copy(),
90                        measurement=measurement,
91                        global_tags=tags,
92                        time_precision=time_precision,
93                        tag_columns=tag_columns,
94                        field_columns=field_columns,
95                        numeric_precision=numeric_precision)
96                else:
97                    points = self._convert_dataframe_to_json(
98                        dataframe.iloc[start_index:end_index].copy(),
99                        measurement=measurement,
100                        tags=tags,
101                        time_precision=time_precision,
102                        tag_columns=tag_columns,
103                        field_columns=field_columns)
104
105                super(DataFrameClient, self).write_points(
106                    points,
107                    time_precision,
108                    database,
109                    retention_policy,
110                    protocol=protocol)
111
112            return True
113
114        if protocol == 'line':
115            points = self._convert_dataframe_to_lines(
116                dataframe,
117                measurement=measurement,
118                global_tags=tags,
119                tag_columns=tag_columns,
120                field_columns=field_columns,
121                time_precision=time_precision,
122                numeric_precision=numeric_precision)
123        else:
124            points = self._convert_dataframe_to_json(
125                dataframe,
126                measurement=measurement,
127                tags=tags,
128                time_precision=time_precision,
129                tag_columns=tag_columns,
130                field_columns=field_columns)
131
132        super(DataFrameClient, self).write_points(
133            points,
134            time_precision,
135            database,
136            retention_policy,
137            protocol=protocol)
138
139        return True
140
141    def query(self,
142              query,
143              params=None,
144              epoch=None,
145              expected_response_code=200,
146              database=None,
147              raise_errors=True,
148              chunked=False,
149              chunk_size=0,
150              dropna=True):
151        """
152        Quering data into a DataFrame.
153
154        :param query: the actual query string
155        :param params: additional parameters for the request, defaults to {}
156        :param epoch: response timestamps to be in epoch format either 'h',
157            'm', 's', 'ms', 'u', or 'ns',defaults to `None` which is
158            RFC3339 UTC format with nanosecond precision
159        :param expected_response_code: the expected status code of response,
160            defaults to 200
161        :param database: database to query, defaults to None
162        :param raise_errors: Whether or not to raise exceptions when InfluxDB
163            returns errors, defaults to True
164        :param chunked: Enable to use chunked responses from InfluxDB.
165            With ``chunked`` enabled, one ResultSet is returned per chunk
166            containing all results within that chunk
167        :param chunk_size: Size of each chunk to tell InfluxDB to use.
168        :param dropna: drop columns where all values are missing
169        :returns: the queried data
170        :rtype: :class:`~.ResultSet`
171        """
172        query_args = dict(params=params,
173                          epoch=epoch,
174                          expected_response_code=expected_response_code,
175                          raise_errors=raise_errors,
176                          chunked=chunked,
177                          chunk_size=chunk_size)
178        results = super(DataFrameClient, self).query(query, **query_args)
179        if query.strip().upper().startswith("SELECT"):
180            if len(results) > 0:
181                return self._to_dataframe(results, dropna)
182            else:
183                return {}
184        else:
185            return results
186
187    def _to_dataframe(self, rs, dropna=True):
188        result = defaultdict(list)
189        if isinstance(rs, list):
190            return map(self._to_dataframe, rs)
191
192        for key, data in rs.items():
193            name, tags = key
194            if tags is None:
195                key = name
196            else:
197                key = (name, tuple(sorted(tags.items())))
198            df = pd.DataFrame(data)
199            df.time = pd.to_datetime(df.time)
200            df.set_index('time', inplace=True)
201            df.index = df.index.tz_localize('UTC')
202            df.index.name = None
203            result[key].append(df)
204        for key, data in result.items():
205            df = pd.concat(data).sort_index()
206            if dropna:
207                df.dropna(how='all', axis=1, inplace=True)
208            result[key] = df
209
210        return result
211
212    @staticmethod
213    def _convert_dataframe_to_json(dataframe,
214                                   measurement,
215                                   tags=None,
216                                   tag_columns=None,
217                                   field_columns=None,
218                                   time_precision=None):
219
220        if not isinstance(dataframe, pd.DataFrame):
221            raise TypeError('Must be DataFrame, but type was: {0}.'
222                            .format(type(dataframe)))
223        if not (isinstance(dataframe.index, pd.PeriodIndex) or
224                isinstance(dataframe.index, pd.DatetimeIndex)):
225            raise TypeError('Must be DataFrame with DatetimeIndex or '
226                            'PeriodIndex.')
227
228        # Make sure tags and tag columns are correctly typed
229        tag_columns = tag_columns if tag_columns is not None else []
230        field_columns = field_columns if field_columns is not None else []
231        tags = tags if tags is not None else {}
232        # Assume field columns are all columns not included in tag columns
233        if not field_columns:
234            field_columns = list(
235                set(dataframe.columns).difference(set(tag_columns)))
236
237        dataframe.index = dataframe.index.to_datetime()
238        if dataframe.index.tzinfo is None:
239            dataframe.index = dataframe.index.tz_localize('UTC')
240
241        # Convert column to strings
242        dataframe.columns = dataframe.columns.astype('str')
243
244        # Convert dtype for json serialization
245        dataframe = dataframe.astype('object')
246
247        precision_factor = {
248            "n": 1,
249            "u": 1e3,
250            "ms": 1e6,
251            "s": 1e9,
252            "m": 1e9 * 60,
253            "h": 1e9 * 3600,
254        }.get(time_precision, 1)
255
256        points = [
257            {'measurement': measurement,
258             'tags': dict(list(tag.items()) + list(tags.items())),
259             'fields': rec,
260             'time': int(ts.value / precision_factor)}
261            for ts, tag, rec in zip(dataframe.index,
262                                    dataframe[tag_columns].to_dict('record'),
263                                    dataframe[field_columns].to_dict('record'))
264        ]
265
266        return points
267
268    def _convert_dataframe_to_lines(self,
269                                    dataframe,
270                                    measurement,
271                                    field_columns=None,
272                                    tag_columns=None,
273                                    global_tags=None,
274                                    time_precision=None,
275                                    numeric_precision=None):
276
277        if not isinstance(dataframe, pd.DataFrame):
278            raise TypeError('Must be DataFrame, but type was: {0}.'
279                            .format(type(dataframe)))
280        if not (isinstance(dataframe.index, pd.PeriodIndex) or
281                isinstance(dataframe.index, pd.DatetimeIndex)):
282            raise TypeError('Must be DataFrame with DatetimeIndex or '
283                            'PeriodIndex.')
284
285        # Create a Series of columns for easier indexing
286        column_series = pd.Series(dataframe.columns)
287
288        if field_columns is None:
289            field_columns = []
290
291        if tag_columns is None:
292            tag_columns = []
293
294        if global_tags is None:
295            global_tags = {}
296
297        # Make sure field_columns and tag_columns are lists
298        field_columns = list(field_columns) if list(field_columns) else []
299        tag_columns = list(tag_columns) if list(tag_columns) else []
300
301        # If field columns but no tag columns, assume rest of columns are tags
302        if field_columns and (not tag_columns):
303            tag_columns = list(column_series[~column_series.isin(
304                field_columns)])
305
306        # If no field columns, assume non-tag columns are fields
307        if not field_columns:
308            field_columns = list(column_series[~column_series.isin(
309                tag_columns)])
310
311        precision_factor = {
312            "n": 1,
313            "u": 1e3,
314            "ms": 1e6,
315            "s": 1e9,
316            "m": 1e9 * 60,
317            "h": 1e9 * 3600,
318        }.get(time_precision, 1)
319
320        # Make array of timestamp ints
321        if isinstance(dataframe.index, pd.PeriodIndex):
322            time = ((dataframe.index.to_timestamp().values.astype(int) /
323                     precision_factor).astype(int).astype(str))
324        else:
325            time = ((pd.to_datetime(dataframe.index).values.astype(int) /
326                     precision_factor).astype(int).astype(str))
327
328        # If tag columns exist, make an array of formatted tag keys and values
329        if tag_columns:
330
331            # Make global_tags as tag_columns
332            if global_tags:
333                for tag in global_tags:
334                    dataframe[tag] = global_tags[tag]
335                    tag_columns.append(tag)
336
337            tag_df = dataframe[tag_columns]
338            tag_df = tag_df.fillna('')  # replace NA with empty string
339            tag_df = tag_df.sort_index(axis=1)
340            tag_df = self._stringify_dataframe(
341                tag_df, numeric_precision, datatype='tag')
342
343            # join preprendded tags, leaving None values out
344            tags = tag_df.apply(
345                lambda s: [',' + s.name + '=' + v if v else '' for v in s])
346            tags = tags.sum(axis=1)
347
348            del tag_df
349        elif global_tags:
350            tag_string = ''.join(
351                [",{}={}".format(k, _escape_tag(v)) if v else ''
352                 for k, v in sorted(global_tags.items())]
353            )
354            tags = pd.Series(tag_string, index=dataframe.index)
355        else:
356            tags = ''
357
358        # Make an array of formatted field keys and values
359        field_df = dataframe[field_columns]
360        field_df = self._stringify_dataframe(field_df,
361                                             numeric_precision,
362                                             datatype='field')
363        field_df = (field_df.columns.values + '=').tolist() + field_df
364        field_df[field_df.columns[1:]] = ',' + field_df[field_df.columns[1:]]
365        fields = field_df.sum(axis=1)
366        del field_df
367
368        # Generate line protocol string
369        points = (measurement + tags + ' ' + fields + ' ' + time).tolist()
370        return points
371
372    @staticmethod
373    def _stringify_dataframe(dframe, numeric_precision, datatype='field'):
374        # Find int and string columns for field-type data
375        int_columns = dframe.select_dtypes(include=['integer']).columns
376        string_columns = dframe.select_dtypes(include=['object']).columns
377
378        # Convert dframe to string
379        if numeric_precision is None:
380            # If no precision specified, convert directly to string (fast)
381            dframe = dframe.astype(str)
382        elif numeric_precision == 'full':
383            # If full precision, use repr to get full float precision
384            float_columns = (dframe.select_dtypes(
385                include=['floating']).columns)
386            nonfloat_columns = dframe.columns[~dframe.columns.isin(
387                float_columns)]
388            dframe[float_columns] = dframe[float_columns].applymap(repr)
389            dframe[nonfloat_columns] = (dframe[nonfloat_columns].astype(str))
390        elif isinstance(numeric_precision, int):
391            # If precision is specified, round to appropriate precision
392            float_columns = (dframe.select_dtypes(
393                include=['floating']).columns)
394            nonfloat_columns = dframe.columns[~dframe.columns.isin(
395                float_columns)]
396            dframe[float_columns] = (dframe[float_columns].round(
397                numeric_precision))
398
399            # If desired precision is > 10 decimal places, need to use repr
400            if numeric_precision > 10:
401                dframe[float_columns] = (dframe[float_columns].applymap(repr))
402                dframe[nonfloat_columns] = (dframe[nonfloat_columns]
403                                            .astype(str))
404            else:
405                dframe = dframe.astype(str)
406        else:
407            raise ValueError('Invalid numeric precision.')
408
409        if datatype == 'field':
410            # If dealing with fields, format ints and strings correctly
411            dframe[int_columns] += 'i'
412            dframe[string_columns] = '"' + dframe[string_columns] + '"'
413        elif datatype == 'tag':
414            dframe = dframe.apply(_escape_pandas_series)
415
416        dframe.columns = dframe.columns.astype(str)
417        return dframe
418
419    def _datetime_to_epoch(self, datetime, time_precision='s'):
420        seconds = (datetime - self.EPOCH).total_seconds()
421        if time_precision == 'h':
422            return seconds / 3600
423        elif time_precision == 'm':
424            return seconds / 60
425        elif time_precision == 's':
426            return seconds
427        elif time_precision == 'ms':
428            return seconds * 1e3
429        elif time_precision == 'u':
430            return seconds * 1e6
431        elif time_precision == 'n':
432            return seconds * 1e9
433