1# Licensed to the Apache Software Foundation (ASF) under one 2# or more contributor license agreements. See the NOTICE file 3# distributed with this work for additional information 4# regarding copyright ownership. The ASF licenses this file 5# to you under the Apache License, Version 2.0 (the 6# "License"); you may not use this file except in compliance 7# with the License. You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, 12# software distributed under the License is distributed on an 13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14# KIND, either express or implied. See the License for the 15# specific language governing permissions and limitations 16# under the License. 17 18 19import pytest 20 21import collections 22import datetime 23import os 24import pathlib 25import pickle 26import subprocess 27import string 28import sys 29 30import pyarrow as pa 31import numpy as np 32 33import pyarrow.tests.util as test_util 34 35try: 36 import torch 37except ImportError: 38 torch = None 39 # Blacklist the module in case `import torch` is costly before 40 # failing (ARROW-2071) 41 sys.modules['torch'] = None 42 43try: 44 from scipy.sparse import coo_matrix, csr_matrix, csc_matrix 45except ImportError: 46 coo_matrix = None 47 csr_matrix = None 48 csc_matrix = None 49 50try: 51 import sparse 52except ImportError: 53 sparse = None 54 55 56# ignore all serialization deprecation warnings in this file, we test that the 57# warnings are actually raised in test_serialization_deprecated.py 58pytestmark = pytest.mark.filterwarnings("ignore:'pyarrow:FutureWarning") 59 60 61def assert_equal(obj1, obj2): 62 if torch is not None and torch.is_tensor(obj1) and torch.is_tensor(obj2): 63 if obj1.is_sparse: 64 obj1 = obj1.to_dense() 65 if obj2.is_sparse: 66 obj2 = obj2.to_dense() 67 assert torch.equal(obj1, obj2) 68 return 69 module_numpy = (type(obj1).__module__ == np.__name__ or 70 type(obj2).__module__ == np.__name__) 71 if module_numpy: 72 empty_shape = ((hasattr(obj1, "shape") and obj1.shape == ()) or 73 (hasattr(obj2, "shape") and obj2.shape == ())) 74 if empty_shape: 75 # This is a special case because currently np.testing.assert_equal 76 # fails because we do not properly handle different numerical 77 # types. 78 assert obj1 == obj2, ("Objects {} and {} are " 79 "different.".format(obj1, obj2)) 80 else: 81 np.testing.assert_equal(obj1, obj2) 82 elif hasattr(obj1, "__dict__") and hasattr(obj2, "__dict__"): 83 special_keys = ["_pytype_"] 84 assert (set(list(obj1.__dict__.keys()) + special_keys) == 85 set(list(obj2.__dict__.keys()) + special_keys)), ("Objects {} " 86 "and {} are " 87 "different." 88 .format( 89 obj1, 90 obj2)) 91 if obj1.__dict__ == {}: 92 print("WARNING: Empty dict in ", obj1) 93 for key in obj1.__dict__.keys(): 94 if key not in special_keys: 95 assert_equal(obj1.__dict__[key], obj2.__dict__[key]) 96 elif type(obj1) is dict or type(obj2) is dict: 97 assert_equal(obj1.keys(), obj2.keys()) 98 for key in obj1.keys(): 99 assert_equal(obj1[key], obj2[key]) 100 elif type(obj1) is list or type(obj2) is list: 101 assert len(obj1) == len(obj2), ("Objects {} and {} are lists with " 102 "different lengths." 103 .format(obj1, obj2)) 104 for i in range(len(obj1)): 105 assert_equal(obj1[i], obj2[i]) 106 elif type(obj1) is tuple or type(obj2) is tuple: 107 assert len(obj1) == len(obj2), ("Objects {} and {} are tuples with " 108 "different lengths." 109 .format(obj1, obj2)) 110 for i in range(len(obj1)): 111 assert_equal(obj1[i], obj2[i]) 112 elif (pa.lib.is_named_tuple(type(obj1)) or 113 pa.lib.is_named_tuple(type(obj2))): 114 assert len(obj1) == len(obj2), ("Objects {} and {} are named tuples " 115 "with different lengths." 116 .format(obj1, obj2)) 117 for i in range(len(obj1)): 118 assert_equal(obj1[i], obj2[i]) 119 elif isinstance(obj1, pa.Array) and isinstance(obj2, pa.Array): 120 assert obj1.equals(obj2) 121 elif isinstance(obj1, pa.Tensor) and isinstance(obj2, pa.Tensor): 122 assert obj1.equals(obj2) 123 elif isinstance(obj1, pa.Tensor) and isinstance(obj2, pa.Tensor): 124 assert obj1.equals(obj2) 125 elif isinstance(obj1, pa.SparseCOOTensor) and \ 126 isinstance(obj2, pa.SparseCOOTensor): 127 assert obj1.equals(obj2) 128 elif isinstance(obj1, pa.SparseCSRMatrix) and \ 129 isinstance(obj2, pa.SparseCSRMatrix): 130 assert obj1.equals(obj2) 131 elif isinstance(obj1, pa.SparseCSCMatrix) and \ 132 isinstance(obj2, pa.SparseCSCMatrix): 133 assert obj1.equals(obj2) 134 elif isinstance(obj1, pa.SparseCSFTensor) and \ 135 isinstance(obj2, pa.SparseCSFTensor): 136 assert obj1.equals(obj2) 137 elif isinstance(obj1, pa.RecordBatch) and isinstance(obj2, pa.RecordBatch): 138 assert obj1.equals(obj2) 139 elif isinstance(obj1, pa.Table) and isinstance(obj2, pa.Table): 140 assert obj1.equals(obj2) 141 else: 142 assert type(obj1) == type(obj2) and obj1 == obj2, \ 143 "Objects {} and {} are different.".format(obj1, obj2) 144 145 146PRIMITIVE_OBJECTS = [ 147 0, 0.0, 0.9, 1 << 62, 1 << 999, 148 [1 << 100, [1 << 100]], "a", string.printable, "\u262F", 149 "hello world", "hello world", "\xff\xfe\x9c\x001\x000\x00", 150 None, True, False, [], (), {}, {(1, 2): 1}, {(): 2}, 151 [1, "hello", 3.0], "\u262F", 42.0, (1.0, "hi"), 152 [1, 2, 3, None], [(None,), 3, 1.0], ["h", "e", "l", "l", "o", None], 153 (None, None), ("hello", None), (True, False), 154 {True: "hello", False: "world"}, {"hello": "world", 1: 42, 2.5: 45}, 155 {"hello": {2, 3}, "world": {42.0}, "this": None}, 156 np.int8(3), np.int32(4), np.int64(5), 157 np.uint8(3), np.uint32(4), np.uint64(5), 158 np.float16(1.9), np.float32(1.9), 159 np.float64(1.9), np.zeros([8, 20]), 160 np.random.normal(size=[17, 10]), np.array(["hi", 3]), 161 np.array(["hi", 3], dtype=object), 162 np.random.normal(size=[15, 13]).T 163] 164 165 166index_types = ('i1', 'i2', 'i4', 'i8', 'u1', 'u2', 'u4', 'u8') 167tensor_types = ('i1', 'i2', 'i4', 'i8', 'u1', 'u2', 'u4', 'u8', 168 'f2', 'f4', 'f8') 169 170PRIMITIVE_OBJECTS += [0, np.array([["hi", "hi"], [1.3, 1]])] 171 172 173COMPLEX_OBJECTS = [ 174 [[[[[[[[[[[[]]]]]]]]]]]], 175 {"obj{}".format(i): np.random.normal(size=[4, 4]) for i in range(5)}, 176 # {(): {(): {(): {(): {(): {(): {(): {(): {(): {(): { 177 # (): {(): {}}}}}}}}}}}}}, 178 ((((((((((),),),),),),),),),), 179 {"a": {"b": {"c": {"d": {}}}}}, 180] 181 182 183class Foo: 184 def __init__(self, value=0): 185 self.value = value 186 187 def __hash__(self): 188 return hash(self.value) 189 190 def __eq__(self, other): 191 return other.value == self.value 192 193 194class Bar: 195 def __init__(self): 196 for i, val in enumerate(COMPLEX_OBJECTS): 197 setattr(self, "field{}".format(i), val) 198 199 200class Baz: 201 def __init__(self): 202 self.foo = Foo() 203 self.bar = Bar() 204 205 def method(self, arg): 206 pass 207 208 209class Qux: 210 def __init__(self): 211 self.objs = [Foo(1), Foo(42)] 212 213 214class SubQux(Qux): 215 def __init__(self): 216 Qux.__init__(self) 217 218 219class SubQuxPickle(Qux): 220 def __init__(self): 221 Qux.__init__(self) 222 223 224class CustomError(Exception): 225 pass 226 227 228Point = collections.namedtuple("Point", ["x", "y"]) 229NamedTupleExample = collections.namedtuple( 230 "Example", "field1, field2, field3, field4, field5") 231 232 233CUSTOM_OBJECTS = [Exception("Test object."), CustomError(), Point(11, y=22), 234 Foo(), Bar(), Baz(), Qux(), SubQux(), SubQuxPickle(), 235 NamedTupleExample(1, 1.0, "hi", np.zeros([3, 5]), [1, 2, 3]), 236 collections.OrderedDict([("hello", 1), ("world", 2)]), 237 collections.deque([1, 2, 3, "a", "b", "c", 3.5]), 238 collections.Counter([1, 1, 1, 2, 2, 3, "a", "b"])] 239 240 241def make_serialization_context(): 242 with pytest.warns(FutureWarning): 243 context = pa.default_serialization_context() 244 245 context.register_type(Foo, "Foo") 246 context.register_type(Bar, "Bar") 247 context.register_type(Baz, "Baz") 248 context.register_type(Qux, "Quz") 249 context.register_type(SubQux, "SubQux") 250 context.register_type(SubQuxPickle, "SubQuxPickle", pickle=True) 251 context.register_type(Exception, "Exception") 252 context.register_type(CustomError, "CustomError") 253 context.register_type(Point, "Point") 254 context.register_type(NamedTupleExample, "NamedTupleExample") 255 256 return context 257 258 259global_serialization_context = make_serialization_context() 260 261 262def serialization_roundtrip(value, scratch_buffer, 263 context=global_serialization_context): 264 writer = pa.FixedSizeBufferWriter(scratch_buffer) 265 pa.serialize_to(value, writer, context=context) 266 267 reader = pa.BufferReader(scratch_buffer) 268 result = pa.deserialize_from(reader, None, context=context) 269 assert_equal(value, result) 270 271 _check_component_roundtrip(value, context=context) 272 273 274def _check_component_roundtrip(value, context=global_serialization_context): 275 # Test to/from components 276 serialized = pa.serialize(value, context=context) 277 components = serialized.to_components() 278 from_comp = pa.SerializedPyObject.from_components(components) 279 recons = from_comp.deserialize(context=context) 280 assert_equal(value, recons) 281 282 283@pytest.fixture(scope='session') 284def large_buffer(size=32*1024*1024): 285 yield pa.allocate_buffer(size) 286 287 288def large_memory_map(tmpdir_factory, size=100*1024*1024): 289 path = (tmpdir_factory.mktemp('data') 290 .join('pyarrow-serialization-tmp-file').strpath) 291 292 # Create a large memory mapped file 293 with open(path, 'wb') as f: 294 f.write(np.random.randint(0, 256, size=size) 295 .astype('u1') 296 .tobytes() 297 [:size]) 298 return path 299 300 301def test_clone(): 302 context = pa.SerializationContext() 303 304 class Foo: 305 pass 306 307 def custom_serializer(obj): 308 return 0 309 310 def custom_deserializer(serialized_obj): 311 return (serialized_obj, 'a') 312 313 context.register_type(Foo, 'Foo', custom_serializer=custom_serializer, 314 custom_deserializer=custom_deserializer) 315 316 new_context = context.clone() 317 318 f = Foo() 319 serialized = pa.serialize(f, context=context) 320 deserialized = serialized.deserialize(context=context) 321 assert deserialized == (0, 'a') 322 323 serialized = pa.serialize(f, context=new_context) 324 deserialized = serialized.deserialize(context=new_context) 325 assert deserialized == (0, 'a') 326 327 328def test_primitive_serialization_notbroken(large_buffer): 329 serialization_roundtrip({(1, 2): 2}, large_buffer) 330 331 332def test_primitive_serialization_broken(large_buffer): 333 serialization_roundtrip({(): 2}, large_buffer) 334 335 336def test_primitive_serialization(large_buffer): 337 for obj in PRIMITIVE_OBJECTS: 338 serialization_roundtrip(obj, large_buffer) 339 340 341def test_integer_limits(large_buffer): 342 # Check that Numpy scalars can be represented up to their limit values 343 # (except np.uint64 which is limited to 2**63 - 1) 344 for dt in [np.int8, np.int64, np.int32, np.int64, 345 np.uint8, np.uint64, np.uint32, np.uint64]: 346 scal = dt(np.iinfo(dt).min) 347 serialization_roundtrip(scal, large_buffer) 348 if dt is not np.uint64: 349 scal = dt(np.iinfo(dt).max) 350 serialization_roundtrip(scal, large_buffer) 351 else: 352 scal = dt(2**63 - 1) 353 serialization_roundtrip(scal, large_buffer) 354 for v in (2**63, 2**64 - 1): 355 scal = dt(v) 356 with pytest.raises(pa.ArrowInvalid): 357 pa.serialize(scal) 358 359 360def test_serialize_to_buffer(): 361 for nthreads in [1, 4]: 362 for value in COMPLEX_OBJECTS: 363 buf = pa.serialize(value).to_buffer(nthreads=nthreads) 364 result = pa.deserialize(buf) 365 assert_equal(value, result) 366 367 368def test_complex_serialization(large_buffer): 369 for obj in COMPLEX_OBJECTS: 370 serialization_roundtrip(obj, large_buffer) 371 372 373def test_custom_serialization(large_buffer): 374 for obj in CUSTOM_OBJECTS: 375 serialization_roundtrip(obj, large_buffer) 376 377 378def test_default_dict_serialization(large_buffer): 379 pytest.importorskip("cloudpickle") 380 381 obj = collections.defaultdict(lambda: 0, [("hello", 1), ("world", 2)]) 382 serialization_roundtrip(obj, large_buffer) 383 384 385def test_numpy_serialization(large_buffer): 386 for t in ["bool", "int8", "uint8", "int16", "uint16", "int32", 387 "uint32", "float16", "float32", "float64", "<U1", "<U2", "<U3", 388 "<U4", "|S1", "|S2", "|S3", "|S4", "|O", 389 np.dtype([('a', 'int64'), ('b', 'float')]), 390 np.dtype([('x', 'uint32'), ('y', '<U8')])]: 391 obj = np.random.randint(0, 10, size=(100, 100)).astype(t) 392 serialization_roundtrip(obj, large_buffer) 393 obj = obj[1:99, 10:90] 394 serialization_roundtrip(obj, large_buffer) 395 396 397def test_datetime_serialization(large_buffer): 398 data = [ 399 # Principia Mathematica published 400 datetime.datetime(year=1687, month=7, day=5), 401 402 # Some random date 403 datetime.datetime(year=1911, month=6, day=3, hour=4, 404 minute=55, second=44), 405 # End of WWI 406 datetime.datetime(year=1918, month=11, day=11), 407 408 # Beginning of UNIX time 409 datetime.datetime(year=1970, month=1, day=1), 410 411 # The Berlin wall falls 412 datetime.datetime(year=1989, month=11, day=9), 413 414 # Another random date 415 datetime.datetime(year=2011, month=6, day=3, hour=4, 416 minute=0, second=3), 417 # Another random date 418 datetime.datetime(year=1970, month=1, day=3, hour=4, 419 minute=0, second=0) 420 ] 421 for d in data: 422 serialization_roundtrip(d, large_buffer) 423 424 425def test_torch_serialization(large_buffer): 426 pytest.importorskip("torch") 427 428 serialization_context = pa.default_serialization_context() 429 pa.register_torch_serialization_handlers(serialization_context) 430 431 # Dense tensors: 432 433 # These are the only types that are supported for the 434 # PyTorch to NumPy conversion 435 for t in ["float32", "float64", 436 "uint8", "int16", "int32", "int64"]: 437 obj = torch.from_numpy(np.random.randn(1000).astype(t)) 438 serialization_roundtrip(obj, large_buffer, 439 context=serialization_context) 440 441 tensor_requiring_grad = torch.randn(10, 10, requires_grad=True) 442 serialization_roundtrip(tensor_requiring_grad, large_buffer, 443 context=serialization_context) 444 445 # Sparse tensors: 446 447 # These are the only types that are supported for the 448 # PyTorch to NumPy conversion 449 for t in ["float32", "float64", 450 "uint8", "int16", "int32", "int64"]: 451 i = torch.LongTensor([[0, 2], [1, 0], [1, 2]]) 452 v = torch.from_numpy(np.array([3, 4, 5]).astype(t)) 453 obj = torch.sparse_coo_tensor(i.t(), v, torch.Size([2, 3])) 454 serialization_roundtrip(obj, large_buffer, 455 context=serialization_context) 456 457 458@pytest.mark.skipif(not torch or not torch.cuda.is_available(), 459 reason="requires pytorch with CUDA") 460def test_torch_cuda(): 461 # ARROW-2920: This used to segfault if torch is not imported 462 # before pyarrow 463 # Note that this test will only catch the issue if it is run 464 # with a pyarrow that has been built in the manylinux1 environment 465 torch.nn.Conv2d(64, 2, kernel_size=3, stride=1, 466 padding=1, bias=False).cuda() 467 468 469def test_numpy_immutable(large_buffer): 470 obj = np.zeros([10]) 471 472 writer = pa.FixedSizeBufferWriter(large_buffer) 473 pa.serialize_to(obj, writer, global_serialization_context) 474 475 reader = pa.BufferReader(large_buffer) 476 result = pa.deserialize_from(reader, None, global_serialization_context) 477 with pytest.raises(ValueError): 478 result[0] = 1.0 479 480 481def test_numpy_base_object(tmpdir): 482 # ARROW-2040: deserialized Numpy array should keep a reference to the 483 # owner of its memory 484 path = os.path.join(str(tmpdir), 'zzz.bin') 485 data = np.arange(12, dtype=np.int32) 486 487 with open(path, 'wb') as f: 488 f.write(pa.serialize(data).to_buffer()) 489 490 serialized = pa.read_serialized(pa.OSFile(path)) 491 result = serialized.deserialize() 492 assert_equal(result, data) 493 serialized = None 494 assert_equal(result, data) 495 assert result.base is not None 496 497 498# see https://issues.apache.org/jira/browse/ARROW-1695 499def test_serialization_callback_numpy(): 500 501 class DummyClass: 502 pass 503 504 def serialize_dummy_class(obj): 505 x = np.zeros(4) 506 return x 507 508 def deserialize_dummy_class(serialized_obj): 509 return serialized_obj 510 511 context = pa.default_serialization_context() 512 context.register_type(DummyClass, "DummyClass", 513 custom_serializer=serialize_dummy_class, 514 custom_deserializer=deserialize_dummy_class) 515 516 pa.serialize(DummyClass(), context=context) 517 518 519def test_numpy_subclass_serialization(): 520 # Check that we can properly serialize subclasses of np.ndarray. 521 class CustomNDArray(np.ndarray): 522 def __new__(cls, input_array): 523 array = np.asarray(input_array).view(cls) 524 return array 525 526 def serializer(obj): 527 return {'numpy': obj.view(np.ndarray)} 528 529 def deserializer(data): 530 array = data['numpy'].view(CustomNDArray) 531 return array 532 533 context = pa.default_serialization_context() 534 535 context.register_type(CustomNDArray, 'CustomNDArray', 536 custom_serializer=serializer, 537 custom_deserializer=deserializer) 538 539 x = CustomNDArray(np.zeros(3)) 540 serialized = pa.serialize(x, context=context).to_buffer() 541 new_x = pa.deserialize(serialized, context=context) 542 assert type(new_x) == CustomNDArray 543 assert np.alltrue(new_x.view(np.ndarray) == np.zeros(3)) 544 545 546@pytest.mark.parametrize('tensor_type', tensor_types) 547@pytest.mark.parametrize('index_type', index_types) 548def test_sparse_coo_tensor_serialization(index_type, tensor_type): 549 tensor_dtype = np.dtype(tensor_type) 550 index_dtype = np.dtype(index_type) 551 data = np.array([[1, 2, 3, 4, 5, 6]]).T.astype(tensor_dtype) 552 coords = np.array([ 553 [0, 0, 2, 3, 1, 3], 554 [0, 2, 0, 4, 5, 5], 555 ]).T.astype(index_dtype) 556 shape = (4, 6) 557 dim_names = ('x', 'y') 558 559 sparse_tensor = pa.SparseCOOTensor.from_numpy(data, coords, 560 shape, dim_names) 561 562 context = pa.default_serialization_context() 563 serialized = pa.serialize(sparse_tensor, context=context).to_buffer() 564 result = pa.deserialize(serialized) 565 assert_equal(result, sparse_tensor) 566 assert isinstance(result, pa.SparseCOOTensor) 567 568 data_result, coords_result = result.to_numpy() 569 assert np.array_equal(data_result, data) 570 assert np.array_equal(coords_result, coords) 571 assert result.dim_names == dim_names 572 573 574@pytest.mark.parametrize('tensor_type', tensor_types) 575@pytest.mark.parametrize('index_type', index_types) 576def test_sparse_coo_tensor_components_serialization(large_buffer, 577 index_type, tensor_type): 578 tensor_dtype = np.dtype(tensor_type) 579 index_dtype = np.dtype(index_type) 580 data = np.array([[1, 2, 3, 4, 5, 6]]).T.astype(tensor_dtype) 581 coords = np.array([ 582 [0, 0, 2, 3, 1, 3], 583 [0, 2, 0, 4, 5, 5], 584 ]).T.astype(index_dtype) 585 shape = (4, 6) 586 dim_names = ('x', 'y') 587 588 sparse_tensor = pa.SparseCOOTensor.from_numpy(data, coords, 589 shape, dim_names) 590 serialization_roundtrip(sparse_tensor, large_buffer) 591 592 593@pytest.mark.skipif(not coo_matrix, reason="requires scipy") 594def test_scipy_sparse_coo_tensor_serialization(): 595 data = np.array([1, 2, 3, 4, 5, 6]) 596 row = np.array([0, 0, 2, 3, 1, 3]) 597 col = np.array([0, 2, 0, 4, 5, 5]) 598 shape = (4, 6) 599 600 sparse_array = coo_matrix((data, (row, col)), shape=shape) 601 serialized = pa.serialize(sparse_array) 602 result = serialized.deserialize() 603 604 assert np.array_equal(sparse_array.toarray(), result.toarray()) 605 606 607@pytest.mark.skipif(not sparse, reason="requires pydata/sparse") 608def test_pydata_sparse_sparse_coo_tensor_serialization(): 609 data = np.array([1, 2, 3, 4, 5, 6]) 610 coords = np.array([ 611 [0, 0, 2, 3, 1, 3], 612 [0, 2, 0, 4, 5, 5], 613 ]) 614 shape = (4, 6) 615 616 sparse_array = sparse.COO(data=data, coords=coords, shape=shape) 617 serialized = pa.serialize(sparse_array) 618 result = serialized.deserialize() 619 620 assert np.array_equal(sparse_array.todense(), result.todense()) 621 622 623@pytest.mark.parametrize('tensor_type', tensor_types) 624@pytest.mark.parametrize('index_type', index_types) 625def test_sparse_csr_matrix_serialization(index_type, tensor_type): 626 tensor_dtype = np.dtype(tensor_type) 627 index_dtype = np.dtype(index_type) 628 data = np.array([[8, 2, 5, 3, 4, 6]]).T.astype(tensor_dtype) 629 indptr = np.array([0, 2, 3, 4, 6]).astype(index_dtype) 630 indices = np.array([0, 2, 5, 0, 4, 5]).astype(index_dtype) 631 shape = (4, 6) 632 dim_names = ('x', 'y') 633 634 sparse_tensor = pa.SparseCSRMatrix.from_numpy(data, indptr, indices, 635 shape, dim_names) 636 637 context = pa.default_serialization_context() 638 serialized = pa.serialize(sparse_tensor, context=context).to_buffer() 639 result = pa.deserialize(serialized) 640 assert_equal(result, sparse_tensor) 641 assert isinstance(result, pa.SparseCSRMatrix) 642 643 data_result, indptr_result, indices_result = result.to_numpy() 644 assert np.array_equal(data_result, data) 645 assert np.array_equal(indptr_result, indptr) 646 assert np.array_equal(indices_result, indices) 647 assert result.dim_names == dim_names 648 649 650@pytest.mark.parametrize('tensor_type', tensor_types) 651@pytest.mark.parametrize('index_type', index_types) 652def test_sparse_csr_matrix_components_serialization(large_buffer, 653 index_type, tensor_type): 654 tensor_dtype = np.dtype(tensor_type) 655 index_dtype = np.dtype(index_type) 656 data = np.array([8, 2, 5, 3, 4, 6]).astype(tensor_dtype) 657 indptr = np.array([0, 2, 3, 4, 6]).astype(index_dtype) 658 indices = np.array([0, 2, 5, 0, 4, 5]).astype(index_dtype) 659 shape = (4, 6) 660 dim_names = ('x', 'y') 661 662 sparse_tensor = pa.SparseCSRMatrix.from_numpy(data, indptr, indices, 663 shape, dim_names) 664 serialization_roundtrip(sparse_tensor, large_buffer) 665 666 667@pytest.mark.skipif(not csr_matrix, reason="requires scipy") 668def test_scipy_sparse_csr_matrix_serialization(): 669 data = np.array([8, 2, 5, 3, 4, 6]) 670 indptr = np.array([0, 2, 3, 4, 6]) 671 indices = np.array([0, 2, 5, 0, 4, 5]) 672 shape = (4, 6) 673 674 sparse_array = csr_matrix((data, indices, indptr), shape=shape) 675 serialized = pa.serialize(sparse_array) 676 result = serialized.deserialize() 677 678 assert np.array_equal(sparse_array.toarray(), result.toarray()) 679 680 681@pytest.mark.parametrize('tensor_type', tensor_types) 682@pytest.mark.parametrize('index_type', index_types) 683def test_sparse_csc_matrix_serialization(index_type, tensor_type): 684 tensor_dtype = np.dtype(tensor_type) 685 index_dtype = np.dtype(index_type) 686 data = np.array([[8, 2, 5, 3, 4, 6]]).T.astype(tensor_dtype) 687 indptr = np.array([0, 2, 3, 4, 6]).astype(index_dtype) 688 indices = np.array([0, 2, 5, 0, 4, 5]).astype(index_dtype) 689 shape = (6, 4) 690 dim_names = ('x', 'y') 691 692 sparse_tensor = pa.SparseCSCMatrix.from_numpy(data, indptr, indices, 693 shape, dim_names) 694 695 context = pa.default_serialization_context() 696 serialized = pa.serialize(sparse_tensor, context=context).to_buffer() 697 result = pa.deserialize(serialized) 698 assert_equal(result, sparse_tensor) 699 assert isinstance(result, pa.SparseCSCMatrix) 700 701 data_result, indptr_result, indices_result = result.to_numpy() 702 assert np.array_equal(data_result, data) 703 assert np.array_equal(indptr_result, indptr) 704 assert np.array_equal(indices_result, indices) 705 assert result.dim_names == dim_names 706 707 708@pytest.mark.parametrize('tensor_type', tensor_types) 709@pytest.mark.parametrize('index_type', index_types) 710def test_sparse_csc_matrix_components_serialization(large_buffer, 711 index_type, tensor_type): 712 tensor_dtype = np.dtype(tensor_type) 713 index_dtype = np.dtype(index_type) 714 data = np.array([8, 2, 5, 3, 4, 6]).astype(tensor_dtype) 715 indptr = np.array([0, 2, 3, 6]).astype(index_dtype) 716 indices = np.array([0, 2, 2, 0, 1, 2]).astype(index_dtype) 717 shape = (3, 3) 718 dim_names = ('x', 'y') 719 720 sparse_tensor = pa.SparseCSCMatrix.from_numpy(data, indptr, indices, 721 shape, dim_names) 722 serialization_roundtrip(sparse_tensor, large_buffer) 723 724 725@pytest.mark.skipif(not csc_matrix, reason="requires scipy") 726def test_scipy_sparse_csc_matrix_serialization(): 727 data = np.array([8, 2, 5, 3, 4, 6]) 728 indptr = np.array([0, 2, 3, 4, 6]) 729 indices = np.array([0, 2, 5, 0, 4, 5]) 730 shape = (6, 4) 731 732 sparse_array = csc_matrix((data, indices, indptr), shape=shape) 733 serialized = pa.serialize(sparse_array) 734 result = serialized.deserialize() 735 736 assert np.array_equal(sparse_array.toarray(), result.toarray()) 737 738 739@pytest.mark.parametrize('tensor_type', tensor_types) 740@pytest.mark.parametrize('index_type', index_types) 741def test_sparse_csf_tensor_serialization(index_type, tensor_type): 742 tensor_dtype = np.dtype(tensor_type) 743 index_dtype = np.dtype(index_type) 744 data = np.array([[1, 2, 3, 4, 5, 6, 7, 8]]).T.astype(tensor_dtype) 745 indptr = [ 746 np.array([0, 2, 3]), 747 np.array([0, 1, 3, 4]), 748 np.array([0, 2, 4, 5, 8]), 749 ] 750 indices = [ 751 np.array([0, 1]), 752 np.array([0, 1, 1]), 753 np.array([0, 0, 1, 1]), 754 np.array([1, 2, 0, 2, 0, 0, 1, 2]), 755 ] 756 indptr = [x.astype(index_dtype) for x in indptr] 757 indices = [x.astype(index_dtype) for x in indices] 758 shape = (2, 3, 4, 5) 759 axis_order = (0, 1, 2, 3) 760 dim_names = ("a", "b", "c", "d") 761 762 for ndim in [2, 3, 4]: 763 sparse_tensor = pa.SparseCSFTensor.from_numpy(data, indptr[:ndim - 1], 764 indices[:ndim], 765 shape[:ndim], 766 axis_order[:ndim], 767 dim_names[:ndim]) 768 769 context = pa.default_serialization_context() 770 serialized = pa.serialize(sparse_tensor, context=context).to_buffer() 771 result = pa.deserialize(serialized) 772 assert_equal(result, sparse_tensor) 773 assert isinstance(result, pa.SparseCSFTensor) 774 775 776@pytest.mark.parametrize('tensor_type', tensor_types) 777@pytest.mark.parametrize('index_type', index_types) 778def test_sparse_csf_tensor_components_serialization(large_buffer, 779 index_type, tensor_type): 780 tensor_dtype = np.dtype(tensor_type) 781 index_dtype = np.dtype(index_type) 782 data = np.array([[1, 2, 3, 4, 5, 6, 7, 8]]).T.astype(tensor_dtype) 783 indptr = [ 784 np.array([0, 2, 3]), 785 np.array([0, 1, 3, 4]), 786 np.array([0, 2, 4, 5, 8]), 787 ] 788 indices = [ 789 np.array([0, 1]), 790 np.array([0, 1, 1]), 791 np.array([0, 0, 1, 1]), 792 np.array([1, 2, 0, 2, 0, 0, 1, 2]), 793 ] 794 indptr = [x.astype(index_dtype) for x in indptr] 795 indices = [x.astype(index_dtype) for x in indices] 796 shape = (2, 3, 4, 5) 797 axis_order = (0, 1, 2, 3) 798 dim_names = ("a", "b", "c", "d") 799 800 for ndim in [2, 3, 4]: 801 sparse_tensor = pa.SparseCSFTensor.from_numpy(data, indptr[:ndim - 1], 802 indices[:ndim], 803 shape[:ndim], 804 axis_order[:ndim], 805 dim_names[:ndim]) 806 807 serialization_roundtrip(sparse_tensor, large_buffer) 808 809 810@pytest.mark.filterwarnings( 811 "ignore:the matrix subclass:PendingDeprecationWarning") 812def test_numpy_matrix_serialization(tmpdir): 813 class CustomType: 814 def __init__(self, val): 815 self.val = val 816 817 rec_type = np.dtype([('x', 'int64'), ('y', 'double'), ('z', '<U4')]) 818 819 path = os.path.join(str(tmpdir), 'pyarrow_npmatrix_serialization_test.bin') 820 array = np.random.randint(low=-1, high=1, size=(2, 2)) 821 822 for data_type in [str, int, float, rec_type, CustomType]: 823 matrix = np.matrix(array.astype(data_type)) 824 825 with open(path, 'wb') as f: 826 f.write(pa.serialize(matrix).to_buffer()) 827 828 serialized = pa.read_serialized(pa.OSFile(path)) 829 result = serialized.deserialize() 830 assert_equal(result, matrix) 831 assert_equal(result.dtype, matrix.dtype) 832 serialized = None 833 assert_equal(result, matrix) 834 assert result.base is not None 835 836 837def test_pyarrow_objects_serialization(large_buffer): 838 # NOTE: We have to put these objects inside, 839 # or it will affect 'test_total_bytes_allocated'. 840 pyarrow_objects = [ 841 pa.array([1, 2, 3, 4]), pa.array(['1', 'never U+1F631', '', 842 "233 * U+1F600"]), 843 pa.array([1, None, 2, 3]), 844 pa.Tensor.from_numpy(np.random.rand(2, 3, 4)), 845 pa.RecordBatch.from_arrays( 846 [pa.array([1, None, 2, 3]), 847 pa.array(['1', 'never U+1F631', '', "233 * u1F600"])], 848 ['a', 'b']), 849 pa.Table.from_arrays([pa.array([1, None, 2, 3]), 850 pa.array(['1', 'never U+1F631', '', 851 "233 * u1F600"])], 852 ['a', 'b']) 853 ] 854 for obj in pyarrow_objects: 855 serialization_roundtrip(obj, large_buffer) 856 857 858def test_buffer_serialization(): 859 860 class BufferClass: 861 pass 862 863 def serialize_buffer_class(obj): 864 return pa.py_buffer(b"hello") 865 866 def deserialize_buffer_class(serialized_obj): 867 return serialized_obj 868 869 context = pa.default_serialization_context() 870 context.register_type( 871 BufferClass, "BufferClass", 872 custom_serializer=serialize_buffer_class, 873 custom_deserializer=deserialize_buffer_class) 874 875 b = pa.serialize(BufferClass(), context=context).to_buffer() 876 assert pa.deserialize(b, context=context).to_pybytes() == b"hello" 877 878 879@pytest.mark.skip(reason="extensive memory requirements") 880def test_arrow_limits(self): 881 def huge_memory_map(temp_dir): 882 return large_memory_map(temp_dir, 100 * 1024 * 1024 * 1024) 883 884 with pa.memory_map(huge_memory_map, mode="r+") as mmap: 885 # Test that objects that are too large for Arrow throw a Python 886 # exception. These tests give out of memory errors on Travis and need 887 # to be run on a machine with lots of RAM. 888 x = 2 ** 29 * [1.0] 889 serialization_roundtrip(x, mmap) 890 del x 891 x = 2 ** 29 * ["s"] 892 serialization_roundtrip(x, mmap) 893 del x 894 x = 2 ** 29 * [["1"], 2, 3, [{"s": 4}]] 895 serialization_roundtrip(x, mmap) 896 del x 897 x = 2 ** 29 * [{"s": 1}] + 2 ** 29 * [1.0] 898 serialization_roundtrip(x, mmap) 899 del x 900 x = np.zeros(2 ** 25) 901 serialization_roundtrip(x, mmap) 902 del x 903 x = [np.zeros(2 ** 18) for _ in range(2 ** 7)] 904 serialization_roundtrip(x, mmap) 905 del x 906 907 908def test_serialization_callback_error(): 909 910 class TempClass: 911 pass 912 913 # Pass a SerializationContext into serialize, but TempClass 914 # is not registered 915 serialization_context = pa.SerializationContext() 916 val = TempClass() 917 with pytest.raises(pa.SerializationCallbackError) as err: 918 serialized_object = pa.serialize(val, serialization_context) 919 assert err.value.example_object == val 920 921 serialization_context.register_type(TempClass, "TempClass") 922 serialized_object = pa.serialize(TempClass(), serialization_context) 923 deserialization_context = pa.SerializationContext() 924 925 # Pass a Serialization Context into deserialize, but TempClass 926 # is not registered 927 with pytest.raises(pa.DeserializationCallbackError) as err: 928 serialized_object.deserialize(deserialization_context) 929 assert err.value.type_id == "TempClass" 930 931 class TempClass2: 932 pass 933 934 # Make sure that we receive an error when we use an inappropriate value for 935 # the type_id argument. 936 with pytest.raises(TypeError): 937 serialization_context.register_type(TempClass2, 1) 938 939 940def test_fallback_to_subclasses(): 941 942 class SubFoo(Foo): 943 def __init__(self): 944 Foo.__init__(self) 945 946 # should be able to serialize/deserialize an instance 947 # if a base class has been registered 948 serialization_context = pa.SerializationContext() 949 serialization_context.register_type(Foo, "Foo") 950 951 subfoo = SubFoo() 952 # should fallbact to Foo serializer 953 serialized_object = pa.serialize(subfoo, serialization_context) 954 955 reconstructed_object = serialized_object.deserialize( 956 serialization_context 957 ) 958 assert type(reconstructed_object) == Foo 959 960 961class Serializable: 962 pass 963 964 965def serialize_serializable(obj): 966 return {"type": type(obj), "data": obj.__dict__} 967 968 969def deserialize_serializable(obj): 970 val = obj["type"].__new__(obj["type"]) 971 val.__dict__.update(obj["data"]) 972 return val 973 974 975class SerializableClass(Serializable): 976 def __init__(self): 977 self.value = 3 978 979 980def test_serialize_subclasses(): 981 982 # This test shows how subclasses can be handled in an idiomatic way 983 # by having only a serializer for the base class 984 985 # This technique should however be used with care, since pickling 986 # type(obj) with couldpickle will include the full class definition 987 # in the serialized representation. 988 # This means the class definition is part of every instance of the 989 # object, which in general is not desirable; registering all subclasses 990 # with register_type will result in faster and more memory 991 # efficient serialization. 992 993 context = pa.default_serialization_context() 994 context.register_type( 995 Serializable, "Serializable", 996 custom_serializer=serialize_serializable, 997 custom_deserializer=deserialize_serializable) 998 999 a = SerializableClass() 1000 serialized = pa.serialize(a, context=context) 1001 1002 deserialized = serialized.deserialize(context=context) 1003 assert type(deserialized).__name__ == SerializableClass.__name__ 1004 assert deserialized.value == 3 1005 1006 1007def test_serialize_to_components_invalid_cases(): 1008 buf = pa.py_buffer(b'hello') 1009 1010 components = { 1011 'num_tensors': 0, 1012 'num_sparse_tensors': { 1013 'coo': 0, 'csr': 0, 'csc': 0, 'csf': 0, 'ndim_csf': 0 1014 }, 1015 'num_ndarrays': 0, 1016 'num_buffers': 1, 1017 'data': [buf] 1018 } 1019 1020 with pytest.raises(pa.ArrowInvalid): 1021 pa.deserialize_components(components) 1022 1023 components = { 1024 'num_tensors': 0, 1025 'num_sparse_tensors': { 1026 'coo': 0, 'csr': 0, 'csc': 0, 'csf': 0, 'ndim_csf': 0 1027 }, 1028 'num_ndarrays': 1, 1029 'num_buffers': 0, 1030 'data': [buf, buf] 1031 } 1032 1033 with pytest.raises(pa.ArrowInvalid): 1034 pa.deserialize_components(components) 1035 1036 1037def test_deserialize_components_in_different_process(): 1038 arr = pa.array([1, 2, 5, 6], type=pa.int8()) 1039 ser = pa.serialize(arr) 1040 data = pickle.dumps(ser.to_components(), protocol=-1) 1041 1042 code = """if 1: 1043 import pickle 1044 1045 import pyarrow as pa 1046 1047 data = {!r} 1048 components = pickle.loads(data) 1049 arr = pa.deserialize_components(components) 1050 1051 assert arr.to_pylist() == [1, 2, 5, 6], arr 1052 """.format(data) 1053 1054 subprocess_env = test_util.get_modified_env_with_pythonpath() 1055 print("** sys.path =", sys.path) 1056 print("** setting PYTHONPATH to:", subprocess_env['PYTHONPATH']) 1057 subprocess.check_call([sys.executable, "-c", code], env=subprocess_env) 1058 1059 1060def test_serialize_read_concatenated_records(): 1061 # ARROW-1996 -- see stream alignment work in ARROW-2840, ARROW-3212 1062 f = pa.BufferOutputStream() 1063 pa.serialize_to(12, f) 1064 pa.serialize_to(23, f) 1065 buf = f.getvalue() 1066 1067 f = pa.BufferReader(buf) 1068 pa.read_serialized(f).deserialize() 1069 pa.read_serialized(f).deserialize() 1070 1071 1072def deserialize_regex(serialized, q): 1073 import pyarrow as pa 1074 q.put(pa.deserialize(serialized)) 1075 1076 1077def test_deserialize_in_different_process(): 1078 from multiprocessing import Process, Queue 1079 import re 1080 1081 regex = re.compile(r"\d+\.\d*") 1082 1083 serialization_context = pa.SerializationContext() 1084 serialization_context.register_type(type(regex), "Regex", pickle=True) 1085 1086 serialized = pa.serialize(regex, serialization_context) 1087 serialized_bytes = serialized.to_buffer().to_pybytes() 1088 1089 q = Queue() 1090 p = Process(target=deserialize_regex, args=(serialized_bytes, q)) 1091 p.start() 1092 assert q.get().pattern == regex.pattern 1093 p.join() 1094 1095 1096def test_deserialize_buffer_in_different_process(): 1097 import tempfile 1098 1099 f = tempfile.NamedTemporaryFile(delete=False) 1100 b = pa.serialize(pa.py_buffer(b'hello')).to_buffer() 1101 f.write(b.to_pybytes()) 1102 f.close() 1103 1104 test_util.invoke_script('deserialize_buffer.py', f.name) 1105 1106 1107def test_set_pickle(): 1108 # Use a custom type to trigger pickling. 1109 class Foo: 1110 pass 1111 1112 context = pa.SerializationContext() 1113 context.register_type(Foo, 'Foo', pickle=True) 1114 1115 test_object = Foo() 1116 1117 # Define a custom serializer and deserializer to use in place of pickle. 1118 1119 def dumps1(obj): 1120 return b'custom' 1121 1122 def loads1(serialized_obj): 1123 return serialized_obj + b' serialization 1' 1124 1125 # Test that setting a custom pickler changes the behavior. 1126 context.set_pickle(dumps1, loads1) 1127 serialized = pa.serialize(test_object, context=context).to_buffer() 1128 deserialized = pa.deserialize(serialized.to_pybytes(), context=context) 1129 assert deserialized == b'custom serialization 1' 1130 1131 # Define another custom serializer and deserializer. 1132 1133 def dumps2(obj): 1134 return b'custom' 1135 1136 def loads2(serialized_obj): 1137 return serialized_obj + b' serialization 2' 1138 1139 # Test that setting another custom pickler changes the behavior again. 1140 context.set_pickle(dumps2, loads2) 1141 serialized = pa.serialize(test_object, context=context).to_buffer() 1142 deserialized = pa.deserialize(serialized.to_pybytes(), context=context) 1143 assert deserialized == b'custom serialization 2' 1144 1145 1146def test_path_objects(tmpdir): 1147 # Test compatibility with PEP 519 path-like objects 1148 p = pathlib.Path(tmpdir) / 'zzz.bin' 1149 obj = 1234 1150 pa.serialize_to(obj, p) 1151 res = pa.deserialize_from(p, None) 1152 assert res == obj 1153 1154 1155def test_tensor_alignment(): 1156 # Deserialized numpy arrays should be 64-byte aligned. 1157 x = np.random.normal(size=(10, 20, 30)) 1158 y = pa.deserialize(pa.serialize(x).to_buffer()) 1159 assert y.ctypes.data % 64 == 0 1160 1161 xs = [np.random.normal(size=i) for i in range(100)] 1162 ys = pa.deserialize(pa.serialize(xs).to_buffer()) 1163 for y in ys: 1164 assert y.ctypes.data % 64 == 0 1165 1166 xs = [np.random.normal(size=i * (1,)) for i in range(20)] 1167 ys = pa.deserialize(pa.serialize(xs).to_buffer()) 1168 for y in ys: 1169 assert y.ctypes.data % 64 == 0 1170 1171 xs = [np.random.normal(size=i * (5,)) for i in range(1, 8)] 1172 xs = [xs[i][(i + 1) * (slice(1, 3),)] for i in range(len(xs))] 1173 ys = pa.deserialize(pa.serialize(xs).to_buffer()) 1174 for y in ys: 1175 assert y.ctypes.data % 64 == 0 1176 1177 1178def test_empty_tensor(): 1179 # ARROW-8122, serialize and deserialize empty tensors 1180 x = np.array([], dtype=np.float64) 1181 y = pa.deserialize(pa.serialize(x).to_buffer()) 1182 np.testing.assert_array_equal(x, y) 1183 1184 x = np.array([[], [], []], dtype=np.float64) 1185 y = pa.deserialize(pa.serialize(x).to_buffer()) 1186 np.testing.assert_array_equal(x, y) 1187 1188 x = np.array([[], [], []], dtype=np.float64).T 1189 y = pa.deserialize(pa.serialize(x).to_buffer()) 1190 np.testing.assert_array_equal(x, y) 1191 1192 1193def test_serialization_determinism(): 1194 for obj in COMPLEX_OBJECTS: 1195 buf1 = pa.serialize(obj).to_buffer() 1196 buf2 = pa.serialize(obj).to_buffer() 1197 assert buf1.to_pybytes() == buf2.to_pybytes() 1198 1199 1200def test_serialize_recursive_objects(): 1201 class ClassA: 1202 pass 1203 1204 # Make a list that contains itself. 1205 lst = [] 1206 lst.append(lst) 1207 1208 # Make an object that contains itself as a field. 1209 a1 = ClassA() 1210 a1.field = a1 1211 1212 # Make two objects that contain each other as fields. 1213 a2 = ClassA() 1214 a3 = ClassA() 1215 a2.field = a3 1216 a3.field = a2 1217 1218 # Make a dictionary that contains itself. 1219 d1 = {} 1220 d1["key"] = d1 1221 1222 # Make a numpy array that contains itself. 1223 arr = np.array([None], dtype=object) 1224 arr[0] = arr 1225 1226 # Create a list of recursive objects. 1227 recursive_objects = [lst, a1, a2, a3, d1, arr] 1228 1229 # Check that exceptions are thrown when we serialize the recursive 1230 # objects. 1231 for obj in recursive_objects: 1232 with pytest.raises(Exception): 1233 pa.serialize(obj).deserialize() 1234