1# Licensed to the Apache Software Foundation (ASF) under one 2# or more contributor license agreements. See the NOTICE file 3# distributed with this work for additional information 4# regarding copyright ownership. The ASF licenses this file 5# to you under the Apache License, Version 2.0 (the 6# "License"); you may not use this file except in compliance 7# with the License. You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, 12# software distributed under the License is distributed on an 13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14# KIND, either express or implied. See the License for the 15# specific language governing permissions and limitations 16# under the License. 17 18import abc 19import bz2 20from datetime import date, datetime 21from decimal import Decimal 22import gc 23import gzip 24import io 25import itertools 26import os 27import pickle 28import shutil 29import signal 30import string 31import tempfile 32import threading 33import time 34import unittest 35import weakref 36 37import pytest 38 39import numpy as np 40 41import pyarrow as pa 42from pyarrow.csv import ( 43 open_csv, read_csv, ReadOptions, ParseOptions, ConvertOptions, ISO8601, 44 write_csv, WriteOptions, CSVWriter) 45from pyarrow.tests import util 46 47 48def generate_col_names(): 49 # 'a', 'b'... 'z', then 'aa', 'ab'... 50 letters = string.ascii_lowercase 51 yield from letters 52 for first in letters: 53 for second in letters: 54 yield first + second 55 56 57def make_random_csv(num_cols=2, num_rows=10, linesep='\r\n', write_names=True): 58 arr = np.random.RandomState(42).randint(0, 1000, size=(num_cols, num_rows)) 59 csv = io.StringIO() 60 col_names = list(itertools.islice(generate_col_names(), num_cols)) 61 if write_names: 62 csv.write(",".join(col_names)) 63 csv.write(linesep) 64 for row in arr.T: 65 csv.write(",".join(map(str, row))) 66 csv.write(linesep) 67 csv = csv.getvalue().encode() 68 columns = [pa.array(a, type=pa.int64()) for a in arr] 69 expected = pa.Table.from_arrays(columns, col_names) 70 return csv, expected 71 72 73def make_empty_csv(column_names): 74 csv = io.StringIO() 75 csv.write(",".join(column_names)) 76 csv.write("\n") 77 return csv.getvalue().encode() 78 79 80def check_options_class(cls, **attr_values): 81 """ 82 Check setting and getting attributes of an *Options class. 83 """ 84 opts = cls() 85 86 for name, values in attr_values.items(): 87 assert getattr(opts, name) == values[0], \ 88 "incorrect default value for " + name 89 for v in values: 90 setattr(opts, name, v) 91 assert getattr(opts, name) == v, "failed setting value" 92 93 with pytest.raises(AttributeError): 94 opts.zzz_non_existent = True 95 96 # Check constructor named arguments 97 non_defaults = {name: values[1] for name, values in attr_values.items()} 98 opts = cls(**non_defaults) 99 for name, value in non_defaults.items(): 100 assert getattr(opts, name) == value 101 102 103# The various options classes need to be picklable for dataset 104def check_options_class_pickling(cls, **attr_values): 105 opts = cls(**attr_values) 106 new_opts = pickle.loads(pickle.dumps(opts, 107 protocol=pickle.HIGHEST_PROTOCOL)) 108 for name, value in attr_values.items(): 109 assert getattr(new_opts, name) == value 110 111 112def test_read_options(): 113 cls = ReadOptions 114 opts = cls() 115 116 check_options_class(cls, use_threads=[True, False], 117 skip_rows=[0, 3], 118 column_names=[[], ["ab", "cd"]], 119 autogenerate_column_names=[False, True], 120 encoding=['utf8', 'utf16'], 121 skip_rows_after_names=[0, 27]) 122 123 check_options_class_pickling(cls, use_threads=True, 124 skip_rows=3, 125 column_names=["ab", "cd"], 126 autogenerate_column_names=False, 127 encoding='utf16', 128 skip_rows_after_names=27) 129 130 assert opts.block_size > 0 131 opts.block_size = 12345 132 assert opts.block_size == 12345 133 134 opts = cls(block_size=1234) 135 assert opts.block_size == 1234 136 137 opts.validate() 138 139 match = "ReadOptions: block_size must be at least 1: 0" 140 with pytest.raises(pa.ArrowInvalid, match=match): 141 opts = cls() 142 opts.block_size = 0 143 opts.validate() 144 145 match = "ReadOptions: skip_rows cannot be negative: -1" 146 with pytest.raises(pa.ArrowInvalid, match=match): 147 opts = cls() 148 opts.skip_rows = -1 149 opts.validate() 150 151 match = "ReadOptions: skip_rows_after_names cannot be negative: -1" 152 with pytest.raises(pa.ArrowInvalid, match=match): 153 opts = cls() 154 opts.skip_rows_after_names = -1 155 opts.validate() 156 157 match = "ReadOptions: autogenerate_column_names cannot be true when" \ 158 " column_names are provided" 159 with pytest.raises(pa.ArrowInvalid, match=match): 160 opts = cls() 161 opts.autogenerate_column_names = True 162 opts.column_names = ('a', 'b') 163 opts.validate() 164 165 166def test_parse_options(): 167 cls = ParseOptions 168 169 check_options_class(cls, delimiter=[',', 'x'], 170 escape_char=[False, 'y'], 171 quote_char=['"', 'z', False], 172 double_quote=[True, False], 173 newlines_in_values=[False, True], 174 ignore_empty_lines=[True, False]) 175 176 check_options_class_pickling(cls, delimiter='x', 177 escape_char='y', 178 quote_char=False, 179 double_quote=False, 180 newlines_in_values=True, 181 ignore_empty_lines=False) 182 183 cls().validate() 184 opts = cls() 185 opts.delimiter = "\t" 186 opts.validate() 187 188 match = "ParseOptions: delimiter cannot be \\\\r or \\\\n" 189 with pytest.raises(pa.ArrowInvalid, match=match): 190 opts = cls() 191 opts.delimiter = "\n" 192 opts.validate() 193 194 with pytest.raises(pa.ArrowInvalid, match=match): 195 opts = cls() 196 opts.delimiter = "\r" 197 opts.validate() 198 199 match = "ParseOptions: quote_char cannot be \\\\r or \\\\n" 200 with pytest.raises(pa.ArrowInvalid, match=match): 201 opts = cls() 202 opts.quote_char = "\n" 203 opts.validate() 204 205 with pytest.raises(pa.ArrowInvalid, match=match): 206 opts = cls() 207 opts.quote_char = "\r" 208 opts.validate() 209 210 match = "ParseOptions: escape_char cannot be \\\\r or \\\\n" 211 with pytest.raises(pa.ArrowInvalid, match=match): 212 opts = cls() 213 opts.escape_char = "\n" 214 opts.validate() 215 216 with pytest.raises(pa.ArrowInvalid, match=match): 217 opts = cls() 218 opts.escape_char = "\r" 219 opts.validate() 220 221 222def test_convert_options(): 223 cls = ConvertOptions 224 opts = cls() 225 226 check_options_class( 227 cls, check_utf8=[True, False], 228 strings_can_be_null=[False, True], 229 quoted_strings_can_be_null=[True, False], 230 decimal_point=['.', ','], 231 include_columns=[[], ['def', 'abc']], 232 include_missing_columns=[False, True], 233 auto_dict_encode=[False, True], 234 timestamp_parsers=[[], [ISO8601, '%y-%m']]) 235 236 check_options_class_pickling( 237 cls, check_utf8=False, 238 strings_can_be_null=True, 239 quoted_strings_can_be_null=False, 240 decimal_point=',', 241 include_columns=['def', 'abc'], 242 include_missing_columns=False, 243 auto_dict_encode=True, 244 timestamp_parsers=[ISO8601, '%y-%m']) 245 246 with pytest.raises(ValueError): 247 opts.decimal_point = '..' 248 249 assert opts.auto_dict_max_cardinality > 0 250 opts.auto_dict_max_cardinality = 99999 251 assert opts.auto_dict_max_cardinality == 99999 252 253 assert opts.column_types == {} 254 # Pass column_types as mapping 255 opts.column_types = {'b': pa.int16(), 'c': pa.float32()} 256 assert opts.column_types == {'b': pa.int16(), 'c': pa.float32()} 257 opts.column_types = {'v': 'int16', 'w': 'null'} 258 assert opts.column_types == {'v': pa.int16(), 'w': pa.null()} 259 # Pass column_types as schema 260 schema = pa.schema([('a', pa.int32()), ('b', pa.string())]) 261 opts.column_types = schema 262 assert opts.column_types == {'a': pa.int32(), 'b': pa.string()} 263 # Pass column_types as sequence 264 opts.column_types = [('x', pa.binary())] 265 assert opts.column_types == {'x': pa.binary()} 266 267 with pytest.raises(TypeError, match='DataType expected'): 268 opts.column_types = {'a': None} 269 with pytest.raises(TypeError): 270 opts.column_types = 0 271 272 assert isinstance(opts.null_values, list) 273 assert '' in opts.null_values 274 assert 'N/A' in opts.null_values 275 opts.null_values = ['xxx', 'yyy'] 276 assert opts.null_values == ['xxx', 'yyy'] 277 278 assert isinstance(opts.true_values, list) 279 opts.true_values = ['xxx', 'yyy'] 280 assert opts.true_values == ['xxx', 'yyy'] 281 282 assert isinstance(opts.false_values, list) 283 opts.false_values = ['xxx', 'yyy'] 284 assert opts.false_values == ['xxx', 'yyy'] 285 286 assert opts.timestamp_parsers == [] 287 opts.timestamp_parsers = [ISO8601] 288 assert opts.timestamp_parsers == [ISO8601] 289 290 opts = cls(column_types={'a': pa.null()}, 291 null_values=['N', 'nn'], true_values=['T', 'tt'], 292 false_values=['F', 'ff'], auto_dict_max_cardinality=999, 293 timestamp_parsers=[ISO8601, '%Y-%m-%d']) 294 assert opts.column_types == {'a': pa.null()} 295 assert opts.null_values == ['N', 'nn'] 296 assert opts.false_values == ['F', 'ff'] 297 assert opts.true_values == ['T', 'tt'] 298 assert opts.auto_dict_max_cardinality == 999 299 assert opts.timestamp_parsers == [ISO8601, '%Y-%m-%d'] 300 301 302def test_write_options(): 303 cls = WriteOptions 304 opts = cls() 305 306 check_options_class( 307 cls, include_header=[True, False]) 308 309 assert opts.batch_size > 0 310 opts.batch_size = 12345 311 assert opts.batch_size == 12345 312 313 opts = cls(batch_size=9876) 314 assert opts.batch_size == 9876 315 316 opts.validate() 317 318 match = "WriteOptions: batch_size must be at least 1: 0" 319 with pytest.raises(pa.ArrowInvalid, match=match): 320 opts = cls() 321 opts.batch_size = 0 322 opts.validate() 323 324 325class BaseTestCSV(abc.ABC): 326 """Common tests which are shared by streaming and non streaming readers""" 327 328 @abc.abstractmethod 329 def read_bytes(self, b, **kwargs): 330 """ 331 :param b: bytes to be parsed 332 :param kwargs: arguments passed on to open the csv file 333 :return: b parsed as a single RecordBatch 334 """ 335 raise NotImplementedError 336 337 @property 338 @abc.abstractmethod 339 def use_threads(self): 340 """Whether this test is multi-threaded""" 341 raise NotImplementedError 342 343 @staticmethod 344 def check_names(table, names): 345 assert table.num_columns == len(names) 346 assert table.column_names == names 347 348 def test_header_skip_rows(self): 349 rows = b"ab,cd\nef,gh\nij,kl\nmn,op\n" 350 351 opts = ReadOptions() 352 opts.skip_rows = 1 353 table = self.read_bytes(rows, read_options=opts) 354 self.check_names(table, ["ef", "gh"]) 355 assert table.to_pydict() == { 356 "ef": ["ij", "mn"], 357 "gh": ["kl", "op"], 358 } 359 360 opts.skip_rows = 3 361 table = self.read_bytes(rows, read_options=opts) 362 self.check_names(table, ["mn", "op"]) 363 assert table.to_pydict() == { 364 "mn": [], 365 "op": [], 366 } 367 368 opts.skip_rows = 4 369 with pytest.raises(pa.ArrowInvalid): 370 # Not enough rows 371 table = self.read_bytes(rows, read_options=opts) 372 373 # Can skip rows with a different number of columns 374 rows = b"abcd\n,,,,,\nij,kl\nmn,op\n" 375 opts.skip_rows = 2 376 table = self.read_bytes(rows, read_options=opts) 377 self.check_names(table, ["ij", "kl"]) 378 assert table.to_pydict() == { 379 "ij": ["mn"], 380 "kl": ["op"], 381 } 382 383 # Can skip all rows exactly when columns are given 384 opts.skip_rows = 4 385 opts.column_names = ['ij', 'kl'] 386 table = self.read_bytes(rows, read_options=opts) 387 self.check_names(table, ["ij", "kl"]) 388 assert table.to_pydict() == { 389 "ij": [], 390 "kl": [], 391 } 392 393 def test_skip_rows_after_names(self): 394 rows = b"ab,cd\nef,gh\nij,kl\nmn,op\n" 395 396 opts = ReadOptions() 397 opts.skip_rows_after_names = 1 398 table = self.read_bytes(rows, read_options=opts) 399 self.check_names(table, ["ab", "cd"]) 400 assert table.to_pydict() == { 401 "ab": ["ij", "mn"], 402 "cd": ["kl", "op"], 403 } 404 405 # Can skip exact number of rows 406 opts.skip_rows_after_names = 3 407 table = self.read_bytes(rows, read_options=opts) 408 self.check_names(table, ["ab", "cd"]) 409 assert table.to_pydict() == { 410 "ab": [], 411 "cd": [], 412 } 413 414 # Can skip beyond all rows 415 opts.skip_rows_after_names = 4 416 table = self.read_bytes(rows, read_options=opts) 417 self.check_names(table, ["ab", "cd"]) 418 assert table.to_pydict() == { 419 "ab": [], 420 "cd": [], 421 } 422 423 # Can skip rows with a different number of columns 424 rows = b"abcd\n,,,,,\nij,kl\nmn,op\n" 425 opts.skip_rows_after_names = 2 426 opts.column_names = ["f0", "f1"] 427 table = self.read_bytes(rows, read_options=opts) 428 self.check_names(table, ["f0", "f1"]) 429 assert table.to_pydict() == { 430 "f0": ["ij", "mn"], 431 "f1": ["kl", "op"], 432 } 433 opts = ReadOptions() 434 435 # Can skip rows with new lines in the value 436 rows = b'ab,cd\n"e\nf","g\n\nh"\n"ij","k\nl"\nmn,op' 437 opts.skip_rows_after_names = 2 438 parse_opts = ParseOptions() 439 parse_opts.newlines_in_values = True 440 table = self.read_bytes(rows, read_options=opts, 441 parse_options=parse_opts) 442 self.check_names(table, ["ab", "cd"]) 443 assert table.to_pydict() == { 444 "ab": ["mn"], 445 "cd": ["op"], 446 } 447 448 # Can skip rows when block ends in middle of quoted value 449 opts.skip_rows_after_names = 2 450 opts.block_size = 26 451 table = self.read_bytes(rows, read_options=opts, 452 parse_options=parse_opts) 453 self.check_names(table, ["ab", "cd"]) 454 assert table.to_pydict() == { 455 "ab": ["mn"], 456 "cd": ["op"], 457 } 458 opts = ReadOptions() 459 460 # Can skip rows that are beyond the first block without lexer 461 rows, expected = make_random_csv(num_cols=5, num_rows=1000) 462 opts.skip_rows_after_names = 900 463 opts.block_size = len(rows) / 11 464 table = self.read_bytes(rows, read_options=opts) 465 assert table.schema == expected.schema 466 assert table.num_rows == 100 467 table_dict = table.to_pydict() 468 for name, values in expected.to_pydict().items(): 469 assert values[900:] == table_dict[name] 470 471 # Can skip rows that are beyond the first block with lexer 472 table = self.read_bytes(rows, read_options=opts, 473 parse_options=parse_opts) 474 assert table.schema == expected.schema 475 assert table.num_rows == 100 476 table_dict = table.to_pydict() 477 for name, values in expected.to_pydict().items(): 478 assert values[900:] == table_dict[name] 479 480 # Skip rows and skip rows after names 481 rows, expected = make_random_csv(num_cols=5, num_rows=200, 482 write_names=False) 483 opts = ReadOptions() 484 opts.skip_rows = 37 485 opts.skip_rows_after_names = 41 486 opts.column_names = expected.schema.names 487 table = self.read_bytes(rows, read_options=opts, 488 parse_options=parse_opts) 489 assert table.schema == expected.schema 490 assert (table.num_rows == 491 expected.num_rows - opts.skip_rows - 492 opts.skip_rows_after_names) 493 table_dict = table.to_pydict() 494 for name, values in expected.to_pydict().items(): 495 assert (values[opts.skip_rows + opts.skip_rows_after_names:] == 496 table_dict[name]) 497 498 def test_row_number_offset_in_errors(self): 499 # Row numbers are only correctly counted in serial reads 500 def format_msg(msg_format, row, *args): 501 if self.use_threads: 502 row_info = "" 503 else: 504 row_info = "Row #{}: ".format(row) 505 return msg_format.format(row_info, *args) 506 507 csv, _ = make_random_csv(4, 100, write_names=True) 508 509 read_options = ReadOptions() 510 read_options.block_size = len(csv) / 3 511 convert_options = ConvertOptions() 512 convert_options.column_types = {"a": pa.int32()} 513 514 # Test without skip_rows and column names in the csv 515 csv_bad_columns = csv + b"1,2\r\n" 516 message_columns = format_msg("{}Expected 4 columns, got 2", 102) 517 with pytest.raises(pa.ArrowInvalid, match=message_columns): 518 self.read_bytes(csv_bad_columns, 519 read_options=read_options, 520 convert_options=convert_options) 521 522 csv_bad_type = csv + b"a,b,c,d\r\n" 523 message_value = format_msg( 524 "In CSV column #0: {}" 525 "CSV conversion error to int32: invalid value 'a'", 526 102, csv) 527 with pytest.raises(pa.ArrowInvalid, match=message_value): 528 self.read_bytes(csv_bad_type, 529 read_options=read_options, 530 convert_options=convert_options) 531 532 long_row = (b"this is a long row" * 15) + b",3\r\n" 533 csv_bad_columns_long = csv + long_row 534 message_long = format_msg("{}Expected 4 columns, got 2: {} ...", 102, 535 long_row[0:96].decode("utf-8")) 536 with pytest.raises(pa.ArrowInvalid, match=message_long): 537 self.read_bytes(csv_bad_columns_long, 538 read_options=read_options, 539 convert_options=convert_options) 540 541 # Test skipping rows after the names 542 read_options.skip_rows_after_names = 47 543 544 with pytest.raises(pa.ArrowInvalid, match=message_columns): 545 self.read_bytes(csv_bad_columns, 546 read_options=read_options, 547 convert_options=convert_options) 548 549 with pytest.raises(pa.ArrowInvalid, match=message_value): 550 self.read_bytes(csv_bad_type, 551 read_options=read_options, 552 convert_options=convert_options) 553 554 with pytest.raises(pa.ArrowInvalid, match=message_long): 555 self.read_bytes(csv_bad_columns_long, 556 read_options=read_options, 557 convert_options=convert_options) 558 559 read_options.skip_rows_after_names = 0 560 561 # Test without skip_rows and column names not in the csv 562 csv, _ = make_random_csv(4, 100, write_names=False) 563 read_options.column_names = ["a", "b", "c", "d"] 564 csv_bad_columns = csv + b"1,2\r\n" 565 message_columns = format_msg("{}Expected 4 columns, got 2", 101) 566 with pytest.raises(pa.ArrowInvalid, match=message_columns): 567 self.read_bytes(csv_bad_columns, 568 read_options=read_options, 569 convert_options=convert_options) 570 571 csv_bad_columns_long = csv + long_row 572 message_long = format_msg("{}Expected 4 columns, got 2: {} ...", 101, 573 long_row[0:96].decode("utf-8")) 574 with pytest.raises(pa.ArrowInvalid, match=message_long): 575 self.read_bytes(csv_bad_columns_long, 576 read_options=read_options, 577 convert_options=convert_options) 578 579 csv_bad_type = csv + b"a,b,c,d\r\n" 580 message_value = format_msg( 581 "In CSV column #0: {}" 582 "CSV conversion error to int32: invalid value 'a'", 583 101) 584 message_value = message_value.format(len(csv)) 585 with pytest.raises(pa.ArrowInvalid, match=message_value): 586 self.read_bytes(csv_bad_type, 587 read_options=read_options, 588 convert_options=convert_options) 589 590 # Test with skip_rows and column names not in the csv 591 read_options.skip_rows = 23 592 with pytest.raises(pa.ArrowInvalid, match=message_columns): 593 self.read_bytes(csv_bad_columns, 594 read_options=read_options, 595 convert_options=convert_options) 596 597 with pytest.raises(pa.ArrowInvalid, match=message_value): 598 self.read_bytes(csv_bad_type, 599 read_options=read_options, 600 convert_options=convert_options) 601 602 603class BaseCSVTableRead(BaseTestCSV): 604 605 def read_csv(self, csv, *args, validate_full=True, **kwargs): 606 """ 607 Reads the CSV file into memory using pyarrow's read_csv 608 csv The CSV bytes 609 args Positional arguments to be forwarded to pyarrow's read_csv 610 validate_full Whether or not to fully validate the resulting table 611 kwargs Keyword arguments to be forwarded to pyarrow's read_csv 612 """ 613 assert isinstance(self.use_threads, bool) # sanity check 614 read_options = kwargs.setdefault('read_options', ReadOptions()) 615 read_options.use_threads = self.use_threads 616 table = read_csv(csv, *args, **kwargs) 617 table.validate(full=validate_full) 618 return table 619 620 def read_bytes(self, b, **kwargs): 621 return self.read_csv(pa.py_buffer(b), **kwargs) 622 623 def test_file_object(self): 624 data = b"a,b\n1,2\n" 625 expected_data = {'a': [1], 'b': [2]} 626 bio = io.BytesIO(data) 627 table = self.read_csv(bio) 628 assert table.to_pydict() == expected_data 629 # Text files not allowed 630 sio = io.StringIO(data.decode()) 631 with pytest.raises(TypeError): 632 self.read_csv(sio) 633 634 def test_header(self): 635 rows = b"abc,def,gh\n" 636 table = self.read_bytes(rows) 637 assert isinstance(table, pa.Table) 638 self.check_names(table, ["abc", "def", "gh"]) 639 assert table.num_rows == 0 640 641 def test_bom(self): 642 rows = b"\xef\xbb\xbfa,b\n1,2\n" 643 expected_data = {'a': [1], 'b': [2]} 644 table = self.read_bytes(rows) 645 assert table.to_pydict() == expected_data 646 647 def test_one_chunk(self): 648 # ARROW-7661: lack of newline at end of file should not produce 649 # an additional chunk. 650 rows = [b"a,b", b"1,2", b"3,4", b"56,78"] 651 for line_ending in [b'\n', b'\r', b'\r\n']: 652 for file_ending in [b'', line_ending]: 653 data = line_ending.join(rows) + file_ending 654 table = self.read_bytes(data) 655 assert len(table.to_batches()) == 1 656 assert table.to_pydict() == { 657 "a": [1, 3, 56], 658 "b": [2, 4, 78], 659 } 660 661 def test_header_column_names(self): 662 rows = b"ab,cd\nef,gh\nij,kl\nmn,op\n" 663 664 opts = ReadOptions() 665 opts.column_names = ["x", "y"] 666 table = self.read_bytes(rows, read_options=opts) 667 self.check_names(table, ["x", "y"]) 668 assert table.to_pydict() == { 669 "x": ["ab", "ef", "ij", "mn"], 670 "y": ["cd", "gh", "kl", "op"], 671 } 672 673 opts.skip_rows = 3 674 table = self.read_bytes(rows, read_options=opts) 675 self.check_names(table, ["x", "y"]) 676 assert table.to_pydict() == { 677 "x": ["mn"], 678 "y": ["op"], 679 } 680 681 opts.skip_rows = 4 682 table = self.read_bytes(rows, read_options=opts) 683 self.check_names(table, ["x", "y"]) 684 assert table.to_pydict() == { 685 "x": [], 686 "y": [], 687 } 688 689 opts.skip_rows = 5 690 with pytest.raises(pa.ArrowInvalid): 691 # Not enough rows 692 table = self.read_bytes(rows, read_options=opts) 693 694 # Unexpected number of columns 695 opts.skip_rows = 0 696 opts.column_names = ["x", "y", "z"] 697 with pytest.raises(pa.ArrowInvalid, 698 match="Expected 3 columns, got 2"): 699 table = self.read_bytes(rows, read_options=opts) 700 701 # Can skip rows with a different number of columns 702 rows = b"abcd\n,,,,,\nij,kl\nmn,op\n" 703 opts.skip_rows = 2 704 opts.column_names = ["x", "y"] 705 table = self.read_bytes(rows, read_options=opts) 706 self.check_names(table, ["x", "y"]) 707 assert table.to_pydict() == { 708 "x": ["ij", "mn"], 709 "y": ["kl", "op"], 710 } 711 712 def test_header_autogenerate_column_names(self): 713 rows = b"ab,cd\nef,gh\nij,kl\nmn,op\n" 714 715 opts = ReadOptions() 716 opts.autogenerate_column_names = True 717 table = self.read_bytes(rows, read_options=opts) 718 self.check_names(table, ["f0", "f1"]) 719 assert table.to_pydict() == { 720 "f0": ["ab", "ef", "ij", "mn"], 721 "f1": ["cd", "gh", "kl", "op"], 722 } 723 724 opts.skip_rows = 3 725 table = self.read_bytes(rows, read_options=opts) 726 self.check_names(table, ["f0", "f1"]) 727 assert table.to_pydict() == { 728 "f0": ["mn"], 729 "f1": ["op"], 730 } 731 732 # Not enough rows, impossible to infer number of columns 733 opts.skip_rows = 4 734 with pytest.raises(pa.ArrowInvalid): 735 table = self.read_bytes(rows, read_options=opts) 736 737 def test_include_columns(self): 738 rows = b"ab,cd\nef,gh\nij,kl\nmn,op\n" 739 740 convert_options = ConvertOptions() 741 convert_options.include_columns = ['ab'] 742 table = self.read_bytes(rows, convert_options=convert_options) 743 self.check_names(table, ["ab"]) 744 assert table.to_pydict() == { 745 "ab": ["ef", "ij", "mn"], 746 } 747 748 # Order of include_columns is respected, regardless of CSV order 749 convert_options.include_columns = ['cd', 'ab'] 750 table = self.read_bytes(rows, convert_options=convert_options) 751 schema = pa.schema([('cd', pa.string()), 752 ('ab', pa.string())]) 753 assert table.schema == schema 754 assert table.to_pydict() == { 755 "cd": ["gh", "kl", "op"], 756 "ab": ["ef", "ij", "mn"], 757 } 758 759 # Include a column not in the CSV file => raises by default 760 convert_options.include_columns = ['xx', 'ab', 'yy'] 761 with pytest.raises(KeyError, 762 match="Column 'xx' in include_columns " 763 "does not exist in CSV file"): 764 self.read_bytes(rows, convert_options=convert_options) 765 766 def test_include_missing_columns(self): 767 rows = b"ab,cd\nef,gh\nij,kl\nmn,op\n" 768 769 read_options = ReadOptions() 770 convert_options = ConvertOptions() 771 convert_options.include_columns = ['xx', 'ab', 'yy'] 772 convert_options.include_missing_columns = True 773 table = self.read_bytes(rows, read_options=read_options, 774 convert_options=convert_options) 775 schema = pa.schema([('xx', pa.null()), 776 ('ab', pa.string()), 777 ('yy', pa.null())]) 778 assert table.schema == schema 779 assert table.to_pydict() == { 780 "xx": [None, None, None], 781 "ab": ["ef", "ij", "mn"], 782 "yy": [None, None, None], 783 } 784 785 # Combining with `column_names` 786 read_options.column_names = ["xx", "yy"] 787 convert_options.include_columns = ["yy", "cd"] 788 table = self.read_bytes(rows, read_options=read_options, 789 convert_options=convert_options) 790 schema = pa.schema([('yy', pa.string()), 791 ('cd', pa.null())]) 792 assert table.schema == schema 793 assert table.to_pydict() == { 794 "yy": ["cd", "gh", "kl", "op"], 795 "cd": [None, None, None, None], 796 } 797 798 # And with `column_types` as well 799 convert_options.column_types = {"yy": pa.binary(), 800 "cd": pa.int32()} 801 table = self.read_bytes(rows, read_options=read_options, 802 convert_options=convert_options) 803 schema = pa.schema([('yy', pa.binary()), 804 ('cd', pa.int32())]) 805 assert table.schema == schema 806 assert table.to_pydict() == { 807 "yy": [b"cd", b"gh", b"kl", b"op"], 808 "cd": [None, None, None, None], 809 } 810 811 def test_simple_ints(self): 812 # Infer integer columns 813 rows = b"a,b,c\n1,2,3\n4,5,6\n" 814 table = self.read_bytes(rows) 815 schema = pa.schema([('a', pa.int64()), 816 ('b', pa.int64()), 817 ('c', pa.int64())]) 818 assert table.schema == schema 819 assert table.to_pydict() == { 820 'a': [1, 4], 821 'b': [2, 5], 822 'c': [3, 6], 823 } 824 825 def test_simple_varied(self): 826 # Infer various kinds of data 827 rows = b"a,b,c,d\n1,2,3,0\n4.0,-5,foo,True\n" 828 table = self.read_bytes(rows) 829 schema = pa.schema([('a', pa.float64()), 830 ('b', pa.int64()), 831 ('c', pa.string()), 832 ('d', pa.bool_())]) 833 assert table.schema == schema 834 assert table.to_pydict() == { 835 'a': [1.0, 4.0], 836 'b': [2, -5], 837 'c': ["3", "foo"], 838 'd': [False, True], 839 } 840 841 def test_simple_nulls(self): 842 # Infer various kinds of data, with nulls 843 rows = (b"a,b,c,d,e,f\n" 844 b"1,2,,,3,N/A\n" 845 b"nan,-5,foo,,nan,TRUE\n" 846 b"4.5,#N/A,nan,,\xff,false\n") 847 table = self.read_bytes(rows) 848 schema = pa.schema([('a', pa.float64()), 849 ('b', pa.int64()), 850 ('c', pa.string()), 851 ('d', pa.null()), 852 ('e', pa.binary()), 853 ('f', pa.bool_())]) 854 assert table.schema == schema 855 assert table.to_pydict() == { 856 'a': [1.0, None, 4.5], 857 'b': [2, -5, None], 858 'c': ["", "foo", "nan"], 859 'd': [None, None, None], 860 'e': [b"3", b"nan", b"\xff"], 861 'f': [None, True, False], 862 } 863 864 def test_decimal_point(self): 865 # Infer floats with a custom decimal point 866 parse_options = ParseOptions(delimiter=';') 867 rows = b"a;b\n1.25;2,5\nNA;-3\n-4;NA" 868 869 table = self.read_bytes(rows, parse_options=parse_options) 870 schema = pa.schema([('a', pa.float64()), 871 ('b', pa.string())]) 872 assert table.schema == schema 873 assert table.to_pydict() == { 874 'a': [1.25, None, -4.0], 875 'b': ["2,5", "-3", "NA"], 876 } 877 878 convert_options = ConvertOptions(decimal_point=',') 879 table = self.read_bytes(rows, parse_options=parse_options, 880 convert_options=convert_options) 881 schema = pa.schema([('a', pa.string()), 882 ('b', pa.float64())]) 883 assert table.schema == schema 884 assert table.to_pydict() == { 885 'a': ["1.25", "NA", "-4"], 886 'b': [2.5, -3.0, None], 887 } 888 889 def test_simple_timestamps(self): 890 # Infer a timestamp column 891 rows = (b"a,b,c\n" 892 b"1970,1970-01-01 00:00:00,1970-01-01 00:00:00.123\n" 893 b"1989,1989-07-14 01:00:00,1989-07-14 01:00:00.123456\n") 894 table = self.read_bytes(rows) 895 schema = pa.schema([('a', pa.int64()), 896 ('b', pa.timestamp('s')), 897 ('c', pa.timestamp('ns'))]) 898 assert table.schema == schema 899 assert table.to_pydict() == { 900 'a': [1970, 1989], 901 'b': [datetime(1970, 1, 1), datetime(1989, 7, 14, 1)], 902 'c': [datetime(1970, 1, 1, 0, 0, 0, 123000), 903 datetime(1989, 7, 14, 1, 0, 0, 123456)], 904 } 905 906 def test_timestamp_parsers(self): 907 # Infer timestamps with custom parsers 908 rows = b"a,b\n1970/01/01,1980-01-01 00\n1970/01/02,1980-01-02 00\n" 909 opts = ConvertOptions() 910 911 table = self.read_bytes(rows, convert_options=opts) 912 schema = pa.schema([('a', pa.string()), 913 ('b', pa.timestamp('s'))]) 914 assert table.schema == schema 915 assert table.to_pydict() == { 916 'a': ['1970/01/01', '1970/01/02'], 917 'b': [datetime(1980, 1, 1), datetime(1980, 1, 2)], 918 } 919 920 opts.timestamp_parsers = ['%Y/%m/%d'] 921 table = self.read_bytes(rows, convert_options=opts) 922 schema = pa.schema([('a', pa.timestamp('s')), 923 ('b', pa.string())]) 924 assert table.schema == schema 925 assert table.to_pydict() == { 926 'a': [datetime(1970, 1, 1), datetime(1970, 1, 2)], 927 'b': ['1980-01-01 00', '1980-01-02 00'], 928 } 929 930 opts.timestamp_parsers = ['%Y/%m/%d', ISO8601] 931 table = self.read_bytes(rows, convert_options=opts) 932 schema = pa.schema([('a', pa.timestamp('s')), 933 ('b', pa.timestamp('s'))]) 934 assert table.schema == schema 935 assert table.to_pydict() == { 936 'a': [datetime(1970, 1, 1), datetime(1970, 1, 2)], 937 'b': [datetime(1980, 1, 1), datetime(1980, 1, 2)], 938 } 939 940 def test_dates(self): 941 # Dates are inferred as date32 by default 942 rows = b"a,b\n1970-01-01,1970-01-02\n1971-01-01,1971-01-02\n" 943 table = self.read_bytes(rows) 944 schema = pa.schema([('a', pa.date32()), 945 ('b', pa.date32())]) 946 assert table.schema == schema 947 assert table.to_pydict() == { 948 'a': [date(1970, 1, 1), date(1971, 1, 1)], 949 'b': [date(1970, 1, 2), date(1971, 1, 2)], 950 } 951 952 # Can ask for date types explicitly 953 opts = ConvertOptions() 954 opts.column_types = {'a': pa.date32(), 'b': pa.date64()} 955 table = self.read_bytes(rows, convert_options=opts) 956 schema = pa.schema([('a', pa.date32()), 957 ('b', pa.date64())]) 958 assert table.schema == schema 959 assert table.to_pydict() == { 960 'a': [date(1970, 1, 1), date(1971, 1, 1)], 961 'b': [date(1970, 1, 2), date(1971, 1, 2)], 962 } 963 964 # Can ask for timestamp types explicitly 965 opts = ConvertOptions() 966 opts.column_types = {'a': pa.timestamp('s'), 'b': pa.timestamp('ms')} 967 table = self.read_bytes(rows, convert_options=opts) 968 schema = pa.schema([('a', pa.timestamp('s')), 969 ('b', pa.timestamp('ms'))]) 970 assert table.schema == schema 971 assert table.to_pydict() == { 972 'a': [datetime(1970, 1, 1), datetime(1971, 1, 1)], 973 'b': [datetime(1970, 1, 2), datetime(1971, 1, 2)], 974 } 975 976 def test_times(self): 977 # Times are inferred as time32[s] by default 978 from datetime import time 979 980 rows = b"a,b\n12:34:56,12:34:56.789\n23:59:59,23:59:59.999\n" 981 table = self.read_bytes(rows) 982 # Column 'b' has subseconds, so cannot be inferred as time32[s] 983 schema = pa.schema([('a', pa.time32('s')), 984 ('b', pa.string())]) 985 assert table.schema == schema 986 assert table.to_pydict() == { 987 'a': [time(12, 34, 56), time(23, 59, 59)], 988 'b': ["12:34:56.789", "23:59:59.999"], 989 } 990 991 # Can ask for time types explicitly 992 opts = ConvertOptions() 993 opts.column_types = {'a': pa.time64('us'), 'b': pa.time32('ms')} 994 table = self.read_bytes(rows, convert_options=opts) 995 schema = pa.schema([('a', pa.time64('us')), 996 ('b', pa.time32('ms'))]) 997 assert table.schema == schema 998 assert table.to_pydict() == { 999 'a': [time(12, 34, 56), time(23, 59, 59)], 1000 'b': [time(12, 34, 56, 789000), time(23, 59, 59, 999000)], 1001 } 1002 1003 def test_auto_dict_encode(self): 1004 opts = ConvertOptions(auto_dict_encode=True) 1005 rows = "a,b\nab,1\ncdé,2\ncdé,3\nab,4".encode() 1006 table = self.read_bytes(rows, convert_options=opts) 1007 schema = pa.schema([('a', pa.dictionary(pa.int32(), pa.string())), 1008 ('b', pa.int64())]) 1009 expected = { 1010 'a': ["ab", "cdé", "cdé", "ab"], 1011 'b': [1, 2, 3, 4], 1012 } 1013 assert table.schema == schema 1014 assert table.to_pydict() == expected 1015 1016 opts.auto_dict_max_cardinality = 2 1017 table = self.read_bytes(rows, convert_options=opts) 1018 assert table.schema == schema 1019 assert table.to_pydict() == expected 1020 1021 # Cardinality above max => plain-encoded 1022 opts.auto_dict_max_cardinality = 1 1023 table = self.read_bytes(rows, convert_options=opts) 1024 assert table.schema == pa.schema([('a', pa.string()), 1025 ('b', pa.int64())]) 1026 assert table.to_pydict() == expected 1027 1028 # With invalid UTF8, not checked 1029 opts.auto_dict_max_cardinality = 50 1030 opts.check_utf8 = False 1031 rows = b"a,b\nab,1\ncd\xff,2\nab,3" 1032 table = self.read_bytes(rows, convert_options=opts, 1033 validate_full=False) 1034 assert table.schema == schema 1035 dict_values = table['a'].chunk(0).dictionary 1036 assert len(dict_values) == 2 1037 assert dict_values[0].as_py() == "ab" 1038 assert dict_values[1].as_buffer() == b"cd\xff" 1039 1040 # With invalid UTF8, checked 1041 opts.check_utf8 = True 1042 table = self.read_bytes(rows, convert_options=opts) 1043 schema = pa.schema([('a', pa.dictionary(pa.int32(), pa.binary())), 1044 ('b', pa.int64())]) 1045 expected = { 1046 'a': [b"ab", b"cd\xff", b"ab"], 1047 'b': [1, 2, 3], 1048 } 1049 assert table.schema == schema 1050 assert table.to_pydict() == expected 1051 1052 def test_custom_nulls(self): 1053 # Infer nulls with custom values 1054 opts = ConvertOptions(null_values=['Xxx', 'Zzz']) 1055 rows = b"""a,b,c,d\nZzz,"Xxx",1,2\nXxx,#N/A,,Zzz\n""" 1056 table = self.read_bytes(rows, convert_options=opts) 1057 schema = pa.schema([('a', pa.null()), 1058 ('b', pa.string()), 1059 ('c', pa.string()), 1060 ('d', pa.int64())]) 1061 assert table.schema == schema 1062 assert table.to_pydict() == { 1063 'a': [None, None], 1064 'b': ["Xxx", "#N/A"], 1065 'c': ["1", ""], 1066 'd': [2, None], 1067 } 1068 1069 opts = ConvertOptions(null_values=['Xxx', 'Zzz'], 1070 strings_can_be_null=True) 1071 table = self.read_bytes(rows, convert_options=opts) 1072 assert table.to_pydict() == { 1073 'a': [None, None], 1074 'b': [None, "#N/A"], 1075 'c': ["1", ""], 1076 'd': [2, None], 1077 } 1078 opts.quoted_strings_can_be_null = False 1079 table = self.read_bytes(rows, convert_options=opts) 1080 assert table.to_pydict() == { 1081 'a': [None, None], 1082 'b': ["Xxx", "#N/A"], 1083 'c': ["1", ""], 1084 'd': [2, None], 1085 } 1086 1087 opts = ConvertOptions(null_values=[]) 1088 rows = b"a,b\n#N/A,\n" 1089 table = self.read_bytes(rows, convert_options=opts) 1090 schema = pa.schema([('a', pa.string()), 1091 ('b', pa.string())]) 1092 assert table.schema == schema 1093 assert table.to_pydict() == { 1094 'a': ["#N/A"], 1095 'b': [""], 1096 } 1097 1098 def test_custom_bools(self): 1099 # Infer booleans with custom values 1100 opts = ConvertOptions(true_values=['T', 'yes'], 1101 false_values=['F', 'no']) 1102 rows = (b"a,b,c\n" 1103 b"True,T,t\n" 1104 b"False,F,f\n" 1105 b"True,yes,yes\n" 1106 b"False,no,no\n" 1107 b"N/A,N/A,N/A\n") 1108 table = self.read_bytes(rows, convert_options=opts) 1109 schema = pa.schema([('a', pa.string()), 1110 ('b', pa.bool_()), 1111 ('c', pa.string())]) 1112 assert table.schema == schema 1113 assert table.to_pydict() == { 1114 'a': ["True", "False", "True", "False", "N/A"], 1115 'b': [True, False, True, False, None], 1116 'c': ["t", "f", "yes", "no", "N/A"], 1117 } 1118 1119 def test_column_types(self): 1120 # Ask for specific column types in ConvertOptions 1121 opts = ConvertOptions(column_types={'b': 'float32', 1122 'c': 'string', 1123 'd': 'boolean', 1124 'e': pa.decimal128(11, 2), 1125 'zz': 'null'}) 1126 rows = b"a,b,c,d,e\n1,2,3,true,1.0\n4,-5,6,false,0\n" 1127 table = self.read_bytes(rows, convert_options=opts) 1128 schema = pa.schema([('a', pa.int64()), 1129 ('b', pa.float32()), 1130 ('c', pa.string()), 1131 ('d', pa.bool_()), 1132 ('e', pa.decimal128(11, 2))]) 1133 expected = { 1134 'a': [1, 4], 1135 'b': [2.0, -5.0], 1136 'c': ["3", "6"], 1137 'd': [True, False], 1138 'e': [Decimal("1.00"), Decimal("0.00")] 1139 } 1140 assert table.schema == schema 1141 assert table.to_pydict() == expected 1142 # Pass column_types as schema 1143 opts = ConvertOptions( 1144 column_types=pa.schema([('b', pa.float32()), 1145 ('c', pa.string()), 1146 ('d', pa.bool_()), 1147 ('e', pa.decimal128(11, 2)), 1148 ('zz', pa.bool_())])) 1149 table = self.read_bytes(rows, convert_options=opts) 1150 assert table.schema == schema 1151 assert table.to_pydict() == expected 1152 # One of the columns in column_types fails converting 1153 rows = b"a,b,c,d,e\n1,XXX,3,true,5\n4,-5,6,false,7\n" 1154 with pytest.raises(pa.ArrowInvalid) as exc: 1155 self.read_bytes(rows, convert_options=opts) 1156 err = str(exc.value) 1157 assert "In CSV column #1: " in err 1158 assert "CSV conversion error to float: invalid value 'XXX'" in err 1159 1160 def test_column_types_dict(self): 1161 # Ask for dict-encoded column types in ConvertOptions 1162 column_types = [ 1163 ('a', pa.dictionary(pa.int32(), pa.utf8())), 1164 ('b', pa.dictionary(pa.int32(), pa.int64())), 1165 ('c', pa.dictionary(pa.int32(), pa.decimal128(11, 2))), 1166 ('d', pa.dictionary(pa.int32(), pa.large_utf8()))] 1167 1168 opts = ConvertOptions(column_types=dict(column_types)) 1169 rows = (b"a,b,c,d\n" 1170 b"abc,123456,1.0,zz\n" 1171 b"defg,123456,0.5,xx\n" 1172 b"abc,N/A,1.0,xx\n") 1173 table = self.read_bytes(rows, convert_options=opts) 1174 1175 schema = pa.schema(column_types) 1176 expected = { 1177 'a': ["abc", "defg", "abc"], 1178 'b': [123456, 123456, None], 1179 'c': [Decimal("1.00"), Decimal("0.50"), Decimal("1.00")], 1180 'd': ["zz", "xx", "xx"], 1181 } 1182 assert table.schema == schema 1183 assert table.to_pydict() == expected 1184 1185 # Unsupported index type 1186 column_types[0] = ('a', pa.dictionary(pa.int8(), pa.utf8())) 1187 1188 opts = ConvertOptions(column_types=dict(column_types)) 1189 with pytest.raises(NotImplementedError): 1190 table = self.read_bytes(rows, convert_options=opts) 1191 1192 def test_column_types_with_column_names(self): 1193 # When both `column_names` and `column_types` are given, names 1194 # in `column_types` should refer to names in `column_names` 1195 rows = b"a,b\nc,d\ne,f\n" 1196 read_options = ReadOptions(column_names=['x', 'y']) 1197 convert_options = ConvertOptions(column_types={'x': pa.binary()}) 1198 table = self.read_bytes(rows, read_options=read_options, 1199 convert_options=convert_options) 1200 schema = pa.schema([('x', pa.binary()), 1201 ('y', pa.string())]) 1202 assert table.schema == schema 1203 assert table.to_pydict() == { 1204 'x': [b'a', b'c', b'e'], 1205 'y': ['b', 'd', 'f'], 1206 } 1207 1208 def test_no_ending_newline(self): 1209 # No \n after last line 1210 rows = b"a,b,c\n1,2,3\n4,5,6" 1211 table = self.read_bytes(rows) 1212 assert table.to_pydict() == { 1213 'a': [1, 4], 1214 'b': [2, 5], 1215 'c': [3, 6], 1216 } 1217 1218 def test_trivial(self): 1219 # A bit pointless, but at least it shouldn't crash 1220 rows = b",\n\n" 1221 table = self.read_bytes(rows) 1222 assert table.to_pydict() == {'': []} 1223 1224 def test_empty_lines(self): 1225 rows = b"a,b\n\r1,2\r\n\r\n3,4\r\n" 1226 table = self.read_bytes(rows) 1227 assert table.to_pydict() == { 1228 'a': [1, 3], 1229 'b': [2, 4], 1230 } 1231 parse_options = ParseOptions(ignore_empty_lines=False) 1232 table = self.read_bytes(rows, parse_options=parse_options) 1233 assert table.to_pydict() == { 1234 'a': [None, 1, None, 3], 1235 'b': [None, 2, None, 4], 1236 } 1237 read_options = ReadOptions(skip_rows=2) 1238 table = self.read_bytes(rows, parse_options=parse_options, 1239 read_options=read_options) 1240 assert table.to_pydict() == { 1241 '1': [None, 3], 1242 '2': [None, 4], 1243 } 1244 1245 def test_invalid_csv(self): 1246 # Various CSV errors 1247 rows = b"a,b,c\n1,2\n4,5,6\n" 1248 with pytest.raises(pa.ArrowInvalid, match="Expected 3 columns, got 2"): 1249 self.read_bytes(rows) 1250 rows = b"a,b,c\n1,2,3\n4" 1251 with pytest.raises(pa.ArrowInvalid, match="Expected 3 columns, got 1"): 1252 self.read_bytes(rows) 1253 for rows in [b"", b"\n", b"\r\n", b"\r", b"\n\n"]: 1254 with pytest.raises(pa.ArrowInvalid, match="Empty CSV file"): 1255 self.read_bytes(rows) 1256 1257 def test_options_delimiter(self): 1258 rows = b"a;b,c\nde,fg;eh\n" 1259 table = self.read_bytes(rows) 1260 assert table.to_pydict() == { 1261 'a;b': ['de'], 1262 'c': ['fg;eh'], 1263 } 1264 opts = ParseOptions(delimiter=';') 1265 table = self.read_bytes(rows, parse_options=opts) 1266 assert table.to_pydict() == { 1267 'a': ['de,fg'], 1268 'b,c': ['eh'], 1269 } 1270 1271 def test_small_random_csv(self): 1272 csv, expected = make_random_csv(num_cols=2, num_rows=10) 1273 table = self.read_bytes(csv) 1274 assert table.schema == expected.schema 1275 assert table.equals(expected) 1276 assert table.to_pydict() == expected.to_pydict() 1277 1278 def test_stress_block_sizes(self): 1279 # Test a number of small block sizes to stress block stitching 1280 csv_base, expected = make_random_csv(num_cols=2, num_rows=500) 1281 block_sizes = [11, 12, 13, 17, 37, 111] 1282 csvs = [csv_base, csv_base.rstrip(b'\r\n')] 1283 for csv in csvs: 1284 for block_size in block_sizes: 1285 read_options = ReadOptions(block_size=block_size) 1286 table = self.read_bytes(csv, read_options=read_options) 1287 assert table.schema == expected.schema 1288 if not table.equals(expected): 1289 # Better error output 1290 assert table.to_pydict() == expected.to_pydict() 1291 1292 def test_stress_convert_options_blowup(self): 1293 # ARROW-6481: A convert_options with a very large number of columns 1294 # should not blow memory and CPU time. 1295 try: 1296 clock = time.thread_time 1297 except AttributeError: 1298 clock = time.time 1299 num_columns = 10000 1300 col_names = ["K{}".format(i) for i in range(num_columns)] 1301 csv = make_empty_csv(col_names) 1302 t1 = clock() 1303 convert_options = ConvertOptions( 1304 column_types={k: pa.string() for k in col_names[::2]}) 1305 table = self.read_bytes(csv, convert_options=convert_options) 1306 dt = clock() - t1 1307 # Check that processing time didn't blow up. 1308 # This is a conservative check (it takes less than 300 ms 1309 # in debug mode on my local machine). 1310 assert dt <= 10.0 1311 # Check result 1312 assert table.num_columns == num_columns 1313 assert table.num_rows == 0 1314 assert table.column_names == col_names 1315 1316 def test_cancellation(self): 1317 if (threading.current_thread().ident != 1318 threading.main_thread().ident): 1319 pytest.skip("test only works from main Python thread") 1320 # Skips test if not available 1321 raise_signal = util.get_raise_signal() 1322 1323 # Make the interruptible workload large enough to not finish 1324 # before the interrupt comes, even in release mode on fast machines. 1325 last_duration = 0.0 1326 workload_size = 100_000 1327 1328 while last_duration < 1.0: 1329 print("workload size:", workload_size) 1330 large_csv = b"a,b,c\n" + b"1,2,3\n" * workload_size 1331 t1 = time.time() 1332 self.read_bytes(large_csv) 1333 last_duration = time.time() - t1 1334 workload_size = workload_size * 3 1335 1336 def signal_from_thread(): 1337 time.sleep(0.2) 1338 raise_signal(signal.SIGINT) 1339 1340 t1 = time.time() 1341 try: 1342 try: 1343 t = threading.Thread(target=signal_from_thread) 1344 with pytest.raises(KeyboardInterrupt) as exc_info: 1345 t.start() 1346 self.read_bytes(large_csv) 1347 finally: 1348 t.join() 1349 except KeyboardInterrupt: 1350 # In case KeyboardInterrupt didn't interrupt `self.read_bytes` 1351 # above, at least prevent it from stopping the test suite 1352 pytest.fail("KeyboardInterrupt didn't interrupt CSV reading") 1353 dt = time.time() - t1 1354 # Interruption should have arrived timely 1355 assert dt <= 1.0 1356 e = exc_info.value.__context__ 1357 assert isinstance(e, pa.ArrowCancelled) 1358 assert e.signum == signal.SIGINT 1359 1360 def test_cancellation_disabled(self): 1361 # ARROW-12622: reader would segfault when the cancelling signal 1362 # handler was not enabled (e.g. if disabled, or if not on the 1363 # main thread) 1364 t = threading.Thread( 1365 target=lambda: self.read_bytes(b"f64\n0.1")) 1366 t.start() 1367 t.join() 1368 1369 1370class TestSerialCSVTableRead(BaseCSVTableRead): 1371 @property 1372 def use_threads(self): 1373 return False 1374 1375 1376class TestThreadedCSVTableRead(BaseCSVTableRead): 1377 @property 1378 def use_threads(self): 1379 return True 1380 1381 1382class BaseStreamingCSVRead(BaseTestCSV): 1383 1384 def open_csv(self, csv, *args, **kwargs): 1385 """ 1386 Reads the CSV file into memory using pyarrow's open_csv 1387 csv The CSV bytes 1388 args Positional arguments to be forwarded to pyarrow's open_csv 1389 kwargs Keyword arguments to be forwarded to pyarrow's open_csv 1390 """ 1391 read_options = kwargs.setdefault('read_options', ReadOptions()) 1392 read_options.use_threads = self.use_threads 1393 return open_csv(csv, *args, **kwargs) 1394 1395 def open_bytes(self, b, **kwargs): 1396 return self.open_csv(pa.py_buffer(b), **kwargs) 1397 1398 def check_reader(self, reader, expected_schema, expected_data): 1399 assert reader.schema == expected_schema 1400 batches = list(reader) 1401 assert len(batches) == len(expected_data) 1402 for batch, expected_batch in zip(batches, expected_data): 1403 batch.validate(full=True) 1404 assert batch.schema == expected_schema 1405 assert batch.to_pydict() == expected_batch 1406 1407 def read_bytes(self, b, **kwargs): 1408 return self.open_bytes(b, **kwargs).read_all() 1409 1410 def test_file_object(self): 1411 data = b"a,b\n1,2\n3,4\n" 1412 expected_data = {'a': [1, 3], 'b': [2, 4]} 1413 bio = io.BytesIO(data) 1414 reader = self.open_csv(bio) 1415 expected_schema = pa.schema([('a', pa.int64()), 1416 ('b', pa.int64())]) 1417 self.check_reader(reader, expected_schema, [expected_data]) 1418 1419 def test_header(self): 1420 rows = b"abc,def,gh\n" 1421 reader = self.open_bytes(rows) 1422 expected_schema = pa.schema([('abc', pa.null()), 1423 ('def', pa.null()), 1424 ('gh', pa.null())]) 1425 self.check_reader(reader, expected_schema, []) 1426 1427 def test_inference(self): 1428 # Inference is done on first block 1429 rows = b"a,b\n123,456\nabc,de\xff\ngh,ij\n" 1430 expected_schema = pa.schema([('a', pa.string()), 1431 ('b', pa.binary())]) 1432 1433 read_options = ReadOptions() 1434 read_options.block_size = len(rows) 1435 reader = self.open_bytes(rows, read_options=read_options) 1436 self.check_reader(reader, expected_schema, 1437 [{'a': ['123', 'abc', 'gh'], 1438 'b': [b'456', b'de\xff', b'ij']}]) 1439 1440 read_options.block_size = len(rows) - 1 1441 reader = self.open_bytes(rows, read_options=read_options) 1442 self.check_reader(reader, expected_schema, 1443 [{'a': ['123', 'abc'], 1444 'b': [b'456', b'de\xff']}, 1445 {'a': ['gh'], 1446 'b': [b'ij']}]) 1447 1448 def test_inference_failure(self): 1449 # Inference on first block, then conversion failure on second block 1450 rows = b"a,b\n123,456\nabc,de\xff\ngh,ij\n" 1451 read_options = ReadOptions() 1452 read_options.block_size = len(rows) - 7 1453 reader = self.open_bytes(rows, read_options=read_options) 1454 expected_schema = pa.schema([('a', pa.int64()), 1455 ('b', pa.int64())]) 1456 assert reader.schema == expected_schema 1457 assert reader.read_next_batch().to_pydict() == { 1458 'a': [123], 'b': [456] 1459 } 1460 # Second block 1461 with pytest.raises(ValueError, 1462 match="CSV conversion error to int64"): 1463 reader.read_next_batch() 1464 # EOF 1465 with pytest.raises(StopIteration): 1466 reader.read_next_batch() 1467 1468 def test_invalid_csv(self): 1469 # CSV errors on first block 1470 rows = b"a,b\n1,2,3\n4,5\n6,7\n" 1471 read_options = ReadOptions() 1472 read_options.block_size = 10 1473 with pytest.raises(pa.ArrowInvalid, 1474 match="Expected 2 columns, got 3"): 1475 reader = self.open_bytes( 1476 rows, read_options=read_options) 1477 1478 # CSV errors on second block 1479 rows = b"a,b\n1,2\n3,4,5\n6,7\n" 1480 read_options.block_size = 8 1481 reader = self.open_bytes(rows, read_options=read_options) 1482 assert reader.read_next_batch().to_pydict() == {'a': [1], 'b': [2]} 1483 with pytest.raises(pa.ArrowInvalid, 1484 match="Expected 2 columns, got 3"): 1485 reader.read_next_batch() 1486 # Cannot continue after a parse error 1487 with pytest.raises(StopIteration): 1488 reader.read_next_batch() 1489 1490 def test_options_delimiter(self): 1491 rows = b"a;b,c\nde,fg;eh\n" 1492 reader = self.open_bytes(rows) 1493 expected_schema = pa.schema([('a;b', pa.string()), 1494 ('c', pa.string())]) 1495 self.check_reader(reader, expected_schema, 1496 [{'a;b': ['de'], 1497 'c': ['fg;eh']}]) 1498 1499 opts = ParseOptions(delimiter=';') 1500 reader = self.open_bytes(rows, parse_options=opts) 1501 expected_schema = pa.schema([('a', pa.string()), 1502 ('b,c', pa.string())]) 1503 self.check_reader(reader, expected_schema, 1504 [{'a': ['de,fg'], 1505 'b,c': ['eh']}]) 1506 1507 def test_no_ending_newline(self): 1508 # No \n after last line 1509 rows = b"a,b,c\n1,2,3\n4,5,6" 1510 reader = self.open_bytes(rows) 1511 expected_schema = pa.schema([('a', pa.int64()), 1512 ('b', pa.int64()), 1513 ('c', pa.int64())]) 1514 self.check_reader(reader, expected_schema, 1515 [{'a': [1, 4], 1516 'b': [2, 5], 1517 'c': [3, 6]}]) 1518 1519 def test_empty_file(self): 1520 with pytest.raises(ValueError, match="Empty CSV file"): 1521 self.open_bytes(b"") 1522 1523 def test_column_options(self): 1524 # With column_names 1525 rows = b"1,2,3\n4,5,6" 1526 read_options = ReadOptions() 1527 read_options.column_names = ['d', 'e', 'f'] 1528 reader = self.open_bytes(rows, read_options=read_options) 1529 expected_schema = pa.schema([('d', pa.int64()), 1530 ('e', pa.int64()), 1531 ('f', pa.int64())]) 1532 self.check_reader(reader, expected_schema, 1533 [{'d': [1, 4], 1534 'e': [2, 5], 1535 'f': [3, 6]}]) 1536 1537 # With include_columns 1538 convert_options = ConvertOptions() 1539 convert_options.include_columns = ['f', 'e'] 1540 reader = self.open_bytes(rows, read_options=read_options, 1541 convert_options=convert_options) 1542 expected_schema = pa.schema([('f', pa.int64()), 1543 ('e', pa.int64())]) 1544 self.check_reader(reader, expected_schema, 1545 [{'e': [2, 5], 1546 'f': [3, 6]}]) 1547 1548 # With column_types 1549 convert_options.column_types = {'e': pa.string()} 1550 reader = self.open_bytes(rows, read_options=read_options, 1551 convert_options=convert_options) 1552 expected_schema = pa.schema([('f', pa.int64()), 1553 ('e', pa.string())]) 1554 self.check_reader(reader, expected_schema, 1555 [{'e': ["2", "5"], 1556 'f': [3, 6]}]) 1557 1558 # Missing columns in include_columns 1559 convert_options.include_columns = ['g', 'f', 'e'] 1560 with pytest.raises( 1561 KeyError, 1562 match="Column 'g' in include_columns does not exist"): 1563 reader = self.open_bytes(rows, read_options=read_options, 1564 convert_options=convert_options) 1565 1566 convert_options.include_missing_columns = True 1567 reader = self.open_bytes(rows, read_options=read_options, 1568 convert_options=convert_options) 1569 expected_schema = pa.schema([('g', pa.null()), 1570 ('f', pa.int64()), 1571 ('e', pa.string())]) 1572 self.check_reader(reader, expected_schema, 1573 [{'g': [None, None], 1574 'e': ["2", "5"], 1575 'f': [3, 6]}]) 1576 1577 convert_options.column_types = {'e': pa.string(), 'g': pa.float64()} 1578 reader = self.open_bytes(rows, read_options=read_options, 1579 convert_options=convert_options) 1580 expected_schema = pa.schema([('g', pa.float64()), 1581 ('f', pa.int64()), 1582 ('e', pa.string())]) 1583 self.check_reader(reader, expected_schema, 1584 [{'g': [None, None], 1585 'e': ["2", "5"], 1586 'f': [3, 6]}]) 1587 1588 def test_encoding(self): 1589 # latin-1 (invalid utf-8) 1590 rows = b"a,b\nun,\xe9l\xe9phant" 1591 read_options = ReadOptions() 1592 reader = self.open_bytes(rows, read_options=read_options) 1593 expected_schema = pa.schema([('a', pa.string()), 1594 ('b', pa.binary())]) 1595 self.check_reader(reader, expected_schema, 1596 [{'a': ["un"], 1597 'b': [b"\xe9l\xe9phant"]}]) 1598 1599 read_options.encoding = 'latin1' 1600 reader = self.open_bytes(rows, read_options=read_options) 1601 expected_schema = pa.schema([('a', pa.string()), 1602 ('b', pa.string())]) 1603 self.check_reader(reader, expected_schema, 1604 [{'a': ["un"], 1605 'b': ["éléphant"]}]) 1606 1607 # utf-16 1608 rows = (b'\xff\xfea\x00,\x00b\x00\n\x00u\x00n\x00,' 1609 b'\x00\xe9\x00l\x00\xe9\x00p\x00h\x00a\x00n\x00t\x00') 1610 read_options.encoding = 'utf16' 1611 reader = self.open_bytes(rows, read_options=read_options) 1612 expected_schema = pa.schema([('a', pa.string()), 1613 ('b', pa.string())]) 1614 self.check_reader(reader, expected_schema, 1615 [{'a': ["un"], 1616 'b': ["éléphant"]}]) 1617 1618 def test_small_random_csv(self): 1619 csv, expected = make_random_csv(num_cols=2, num_rows=10) 1620 reader = self.open_bytes(csv) 1621 table = reader.read_all() 1622 assert table.schema == expected.schema 1623 assert table.equals(expected) 1624 assert table.to_pydict() == expected.to_pydict() 1625 1626 def test_stress_block_sizes(self): 1627 # Test a number of small block sizes to stress block stitching 1628 csv_base, expected = make_random_csv(num_cols=2, num_rows=500) 1629 block_sizes = [19, 21, 23, 26, 37, 111] 1630 csvs = [csv_base, csv_base.rstrip(b'\r\n')] 1631 for csv in csvs: 1632 for block_size in block_sizes: 1633 # Need at least two lines for type inference 1634 assert csv[:block_size].count(b'\n') >= 2 1635 read_options = ReadOptions(block_size=block_size) 1636 reader = self.open_bytes( 1637 csv, read_options=read_options) 1638 table = reader.read_all() 1639 assert table.schema == expected.schema 1640 if not table.equals(expected): 1641 # Better error output 1642 assert table.to_pydict() == expected.to_pydict() 1643 1644 def test_batch_lifetime(self): 1645 gc.collect() 1646 old_allocated = pa.total_allocated_bytes() 1647 1648 # Memory occupation should not grow with CSV file size 1649 def check_one_batch(reader, expected): 1650 batch = reader.read_next_batch() 1651 assert batch.to_pydict() == expected 1652 1653 rows = b"10,11\n12,13\n14,15\n16,17\n" 1654 read_options = ReadOptions() 1655 read_options.column_names = ['a', 'b'] 1656 read_options.block_size = 6 1657 reader = self.open_bytes(rows, read_options=read_options) 1658 check_one_batch(reader, {'a': [10], 'b': [11]}) 1659 allocated_after_first_batch = pa.total_allocated_bytes() 1660 check_one_batch(reader, {'a': [12], 'b': [13]}) 1661 assert pa.total_allocated_bytes() <= allocated_after_first_batch 1662 check_one_batch(reader, {'a': [14], 'b': [15]}) 1663 assert pa.total_allocated_bytes() <= allocated_after_first_batch 1664 check_one_batch(reader, {'a': [16], 'b': [17]}) 1665 assert pa.total_allocated_bytes() <= allocated_after_first_batch 1666 with pytest.raises(StopIteration): 1667 reader.read_next_batch() 1668 assert pa.total_allocated_bytes() == old_allocated 1669 reader = None 1670 assert pa.total_allocated_bytes() == old_allocated 1671 1672 def test_header_skip_rows(self): 1673 super().test_header_skip_rows() 1674 1675 rows = b"ab,cd\nef,gh\nij,kl\nmn,op\n" 1676 1677 # Skipping all rows immediately results in end of iteration 1678 opts = ReadOptions() 1679 opts.skip_rows = 4 1680 opts.column_names = ['ab', 'cd'] 1681 reader = self.open_bytes(rows, read_options=opts) 1682 with pytest.raises(StopIteration): 1683 assert reader.read_next_batch() 1684 1685 def test_skip_rows_after_names(self): 1686 super().test_skip_rows_after_names() 1687 1688 rows = b"ab,cd\nef,gh\nij,kl\nmn,op\n" 1689 1690 # Skipping all rows immediately results in end of iteration 1691 opts = ReadOptions() 1692 opts.skip_rows_after_names = 3 1693 reader = self.open_bytes(rows, read_options=opts) 1694 with pytest.raises(StopIteration): 1695 assert reader.read_next_batch() 1696 1697 # Skipping beyond all rows immediately results in end of iteration 1698 opts.skip_rows_after_names = 99999 1699 reader = self.open_bytes(rows, read_options=opts) 1700 with pytest.raises(StopIteration): 1701 assert reader.read_next_batch() 1702 1703 1704class TestSerialStreamingCSVRead(BaseStreamingCSVRead, unittest.TestCase): 1705 @property 1706 def use_threads(self): 1707 return False 1708 1709 1710class TestThreadedStreamingCSVRead(BaseStreamingCSVRead, unittest.TestCase): 1711 @property 1712 def use_threads(self): 1713 return True 1714 1715 1716class BaseTestCompressedCSVRead: 1717 1718 def setUp(self): 1719 self.tmpdir = tempfile.mkdtemp(prefix='arrow-csv-test-') 1720 1721 def tearDown(self): 1722 shutil.rmtree(self.tmpdir) 1723 1724 def read_csv(self, csv_path): 1725 try: 1726 return read_csv(csv_path) 1727 except pa.ArrowNotImplementedError as e: 1728 pytest.skip(str(e)) 1729 1730 def test_random_csv(self): 1731 csv, expected = make_random_csv(num_cols=2, num_rows=100) 1732 csv_path = os.path.join(self.tmpdir, self.csv_filename) 1733 self.write_file(csv_path, csv) 1734 table = self.read_csv(csv_path) 1735 table.validate(full=True) 1736 assert table.schema == expected.schema 1737 assert table.equals(expected) 1738 assert table.to_pydict() == expected.to_pydict() 1739 1740 1741class TestGZipCSVRead(BaseTestCompressedCSVRead, unittest.TestCase): 1742 csv_filename = "compressed.csv.gz" 1743 1744 def write_file(self, path, contents): 1745 with gzip.open(path, 'wb', 3) as f: 1746 f.write(contents) 1747 1748 def test_concatenated(self): 1749 # ARROW-5974 1750 csv_path = os.path.join(self.tmpdir, self.csv_filename) 1751 with gzip.open(csv_path, 'wb', 3) as f: 1752 f.write(b"ab,cd\nef,gh\n") 1753 with gzip.open(csv_path, 'ab', 3) as f: 1754 f.write(b"ij,kl\nmn,op\n") 1755 table = self.read_csv(csv_path) 1756 assert table.to_pydict() == { 1757 'ab': ['ef', 'ij', 'mn'], 1758 'cd': ['gh', 'kl', 'op'], 1759 } 1760 1761 1762class TestBZ2CSVRead(BaseTestCompressedCSVRead, unittest.TestCase): 1763 csv_filename = "compressed.csv.bz2" 1764 1765 def write_file(self, path, contents): 1766 with bz2.BZ2File(path, 'w') as f: 1767 f.write(contents) 1768 1769 1770def test_read_csv_does_not_close_passed_file_handles(): 1771 # ARROW-4823 1772 buf = io.BytesIO(b"a,b,c\n1,2,3\n4,5,6") 1773 read_csv(buf) 1774 assert not buf.closed 1775 1776 1777def test_write_read_round_trip(): 1778 t = pa.Table.from_arrays([[1, 2, 3], ["a", "b", "c"]], ["c1", "c2"]) 1779 record_batch = t.to_batches(max_chunksize=4)[0] 1780 for data in [t, record_batch]: 1781 # Test with header 1782 buf = io.BytesIO() 1783 write_csv(data, buf, WriteOptions(include_header=True)) 1784 buf.seek(0) 1785 assert t == read_csv(buf) 1786 1787 # Test without header 1788 buf = io.BytesIO() 1789 write_csv(data, buf, WriteOptions(include_header=False)) 1790 buf.seek(0) 1791 1792 read_options = ReadOptions(column_names=t.column_names) 1793 assert t == read_csv(buf, read_options=read_options) 1794 1795 # Test with writer 1796 for read_options, write_options in [ 1797 (None, WriteOptions(include_header=True)), 1798 (ReadOptions(column_names=t.column_names), 1799 WriteOptions(include_header=False)), 1800 ]: 1801 buf = io.BytesIO() 1802 with CSVWriter(buf, t.schema, write_options=write_options) as writer: 1803 writer.write_table(t) 1804 buf.seek(0) 1805 assert t == read_csv(buf, read_options=read_options) 1806 1807 buf = io.BytesIO() 1808 with CSVWriter(buf, t.schema, write_options=write_options) as writer: 1809 for batch in t.to_batches(max_chunksize=1): 1810 writer.write_batch(batch) 1811 buf.seek(0) 1812 assert t == read_csv(buf, read_options=read_options) 1813 1814 1815def test_read_csv_reference_cycle(): 1816 # ARROW-13187 1817 def inner(): 1818 buf = io.BytesIO(b"a,b,c\n1,2,3\n4,5,6") 1819 table = read_csv(buf) 1820 return weakref.ref(table) 1821 1822 with util.disabled_gc(): 1823 wr = inner() 1824 assert wr() is None 1825