1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include "arrow/python/serialize.h"
19 #include "arrow/python/numpy_interop.h"
20 
21 #include <cstdint>
22 #include <limits>
23 #include <memory>
24 #include <sstream>
25 #include <string>
26 #include <vector>
27 
28 #include <numpy/arrayobject.h>
29 #include <numpy/arrayscalars.h>
30 
31 #include "arrow/array.h"
32 #include "arrow/array/builder_binary.h"
33 #include "arrow/array/builder_nested.h"
34 #include "arrow/array/builder_primitive.h"
35 #include "arrow/array/builder_union.h"
36 #include "arrow/io/interfaces.h"
37 #include "arrow/io/memory.h"
38 #include "arrow/ipc/util.h"
39 #include "arrow/ipc/writer.h"
40 #include "arrow/record_batch.h"
41 #include "arrow/result.h"
42 #include "arrow/tensor.h"
43 #include "arrow/util/logging.h"
44 
45 #include "arrow/python/common.h"
46 #include "arrow/python/datetime.h"
47 #include "arrow/python/helpers.h"
48 #include "arrow/python/iterators.h"
49 #include "arrow/python/numpy_convert.h"
50 #include "arrow/python/platform.h"
51 #include "arrow/python/pyarrow.h"
52 
53 constexpr int32_t kMaxRecursionDepth = 100;
54 
55 namespace arrow {
56 
57 using internal::checked_cast;
58 
59 namespace py {
60 
61 class SequenceBuilder;
62 class DictBuilder;
63 
64 Status Append(PyObject* context, PyObject* elem, SequenceBuilder* builder,
65               int32_t recursion_depth, SerializedPyObject* blobs_out);
66 
67 // A Sequence is a heterogeneous collections of elements. It can contain
68 // scalar Python types, lists, tuples, dictionaries, tensors and sparse tensors.
69 class SequenceBuilder {
70  public:
SequenceBuilder(MemoryPool * pool=default_memory_pool ())71   explicit SequenceBuilder(MemoryPool* pool = default_memory_pool())
72       : pool_(pool),
73         types_(::arrow::int8(), pool),
74         offsets_(::arrow::int32(), pool),
75         type_map_(PythonType::NUM_PYTHON_TYPES, -1) {
76     auto null_builder = std::make_shared<NullBuilder>(pool);
77     auto initial_ty = dense_union({field("0", null())});
78     builder_.reset(new DenseUnionBuilder(pool, {null_builder}, initial_ty));
79   }
80 
81   // Appending a none to the sequence
AppendNone()82   Status AppendNone() { return builder_->AppendNull(); }
83 
84   template <typename BuilderType, typename MakeBuilderFn>
CreateAndUpdate(std::shared_ptr<BuilderType> * child_builder,int8_t tag,MakeBuilderFn make_builder)85   Status CreateAndUpdate(std::shared_ptr<BuilderType>* child_builder, int8_t tag,
86                          MakeBuilderFn make_builder) {
87     if (!*child_builder) {
88       child_builder->reset(make_builder());
89       std::ostringstream convert;
90       convert.imbue(std::locale::classic());
91       convert << static_cast<int>(tag);
92       type_map_[tag] = builder_->AppendChild(*child_builder, convert.str());
93     }
94     return builder_->Append(type_map_[tag]);
95   }
96 
97   template <typename BuilderType, typename T>
AppendPrimitive(std::shared_ptr<BuilderType> * child_builder,const T val,int8_t tag)98   Status AppendPrimitive(std::shared_ptr<BuilderType>* child_builder, const T val,
99                          int8_t tag) {
100     RETURN_NOT_OK(
101         CreateAndUpdate(child_builder, tag, [this]() { return new BuilderType(pool_); }));
102     return (*child_builder)->Append(val);
103   }
104 
105   // Appending a boolean to the sequence
AppendBool(const bool data)106   Status AppendBool(const bool data) {
107     return AppendPrimitive(&bools_, data, PythonType::BOOL);
108   }
109 
110   // Appending an int64_t to the sequence
AppendInt64(const int64_t data)111   Status AppendInt64(const int64_t data) {
112     return AppendPrimitive(&ints_, data, PythonType::INT);
113   }
114 
115   // Append a list of bytes to the sequence
AppendBytes(const uint8_t * data,int32_t length)116   Status AppendBytes(const uint8_t* data, int32_t length) {
117     RETURN_NOT_OK(CreateAndUpdate(&bytes_, PythonType::BYTES,
118                                   [this]() { return new BinaryBuilder(pool_); }));
119     return bytes_->Append(data, length);
120   }
121 
122   // Appending a string to the sequence
AppendString(const char * data,int32_t length)123   Status AppendString(const char* data, int32_t length) {
124     RETURN_NOT_OK(CreateAndUpdate(&strings_, PythonType::STRING,
125                                   [this]() { return new StringBuilder(pool_); }));
126     return strings_->Append(data, length);
127   }
128 
129   // Appending a half_float to the sequence
AppendHalfFloat(const npy_half data)130   Status AppendHalfFloat(const npy_half data) {
131     return AppendPrimitive(&half_floats_, data, PythonType::HALF_FLOAT);
132   }
133 
134   // Appending a float to the sequence
AppendFloat(const float data)135   Status AppendFloat(const float data) {
136     return AppendPrimitive(&floats_, data, PythonType::FLOAT);
137   }
138 
139   // Appending a double to the sequence
AppendDouble(const double data)140   Status AppendDouble(const double data) {
141     return AppendPrimitive(&doubles_, data, PythonType::DOUBLE);
142   }
143 
144   // Appending a Date64 timestamp to the sequence
AppendDate64(const int64_t timestamp)145   Status AppendDate64(const int64_t timestamp) {
146     return AppendPrimitive(&date64s_, timestamp, PythonType::DATE64);
147   }
148 
149   // Appending a tensor to the sequence
150   //
151   // \param tensor_index Index of the tensor in the object.
AppendTensor(const int32_t tensor_index)152   Status AppendTensor(const int32_t tensor_index) {
153     RETURN_NOT_OK(CreateAndUpdate(&tensor_indices_, PythonType::TENSOR,
154                                   [this]() { return new Int32Builder(pool_); }));
155     return tensor_indices_->Append(tensor_index);
156   }
157 
158   // Appending a sparse coo tensor to the sequence
159   //
160   // \param sparse_coo_tensor_index Index of the sparse coo tensor in the object.
AppendSparseCOOTensor(const int32_t sparse_coo_tensor_index)161   Status AppendSparseCOOTensor(const int32_t sparse_coo_tensor_index) {
162     RETURN_NOT_OK(CreateAndUpdate(&sparse_coo_tensor_indices_,
163                                   PythonType::SPARSECOOTENSOR,
164                                   [this]() { return new Int32Builder(pool_); }));
165     return sparse_coo_tensor_indices_->Append(sparse_coo_tensor_index);
166   }
167 
168   // Appending a sparse csr matrix to the sequence
169   //
170   // \param sparse_csr_matrix_index Index of the sparse csr matrix in the object.
AppendSparseCSRMatrix(const int32_t sparse_csr_matrix_index)171   Status AppendSparseCSRMatrix(const int32_t sparse_csr_matrix_index) {
172     RETURN_NOT_OK(CreateAndUpdate(&sparse_csr_matrix_indices_,
173                                   PythonType::SPARSECSRMATRIX,
174                                   [this]() { return new Int32Builder(pool_); }));
175     return sparse_csr_matrix_indices_->Append(sparse_csr_matrix_index);
176   }
177 
178   // Appending a sparse csc matrix to the sequence
179   //
180   // \param sparse_csc_matrix_index Index of the sparse csc matrix in the object.
AppendSparseCSCMatrix(const int32_t sparse_csc_matrix_index)181   Status AppendSparseCSCMatrix(const int32_t sparse_csc_matrix_index) {
182     RETURN_NOT_OK(CreateAndUpdate(&sparse_csc_matrix_indices_,
183                                   PythonType::SPARSECSCMATRIX,
184                                   [this]() { return new Int32Builder(pool_); }));
185     return sparse_csc_matrix_indices_->Append(sparse_csc_matrix_index);
186   }
187 
188   // Appending a sparse csf tensor to the sequence
189   //
190   // \param sparse_csf_tensor_index Index of the sparse csf tensor in the object.
AppendSparseCSFTensor(const int32_t sparse_csf_tensor_index)191   Status AppendSparseCSFTensor(const int32_t sparse_csf_tensor_index) {
192     RETURN_NOT_OK(CreateAndUpdate(&sparse_csf_tensor_indices_,
193                                   PythonType::SPARSECSFTENSOR,
194                                   [this]() { return new Int32Builder(pool_); }));
195     return sparse_csf_tensor_indices_->Append(sparse_csf_tensor_index);
196   }
197 
198   // Appending a numpy ndarray to the sequence
199   //
200   // \param tensor_index Index of the tensor in the object.
AppendNdarray(const int32_t ndarray_index)201   Status AppendNdarray(const int32_t ndarray_index) {
202     RETURN_NOT_OK(CreateAndUpdate(&ndarray_indices_, PythonType::NDARRAY,
203                                   [this]() { return new Int32Builder(pool_); }));
204     return ndarray_indices_->Append(ndarray_index);
205   }
206 
207   // Appending a buffer to the sequence
208   //
209   // \param buffer_index Index of the buffer in the object.
AppendBuffer(const int32_t buffer_index)210   Status AppendBuffer(const int32_t buffer_index) {
211     RETURN_NOT_OK(CreateAndUpdate(&buffer_indices_, PythonType::BUFFER,
212                                   [this]() { return new Int32Builder(pool_); }));
213     return buffer_indices_->Append(buffer_index);
214   }
215 
AppendSequence(PyObject * context,PyObject * sequence,int8_t tag,std::shared_ptr<ListBuilder> & target_sequence,std::unique_ptr<SequenceBuilder> & values,int32_t recursion_depth,SerializedPyObject * blobs_out)216   Status AppendSequence(PyObject* context, PyObject* sequence, int8_t tag,
217                         std::shared_ptr<ListBuilder>& target_sequence,
218                         std::unique_ptr<SequenceBuilder>& values, int32_t recursion_depth,
219                         SerializedPyObject* blobs_out) {
220     if (recursion_depth >= kMaxRecursionDepth) {
221       return Status::NotImplemented(
222           "This object exceeds the maximum recursion depth. It may contain itself "
223           "recursively.");
224     }
225     RETURN_NOT_OK(CreateAndUpdate(&target_sequence, tag, [this, &values]() {
226       values.reset(new SequenceBuilder(pool_));
227       return new ListBuilder(pool_, values->builder());
228     }));
229     RETURN_NOT_OK(target_sequence->Append());
230     return internal::VisitIterable(
231         sequence, [&](PyObject* obj, bool* keep_going /* unused */) {
232           return Append(context, obj, values.get(), recursion_depth, blobs_out);
233         });
234   }
235 
AppendList(PyObject * context,PyObject * list,int32_t recursion_depth,SerializedPyObject * blobs_out)236   Status AppendList(PyObject* context, PyObject* list, int32_t recursion_depth,
237                     SerializedPyObject* blobs_out) {
238     return AppendSequence(context, list, PythonType::LIST, lists_, list_values_,
239                           recursion_depth + 1, blobs_out);
240   }
241 
AppendTuple(PyObject * context,PyObject * tuple,int32_t recursion_depth,SerializedPyObject * blobs_out)242   Status AppendTuple(PyObject* context, PyObject* tuple, int32_t recursion_depth,
243                      SerializedPyObject* blobs_out) {
244     return AppendSequence(context, tuple, PythonType::TUPLE, tuples_, tuple_values_,
245                           recursion_depth + 1, blobs_out);
246   }
247 
AppendSet(PyObject * context,PyObject * set,int32_t recursion_depth,SerializedPyObject * blobs_out)248   Status AppendSet(PyObject* context, PyObject* set, int32_t recursion_depth,
249                    SerializedPyObject* blobs_out) {
250     return AppendSequence(context, set, PythonType::SET, sets_, set_values_,
251                           recursion_depth + 1, blobs_out);
252   }
253 
254   Status AppendDict(PyObject* context, PyObject* dict, int32_t recursion_depth,
255                     SerializedPyObject* blobs_out);
256 
257   // Finish building the sequence and return the result.
258   // Input arrays may be nullptr
Finish(std::shared_ptr<Array> * out)259   Status Finish(std::shared_ptr<Array>* out) { return builder_->Finish(out); }
260 
builder()261   std::shared_ptr<DenseUnionBuilder> builder() { return builder_; }
262 
263  private:
264   MemoryPool* pool_;
265 
266   Int8Builder types_;
267   Int32Builder offsets_;
268 
269   /// Mapping from PythonType to child index
270   std::vector<int8_t> type_map_;
271 
272   std::shared_ptr<BooleanBuilder> bools_;
273   std::shared_ptr<Int64Builder> ints_;
274   std::shared_ptr<BinaryBuilder> bytes_;
275   std::shared_ptr<StringBuilder> strings_;
276   std::shared_ptr<HalfFloatBuilder> half_floats_;
277   std::shared_ptr<FloatBuilder> floats_;
278   std::shared_ptr<DoubleBuilder> doubles_;
279   std::shared_ptr<Date64Builder> date64s_;
280 
281   std::unique_ptr<SequenceBuilder> list_values_;
282   std::shared_ptr<ListBuilder> lists_;
283   std::unique_ptr<DictBuilder> dict_values_;
284   std::shared_ptr<ListBuilder> dicts_;
285   std::unique_ptr<SequenceBuilder> tuple_values_;
286   std::shared_ptr<ListBuilder> tuples_;
287   std::unique_ptr<SequenceBuilder> set_values_;
288   std::shared_ptr<ListBuilder> sets_;
289 
290   std::shared_ptr<Int32Builder> tensor_indices_;
291   std::shared_ptr<Int32Builder> sparse_coo_tensor_indices_;
292   std::shared_ptr<Int32Builder> sparse_csr_matrix_indices_;
293   std::shared_ptr<Int32Builder> sparse_csc_matrix_indices_;
294   std::shared_ptr<Int32Builder> sparse_csf_tensor_indices_;
295   std::shared_ptr<Int32Builder> ndarray_indices_;
296   std::shared_ptr<Int32Builder> buffer_indices_;
297 
298   std::shared_ptr<DenseUnionBuilder> builder_;
299 };
300 
301 // Constructing dictionaries of key/value pairs. Sequences of
302 // keys and values are built separately using a pair of
303 // SequenceBuilders. The resulting Arrow representation
304 // can be obtained via the Finish method.
305 class DictBuilder {
306  public:
DictBuilder(MemoryPool * pool=nullptr)307   explicit DictBuilder(MemoryPool* pool = nullptr) : keys_(pool), vals_(pool) {
308     builder_.reset(new StructBuilder(struct_({field("keys", dense_union(FieldVector{})),
309                                               field("vals", dense_union(FieldVector{}))}),
310                                      pool, {keys_.builder(), vals_.builder()}));
311   }
312 
313   // Builder for the keys of the dictionary
keys()314   SequenceBuilder& keys() { return keys_; }
315   // Builder for the values of the dictionary
vals()316   SequenceBuilder& vals() { return vals_; }
317 
318   // Construct an Arrow StructArray representing the dictionary.
319   // Contains a field "keys" for the keys and "vals" for the values.
Finish(std::shared_ptr<Array> * out)320   Status Finish(std::shared_ptr<Array>* out) { return builder_->Finish(out); }
321 
builder()322   std::shared_ptr<StructBuilder> builder() { return builder_; }
323 
324  private:
325   SequenceBuilder keys_;
326   SequenceBuilder vals_;
327   std::shared_ptr<StructBuilder> builder_;
328 };
329 
AppendDict(PyObject * context,PyObject * dict,int32_t recursion_depth,SerializedPyObject * blobs_out)330 Status SequenceBuilder::AppendDict(PyObject* context, PyObject* dict,
331                                    int32_t recursion_depth,
332                                    SerializedPyObject* blobs_out) {
333   if (recursion_depth >= kMaxRecursionDepth) {
334     return Status::NotImplemented(
335         "This object exceeds the maximum recursion depth. It may contain itself "
336         "recursively.");
337   }
338   RETURN_NOT_OK(CreateAndUpdate(&dicts_, PythonType::DICT, [this]() {
339     dict_values_.reset(new DictBuilder(pool_));
340     return new ListBuilder(pool_, dict_values_->builder());
341   }));
342   RETURN_NOT_OK(dicts_->Append());
343   PyObject* key;
344   PyObject* value;
345   Py_ssize_t pos = 0;
346   while (PyDict_Next(dict, &pos, &key, &value)) {
347     RETURN_NOT_OK(dict_values_->builder()->Append());
348     RETURN_NOT_OK(
349         Append(context, key, &dict_values_->keys(), recursion_depth + 1, blobs_out));
350     RETURN_NOT_OK(
351         Append(context, value, &dict_values_->vals(), recursion_depth + 1, blobs_out));
352   }
353 
354   // This block is used to decrement the reference counts of the results
355   // returned by the serialization callback, which is called in AppendArray,
356   // in DeserializeDict and in Append
357   static PyObject* py_type = PyUnicode_FromString("_pytype_");
358   if (PyDict_Contains(dict, py_type)) {
359     // If the dictionary contains the key "_pytype_", then the user has to
360     // have registered a callback.
361     if (context == Py_None) {
362       return Status::Invalid("No serialization callback set");
363     }
364     Py_XDECREF(dict);
365   }
366   return Status::OK();
367 }
368 
CallCustomCallback(PyObject * context,PyObject * method_name,PyObject * elem,PyObject ** result)369 Status CallCustomCallback(PyObject* context, PyObject* method_name, PyObject* elem,
370                           PyObject** result) {
371   if (context == Py_None) {
372     *result = NULL;
373     return Status::SerializationError("error while calling callback on ",
374                                       internal::PyObject_StdStringRepr(elem),
375                                       ": handler not registered");
376   } else {
377     *result = PyObject_CallMethodObjArgs(context, method_name, elem, NULL);
378     return CheckPyError();
379   }
380 }
381 
CallSerializeCallback(PyObject * context,PyObject * value,PyObject ** serialized_object)382 Status CallSerializeCallback(PyObject* context, PyObject* value,
383                              PyObject** serialized_object) {
384   OwnedRef method_name(PyUnicode_FromString("_serialize_callback"));
385   RETURN_NOT_OK(CallCustomCallback(context, method_name.obj(), value, serialized_object));
386   if (!PyDict_Check(*serialized_object)) {
387     return Status::TypeError("serialization callback must return a valid dictionary");
388   }
389   return Status::OK();
390 }
391 
CallDeserializeCallback(PyObject * context,PyObject * value,PyObject ** deserialized_object)392 Status CallDeserializeCallback(PyObject* context, PyObject* value,
393                                PyObject** deserialized_object) {
394   OwnedRef method_name(PyUnicode_FromString("_deserialize_callback"));
395   return CallCustomCallback(context, method_name.obj(), value, deserialized_object);
396 }
397 
398 Status AppendArray(PyObject* context, PyArrayObject* array, SequenceBuilder* builder,
399                    int32_t recursion_depth, SerializedPyObject* blobs_out);
400 
401 template <typename NumpyScalarObject>
AppendIntegerScalar(PyObject * obj,SequenceBuilder * builder)402 Status AppendIntegerScalar(PyObject* obj, SequenceBuilder* builder) {
403   int64_t value = reinterpret_cast<NumpyScalarObject*>(obj)->obval;
404   return builder->AppendInt64(value);
405 }
406 
407 // Append a potentially 64-bit wide unsigned Numpy scalar.
408 // Must check for overflow as we reinterpret it as signed int64.
409 template <typename NumpyScalarObject>
AppendLargeUnsignedScalar(PyObject * obj,SequenceBuilder * builder)410 Status AppendLargeUnsignedScalar(PyObject* obj, SequenceBuilder* builder) {
411   constexpr uint64_t max_value = std::numeric_limits<int64_t>::max();
412 
413   uint64_t value = reinterpret_cast<NumpyScalarObject*>(obj)->obval;
414   if (value > max_value) {
415     return Status::Invalid("cannot serialize Numpy uint64 scalar >= 2**63");
416   }
417   return builder->AppendInt64(static_cast<int64_t>(value));
418 }
419 
AppendScalar(PyObject * obj,SequenceBuilder * builder)420 Status AppendScalar(PyObject* obj, SequenceBuilder* builder) {
421   if (PyArray_IsScalar(obj, Bool)) {
422     return builder->AppendBool(reinterpret_cast<PyBoolScalarObject*>(obj)->obval != 0);
423   } else if (PyArray_IsScalar(obj, Half)) {
424     return builder->AppendHalfFloat(reinterpret_cast<PyHalfScalarObject*>(obj)->obval);
425   } else if (PyArray_IsScalar(obj, Float)) {
426     return builder->AppendFloat(reinterpret_cast<PyFloatScalarObject*>(obj)->obval);
427   } else if (PyArray_IsScalar(obj, Double)) {
428     return builder->AppendDouble(reinterpret_cast<PyDoubleScalarObject*>(obj)->obval);
429   }
430   if (PyArray_IsScalar(obj, Byte)) {
431     return AppendIntegerScalar<PyByteScalarObject>(obj, builder);
432   } else if (PyArray_IsScalar(obj, Short)) {
433     return AppendIntegerScalar<PyShortScalarObject>(obj, builder);
434   } else if (PyArray_IsScalar(obj, Int)) {
435     return AppendIntegerScalar<PyIntScalarObject>(obj, builder);
436   } else if (PyArray_IsScalar(obj, Long)) {
437     return AppendIntegerScalar<PyLongScalarObject>(obj, builder);
438   } else if (PyArray_IsScalar(obj, LongLong)) {
439     return AppendIntegerScalar<PyLongLongScalarObject>(obj, builder);
440   } else if (PyArray_IsScalar(obj, Int64)) {
441     return AppendIntegerScalar<PyInt64ScalarObject>(obj, builder);
442   } else if (PyArray_IsScalar(obj, UByte)) {
443     return AppendIntegerScalar<PyUByteScalarObject>(obj, builder);
444   } else if (PyArray_IsScalar(obj, UShort)) {
445     return AppendIntegerScalar<PyUShortScalarObject>(obj, builder);
446   } else if (PyArray_IsScalar(obj, UInt)) {
447     return AppendIntegerScalar<PyUIntScalarObject>(obj, builder);
448   } else if (PyArray_IsScalar(obj, ULong)) {
449     return AppendLargeUnsignedScalar<PyULongScalarObject>(obj, builder);
450   } else if (PyArray_IsScalar(obj, ULongLong)) {
451     return AppendLargeUnsignedScalar<PyULongLongScalarObject>(obj, builder);
452   } else if (PyArray_IsScalar(obj, UInt64)) {
453     return AppendLargeUnsignedScalar<PyUInt64ScalarObject>(obj, builder);
454   }
455   return Status::NotImplemented("Numpy scalar type not recognized");
456 }
457 
Append(PyObject * context,PyObject * elem,SequenceBuilder * builder,int32_t recursion_depth,SerializedPyObject * blobs_out)458 Status Append(PyObject* context, PyObject* elem, SequenceBuilder* builder,
459               int32_t recursion_depth, SerializedPyObject* blobs_out) {
460   // The bool case must precede the int case (PyInt_Check passes for bools)
461   if (PyBool_Check(elem)) {
462     RETURN_NOT_OK(builder->AppendBool(elem == Py_True));
463   } else if (PyArray_DescrFromScalar(elem)->type_num == NPY_HALF) {
464     npy_half halffloat = reinterpret_cast<PyHalfScalarObject*>(elem)->obval;
465     RETURN_NOT_OK(builder->AppendHalfFloat(halffloat));
466   } else if (PyFloat_Check(elem)) {
467     RETURN_NOT_OK(builder->AppendDouble(PyFloat_AS_DOUBLE(elem)));
468   } else if (PyLong_Check(elem)) {
469     int overflow = 0;
470     int64_t data = PyLong_AsLongLongAndOverflow(elem, &overflow);
471     if (!overflow) {
472       RETURN_NOT_OK(builder->AppendInt64(data));
473     } else {
474       // Attempt to serialize the object using the custom callback.
475       PyObject* serialized_object;
476       // The reference count of serialized_object will be decremented in SerializeDict
477       RETURN_NOT_OK(CallSerializeCallback(context, elem, &serialized_object));
478       RETURN_NOT_OK(
479           builder->AppendDict(context, serialized_object, recursion_depth, blobs_out));
480     }
481   } else if (PyBytes_Check(elem)) {
482     auto data = reinterpret_cast<uint8_t*>(PyBytes_AS_STRING(elem));
483     int32_t size = -1;
484     RETURN_NOT_OK(internal::CastSize(PyBytes_GET_SIZE(elem), &size));
485     RETURN_NOT_OK(builder->AppendBytes(data, size));
486   } else if (PyUnicode_Check(elem)) {
487     ARROW_ASSIGN_OR_RAISE(auto view, PyBytesView::FromUnicode(elem));
488     int32_t size = -1;
489     RETURN_NOT_OK(internal::CastSize(view.size, &size));
490     RETURN_NOT_OK(builder->AppendString(view.bytes, size));
491   } else if (PyList_CheckExact(elem)) {
492     RETURN_NOT_OK(builder->AppendList(context, elem, recursion_depth, blobs_out));
493   } else if (PyDict_CheckExact(elem)) {
494     RETURN_NOT_OK(builder->AppendDict(context, elem, recursion_depth, blobs_out));
495   } else if (PyTuple_CheckExact(elem)) {
496     RETURN_NOT_OK(builder->AppendTuple(context, elem, recursion_depth, blobs_out));
497   } else if (PySet_Check(elem)) {
498     RETURN_NOT_OK(builder->AppendSet(context, elem, recursion_depth, blobs_out));
499   } else if (PyArray_IsScalar(elem, Generic)) {
500     RETURN_NOT_OK(AppendScalar(elem, builder));
501   } else if (PyArray_CheckExact(elem)) {
502     RETURN_NOT_OK(AppendArray(context, reinterpret_cast<PyArrayObject*>(elem), builder,
503                               recursion_depth, blobs_out));
504   } else if (elem == Py_None) {
505     RETURN_NOT_OK(builder->AppendNone());
506   } else if (PyDateTime_Check(elem)) {
507     PyDateTime_DateTime* datetime = reinterpret_cast<PyDateTime_DateTime*>(elem);
508     RETURN_NOT_OK(builder->AppendDate64(internal::PyDateTime_to_us(datetime)));
509   } else if (is_buffer(elem)) {
510     RETURN_NOT_OK(builder->AppendBuffer(static_cast<int32_t>(blobs_out->buffers.size())));
511     ARROW_ASSIGN_OR_RAISE(auto buffer, unwrap_buffer(elem));
512     blobs_out->buffers.push_back(buffer);
513   } else if (is_tensor(elem)) {
514     RETURN_NOT_OK(builder->AppendTensor(static_cast<int32_t>(blobs_out->tensors.size())));
515     ARROW_ASSIGN_OR_RAISE(auto tensor, unwrap_tensor(elem));
516     blobs_out->tensors.push_back(tensor);
517   } else if (is_sparse_coo_tensor(elem)) {
518     RETURN_NOT_OK(builder->AppendSparseCOOTensor(
519         static_cast<int32_t>(blobs_out->sparse_tensors.size())));
520     ARROW_ASSIGN_OR_RAISE(auto tensor, unwrap_sparse_coo_tensor(elem));
521     blobs_out->sparse_tensors.push_back(tensor);
522   } else if (is_sparse_csr_matrix(elem)) {
523     RETURN_NOT_OK(builder->AppendSparseCSRMatrix(
524         static_cast<int32_t>(blobs_out->sparse_tensors.size())));
525     ARROW_ASSIGN_OR_RAISE(auto matrix, unwrap_sparse_csr_matrix(elem));
526     blobs_out->sparse_tensors.push_back(matrix);
527   } else if (is_sparse_csc_matrix(elem)) {
528     RETURN_NOT_OK(builder->AppendSparseCSCMatrix(
529         static_cast<int32_t>(blobs_out->sparse_tensors.size())));
530     ARROW_ASSIGN_OR_RAISE(auto matrix, unwrap_sparse_csc_matrix(elem));
531     blobs_out->sparse_tensors.push_back(matrix);
532   } else if (is_sparse_csf_tensor(elem)) {
533     RETURN_NOT_OK(builder->AppendSparseCSFTensor(
534         static_cast<int32_t>(blobs_out->sparse_tensors.size())));
535     ARROW_ASSIGN_OR_RAISE(auto tensor, unwrap_sparse_csf_tensor(elem));
536     blobs_out->sparse_tensors.push_back(tensor);
537   } else {
538     // Attempt to serialize the object using the custom callback.
539     PyObject* serialized_object;
540     // The reference count of serialized_object will be decremented in SerializeDict
541     RETURN_NOT_OK(CallSerializeCallback(context, elem, &serialized_object));
542     RETURN_NOT_OK(
543         builder->AppendDict(context, serialized_object, recursion_depth, blobs_out));
544   }
545   return Status::OK();
546 }
547 
AppendArray(PyObject * context,PyArrayObject * array,SequenceBuilder * builder,int32_t recursion_depth,SerializedPyObject * blobs_out)548 Status AppendArray(PyObject* context, PyArrayObject* array, SequenceBuilder* builder,
549                    int32_t recursion_depth, SerializedPyObject* blobs_out) {
550   int dtype = PyArray_TYPE(array);
551   switch (dtype) {
552     case NPY_UINT8:
553     case NPY_INT8:
554     case NPY_UINT16:
555     case NPY_INT16:
556     case NPY_UINT32:
557     case NPY_INT32:
558     case NPY_UINT64:
559     case NPY_INT64:
560     case NPY_HALF:
561     case NPY_FLOAT:
562     case NPY_DOUBLE: {
563       RETURN_NOT_OK(
564           builder->AppendNdarray(static_cast<int32_t>(blobs_out->ndarrays.size())));
565       std::shared_ptr<Tensor> tensor;
566       RETURN_NOT_OK(NdarrayToTensor(default_memory_pool(),
567                                     reinterpret_cast<PyObject*>(array), {}, &tensor));
568       blobs_out->ndarrays.push_back(tensor);
569     } break;
570     default: {
571       PyObject* serialized_object;
572       // The reference count of serialized_object will be decremented in SerializeDict
573       RETURN_NOT_OK(CallSerializeCallback(context, reinterpret_cast<PyObject*>(array),
574                                           &serialized_object));
575       RETURN_NOT_OK(builder->AppendDict(context, serialized_object, recursion_depth + 1,
576                                         blobs_out));
577     }
578   }
579   return Status::OK();
580 }
581 
MakeBatch(std::shared_ptr<Array> data)582 std::shared_ptr<RecordBatch> MakeBatch(std::shared_ptr<Array> data) {
583   auto field = std::make_shared<Field>("list", data->type());
584   auto schema = ::arrow::schema({field});
585   return RecordBatch::Make(schema, data->length(), {data});
586 }
587 
SerializeObject(PyObject * context,PyObject * sequence,SerializedPyObject * out)588 Status SerializeObject(PyObject* context, PyObject* sequence, SerializedPyObject* out) {
589   PyAcquireGIL lock;
590   SequenceBuilder builder;
591   RETURN_NOT_OK(internal::VisitIterable(
592       sequence, [&](PyObject* obj, bool* keep_going /* unused */) {
593         return Append(context, obj, &builder, 0, out);
594       }));
595   std::shared_ptr<Array> array;
596   RETURN_NOT_OK(builder.Finish(&array));
597   out->batch = MakeBatch(array);
598   return Status::OK();
599 }
600 
SerializeNdarray(std::shared_ptr<Tensor> tensor,SerializedPyObject * out)601 Status SerializeNdarray(std::shared_ptr<Tensor> tensor, SerializedPyObject* out) {
602   std::shared_ptr<Array> array;
603   SequenceBuilder builder;
604   RETURN_NOT_OK(builder.AppendNdarray(static_cast<int32_t>(out->ndarrays.size())));
605   out->ndarrays.push_back(tensor);
606   RETURN_NOT_OK(builder.Finish(&array));
607   out->batch = MakeBatch(array);
608   return Status::OK();
609 }
610 
WriteNdarrayHeader(std::shared_ptr<DataType> dtype,const std::vector<int64_t> & shape,int64_t tensor_num_bytes,io::OutputStream * dst)611 Status WriteNdarrayHeader(std::shared_ptr<DataType> dtype,
612                           const std::vector<int64_t>& shape, int64_t tensor_num_bytes,
613                           io::OutputStream* dst) {
614   auto empty_tensor = std::make_shared<Tensor>(
615       dtype, std::make_shared<Buffer>(nullptr, tensor_num_bytes), shape);
616   SerializedPyObject serialized_tensor;
617   RETURN_NOT_OK(SerializeNdarray(empty_tensor, &serialized_tensor));
618   return serialized_tensor.WriteTo(dst);
619 }
620 
SerializedPyObject()621 SerializedPyObject::SerializedPyObject()
622     : ipc_options(ipc::IpcWriteOptions::Defaults()) {}
623 
WriteTo(io::OutputStream * dst)624 Status SerializedPyObject::WriteTo(io::OutputStream* dst) {
625   int32_t num_tensors = static_cast<int32_t>(this->tensors.size());
626   int32_t num_sparse_tensors = static_cast<int32_t>(this->sparse_tensors.size());
627   int32_t num_ndarrays = static_cast<int32_t>(this->ndarrays.size());
628   int32_t num_buffers = static_cast<int32_t>(this->buffers.size());
629   RETURN_NOT_OK(
630       dst->Write(reinterpret_cast<const uint8_t*>(&num_tensors), sizeof(int32_t)));
631   RETURN_NOT_OK(
632       dst->Write(reinterpret_cast<const uint8_t*>(&num_sparse_tensors), sizeof(int32_t)));
633   RETURN_NOT_OK(
634       dst->Write(reinterpret_cast<const uint8_t*>(&num_ndarrays), sizeof(int32_t)));
635   RETURN_NOT_OK(
636       dst->Write(reinterpret_cast<const uint8_t*>(&num_buffers), sizeof(int32_t)));
637 
638   // Align stream to 8-byte offset
639   RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kArrowIpcAlignment));
640   RETURN_NOT_OK(ipc::WriteRecordBatchStream({this->batch}, this->ipc_options, dst));
641 
642   // Align stream to 64-byte offset so tensor bodies are 64-byte aligned
643   RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kTensorAlignment));
644 
645   int32_t metadata_length;
646   int64_t body_length;
647   for (const auto& tensor : this->tensors) {
648     RETURN_NOT_OK(ipc::WriteTensor(*tensor, dst, &metadata_length, &body_length));
649     RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kTensorAlignment));
650   }
651 
652   for (const auto& sparse_tensor : this->sparse_tensors) {
653     RETURN_NOT_OK(
654         ipc::WriteSparseTensor(*sparse_tensor, dst, &metadata_length, &body_length));
655     RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kTensorAlignment));
656   }
657 
658   for (const auto& tensor : this->ndarrays) {
659     RETURN_NOT_OK(ipc::WriteTensor(*tensor, dst, &metadata_length, &body_length));
660     RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kTensorAlignment));
661   }
662 
663   for (const auto& buffer : this->buffers) {
664     int64_t size = buffer->size();
665     RETURN_NOT_OK(dst->Write(reinterpret_cast<const uint8_t*>(&size), sizeof(int64_t)));
666     RETURN_NOT_OK(dst->Write(buffer->data(), size));
667   }
668 
669   return Status::OK();
670 }
671 
672 namespace {
673 
CountSparseTensors(const std::vector<std::shared_ptr<SparseTensor>> & sparse_tensors,PyObject ** out)674 Status CountSparseTensors(
675     const std::vector<std::shared_ptr<SparseTensor>>& sparse_tensors, PyObject** out) {
676   OwnedRef num_sparse_tensors(PyDict_New());
677   size_t num_coo = 0;
678   size_t num_csr = 0;
679   size_t num_csc = 0;
680   size_t num_csf = 0;
681   size_t ndim_csf = 0;
682 
683   for (const auto& sparse_tensor : sparse_tensors) {
684     switch (sparse_tensor->format_id()) {
685       case SparseTensorFormat::COO:
686         ++num_coo;
687         break;
688       case SparseTensorFormat::CSR:
689         ++num_csr;
690         break;
691       case SparseTensorFormat::CSC:
692         ++num_csc;
693         break;
694       case SparseTensorFormat::CSF:
695         ++num_csf;
696         ndim_csf += sparse_tensor->ndim();
697         break;
698     }
699   }
700 
701   PyDict_SetItemString(num_sparse_tensors.obj(), "coo", PyLong_FromSize_t(num_coo));
702   PyDict_SetItemString(num_sparse_tensors.obj(), "csr", PyLong_FromSize_t(num_csr));
703   PyDict_SetItemString(num_sparse_tensors.obj(), "csc", PyLong_FromSize_t(num_csc));
704   PyDict_SetItemString(num_sparse_tensors.obj(), "csf", PyLong_FromSize_t(num_csf));
705   PyDict_SetItemString(num_sparse_tensors.obj(), "ndim_csf", PyLong_FromSize_t(ndim_csf));
706   RETURN_IF_PYERROR();
707 
708   *out = num_sparse_tensors.detach();
709   return Status::OK();
710 }
711 
712 }  // namespace
713 
GetComponents(MemoryPool * memory_pool,PyObject ** out)714 Status SerializedPyObject::GetComponents(MemoryPool* memory_pool, PyObject** out) {
715   PyAcquireGIL py_gil;
716 
717   OwnedRef result(PyDict_New());
718   PyObject* buffers = PyList_New(0);
719   PyObject* num_sparse_tensors = nullptr;
720 
721   // TODO(wesm): Not sure how pedantic we need to be about checking the return
722   // values of these functions. There are other places where we do not check
723   // PyDict_SetItem/SetItemString return value, but these failures would be
724   // quite esoteric
725   PyDict_SetItemString(result.obj(), "num_tensors",
726                        PyLong_FromSize_t(this->tensors.size()));
727   RETURN_NOT_OK(CountSparseTensors(this->sparse_tensors, &num_sparse_tensors));
728   PyDict_SetItemString(result.obj(), "num_sparse_tensors", num_sparse_tensors);
729   PyDict_SetItemString(result.obj(), "ndim_csf", num_sparse_tensors);
730   PyDict_SetItemString(result.obj(), "num_ndarrays",
731                        PyLong_FromSize_t(this->ndarrays.size()));
732   PyDict_SetItemString(result.obj(), "num_buffers",
733                        PyLong_FromSize_t(this->buffers.size()));
734   PyDict_SetItemString(result.obj(), "data", buffers);
735   RETURN_IF_PYERROR();
736 
737   Py_DECREF(buffers);
738 
739   auto PushBuffer = [&buffers](const std::shared_ptr<Buffer>& buffer) {
740     PyObject* wrapped_buffer = wrap_buffer(buffer);
741     RETURN_IF_PYERROR();
742     if (PyList_Append(buffers, wrapped_buffer) < 0) {
743       Py_DECREF(wrapped_buffer);
744       RETURN_IF_PYERROR();
745     }
746     Py_DECREF(wrapped_buffer);
747     return Status::OK();
748   };
749 
750   constexpr int64_t kInitialCapacity = 1024;
751 
752   // Write the record batch describing the object structure
753   py_gil.release();
754   ARROW_ASSIGN_OR_RAISE(auto stream,
755                         io::BufferOutputStream::Create(kInitialCapacity, memory_pool));
756   RETURN_NOT_OK(
757       ipc::WriteRecordBatchStream({this->batch}, this->ipc_options, stream.get()));
758   ARROW_ASSIGN_OR_RAISE(auto buffer, stream->Finish());
759   py_gil.acquire();
760 
761   RETURN_NOT_OK(PushBuffer(buffer));
762 
763   // For each tensor, get a metadata buffer and a buffer for the body
764   for (const auto& tensor : this->tensors) {
765     ARROW_ASSIGN_OR_RAISE(std::unique_ptr<ipc::Message> message,
766                           ipc::GetTensorMessage(*tensor, memory_pool));
767     RETURN_NOT_OK(PushBuffer(message->metadata()));
768     RETURN_NOT_OK(PushBuffer(message->body()));
769   }
770 
771   // For each sparse tensor, get a metadata buffer and buffers containing index and data
772   for (const auto& sparse_tensor : this->sparse_tensors) {
773     ipc::IpcPayload payload;
774     RETURN_NOT_OK(ipc::GetSparseTensorPayload(*sparse_tensor, memory_pool, &payload));
775     RETURN_NOT_OK(PushBuffer(payload.metadata));
776     for (const auto& body : payload.body_buffers) {
777       RETURN_NOT_OK(PushBuffer(body));
778     }
779   }
780 
781   // For each ndarray, get a metadata buffer and a buffer for the body
782   for (const auto& ndarray : this->ndarrays) {
783     ARROW_ASSIGN_OR_RAISE(std::unique_ptr<ipc::Message> message,
784                           ipc::GetTensorMessage(*ndarray, memory_pool));
785     RETURN_NOT_OK(PushBuffer(message->metadata()));
786     RETURN_NOT_OK(PushBuffer(message->body()));
787   }
788 
789   for (const auto& buf : this->buffers) {
790     RETURN_NOT_OK(PushBuffer(buf));
791   }
792 
793   *out = result.detach();
794   return Status::OK();
795 }
796 
797 }  // namespace py
798 }  // namespace arrow
799