1import isodate 2import datetime 3from ..dialect import Dialect 4from ..plugin import Plugin 5from ..parser import Parser 6from ..schema import Schema 7from ..field import Field 8from .. import helpers 9 10 11# NOTE: 12# We need to ensure that the way we detect pandas dataframe is good enough. 13# We don't want to be importing pandas and checking the type without a good reason 14 15 16# Plugin 17 18 19class PandasPlugin(Plugin): 20 """Plugin for Pandas 21 22 API | Usage 23 -------- | -------- 24 Public | `from frictionless.plugins.pandas import PandasPlugin` 25 26 """ 27 28 code = "pandas" 29 status = "experimental" 30 31 def create_file(self, file): 32 if not file.scheme and not file.format and file.memory: 33 if helpers.is_type(file.data, "DataFrame"): 34 file.scheme = "" 35 file.format = "pandas" 36 return file 37 38 def create_dialect(self, resource, *, descriptor): 39 if resource.format == "pandas": 40 return PandasDialect(descriptor) 41 42 def create_parser(self, resource): 43 if resource.format == "pandas": 44 return PandasParser(resource) 45 46 47# Dialect 48 49 50class PandasDialect(Dialect): 51 """Pandas dialect representation 52 53 API | Usage 54 -------- | -------- 55 Public | `from frictionless.plugins.pandas import PandasDialect` 56 57 Parameters: 58 descriptor? (str|dict): descriptor 59 60 Raises: 61 FrictionlessException: raise any error that occurs during the process 62 63 """ 64 65 # Metadata 66 67 metadata_profile = { # type: ignore 68 "type": "object", 69 "additionalProperties": False, 70 } 71 72 73# Parser 74 75 76class PandasParser(Parser): 77 """Pandas parser implementation. 78 79 API | Usage 80 -------- | -------- 81 Public | `from frictionless.plugins.pandas import PandasParser` 82 83 """ 84 85 supported_types = [ 86 "string", 87 ] 88 89 # Read 90 91 def read_list_stream_create(self): 92 np = helpers.import_from_plugin("numpy", plugin="pandas") 93 dataframe = self.resource.data 94 95 # Schema 96 schema = self.__read_convert_schema() 97 if not self.resource.schema: 98 self.resource.schema = schema 99 100 # Lists 101 yield schema.field_names 102 for pk, item in dataframe.iterrows(): 103 cells = [] 104 for field in schema.fields: 105 if field.name in schema.primary_key: 106 pk = pk if isinstance(pk, tuple) else [pk] 107 value = pk[schema.primary_key.index(field.name)] 108 else: 109 value = item[field.name] 110 if field.type == "number" and np.isnan(value): 111 value = None 112 elif field.type == "datetime": 113 value = value.to_pydatetime() 114 cells.append(value) 115 yield cells 116 117 def __read_convert_schema(self): 118 dataframe = self.resource.data 119 schema = Schema() 120 121 # Primary key 122 for index, name in enumerate(dataframe.index.names): 123 if name is not None: 124 dtype = dataframe.index.get_level_values(index).dtype 125 type = self.__read_convert_type(dtype) 126 field = Field(name=name, type=type) 127 field.required = True 128 schema.fields.append(field) 129 schema.primary_key.append(name) 130 131 # Fields 132 for name, dtype in dataframe.dtypes.iteritems(): 133 sample = dataframe[name].iloc[0] if len(dataframe) else None 134 type = self.__read_convert_type(dtype, sample=sample) 135 field = Field(name=name, type=type) 136 schema.fields.append(field) 137 138 # Return schema 139 return schema 140 141 def __read_convert_type(self, dtype, sample=None): 142 pdc = helpers.import_from_plugin("pandas.core.dtypes.api", plugin="pandas") 143 144 # Pandas types 145 if pdc.is_bool_dtype(dtype): 146 return "boolean" 147 elif pdc.is_datetime64_any_dtype(dtype): 148 return "datetime" 149 elif pdc.is_integer_dtype(dtype): 150 return "integer" 151 elif pdc.is_numeric_dtype(dtype): 152 return "number" 153 154 # Python types 155 if sample is not None: 156 if isinstance(sample, (list, tuple)): 157 return "array" 158 elif isinstance(sample, datetime.date): 159 return "date" 160 elif isinstance(sample, isodate.Duration): 161 return "duration" 162 elif isinstance(sample, dict): 163 return "object" 164 elif isinstance(sample, str): 165 return "string" 166 elif isinstance(sample, datetime.time): 167 return "time" 168 169 # Default 170 return "string" 171 172 # Write 173 174 def write_row_stream(self, resource): 175 np = helpers.import_from_plugin("numpy", plugin="pandas") 176 pd = helpers.import_from_plugin("pandas", plugin="pandas") 177 source = resource 178 target = self.resource 179 180 # Get data/index 181 data_rows = [] 182 index_rows = [] 183 fixed_types = {} 184 with source: 185 for row in source.row_stream: 186 data_values = [] 187 index_values = [] 188 for field in source.schema.fields: 189 value = row[field.name] 190 if isinstance(value, float) and np.isnan(value): 191 value = None 192 # http://pandas.pydata.org/pandas-docs/stable/gotchas.html#support-for-integer-na 193 if value is None and field.type in ("number", "integer"): 194 fixed_types[field.name] = "number" 195 value = np.NaN 196 if field.type in ["datetime", "time"] and value is not None: 197 value = value.replace(tzinfo=None) 198 if field.name in source.schema.primary_key: 199 index_values.append(value) 200 else: 201 data_values.append(value) 202 if len(source.schema.primary_key) == 1: 203 index_rows.append(index_values[0]) 204 elif len(source.schema.primary_key) > 1: 205 index_rows.append(tuple(index_values)) 206 data_rows.append(tuple(data_values)) 207 208 # Create index 209 index = None 210 if source.schema.primary_key: 211 if len(source.schema.primary_key) == 1: 212 index_class = pd.Index 213 index_field = source.schema.get_field(source.schema.primary_key[0]) 214 index_dtype = self.__write_convert_type(index_field.type) 215 if field.type in ["datetime", "date"]: 216 index_class = pd.DatetimeIndex 217 index = index_class(index_rows, name=index_field.name, dtype=index_dtype) 218 elif len(source.schema.primary_key) > 1: 219 index = pd.MultiIndex.from_tuples( 220 index_rows, names=source.schema.primary_key 221 ) 222 223 # Create dtypes/columns 224 dtypes = [] 225 columns = [] 226 for field in source.schema.fields: 227 if field.name not in source.schema.primary_key: 228 dtype = self.__write_convert_type(fixed_types.get(field.name, field.type)) 229 dtypes.append((field.name, dtype)) 230 columns.append(field.name) 231 232 # Create/set dataframe 233 array = np.array(data_rows, dtype=dtypes) 234 dataframe = pd.DataFrame(array, index=index, columns=columns) 235 target.data = dataframe 236 237 def __write_convert_type(self, type=None): 238 np = helpers.import_from_plugin("numpy", plugin="pandas") 239 240 # Mapping 241 mapping = { 242 "array": np.dtype(list), 243 "boolean": np.dtype(bool), 244 "datetime": np.dtype("datetime64[ns]"), 245 "integer": np.dtype(int), 246 "number": np.dtype(float), 247 "object": np.dtype(dict), 248 "year": np.dtype(int), 249 } 250 251 # Return type 252 if type: 253 return mapping.get(type, np.dtype("O")) 254 255 # Return mapping 256 return mapping 257