1from distutils.version import LooseVersion 2import json 3 4import numpy as np 5import pyarrow 6 7from pandas.core.arrays.interval import VALID_CLOSED 8 9_pyarrow_version_ge_015 = LooseVersion(pyarrow.__version__) >= LooseVersion("0.15") 10 11 12def pyarrow_array_to_numpy_and_mask(arr, dtype): 13 """ 14 Convert a primitive pyarrow.Array to a numpy array and boolean mask based 15 on the buffers of the Array. 16 17 Parameters 18 ---------- 19 arr : pyarrow.Array 20 dtype : numpy.dtype 21 22 Returns 23 ------- 24 (data, mask) 25 Tuple of two numpy arrays with the raw data (with specified dtype) and 26 a boolean mask (validity mask, so False means missing) 27 """ 28 buflist = arr.buffers() 29 data = np.frombuffer(buflist[1], dtype=dtype)[arr.offset : arr.offset + len(arr)] 30 bitmask = buflist[0] 31 if bitmask is not None: 32 mask = pyarrow.BooleanArray.from_buffers( 33 pyarrow.bool_(), len(arr), [None, bitmask], offset=arr.offset 34 ) 35 mask = np.asarray(mask) 36 else: 37 mask = np.ones(len(arr), dtype=bool) 38 return data, mask 39 40 41if _pyarrow_version_ge_015: 42 # the pyarrow extension types are only available for pyarrow 0.15+ 43 44 class ArrowPeriodType(pyarrow.ExtensionType): 45 def __init__(self, freq): 46 # attributes need to be set first before calling 47 # super init (as that calls serialize) 48 self._freq = freq 49 pyarrow.ExtensionType.__init__(self, pyarrow.int64(), "pandas.period") 50 51 @property 52 def freq(self): 53 return self._freq 54 55 def __arrow_ext_serialize__(self): 56 metadata = {"freq": self.freq} 57 return json.dumps(metadata).encode() 58 59 @classmethod 60 def __arrow_ext_deserialize__(cls, storage_type, serialized): 61 metadata = json.loads(serialized.decode()) 62 return ArrowPeriodType(metadata["freq"]) 63 64 def __eq__(self, other): 65 if isinstance(other, pyarrow.BaseExtensionType): 66 return type(self) == type(other) and self.freq == other.freq 67 else: 68 return NotImplemented 69 70 def __hash__(self): 71 return hash((str(self), self.freq)) 72 73 def to_pandas_dtype(self): 74 import pandas as pd 75 76 return pd.PeriodDtype(freq=self.freq) 77 78 # register the type with a dummy instance 79 _period_type = ArrowPeriodType("D") 80 pyarrow.register_extension_type(_period_type) 81 82 class ArrowIntervalType(pyarrow.ExtensionType): 83 def __init__(self, subtype, closed): 84 # attributes need to be set first before calling 85 # super init (as that calls serialize) 86 assert closed in VALID_CLOSED 87 self._closed = closed 88 if not isinstance(subtype, pyarrow.DataType): 89 subtype = pyarrow.type_for_alias(str(subtype)) 90 self._subtype = subtype 91 92 storage_type = pyarrow.struct([("left", subtype), ("right", subtype)]) 93 pyarrow.ExtensionType.__init__(self, storage_type, "pandas.interval") 94 95 @property 96 def subtype(self): 97 return self._subtype 98 99 @property 100 def closed(self): 101 return self._closed 102 103 def __arrow_ext_serialize__(self): 104 metadata = {"subtype": str(self.subtype), "closed": self.closed} 105 return json.dumps(metadata).encode() 106 107 @classmethod 108 def __arrow_ext_deserialize__(cls, storage_type, serialized): 109 metadata = json.loads(serialized.decode()) 110 subtype = pyarrow.type_for_alias(metadata["subtype"]) 111 closed = metadata["closed"] 112 return ArrowIntervalType(subtype, closed) 113 114 def __eq__(self, other): 115 if isinstance(other, pyarrow.BaseExtensionType): 116 return ( 117 type(self) == type(other) 118 and self.subtype == other.subtype 119 and self.closed == other.closed 120 ) 121 else: 122 return NotImplemented 123 124 def __hash__(self): 125 return hash((str(self), str(self.subtype), self.closed)) 126 127 def to_pandas_dtype(self): 128 import pandas as pd 129 130 return pd.IntervalDtype(self.subtype.to_pandas_dtype()) 131 132 # register the type with a dummy instance 133 _interval_type = ArrowIntervalType(pyarrow.int64(), "left") 134 pyarrow.register_extension_type(_interval_type) 135