1import isodate
2import datetime
3from ..dialect import Dialect
4from ..plugin import Plugin
5from ..parser import Parser
6from ..schema import Schema
7from ..field import Field
8from .. import helpers
9
10
11# NOTE:
12# We need to ensure that the way we detect pandas dataframe is good enough.
13# We don't want to be importing pandas and checking the type without a good reason
14
15
16# Plugin
17
18
19class PandasPlugin(Plugin):
20    """Plugin for Pandas
21
22    API      | Usage
23    -------- | --------
24    Public   | `from frictionless.plugins.pandas import PandasPlugin`
25
26    """
27
28    code = "pandas"
29    status = "experimental"
30
31    def create_file(self, file):
32        if not file.scheme and not file.format and file.memory:
33            if helpers.is_type(file.data, "DataFrame"):
34                file.scheme = ""
35                file.format = "pandas"
36                return file
37
38    def create_dialect(self, resource, *, descriptor):
39        if resource.format == "pandas":
40            return PandasDialect(descriptor)
41
42    def create_parser(self, resource):
43        if resource.format == "pandas":
44            return PandasParser(resource)
45
46
47# Dialect
48
49
50class PandasDialect(Dialect):
51    """Pandas dialect representation
52
53    API      | Usage
54    -------- | --------
55    Public   | `from frictionless.plugins.pandas import PandasDialect`
56
57    Parameters:
58        descriptor? (str|dict): descriptor
59
60    Raises:
61        FrictionlessException: raise any error that occurs during the process
62
63    """
64
65    # Metadata
66
67    metadata_profile = {  # type: ignore
68        "type": "object",
69        "additionalProperties": False,
70    }
71
72
73# Parser
74
75
76class PandasParser(Parser):
77    """Pandas parser implementation.
78
79    API      | Usage
80    -------- | --------
81    Public   | `from frictionless.plugins.pandas import PandasParser`
82
83    """
84
85    supported_types = [
86        "string",
87    ]
88
89    # Read
90
91    def read_list_stream_create(self):
92        np = helpers.import_from_plugin("numpy", plugin="pandas")
93        dataframe = self.resource.data
94
95        # Schema
96        schema = self.__read_convert_schema()
97        if not self.resource.schema:
98            self.resource.schema = schema
99
100        # Lists
101        yield schema.field_names
102        for pk, item in dataframe.iterrows():
103            cells = []
104            for field in schema.fields:
105                if field.name in schema.primary_key:
106                    pk = pk if isinstance(pk, tuple) else [pk]
107                    value = pk[schema.primary_key.index(field.name)]
108                else:
109                    value = item[field.name]
110                if field.type == "number" and np.isnan(value):
111                    value = None
112                elif field.type == "datetime":
113                    value = value.to_pydatetime()
114                cells.append(value)
115            yield cells
116
117    def __read_convert_schema(self):
118        dataframe = self.resource.data
119        schema = Schema()
120
121        # Primary key
122        for index, name in enumerate(dataframe.index.names):
123            if name is not None:
124                dtype = dataframe.index.get_level_values(index).dtype
125                type = self.__read_convert_type(dtype)
126                field = Field(name=name, type=type)
127                field.required = True
128                schema.fields.append(field)
129                schema.primary_key.append(name)
130
131        # Fields
132        for name, dtype in dataframe.dtypes.iteritems():
133            sample = dataframe[name].iloc[0] if len(dataframe) else None
134            type = self.__read_convert_type(dtype, sample=sample)
135            field = Field(name=name, type=type)
136            schema.fields.append(field)
137
138        # Return schema
139        return schema
140
141    def __read_convert_type(self, dtype, sample=None):
142        pdc = helpers.import_from_plugin("pandas.core.dtypes.api", plugin="pandas")
143
144        # Pandas types
145        if pdc.is_bool_dtype(dtype):
146            return "boolean"
147        elif pdc.is_datetime64_any_dtype(dtype):
148            return "datetime"
149        elif pdc.is_integer_dtype(dtype):
150            return "integer"
151        elif pdc.is_numeric_dtype(dtype):
152            return "number"
153
154        # Python types
155        if sample is not None:
156            if isinstance(sample, (list, tuple)):
157                return "array"
158            elif isinstance(sample, datetime.date):
159                return "date"
160            elif isinstance(sample, isodate.Duration):
161                return "duration"
162            elif isinstance(sample, dict):
163                return "object"
164            elif isinstance(sample, str):
165                return "string"
166            elif isinstance(sample, datetime.time):
167                return "time"
168
169        # Default
170        return "string"
171
172    # Write
173
174    def write_row_stream(self, resource):
175        np = helpers.import_from_plugin("numpy", plugin="pandas")
176        pd = helpers.import_from_plugin("pandas", plugin="pandas")
177        source = resource
178        target = self.resource
179
180        # Get data/index
181        data_rows = []
182        index_rows = []
183        fixed_types = {}
184        with source:
185            for row in source.row_stream:
186                data_values = []
187                index_values = []
188                for field in source.schema.fields:
189                    value = row[field.name]
190                    if isinstance(value, float) and np.isnan(value):
191                        value = None
192                    # http://pandas.pydata.org/pandas-docs/stable/gotchas.html#support-for-integer-na
193                    if value is None and field.type in ("number", "integer"):
194                        fixed_types[field.name] = "number"
195                        value = np.NaN
196                    if field.type in ["datetime", "time"] and value is not None:
197                        value = value.replace(tzinfo=None)
198                    if field.name in source.schema.primary_key:
199                        index_values.append(value)
200                    else:
201                        data_values.append(value)
202                if len(source.schema.primary_key) == 1:
203                    index_rows.append(index_values[0])
204                elif len(source.schema.primary_key) > 1:
205                    index_rows.append(tuple(index_values))
206                data_rows.append(tuple(data_values))
207
208        # Create index
209        index = None
210        if source.schema.primary_key:
211            if len(source.schema.primary_key) == 1:
212                index_class = pd.Index
213                index_field = source.schema.get_field(source.schema.primary_key[0])
214                index_dtype = self.__write_convert_type(index_field.type)
215                if field.type in ["datetime", "date"]:
216                    index_class = pd.DatetimeIndex
217                index = index_class(index_rows, name=index_field.name, dtype=index_dtype)
218            elif len(source.schema.primary_key) > 1:
219                index = pd.MultiIndex.from_tuples(
220                    index_rows, names=source.schema.primary_key
221                )
222
223        # Create dtypes/columns
224        dtypes = []
225        columns = []
226        for field in source.schema.fields:
227            if field.name not in source.schema.primary_key:
228                dtype = self.__write_convert_type(fixed_types.get(field.name, field.type))
229                dtypes.append((field.name, dtype))
230                columns.append(field.name)
231
232        # Create/set dataframe
233        array = np.array(data_rows, dtype=dtypes)
234        dataframe = pd.DataFrame(array, index=index, columns=columns)
235        target.data = dataframe
236
237    def __write_convert_type(self, type=None):
238        np = helpers.import_from_plugin("numpy", plugin="pandas")
239
240        # Mapping
241        mapping = {
242            "array": np.dtype(list),
243            "boolean": np.dtype(bool),
244            "datetime": np.dtype("datetime64[ns]"),
245            "integer": np.dtype(int),
246            "number": np.dtype(float),
247            "object": np.dtype(dict),
248            "year": np.dtype(int),
249        }
250
251        # Return type
252        if type:
253            return mapping.get(type, np.dtype("O"))
254
255        # Return mapping
256        return mapping
257