1# -*- coding: utf-8 -*-
2"""DataFrame client for InfluxDB v0.8."""
3
4from __future__ import absolute_import
5from __future__ import division
6from __future__ import print_function
7from __future__ import unicode_literals
8
9import math
10import warnings
11
12from .client import InfluxDBClient
13
14
15class DataFrameClient(InfluxDBClient):
16    """Primary defintion of the DataFrameClient for v0.8.
17
18    The ``DataFrameClient`` object holds information necessary to connect
19    to InfluxDB. Requests can be made to InfluxDB directly through the client.
20    The client reads and writes from pandas DataFrames.
21    """
22
23    def __init__(self, ignore_nan=True, *args, **kwargs):
24        """Initialize an instance of the DataFrameClient."""
25        super(DataFrameClient, self).__init__(*args, **kwargs)
26
27        try:
28            global pd
29            import pandas as pd
30        except ImportError as ex:
31            raise ImportError('DataFrameClient requires Pandas, '
32                              '"{ex}" problem importing'.format(ex=str(ex)))
33
34        self.EPOCH = pd.Timestamp('1970-01-01 00:00:00.000+00:00')
35        self.ignore_nan = ignore_nan
36
37    def write_points(self, data, *args, **kwargs):
38        """Write to multiple time series names.
39
40        :param data: A dictionary mapping series names to pandas DataFrames
41        :param time_precision: [Optional, default 's'] Either 's', 'm', 'ms'
42            or 'u'.
43        :param batch_size: [Optional] Value to write the points in batches
44            instead of all at one time. Useful for when doing data dumps from
45            one database to another or when doing a massive write operation
46        :type batch_size: int
47        """
48        batch_size = kwargs.get('batch_size')
49        time_precision = kwargs.get('time_precision', 's')
50        if batch_size:
51            kwargs.pop('batch_size')  # don't hand over to InfluxDBClient
52            for key, data_frame in data.items():
53                number_batches = int(math.ceil(
54                    len(data_frame) / float(batch_size)))
55                for batch in range(number_batches):
56                    start_index = batch * batch_size
57                    end_index = (batch + 1) * batch_size
58                    outdata = [
59                        self._convert_dataframe_to_json(
60                            name=key,
61                            dataframe=data_frame
62                            .iloc[start_index:end_index].copy(),
63                            time_precision=time_precision)]
64                    InfluxDBClient.write_points(self, outdata, *args, **kwargs)
65            return True
66
67        outdata = [
68            self._convert_dataframe_to_json(name=key, dataframe=dataframe,
69                                            time_precision=time_precision)
70            for key, dataframe in data.items()]
71        return InfluxDBClient.write_points(self, outdata, *args, **kwargs)
72
73    def write_points_with_precision(self, data, time_precision='s'):
74        """Write to multiple time series names.
75
76        DEPRECATED
77        """
78        warnings.warn(
79            "write_points_with_precision is deprecated, and will be removed "
80            "in future versions. Please use "
81            "``DataFrameClient.write_points(time_precision='..')`` instead.",
82            FutureWarning)
83        return self.write_points(data, time_precision='s')
84
85    def query(self, query, time_precision='s', chunked=False):
86        """Query data into DataFrames.
87
88        Returns a DataFrame for a single time series and a map for multiple
89        time series with the time series as value and its name as key.
90
91        :param time_precision: [Optional, default 's'] Either 's', 'm', 'ms'
92            or 'u'.
93        :param chunked: [Optional, default=False] True if the data shall be
94            retrieved in chunks, False otherwise.
95        """
96        result = InfluxDBClient.query(self, query=query,
97                                      time_precision=time_precision,
98                                      chunked=chunked)
99        if len(result) == 0:
100            return result
101        elif len(result) == 1:
102            return self._to_dataframe(result[0], time_precision)
103        else:
104            ret = {}
105            for time_series in result:
106                ret[time_series['name']] = self._to_dataframe(time_series,
107                                                              time_precision)
108            return ret
109
110    @staticmethod
111    def _to_dataframe(json_result, time_precision):
112        dataframe = pd.DataFrame(data=json_result['points'],
113                                 columns=json_result['columns'])
114        if 'sequence_number' in dataframe.keys():
115            dataframe.sort_values(['time', 'sequence_number'], inplace=True)
116        else:
117            dataframe.sort_values(['time'], inplace=True)
118
119        pandas_time_unit = time_precision
120        if time_precision == 'm':
121            pandas_time_unit = 'ms'
122        elif time_precision == 'u':
123            pandas_time_unit = 'us'
124
125        dataframe.index = pd.to_datetime(list(dataframe['time']),
126                                         unit=pandas_time_unit,
127                                         utc=True)
128        del dataframe['time']
129        return dataframe
130
131    def _convert_dataframe_to_json(self, dataframe, name, time_precision='s'):
132        if not isinstance(dataframe, pd.DataFrame):
133            raise TypeError('Must be DataFrame, but type was: {0}.'
134                            .format(type(dataframe)))
135        if not (isinstance(dataframe.index, pd.PeriodIndex) or
136                isinstance(dataframe.index, pd.DatetimeIndex)):
137            raise TypeError('Must be DataFrame with DatetimeIndex or \
138                            PeriodIndex.')
139
140        if isinstance(dataframe.index, pd.PeriodIndex):
141            dataframe.index = dataframe.index.to_timestamp()
142        else:
143            dataframe.index = pd.to_datetime(dataframe.index)
144
145        if dataframe.index.tzinfo is None:
146            dataframe.index = dataframe.index.tz_localize('UTC')
147        dataframe['time'] = [self._datetime_to_epoch(dt, time_precision)
148                             for dt in dataframe.index]
149        data = {'name': name,
150                'columns': [str(column) for column in dataframe.columns],
151                'points': [self._convert_array(x) for x in dataframe.values]}
152        return data
153
154    def _convert_array(self, array):
155        try:
156            global np
157            import numpy as np
158        except ImportError as ex:
159            raise ImportError('DataFrameClient requires Numpy, '
160                              '"{ex}" problem importing'.format(ex=str(ex)))
161
162        if self.ignore_nan:
163            number_types = (int, float, np.number)
164            condition = (all(isinstance(el, number_types) for el in array) and
165                         np.isnan(array))
166            return list(np.where(condition, None, array))
167
168        return list(array)
169
170    def _datetime_to_epoch(self, datetime, time_precision='s'):
171        seconds = (datetime - self.EPOCH).total_seconds()
172        if time_precision == 's':
173            return seconds
174        elif time_precision == 'm' or time_precision == 'ms':
175            return seconds * 1000
176        elif time_precision == 'u':
177            return seconds * 1000000
178