1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements.  See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership.  The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License.  You may obtain a copy of the License at
8#
9#   http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied.  See the License for the
15# specific language governing permissions and limitations
16# under the License.
17
18import abc
19import bz2
20from datetime import date, datetime
21from decimal import Decimal
22import gc
23import gzip
24import io
25import itertools
26import os
27import pickle
28import shutil
29import signal
30import string
31import tempfile
32import threading
33import time
34import unittest
35import weakref
36
37import pytest
38
39import numpy as np
40
41import pyarrow as pa
42from pyarrow.csv import (
43    open_csv, read_csv, ReadOptions, ParseOptions, ConvertOptions, ISO8601,
44    write_csv, WriteOptions, CSVWriter)
45from pyarrow.tests import util
46
47
48def generate_col_names():
49    # 'a', 'b'... 'z', then 'aa', 'ab'...
50    letters = string.ascii_lowercase
51    yield from letters
52    for first in letters:
53        for second in letters:
54            yield first + second
55
56
57def make_random_csv(num_cols=2, num_rows=10, linesep='\r\n', write_names=True):
58    arr = np.random.RandomState(42).randint(0, 1000, size=(num_cols, num_rows))
59    csv = io.StringIO()
60    col_names = list(itertools.islice(generate_col_names(), num_cols))
61    if write_names:
62        csv.write(",".join(col_names))
63        csv.write(linesep)
64    for row in arr.T:
65        csv.write(",".join(map(str, row)))
66        csv.write(linesep)
67    csv = csv.getvalue().encode()
68    columns = [pa.array(a, type=pa.int64()) for a in arr]
69    expected = pa.Table.from_arrays(columns, col_names)
70    return csv, expected
71
72
73def make_empty_csv(column_names):
74    csv = io.StringIO()
75    csv.write(",".join(column_names))
76    csv.write("\n")
77    return csv.getvalue().encode()
78
79
80def check_options_class(cls, **attr_values):
81    """
82    Check setting and getting attributes of an *Options class.
83    """
84    opts = cls()
85
86    for name, values in attr_values.items():
87        assert getattr(opts, name) == values[0], \
88            "incorrect default value for " + name
89        for v in values:
90            setattr(opts, name, v)
91            assert getattr(opts, name) == v, "failed setting value"
92
93    with pytest.raises(AttributeError):
94        opts.zzz_non_existent = True
95
96    # Check constructor named arguments
97    non_defaults = {name: values[1] for name, values in attr_values.items()}
98    opts = cls(**non_defaults)
99    for name, value in non_defaults.items():
100        assert getattr(opts, name) == value
101
102
103# The various options classes need to be picklable for dataset
104def check_options_class_pickling(cls, **attr_values):
105    opts = cls(**attr_values)
106    new_opts = pickle.loads(pickle.dumps(opts,
107                                         protocol=pickle.HIGHEST_PROTOCOL))
108    for name, value in attr_values.items():
109        assert getattr(new_opts, name) == value
110
111
112def test_read_options():
113    cls = ReadOptions
114    opts = cls()
115
116    check_options_class(cls, use_threads=[True, False],
117                        skip_rows=[0, 3],
118                        column_names=[[], ["ab", "cd"]],
119                        autogenerate_column_names=[False, True],
120                        encoding=['utf8', 'utf16'],
121                        skip_rows_after_names=[0, 27])
122
123    check_options_class_pickling(cls, use_threads=True,
124                                 skip_rows=3,
125                                 column_names=["ab", "cd"],
126                                 autogenerate_column_names=False,
127                                 encoding='utf16',
128                                 skip_rows_after_names=27)
129
130    assert opts.block_size > 0
131    opts.block_size = 12345
132    assert opts.block_size == 12345
133
134    opts = cls(block_size=1234)
135    assert opts.block_size == 1234
136
137    opts.validate()
138
139    match = "ReadOptions: block_size must be at least 1: 0"
140    with pytest.raises(pa.ArrowInvalid, match=match):
141        opts = cls()
142        opts.block_size = 0
143        opts.validate()
144
145    match = "ReadOptions: skip_rows cannot be negative: -1"
146    with pytest.raises(pa.ArrowInvalid, match=match):
147        opts = cls()
148        opts.skip_rows = -1
149        opts.validate()
150
151    match = "ReadOptions: skip_rows_after_names cannot be negative: -1"
152    with pytest.raises(pa.ArrowInvalid, match=match):
153        opts = cls()
154        opts.skip_rows_after_names = -1
155        opts.validate()
156
157    match = "ReadOptions: autogenerate_column_names cannot be true when" \
158            " column_names are provided"
159    with pytest.raises(pa.ArrowInvalid, match=match):
160        opts = cls()
161        opts.autogenerate_column_names = True
162        opts.column_names = ('a', 'b')
163        opts.validate()
164
165
166def test_parse_options():
167    cls = ParseOptions
168
169    check_options_class(cls, delimiter=[',', 'x'],
170                        escape_char=[False, 'y'],
171                        quote_char=['"', 'z', False],
172                        double_quote=[True, False],
173                        newlines_in_values=[False, True],
174                        ignore_empty_lines=[True, False])
175
176    check_options_class_pickling(cls, delimiter='x',
177                                 escape_char='y',
178                                 quote_char=False,
179                                 double_quote=False,
180                                 newlines_in_values=True,
181                                 ignore_empty_lines=False)
182
183    cls().validate()
184    opts = cls()
185    opts.delimiter = "\t"
186    opts.validate()
187
188    match = "ParseOptions: delimiter cannot be \\\\r or \\\\n"
189    with pytest.raises(pa.ArrowInvalid, match=match):
190        opts = cls()
191        opts.delimiter = "\n"
192        opts.validate()
193
194    with pytest.raises(pa.ArrowInvalid, match=match):
195        opts = cls()
196        opts.delimiter = "\r"
197        opts.validate()
198
199    match = "ParseOptions: quote_char cannot be \\\\r or \\\\n"
200    with pytest.raises(pa.ArrowInvalid, match=match):
201        opts = cls()
202        opts.quote_char = "\n"
203        opts.validate()
204
205    with pytest.raises(pa.ArrowInvalid, match=match):
206        opts = cls()
207        opts.quote_char = "\r"
208        opts.validate()
209
210    match = "ParseOptions: escape_char cannot be \\\\r or \\\\n"
211    with pytest.raises(pa.ArrowInvalid, match=match):
212        opts = cls()
213        opts.escape_char = "\n"
214        opts.validate()
215
216    with pytest.raises(pa.ArrowInvalid, match=match):
217        opts = cls()
218        opts.escape_char = "\r"
219        opts.validate()
220
221
222def test_convert_options():
223    cls = ConvertOptions
224    opts = cls()
225
226    check_options_class(
227        cls, check_utf8=[True, False],
228        strings_can_be_null=[False, True],
229        quoted_strings_can_be_null=[True, False],
230        decimal_point=['.', ','],
231        include_columns=[[], ['def', 'abc']],
232        include_missing_columns=[False, True],
233        auto_dict_encode=[False, True],
234        timestamp_parsers=[[], [ISO8601, '%y-%m']])
235
236    check_options_class_pickling(
237        cls, check_utf8=False,
238        strings_can_be_null=True,
239        quoted_strings_can_be_null=False,
240        decimal_point=',',
241        include_columns=['def', 'abc'],
242        include_missing_columns=False,
243        auto_dict_encode=True,
244        timestamp_parsers=[ISO8601, '%y-%m'])
245
246    with pytest.raises(ValueError):
247        opts.decimal_point = '..'
248
249    assert opts.auto_dict_max_cardinality > 0
250    opts.auto_dict_max_cardinality = 99999
251    assert opts.auto_dict_max_cardinality == 99999
252
253    assert opts.column_types == {}
254    # Pass column_types as mapping
255    opts.column_types = {'b': pa.int16(), 'c': pa.float32()}
256    assert opts.column_types == {'b': pa.int16(), 'c': pa.float32()}
257    opts.column_types = {'v': 'int16', 'w': 'null'}
258    assert opts.column_types == {'v': pa.int16(), 'w': pa.null()}
259    # Pass column_types as schema
260    schema = pa.schema([('a', pa.int32()), ('b', pa.string())])
261    opts.column_types = schema
262    assert opts.column_types == {'a': pa.int32(), 'b': pa.string()}
263    # Pass column_types as sequence
264    opts.column_types = [('x', pa.binary())]
265    assert opts.column_types == {'x': pa.binary()}
266
267    with pytest.raises(TypeError, match='DataType expected'):
268        opts.column_types = {'a': None}
269    with pytest.raises(TypeError):
270        opts.column_types = 0
271
272    assert isinstance(opts.null_values, list)
273    assert '' in opts.null_values
274    assert 'N/A' in opts.null_values
275    opts.null_values = ['xxx', 'yyy']
276    assert opts.null_values == ['xxx', 'yyy']
277
278    assert isinstance(opts.true_values, list)
279    opts.true_values = ['xxx', 'yyy']
280    assert opts.true_values == ['xxx', 'yyy']
281
282    assert isinstance(opts.false_values, list)
283    opts.false_values = ['xxx', 'yyy']
284    assert opts.false_values == ['xxx', 'yyy']
285
286    assert opts.timestamp_parsers == []
287    opts.timestamp_parsers = [ISO8601]
288    assert opts.timestamp_parsers == [ISO8601]
289
290    opts = cls(column_types={'a': pa.null()},
291               null_values=['N', 'nn'], true_values=['T', 'tt'],
292               false_values=['F', 'ff'], auto_dict_max_cardinality=999,
293               timestamp_parsers=[ISO8601, '%Y-%m-%d'])
294    assert opts.column_types == {'a': pa.null()}
295    assert opts.null_values == ['N', 'nn']
296    assert opts.false_values == ['F', 'ff']
297    assert opts.true_values == ['T', 'tt']
298    assert opts.auto_dict_max_cardinality == 999
299    assert opts.timestamp_parsers == [ISO8601, '%Y-%m-%d']
300
301
302def test_write_options():
303    cls = WriteOptions
304    opts = cls()
305
306    check_options_class(
307        cls, include_header=[True, False])
308
309    assert opts.batch_size > 0
310    opts.batch_size = 12345
311    assert opts.batch_size == 12345
312
313    opts = cls(batch_size=9876)
314    assert opts.batch_size == 9876
315
316    opts.validate()
317
318    match = "WriteOptions: batch_size must be at least 1: 0"
319    with pytest.raises(pa.ArrowInvalid, match=match):
320        opts = cls()
321        opts.batch_size = 0
322        opts.validate()
323
324
325class BaseTestCSV(abc.ABC):
326    """Common tests which are shared by streaming and non streaming readers"""
327
328    @abc.abstractmethod
329    def read_bytes(self, b, **kwargs):
330        """
331        :param b: bytes to be parsed
332        :param kwargs: arguments passed on to open the csv file
333        :return: b parsed as a single RecordBatch
334        """
335        raise NotImplementedError
336
337    @property
338    @abc.abstractmethod
339    def use_threads(self):
340        """Whether this test is multi-threaded"""
341        raise NotImplementedError
342
343    @staticmethod
344    def check_names(table, names):
345        assert table.num_columns == len(names)
346        assert table.column_names == names
347
348    def test_header_skip_rows(self):
349        rows = b"ab,cd\nef,gh\nij,kl\nmn,op\n"
350
351        opts = ReadOptions()
352        opts.skip_rows = 1
353        table = self.read_bytes(rows, read_options=opts)
354        self.check_names(table, ["ef", "gh"])
355        assert table.to_pydict() == {
356            "ef": ["ij", "mn"],
357            "gh": ["kl", "op"],
358        }
359
360        opts.skip_rows = 3
361        table = self.read_bytes(rows, read_options=opts)
362        self.check_names(table, ["mn", "op"])
363        assert table.to_pydict() == {
364            "mn": [],
365            "op": [],
366        }
367
368        opts.skip_rows = 4
369        with pytest.raises(pa.ArrowInvalid):
370            # Not enough rows
371            table = self.read_bytes(rows, read_options=opts)
372
373        # Can skip rows with a different number of columns
374        rows = b"abcd\n,,,,,\nij,kl\nmn,op\n"
375        opts.skip_rows = 2
376        table = self.read_bytes(rows, read_options=opts)
377        self.check_names(table, ["ij", "kl"])
378        assert table.to_pydict() == {
379            "ij": ["mn"],
380            "kl": ["op"],
381        }
382
383        # Can skip all rows exactly when columns are given
384        opts.skip_rows = 4
385        opts.column_names = ['ij', 'kl']
386        table = self.read_bytes(rows, read_options=opts)
387        self.check_names(table, ["ij", "kl"])
388        assert table.to_pydict() == {
389            "ij": [],
390            "kl": [],
391        }
392
393    def test_skip_rows_after_names(self):
394        rows = b"ab,cd\nef,gh\nij,kl\nmn,op\n"
395
396        opts = ReadOptions()
397        opts.skip_rows_after_names = 1
398        table = self.read_bytes(rows, read_options=opts)
399        self.check_names(table, ["ab", "cd"])
400        assert table.to_pydict() == {
401            "ab": ["ij", "mn"],
402            "cd": ["kl", "op"],
403        }
404
405        # Can skip exact number of rows
406        opts.skip_rows_after_names = 3
407        table = self.read_bytes(rows, read_options=opts)
408        self.check_names(table, ["ab", "cd"])
409        assert table.to_pydict() == {
410            "ab": [],
411            "cd": [],
412        }
413
414        # Can skip beyond all rows
415        opts.skip_rows_after_names = 4
416        table = self.read_bytes(rows, read_options=opts)
417        self.check_names(table, ["ab", "cd"])
418        assert table.to_pydict() == {
419            "ab": [],
420            "cd": [],
421        }
422
423        # Can skip rows with a different number of columns
424        rows = b"abcd\n,,,,,\nij,kl\nmn,op\n"
425        opts.skip_rows_after_names = 2
426        opts.column_names = ["f0", "f1"]
427        table = self.read_bytes(rows, read_options=opts)
428        self.check_names(table, ["f0", "f1"])
429        assert table.to_pydict() == {
430            "f0": ["ij", "mn"],
431            "f1": ["kl", "op"],
432        }
433        opts = ReadOptions()
434
435        # Can skip rows with new lines in the value
436        rows = b'ab,cd\n"e\nf","g\n\nh"\n"ij","k\nl"\nmn,op'
437        opts.skip_rows_after_names = 2
438        parse_opts = ParseOptions()
439        parse_opts.newlines_in_values = True
440        table = self.read_bytes(rows, read_options=opts,
441                                parse_options=parse_opts)
442        self.check_names(table, ["ab", "cd"])
443        assert table.to_pydict() == {
444            "ab": ["mn"],
445            "cd": ["op"],
446        }
447
448        # Can skip rows when block ends in middle of quoted value
449        opts.skip_rows_after_names = 2
450        opts.block_size = 26
451        table = self.read_bytes(rows, read_options=opts,
452                                parse_options=parse_opts)
453        self.check_names(table, ["ab", "cd"])
454        assert table.to_pydict() == {
455            "ab": ["mn"],
456            "cd": ["op"],
457        }
458        opts = ReadOptions()
459
460        # Can skip rows that are beyond the first block without lexer
461        rows, expected = make_random_csv(num_cols=5, num_rows=1000)
462        opts.skip_rows_after_names = 900
463        opts.block_size = len(rows) / 11
464        table = self.read_bytes(rows, read_options=opts)
465        assert table.schema == expected.schema
466        assert table.num_rows == 100
467        table_dict = table.to_pydict()
468        for name, values in expected.to_pydict().items():
469            assert values[900:] == table_dict[name]
470
471        # Can skip rows that are beyond the first block with lexer
472        table = self.read_bytes(rows, read_options=opts,
473                                parse_options=parse_opts)
474        assert table.schema == expected.schema
475        assert table.num_rows == 100
476        table_dict = table.to_pydict()
477        for name, values in expected.to_pydict().items():
478            assert values[900:] == table_dict[name]
479
480        # Skip rows and skip rows after names
481        rows, expected = make_random_csv(num_cols=5, num_rows=200,
482                                         write_names=False)
483        opts = ReadOptions()
484        opts.skip_rows = 37
485        opts.skip_rows_after_names = 41
486        opts.column_names = expected.schema.names
487        table = self.read_bytes(rows, read_options=opts,
488                                parse_options=parse_opts)
489        assert table.schema == expected.schema
490        assert (table.num_rows ==
491                expected.num_rows - opts.skip_rows -
492                opts.skip_rows_after_names)
493        table_dict = table.to_pydict()
494        for name, values in expected.to_pydict().items():
495            assert (values[opts.skip_rows + opts.skip_rows_after_names:] ==
496                    table_dict[name])
497
498    def test_row_number_offset_in_errors(self):
499        # Row numbers are only correctly counted in serial reads
500        def format_msg(msg_format, row, *args):
501            if self.use_threads:
502                row_info = ""
503            else:
504                row_info = "Row #{}: ".format(row)
505            return msg_format.format(row_info, *args)
506
507        csv, _ = make_random_csv(4, 100, write_names=True)
508
509        read_options = ReadOptions()
510        read_options.block_size = len(csv) / 3
511        convert_options = ConvertOptions()
512        convert_options.column_types = {"a": pa.int32()}
513
514        # Test without skip_rows and column names in the csv
515        csv_bad_columns = csv + b"1,2\r\n"
516        message_columns = format_msg("{}Expected 4 columns, got 2", 102)
517        with pytest.raises(pa.ArrowInvalid, match=message_columns):
518            self.read_bytes(csv_bad_columns,
519                            read_options=read_options,
520                            convert_options=convert_options)
521
522        csv_bad_type = csv + b"a,b,c,d\r\n"
523        message_value = format_msg(
524            "In CSV column #0: {}"
525            "CSV conversion error to int32: invalid value 'a'",
526            102, csv)
527        with pytest.raises(pa.ArrowInvalid, match=message_value):
528            self.read_bytes(csv_bad_type,
529                            read_options=read_options,
530                            convert_options=convert_options)
531
532        long_row = (b"this is a long row" * 15) + b",3\r\n"
533        csv_bad_columns_long = csv + long_row
534        message_long = format_msg("{}Expected 4 columns, got 2: {} ...", 102,
535                                  long_row[0:96].decode("utf-8"))
536        with pytest.raises(pa.ArrowInvalid, match=message_long):
537            self.read_bytes(csv_bad_columns_long,
538                            read_options=read_options,
539                            convert_options=convert_options)
540
541        # Test skipping rows after the names
542        read_options.skip_rows_after_names = 47
543
544        with pytest.raises(pa.ArrowInvalid, match=message_columns):
545            self.read_bytes(csv_bad_columns,
546                            read_options=read_options,
547                            convert_options=convert_options)
548
549        with pytest.raises(pa.ArrowInvalid, match=message_value):
550            self.read_bytes(csv_bad_type,
551                            read_options=read_options,
552                            convert_options=convert_options)
553
554        with pytest.raises(pa.ArrowInvalid, match=message_long):
555            self.read_bytes(csv_bad_columns_long,
556                            read_options=read_options,
557                            convert_options=convert_options)
558
559        read_options.skip_rows_after_names = 0
560
561        # Test without skip_rows and column names not in the csv
562        csv, _ = make_random_csv(4, 100, write_names=False)
563        read_options.column_names = ["a", "b", "c", "d"]
564        csv_bad_columns = csv + b"1,2\r\n"
565        message_columns = format_msg("{}Expected 4 columns, got 2", 101)
566        with pytest.raises(pa.ArrowInvalid, match=message_columns):
567            self.read_bytes(csv_bad_columns,
568                            read_options=read_options,
569                            convert_options=convert_options)
570
571        csv_bad_columns_long = csv + long_row
572        message_long = format_msg("{}Expected 4 columns, got 2: {} ...", 101,
573                                  long_row[0:96].decode("utf-8"))
574        with pytest.raises(pa.ArrowInvalid, match=message_long):
575            self.read_bytes(csv_bad_columns_long,
576                            read_options=read_options,
577                            convert_options=convert_options)
578
579        csv_bad_type = csv + b"a,b,c,d\r\n"
580        message_value = format_msg(
581            "In CSV column #0: {}"
582            "CSV conversion error to int32: invalid value 'a'",
583            101)
584        message_value = message_value.format(len(csv))
585        with pytest.raises(pa.ArrowInvalid, match=message_value):
586            self.read_bytes(csv_bad_type,
587                            read_options=read_options,
588                            convert_options=convert_options)
589
590        # Test with skip_rows and column names not in the csv
591        read_options.skip_rows = 23
592        with pytest.raises(pa.ArrowInvalid, match=message_columns):
593            self.read_bytes(csv_bad_columns,
594                            read_options=read_options,
595                            convert_options=convert_options)
596
597        with pytest.raises(pa.ArrowInvalid, match=message_value):
598            self.read_bytes(csv_bad_type,
599                            read_options=read_options,
600                            convert_options=convert_options)
601
602
603class BaseCSVTableRead(BaseTestCSV):
604
605    def read_csv(self, csv, *args, validate_full=True, **kwargs):
606        """
607        Reads the CSV file into memory using pyarrow's read_csv
608        csv The CSV bytes
609        args Positional arguments to be forwarded to pyarrow's read_csv
610        validate_full Whether or not to fully validate the resulting table
611        kwargs Keyword arguments to be forwarded to pyarrow's read_csv
612        """
613        assert isinstance(self.use_threads, bool)  # sanity check
614        read_options = kwargs.setdefault('read_options', ReadOptions())
615        read_options.use_threads = self.use_threads
616        table = read_csv(csv, *args, **kwargs)
617        table.validate(full=validate_full)
618        return table
619
620    def read_bytes(self, b, **kwargs):
621        return self.read_csv(pa.py_buffer(b), **kwargs)
622
623    def test_file_object(self):
624        data = b"a,b\n1,2\n"
625        expected_data = {'a': [1], 'b': [2]}
626        bio = io.BytesIO(data)
627        table = self.read_csv(bio)
628        assert table.to_pydict() == expected_data
629        # Text files not allowed
630        sio = io.StringIO(data.decode())
631        with pytest.raises(TypeError):
632            self.read_csv(sio)
633
634    def test_header(self):
635        rows = b"abc,def,gh\n"
636        table = self.read_bytes(rows)
637        assert isinstance(table, pa.Table)
638        self.check_names(table, ["abc", "def", "gh"])
639        assert table.num_rows == 0
640
641    def test_bom(self):
642        rows = b"\xef\xbb\xbfa,b\n1,2\n"
643        expected_data = {'a': [1], 'b': [2]}
644        table = self.read_bytes(rows)
645        assert table.to_pydict() == expected_data
646
647    def test_one_chunk(self):
648        # ARROW-7661: lack of newline at end of file should not produce
649        # an additional chunk.
650        rows = [b"a,b", b"1,2", b"3,4", b"56,78"]
651        for line_ending in [b'\n', b'\r', b'\r\n']:
652            for file_ending in [b'', line_ending]:
653                data = line_ending.join(rows) + file_ending
654                table = self.read_bytes(data)
655                assert len(table.to_batches()) == 1
656                assert table.to_pydict() == {
657                    "a": [1, 3, 56],
658                    "b": [2, 4, 78],
659                }
660
661    def test_header_column_names(self):
662        rows = b"ab,cd\nef,gh\nij,kl\nmn,op\n"
663
664        opts = ReadOptions()
665        opts.column_names = ["x", "y"]
666        table = self.read_bytes(rows, read_options=opts)
667        self.check_names(table, ["x", "y"])
668        assert table.to_pydict() == {
669            "x": ["ab", "ef", "ij", "mn"],
670            "y": ["cd", "gh", "kl", "op"],
671        }
672
673        opts.skip_rows = 3
674        table = self.read_bytes(rows, read_options=opts)
675        self.check_names(table, ["x", "y"])
676        assert table.to_pydict() == {
677            "x": ["mn"],
678            "y": ["op"],
679        }
680
681        opts.skip_rows = 4
682        table = self.read_bytes(rows, read_options=opts)
683        self.check_names(table, ["x", "y"])
684        assert table.to_pydict() == {
685            "x": [],
686            "y": [],
687        }
688
689        opts.skip_rows = 5
690        with pytest.raises(pa.ArrowInvalid):
691            # Not enough rows
692            table = self.read_bytes(rows, read_options=opts)
693
694        # Unexpected number of columns
695        opts.skip_rows = 0
696        opts.column_names = ["x", "y", "z"]
697        with pytest.raises(pa.ArrowInvalid,
698                           match="Expected 3 columns, got 2"):
699            table = self.read_bytes(rows, read_options=opts)
700
701        # Can skip rows with a different number of columns
702        rows = b"abcd\n,,,,,\nij,kl\nmn,op\n"
703        opts.skip_rows = 2
704        opts.column_names = ["x", "y"]
705        table = self.read_bytes(rows, read_options=opts)
706        self.check_names(table, ["x", "y"])
707        assert table.to_pydict() == {
708            "x": ["ij", "mn"],
709            "y": ["kl", "op"],
710        }
711
712    def test_header_autogenerate_column_names(self):
713        rows = b"ab,cd\nef,gh\nij,kl\nmn,op\n"
714
715        opts = ReadOptions()
716        opts.autogenerate_column_names = True
717        table = self.read_bytes(rows, read_options=opts)
718        self.check_names(table, ["f0", "f1"])
719        assert table.to_pydict() == {
720            "f0": ["ab", "ef", "ij", "mn"],
721            "f1": ["cd", "gh", "kl", "op"],
722        }
723
724        opts.skip_rows = 3
725        table = self.read_bytes(rows, read_options=opts)
726        self.check_names(table, ["f0", "f1"])
727        assert table.to_pydict() == {
728            "f0": ["mn"],
729            "f1": ["op"],
730        }
731
732        # Not enough rows, impossible to infer number of columns
733        opts.skip_rows = 4
734        with pytest.raises(pa.ArrowInvalid):
735            table = self.read_bytes(rows, read_options=opts)
736
737    def test_include_columns(self):
738        rows = b"ab,cd\nef,gh\nij,kl\nmn,op\n"
739
740        convert_options = ConvertOptions()
741        convert_options.include_columns = ['ab']
742        table = self.read_bytes(rows, convert_options=convert_options)
743        self.check_names(table, ["ab"])
744        assert table.to_pydict() == {
745            "ab": ["ef", "ij", "mn"],
746        }
747
748        # Order of include_columns is respected, regardless of CSV order
749        convert_options.include_columns = ['cd', 'ab']
750        table = self.read_bytes(rows, convert_options=convert_options)
751        schema = pa.schema([('cd', pa.string()),
752                            ('ab', pa.string())])
753        assert table.schema == schema
754        assert table.to_pydict() == {
755            "cd": ["gh", "kl", "op"],
756            "ab": ["ef", "ij", "mn"],
757        }
758
759        # Include a column not in the CSV file => raises by default
760        convert_options.include_columns = ['xx', 'ab', 'yy']
761        with pytest.raises(KeyError,
762                           match="Column 'xx' in include_columns "
763                                 "does not exist in CSV file"):
764            self.read_bytes(rows, convert_options=convert_options)
765
766    def test_include_missing_columns(self):
767        rows = b"ab,cd\nef,gh\nij,kl\nmn,op\n"
768
769        read_options = ReadOptions()
770        convert_options = ConvertOptions()
771        convert_options.include_columns = ['xx', 'ab', 'yy']
772        convert_options.include_missing_columns = True
773        table = self.read_bytes(rows, read_options=read_options,
774                                convert_options=convert_options)
775        schema = pa.schema([('xx', pa.null()),
776                            ('ab', pa.string()),
777                            ('yy', pa.null())])
778        assert table.schema == schema
779        assert table.to_pydict() == {
780            "xx": [None, None, None],
781            "ab": ["ef", "ij", "mn"],
782            "yy": [None, None, None],
783        }
784
785        # Combining with `column_names`
786        read_options.column_names = ["xx", "yy"]
787        convert_options.include_columns = ["yy", "cd"]
788        table = self.read_bytes(rows, read_options=read_options,
789                                convert_options=convert_options)
790        schema = pa.schema([('yy', pa.string()),
791                            ('cd', pa.null())])
792        assert table.schema == schema
793        assert table.to_pydict() == {
794            "yy": ["cd", "gh", "kl", "op"],
795            "cd": [None, None, None, None],
796        }
797
798        # And with `column_types` as well
799        convert_options.column_types = {"yy": pa.binary(),
800                                        "cd": pa.int32()}
801        table = self.read_bytes(rows, read_options=read_options,
802                                convert_options=convert_options)
803        schema = pa.schema([('yy', pa.binary()),
804                            ('cd', pa.int32())])
805        assert table.schema == schema
806        assert table.to_pydict() == {
807            "yy": [b"cd", b"gh", b"kl", b"op"],
808            "cd": [None, None, None, None],
809        }
810
811    def test_simple_ints(self):
812        # Infer integer columns
813        rows = b"a,b,c\n1,2,3\n4,5,6\n"
814        table = self.read_bytes(rows)
815        schema = pa.schema([('a', pa.int64()),
816                            ('b', pa.int64()),
817                            ('c', pa.int64())])
818        assert table.schema == schema
819        assert table.to_pydict() == {
820            'a': [1, 4],
821            'b': [2, 5],
822            'c': [3, 6],
823        }
824
825    def test_simple_varied(self):
826        # Infer various kinds of data
827        rows = b"a,b,c,d\n1,2,3,0\n4.0,-5,foo,True\n"
828        table = self.read_bytes(rows)
829        schema = pa.schema([('a', pa.float64()),
830                            ('b', pa.int64()),
831                            ('c', pa.string()),
832                            ('d', pa.bool_())])
833        assert table.schema == schema
834        assert table.to_pydict() == {
835            'a': [1.0, 4.0],
836            'b': [2, -5],
837            'c': ["3", "foo"],
838            'd': [False, True],
839        }
840
841    def test_simple_nulls(self):
842        # Infer various kinds of data, with nulls
843        rows = (b"a,b,c,d,e,f\n"
844                b"1,2,,,3,N/A\n"
845                b"nan,-5,foo,,nan,TRUE\n"
846                b"4.5,#N/A,nan,,\xff,false\n")
847        table = self.read_bytes(rows)
848        schema = pa.schema([('a', pa.float64()),
849                            ('b', pa.int64()),
850                            ('c', pa.string()),
851                            ('d', pa.null()),
852                            ('e', pa.binary()),
853                            ('f', pa.bool_())])
854        assert table.schema == schema
855        assert table.to_pydict() == {
856            'a': [1.0, None, 4.5],
857            'b': [2, -5, None],
858            'c': ["", "foo", "nan"],
859            'd': [None, None, None],
860            'e': [b"3", b"nan", b"\xff"],
861            'f': [None, True, False],
862        }
863
864    def test_decimal_point(self):
865        # Infer floats with a custom decimal point
866        parse_options = ParseOptions(delimiter=';')
867        rows = b"a;b\n1.25;2,5\nNA;-3\n-4;NA"
868
869        table = self.read_bytes(rows, parse_options=parse_options)
870        schema = pa.schema([('a', pa.float64()),
871                            ('b', pa.string())])
872        assert table.schema == schema
873        assert table.to_pydict() == {
874            'a': [1.25, None, -4.0],
875            'b': ["2,5", "-3", "NA"],
876        }
877
878        convert_options = ConvertOptions(decimal_point=',')
879        table = self.read_bytes(rows, parse_options=parse_options,
880                                convert_options=convert_options)
881        schema = pa.schema([('a', pa.string()),
882                            ('b', pa.float64())])
883        assert table.schema == schema
884        assert table.to_pydict() == {
885            'a': ["1.25", "NA", "-4"],
886            'b': [2.5, -3.0, None],
887        }
888
889    def test_simple_timestamps(self):
890        # Infer a timestamp column
891        rows = (b"a,b,c\n"
892                b"1970,1970-01-01 00:00:00,1970-01-01 00:00:00.123\n"
893                b"1989,1989-07-14 01:00:00,1989-07-14 01:00:00.123456\n")
894        table = self.read_bytes(rows)
895        schema = pa.schema([('a', pa.int64()),
896                            ('b', pa.timestamp('s')),
897                            ('c', pa.timestamp('ns'))])
898        assert table.schema == schema
899        assert table.to_pydict() == {
900            'a': [1970, 1989],
901            'b': [datetime(1970, 1, 1), datetime(1989, 7, 14, 1)],
902            'c': [datetime(1970, 1, 1, 0, 0, 0, 123000),
903                  datetime(1989, 7, 14, 1, 0, 0, 123456)],
904        }
905
906    def test_timestamp_parsers(self):
907        # Infer timestamps with custom parsers
908        rows = b"a,b\n1970/01/01,1980-01-01 00\n1970/01/02,1980-01-02 00\n"
909        opts = ConvertOptions()
910
911        table = self.read_bytes(rows, convert_options=opts)
912        schema = pa.schema([('a', pa.string()),
913                            ('b', pa.timestamp('s'))])
914        assert table.schema == schema
915        assert table.to_pydict() == {
916            'a': ['1970/01/01', '1970/01/02'],
917            'b': [datetime(1980, 1, 1), datetime(1980, 1, 2)],
918        }
919
920        opts.timestamp_parsers = ['%Y/%m/%d']
921        table = self.read_bytes(rows, convert_options=opts)
922        schema = pa.schema([('a', pa.timestamp('s')),
923                            ('b', pa.string())])
924        assert table.schema == schema
925        assert table.to_pydict() == {
926            'a': [datetime(1970, 1, 1), datetime(1970, 1, 2)],
927            'b': ['1980-01-01 00', '1980-01-02 00'],
928        }
929
930        opts.timestamp_parsers = ['%Y/%m/%d', ISO8601]
931        table = self.read_bytes(rows, convert_options=opts)
932        schema = pa.schema([('a', pa.timestamp('s')),
933                            ('b', pa.timestamp('s'))])
934        assert table.schema == schema
935        assert table.to_pydict() == {
936            'a': [datetime(1970, 1, 1), datetime(1970, 1, 2)],
937            'b': [datetime(1980, 1, 1), datetime(1980, 1, 2)],
938        }
939
940    def test_dates(self):
941        # Dates are inferred as date32 by default
942        rows = b"a,b\n1970-01-01,1970-01-02\n1971-01-01,1971-01-02\n"
943        table = self.read_bytes(rows)
944        schema = pa.schema([('a', pa.date32()),
945                            ('b', pa.date32())])
946        assert table.schema == schema
947        assert table.to_pydict() == {
948            'a': [date(1970, 1, 1), date(1971, 1, 1)],
949            'b': [date(1970, 1, 2), date(1971, 1, 2)],
950        }
951
952        # Can ask for date types explicitly
953        opts = ConvertOptions()
954        opts.column_types = {'a': pa.date32(), 'b': pa.date64()}
955        table = self.read_bytes(rows, convert_options=opts)
956        schema = pa.schema([('a', pa.date32()),
957                            ('b', pa.date64())])
958        assert table.schema == schema
959        assert table.to_pydict() == {
960            'a': [date(1970, 1, 1), date(1971, 1, 1)],
961            'b': [date(1970, 1, 2), date(1971, 1, 2)],
962        }
963
964        # Can ask for timestamp types explicitly
965        opts = ConvertOptions()
966        opts.column_types = {'a': pa.timestamp('s'), 'b': pa.timestamp('ms')}
967        table = self.read_bytes(rows, convert_options=opts)
968        schema = pa.schema([('a', pa.timestamp('s')),
969                            ('b', pa.timestamp('ms'))])
970        assert table.schema == schema
971        assert table.to_pydict() == {
972            'a': [datetime(1970, 1, 1), datetime(1971, 1, 1)],
973            'b': [datetime(1970, 1, 2), datetime(1971, 1, 2)],
974        }
975
976    def test_times(self):
977        # Times are inferred as time32[s] by default
978        from datetime import time
979
980        rows = b"a,b\n12:34:56,12:34:56.789\n23:59:59,23:59:59.999\n"
981        table = self.read_bytes(rows)
982        # Column 'b' has subseconds, so cannot be inferred as time32[s]
983        schema = pa.schema([('a', pa.time32('s')),
984                            ('b', pa.string())])
985        assert table.schema == schema
986        assert table.to_pydict() == {
987            'a': [time(12, 34, 56), time(23, 59, 59)],
988            'b': ["12:34:56.789", "23:59:59.999"],
989        }
990
991        # Can ask for time types explicitly
992        opts = ConvertOptions()
993        opts.column_types = {'a': pa.time64('us'), 'b': pa.time32('ms')}
994        table = self.read_bytes(rows, convert_options=opts)
995        schema = pa.schema([('a', pa.time64('us')),
996                            ('b', pa.time32('ms'))])
997        assert table.schema == schema
998        assert table.to_pydict() == {
999            'a': [time(12, 34, 56), time(23, 59, 59)],
1000            'b': [time(12, 34, 56, 789000), time(23, 59, 59, 999000)],
1001        }
1002
1003    def test_auto_dict_encode(self):
1004        opts = ConvertOptions(auto_dict_encode=True)
1005        rows = "a,b\nab,1\ncdé,2\ncdé,3\nab,4".encode()
1006        table = self.read_bytes(rows, convert_options=opts)
1007        schema = pa.schema([('a', pa.dictionary(pa.int32(), pa.string())),
1008                            ('b', pa.int64())])
1009        expected = {
1010            'a': ["ab", "cdé", "cdé", "ab"],
1011            'b': [1, 2, 3, 4],
1012        }
1013        assert table.schema == schema
1014        assert table.to_pydict() == expected
1015
1016        opts.auto_dict_max_cardinality = 2
1017        table = self.read_bytes(rows, convert_options=opts)
1018        assert table.schema == schema
1019        assert table.to_pydict() == expected
1020
1021        # Cardinality above max => plain-encoded
1022        opts.auto_dict_max_cardinality = 1
1023        table = self.read_bytes(rows, convert_options=opts)
1024        assert table.schema == pa.schema([('a', pa.string()),
1025                                          ('b', pa.int64())])
1026        assert table.to_pydict() == expected
1027
1028        # With invalid UTF8, not checked
1029        opts.auto_dict_max_cardinality = 50
1030        opts.check_utf8 = False
1031        rows = b"a,b\nab,1\ncd\xff,2\nab,3"
1032        table = self.read_bytes(rows, convert_options=opts,
1033                                validate_full=False)
1034        assert table.schema == schema
1035        dict_values = table['a'].chunk(0).dictionary
1036        assert len(dict_values) == 2
1037        assert dict_values[0].as_py() == "ab"
1038        assert dict_values[1].as_buffer() == b"cd\xff"
1039
1040        # With invalid UTF8, checked
1041        opts.check_utf8 = True
1042        table = self.read_bytes(rows, convert_options=opts)
1043        schema = pa.schema([('a', pa.dictionary(pa.int32(), pa.binary())),
1044                            ('b', pa.int64())])
1045        expected = {
1046            'a': [b"ab", b"cd\xff", b"ab"],
1047            'b': [1, 2, 3],
1048        }
1049        assert table.schema == schema
1050        assert table.to_pydict() == expected
1051
1052    def test_custom_nulls(self):
1053        # Infer nulls with custom values
1054        opts = ConvertOptions(null_values=['Xxx', 'Zzz'])
1055        rows = b"""a,b,c,d\nZzz,"Xxx",1,2\nXxx,#N/A,,Zzz\n"""
1056        table = self.read_bytes(rows, convert_options=opts)
1057        schema = pa.schema([('a', pa.null()),
1058                            ('b', pa.string()),
1059                            ('c', pa.string()),
1060                            ('d', pa.int64())])
1061        assert table.schema == schema
1062        assert table.to_pydict() == {
1063            'a': [None, None],
1064            'b': ["Xxx", "#N/A"],
1065            'c': ["1", ""],
1066            'd': [2, None],
1067        }
1068
1069        opts = ConvertOptions(null_values=['Xxx', 'Zzz'],
1070                              strings_can_be_null=True)
1071        table = self.read_bytes(rows, convert_options=opts)
1072        assert table.to_pydict() == {
1073            'a': [None, None],
1074            'b': [None, "#N/A"],
1075            'c': ["1", ""],
1076            'd': [2, None],
1077        }
1078        opts.quoted_strings_can_be_null = False
1079        table = self.read_bytes(rows, convert_options=opts)
1080        assert table.to_pydict() == {
1081            'a': [None, None],
1082            'b': ["Xxx", "#N/A"],
1083            'c': ["1", ""],
1084            'd': [2, None],
1085        }
1086
1087        opts = ConvertOptions(null_values=[])
1088        rows = b"a,b\n#N/A,\n"
1089        table = self.read_bytes(rows, convert_options=opts)
1090        schema = pa.schema([('a', pa.string()),
1091                            ('b', pa.string())])
1092        assert table.schema == schema
1093        assert table.to_pydict() == {
1094            'a': ["#N/A"],
1095            'b': [""],
1096        }
1097
1098    def test_custom_bools(self):
1099        # Infer booleans with custom values
1100        opts = ConvertOptions(true_values=['T', 'yes'],
1101                              false_values=['F', 'no'])
1102        rows = (b"a,b,c\n"
1103                b"True,T,t\n"
1104                b"False,F,f\n"
1105                b"True,yes,yes\n"
1106                b"False,no,no\n"
1107                b"N/A,N/A,N/A\n")
1108        table = self.read_bytes(rows, convert_options=opts)
1109        schema = pa.schema([('a', pa.string()),
1110                            ('b', pa.bool_()),
1111                            ('c', pa.string())])
1112        assert table.schema == schema
1113        assert table.to_pydict() == {
1114            'a': ["True", "False", "True", "False", "N/A"],
1115            'b': [True, False, True, False, None],
1116            'c': ["t", "f", "yes", "no", "N/A"],
1117        }
1118
1119    def test_column_types(self):
1120        # Ask for specific column types in ConvertOptions
1121        opts = ConvertOptions(column_types={'b': 'float32',
1122                                            'c': 'string',
1123                                            'd': 'boolean',
1124                                            'e': pa.decimal128(11, 2),
1125                                            'zz': 'null'})
1126        rows = b"a,b,c,d,e\n1,2,3,true,1.0\n4,-5,6,false,0\n"
1127        table = self.read_bytes(rows, convert_options=opts)
1128        schema = pa.schema([('a', pa.int64()),
1129                            ('b', pa.float32()),
1130                            ('c', pa.string()),
1131                            ('d', pa.bool_()),
1132                            ('e', pa.decimal128(11, 2))])
1133        expected = {
1134            'a': [1, 4],
1135            'b': [2.0, -5.0],
1136            'c': ["3", "6"],
1137            'd': [True, False],
1138            'e': [Decimal("1.00"), Decimal("0.00")]
1139        }
1140        assert table.schema == schema
1141        assert table.to_pydict() == expected
1142        # Pass column_types as schema
1143        opts = ConvertOptions(
1144            column_types=pa.schema([('b', pa.float32()),
1145                                    ('c', pa.string()),
1146                                    ('d', pa.bool_()),
1147                                    ('e', pa.decimal128(11, 2)),
1148                                    ('zz', pa.bool_())]))
1149        table = self.read_bytes(rows, convert_options=opts)
1150        assert table.schema == schema
1151        assert table.to_pydict() == expected
1152        # One of the columns in column_types fails converting
1153        rows = b"a,b,c,d,e\n1,XXX,3,true,5\n4,-5,6,false,7\n"
1154        with pytest.raises(pa.ArrowInvalid) as exc:
1155            self.read_bytes(rows, convert_options=opts)
1156        err = str(exc.value)
1157        assert "In CSV column #1: " in err
1158        assert "CSV conversion error to float: invalid value 'XXX'" in err
1159
1160    def test_column_types_dict(self):
1161        # Ask for dict-encoded column types in ConvertOptions
1162        column_types = [
1163            ('a', pa.dictionary(pa.int32(), pa.utf8())),
1164            ('b', pa.dictionary(pa.int32(), pa.int64())),
1165            ('c', pa.dictionary(pa.int32(), pa.decimal128(11, 2))),
1166            ('d', pa.dictionary(pa.int32(), pa.large_utf8()))]
1167
1168        opts = ConvertOptions(column_types=dict(column_types))
1169        rows = (b"a,b,c,d\n"
1170                b"abc,123456,1.0,zz\n"
1171                b"defg,123456,0.5,xx\n"
1172                b"abc,N/A,1.0,xx\n")
1173        table = self.read_bytes(rows, convert_options=opts)
1174
1175        schema = pa.schema(column_types)
1176        expected = {
1177            'a': ["abc", "defg", "abc"],
1178            'b': [123456, 123456, None],
1179            'c': [Decimal("1.00"), Decimal("0.50"), Decimal("1.00")],
1180            'd': ["zz", "xx", "xx"],
1181        }
1182        assert table.schema == schema
1183        assert table.to_pydict() == expected
1184
1185        # Unsupported index type
1186        column_types[0] = ('a', pa.dictionary(pa.int8(), pa.utf8()))
1187
1188        opts = ConvertOptions(column_types=dict(column_types))
1189        with pytest.raises(NotImplementedError):
1190            table = self.read_bytes(rows, convert_options=opts)
1191
1192    def test_column_types_with_column_names(self):
1193        # When both `column_names` and `column_types` are given, names
1194        # in `column_types` should refer to names in `column_names`
1195        rows = b"a,b\nc,d\ne,f\n"
1196        read_options = ReadOptions(column_names=['x', 'y'])
1197        convert_options = ConvertOptions(column_types={'x': pa.binary()})
1198        table = self.read_bytes(rows, read_options=read_options,
1199                                convert_options=convert_options)
1200        schema = pa.schema([('x', pa.binary()),
1201                            ('y', pa.string())])
1202        assert table.schema == schema
1203        assert table.to_pydict() == {
1204            'x': [b'a', b'c', b'e'],
1205            'y': ['b', 'd', 'f'],
1206        }
1207
1208    def test_no_ending_newline(self):
1209        # No \n after last line
1210        rows = b"a,b,c\n1,2,3\n4,5,6"
1211        table = self.read_bytes(rows)
1212        assert table.to_pydict() == {
1213            'a': [1, 4],
1214            'b': [2, 5],
1215            'c': [3, 6],
1216        }
1217
1218    def test_trivial(self):
1219        # A bit pointless, but at least it shouldn't crash
1220        rows = b",\n\n"
1221        table = self.read_bytes(rows)
1222        assert table.to_pydict() == {'': []}
1223
1224    def test_empty_lines(self):
1225        rows = b"a,b\n\r1,2\r\n\r\n3,4\r\n"
1226        table = self.read_bytes(rows)
1227        assert table.to_pydict() == {
1228            'a': [1, 3],
1229            'b': [2, 4],
1230        }
1231        parse_options = ParseOptions(ignore_empty_lines=False)
1232        table = self.read_bytes(rows, parse_options=parse_options)
1233        assert table.to_pydict() == {
1234            'a': [None, 1, None, 3],
1235            'b': [None, 2, None, 4],
1236        }
1237        read_options = ReadOptions(skip_rows=2)
1238        table = self.read_bytes(rows, parse_options=parse_options,
1239                                read_options=read_options)
1240        assert table.to_pydict() == {
1241            '1': [None, 3],
1242            '2': [None, 4],
1243        }
1244
1245    def test_invalid_csv(self):
1246        # Various CSV errors
1247        rows = b"a,b,c\n1,2\n4,5,6\n"
1248        with pytest.raises(pa.ArrowInvalid, match="Expected 3 columns, got 2"):
1249            self.read_bytes(rows)
1250        rows = b"a,b,c\n1,2,3\n4"
1251        with pytest.raises(pa.ArrowInvalid, match="Expected 3 columns, got 1"):
1252            self.read_bytes(rows)
1253        for rows in [b"", b"\n", b"\r\n", b"\r", b"\n\n"]:
1254            with pytest.raises(pa.ArrowInvalid, match="Empty CSV file"):
1255                self.read_bytes(rows)
1256
1257    def test_options_delimiter(self):
1258        rows = b"a;b,c\nde,fg;eh\n"
1259        table = self.read_bytes(rows)
1260        assert table.to_pydict() == {
1261            'a;b': ['de'],
1262            'c': ['fg;eh'],
1263        }
1264        opts = ParseOptions(delimiter=';')
1265        table = self.read_bytes(rows, parse_options=opts)
1266        assert table.to_pydict() == {
1267            'a': ['de,fg'],
1268            'b,c': ['eh'],
1269        }
1270
1271    def test_small_random_csv(self):
1272        csv, expected = make_random_csv(num_cols=2, num_rows=10)
1273        table = self.read_bytes(csv)
1274        assert table.schema == expected.schema
1275        assert table.equals(expected)
1276        assert table.to_pydict() == expected.to_pydict()
1277
1278    def test_stress_block_sizes(self):
1279        # Test a number of small block sizes to stress block stitching
1280        csv_base, expected = make_random_csv(num_cols=2, num_rows=500)
1281        block_sizes = [11, 12, 13, 17, 37, 111]
1282        csvs = [csv_base, csv_base.rstrip(b'\r\n')]
1283        for csv in csvs:
1284            for block_size in block_sizes:
1285                read_options = ReadOptions(block_size=block_size)
1286                table = self.read_bytes(csv, read_options=read_options)
1287                assert table.schema == expected.schema
1288                if not table.equals(expected):
1289                    # Better error output
1290                    assert table.to_pydict() == expected.to_pydict()
1291
1292    def test_stress_convert_options_blowup(self):
1293        # ARROW-6481: A convert_options with a very large number of columns
1294        # should not blow memory and CPU time.
1295        try:
1296            clock = time.thread_time
1297        except AttributeError:
1298            clock = time.time
1299        num_columns = 10000
1300        col_names = ["K{}".format(i) for i in range(num_columns)]
1301        csv = make_empty_csv(col_names)
1302        t1 = clock()
1303        convert_options = ConvertOptions(
1304            column_types={k: pa.string() for k in col_names[::2]})
1305        table = self.read_bytes(csv, convert_options=convert_options)
1306        dt = clock() - t1
1307        # Check that processing time didn't blow up.
1308        # This is a conservative check (it takes less than 300 ms
1309        # in debug mode on my local machine).
1310        assert dt <= 10.0
1311        # Check result
1312        assert table.num_columns == num_columns
1313        assert table.num_rows == 0
1314        assert table.column_names == col_names
1315
1316    def test_cancellation(self):
1317        if (threading.current_thread().ident !=
1318                threading.main_thread().ident):
1319            pytest.skip("test only works from main Python thread")
1320        # Skips test if not available
1321        raise_signal = util.get_raise_signal()
1322
1323        # Make the interruptible workload large enough to not finish
1324        # before the interrupt comes, even in release mode on fast machines.
1325        last_duration = 0.0
1326        workload_size = 100_000
1327
1328        while last_duration < 1.0:
1329            print("workload size:", workload_size)
1330            large_csv = b"a,b,c\n" + b"1,2,3\n" * workload_size
1331            t1 = time.time()
1332            self.read_bytes(large_csv)
1333            last_duration = time.time() - t1
1334            workload_size = workload_size * 3
1335
1336        def signal_from_thread():
1337            time.sleep(0.2)
1338            raise_signal(signal.SIGINT)
1339
1340        t1 = time.time()
1341        try:
1342            try:
1343                t = threading.Thread(target=signal_from_thread)
1344                with pytest.raises(KeyboardInterrupt) as exc_info:
1345                    t.start()
1346                    self.read_bytes(large_csv)
1347            finally:
1348                t.join()
1349        except KeyboardInterrupt:
1350            # In case KeyboardInterrupt didn't interrupt `self.read_bytes`
1351            # above, at least prevent it from stopping the test suite
1352            pytest.fail("KeyboardInterrupt didn't interrupt CSV reading")
1353        dt = time.time() - t1
1354        # Interruption should have arrived timely
1355        assert dt <= 1.0
1356        e = exc_info.value.__context__
1357        assert isinstance(e, pa.ArrowCancelled)
1358        assert e.signum == signal.SIGINT
1359
1360    def test_cancellation_disabled(self):
1361        # ARROW-12622: reader would segfault when the cancelling signal
1362        # handler was not enabled (e.g. if disabled, or if not on the
1363        # main thread)
1364        t = threading.Thread(
1365            target=lambda: self.read_bytes(b"f64\n0.1"))
1366        t.start()
1367        t.join()
1368
1369
1370class TestSerialCSVTableRead(BaseCSVTableRead):
1371    @property
1372    def use_threads(self):
1373        return False
1374
1375
1376class TestThreadedCSVTableRead(BaseCSVTableRead):
1377    @property
1378    def use_threads(self):
1379        return True
1380
1381
1382class BaseStreamingCSVRead(BaseTestCSV):
1383
1384    def open_csv(self, csv, *args, **kwargs):
1385        """
1386        Reads the CSV file into memory using pyarrow's open_csv
1387        csv The CSV bytes
1388        args Positional arguments to be forwarded to pyarrow's open_csv
1389        kwargs Keyword arguments to be forwarded to pyarrow's open_csv
1390        """
1391        read_options = kwargs.setdefault('read_options', ReadOptions())
1392        read_options.use_threads = self.use_threads
1393        return open_csv(csv, *args, **kwargs)
1394
1395    def open_bytes(self, b, **kwargs):
1396        return self.open_csv(pa.py_buffer(b), **kwargs)
1397
1398    def check_reader(self, reader, expected_schema, expected_data):
1399        assert reader.schema == expected_schema
1400        batches = list(reader)
1401        assert len(batches) == len(expected_data)
1402        for batch, expected_batch in zip(batches, expected_data):
1403            batch.validate(full=True)
1404            assert batch.schema == expected_schema
1405            assert batch.to_pydict() == expected_batch
1406
1407    def read_bytes(self, b, **kwargs):
1408        return self.open_bytes(b, **kwargs).read_all()
1409
1410    def test_file_object(self):
1411        data = b"a,b\n1,2\n3,4\n"
1412        expected_data = {'a': [1, 3], 'b': [2, 4]}
1413        bio = io.BytesIO(data)
1414        reader = self.open_csv(bio)
1415        expected_schema = pa.schema([('a', pa.int64()),
1416                                     ('b', pa.int64())])
1417        self.check_reader(reader, expected_schema, [expected_data])
1418
1419    def test_header(self):
1420        rows = b"abc,def,gh\n"
1421        reader = self.open_bytes(rows)
1422        expected_schema = pa.schema([('abc', pa.null()),
1423                                     ('def', pa.null()),
1424                                     ('gh', pa.null())])
1425        self.check_reader(reader, expected_schema, [])
1426
1427    def test_inference(self):
1428        # Inference is done on first block
1429        rows = b"a,b\n123,456\nabc,de\xff\ngh,ij\n"
1430        expected_schema = pa.schema([('a', pa.string()),
1431                                     ('b', pa.binary())])
1432
1433        read_options = ReadOptions()
1434        read_options.block_size = len(rows)
1435        reader = self.open_bytes(rows, read_options=read_options)
1436        self.check_reader(reader, expected_schema,
1437                          [{'a': ['123', 'abc', 'gh'],
1438                            'b': [b'456', b'de\xff', b'ij']}])
1439
1440        read_options.block_size = len(rows) - 1
1441        reader = self.open_bytes(rows, read_options=read_options)
1442        self.check_reader(reader, expected_schema,
1443                          [{'a': ['123', 'abc'],
1444                            'b': [b'456', b'de\xff']},
1445                           {'a': ['gh'],
1446                            'b': [b'ij']}])
1447
1448    def test_inference_failure(self):
1449        # Inference on first block, then conversion failure on second block
1450        rows = b"a,b\n123,456\nabc,de\xff\ngh,ij\n"
1451        read_options = ReadOptions()
1452        read_options.block_size = len(rows) - 7
1453        reader = self.open_bytes(rows, read_options=read_options)
1454        expected_schema = pa.schema([('a', pa.int64()),
1455                                     ('b', pa.int64())])
1456        assert reader.schema == expected_schema
1457        assert reader.read_next_batch().to_pydict() == {
1458            'a': [123], 'b': [456]
1459        }
1460        # Second block
1461        with pytest.raises(ValueError,
1462                           match="CSV conversion error to int64"):
1463            reader.read_next_batch()
1464        # EOF
1465        with pytest.raises(StopIteration):
1466            reader.read_next_batch()
1467
1468    def test_invalid_csv(self):
1469        # CSV errors on first block
1470        rows = b"a,b\n1,2,3\n4,5\n6,7\n"
1471        read_options = ReadOptions()
1472        read_options.block_size = 10
1473        with pytest.raises(pa.ArrowInvalid,
1474                           match="Expected 2 columns, got 3"):
1475            reader = self.open_bytes(
1476                rows, read_options=read_options)
1477
1478        # CSV errors on second block
1479        rows = b"a,b\n1,2\n3,4,5\n6,7\n"
1480        read_options.block_size = 8
1481        reader = self.open_bytes(rows, read_options=read_options)
1482        assert reader.read_next_batch().to_pydict() == {'a': [1], 'b': [2]}
1483        with pytest.raises(pa.ArrowInvalid,
1484                           match="Expected 2 columns, got 3"):
1485            reader.read_next_batch()
1486        # Cannot continue after a parse error
1487        with pytest.raises(StopIteration):
1488            reader.read_next_batch()
1489
1490    def test_options_delimiter(self):
1491        rows = b"a;b,c\nde,fg;eh\n"
1492        reader = self.open_bytes(rows)
1493        expected_schema = pa.schema([('a;b', pa.string()),
1494                                     ('c', pa.string())])
1495        self.check_reader(reader, expected_schema,
1496                          [{'a;b': ['de'],
1497                            'c': ['fg;eh']}])
1498
1499        opts = ParseOptions(delimiter=';')
1500        reader = self.open_bytes(rows, parse_options=opts)
1501        expected_schema = pa.schema([('a', pa.string()),
1502                                     ('b,c', pa.string())])
1503        self.check_reader(reader, expected_schema,
1504                          [{'a': ['de,fg'],
1505                            'b,c': ['eh']}])
1506
1507    def test_no_ending_newline(self):
1508        # No \n after last line
1509        rows = b"a,b,c\n1,2,3\n4,5,6"
1510        reader = self.open_bytes(rows)
1511        expected_schema = pa.schema([('a', pa.int64()),
1512                                     ('b', pa.int64()),
1513                                     ('c', pa.int64())])
1514        self.check_reader(reader, expected_schema,
1515                          [{'a': [1, 4],
1516                            'b': [2, 5],
1517                            'c': [3, 6]}])
1518
1519    def test_empty_file(self):
1520        with pytest.raises(ValueError, match="Empty CSV file"):
1521            self.open_bytes(b"")
1522
1523    def test_column_options(self):
1524        # With column_names
1525        rows = b"1,2,3\n4,5,6"
1526        read_options = ReadOptions()
1527        read_options.column_names = ['d', 'e', 'f']
1528        reader = self.open_bytes(rows, read_options=read_options)
1529        expected_schema = pa.schema([('d', pa.int64()),
1530                                     ('e', pa.int64()),
1531                                     ('f', pa.int64())])
1532        self.check_reader(reader, expected_schema,
1533                          [{'d': [1, 4],
1534                            'e': [2, 5],
1535                            'f': [3, 6]}])
1536
1537        # With include_columns
1538        convert_options = ConvertOptions()
1539        convert_options.include_columns = ['f', 'e']
1540        reader = self.open_bytes(rows, read_options=read_options,
1541                                 convert_options=convert_options)
1542        expected_schema = pa.schema([('f', pa.int64()),
1543                                     ('e', pa.int64())])
1544        self.check_reader(reader, expected_schema,
1545                          [{'e': [2, 5],
1546                            'f': [3, 6]}])
1547
1548        # With column_types
1549        convert_options.column_types = {'e': pa.string()}
1550        reader = self.open_bytes(rows, read_options=read_options,
1551                                 convert_options=convert_options)
1552        expected_schema = pa.schema([('f', pa.int64()),
1553                                     ('e', pa.string())])
1554        self.check_reader(reader, expected_schema,
1555                          [{'e': ["2", "5"],
1556                            'f': [3, 6]}])
1557
1558        # Missing columns in include_columns
1559        convert_options.include_columns = ['g', 'f', 'e']
1560        with pytest.raises(
1561                KeyError,
1562                match="Column 'g' in include_columns does not exist"):
1563            reader = self.open_bytes(rows, read_options=read_options,
1564                                     convert_options=convert_options)
1565
1566        convert_options.include_missing_columns = True
1567        reader = self.open_bytes(rows, read_options=read_options,
1568                                 convert_options=convert_options)
1569        expected_schema = pa.schema([('g', pa.null()),
1570                                     ('f', pa.int64()),
1571                                     ('e', pa.string())])
1572        self.check_reader(reader, expected_schema,
1573                          [{'g': [None, None],
1574                            'e': ["2", "5"],
1575                            'f': [3, 6]}])
1576
1577        convert_options.column_types = {'e': pa.string(), 'g': pa.float64()}
1578        reader = self.open_bytes(rows, read_options=read_options,
1579                                 convert_options=convert_options)
1580        expected_schema = pa.schema([('g', pa.float64()),
1581                                     ('f', pa.int64()),
1582                                     ('e', pa.string())])
1583        self.check_reader(reader, expected_schema,
1584                          [{'g': [None, None],
1585                            'e': ["2", "5"],
1586                            'f': [3, 6]}])
1587
1588    def test_encoding(self):
1589        # latin-1 (invalid utf-8)
1590        rows = b"a,b\nun,\xe9l\xe9phant"
1591        read_options = ReadOptions()
1592        reader = self.open_bytes(rows, read_options=read_options)
1593        expected_schema = pa.schema([('a', pa.string()),
1594                                     ('b', pa.binary())])
1595        self.check_reader(reader, expected_schema,
1596                          [{'a': ["un"],
1597                            'b': [b"\xe9l\xe9phant"]}])
1598
1599        read_options.encoding = 'latin1'
1600        reader = self.open_bytes(rows, read_options=read_options)
1601        expected_schema = pa.schema([('a', pa.string()),
1602                                     ('b', pa.string())])
1603        self.check_reader(reader, expected_schema,
1604                          [{'a': ["un"],
1605                            'b': ["éléphant"]}])
1606
1607        # utf-16
1608        rows = (b'\xff\xfea\x00,\x00b\x00\n\x00u\x00n\x00,'
1609                b'\x00\xe9\x00l\x00\xe9\x00p\x00h\x00a\x00n\x00t\x00')
1610        read_options.encoding = 'utf16'
1611        reader = self.open_bytes(rows, read_options=read_options)
1612        expected_schema = pa.schema([('a', pa.string()),
1613                                     ('b', pa.string())])
1614        self.check_reader(reader, expected_schema,
1615                          [{'a': ["un"],
1616                            'b': ["éléphant"]}])
1617
1618    def test_small_random_csv(self):
1619        csv, expected = make_random_csv(num_cols=2, num_rows=10)
1620        reader = self.open_bytes(csv)
1621        table = reader.read_all()
1622        assert table.schema == expected.schema
1623        assert table.equals(expected)
1624        assert table.to_pydict() == expected.to_pydict()
1625
1626    def test_stress_block_sizes(self):
1627        # Test a number of small block sizes to stress block stitching
1628        csv_base, expected = make_random_csv(num_cols=2, num_rows=500)
1629        block_sizes = [19, 21, 23, 26, 37, 111]
1630        csvs = [csv_base, csv_base.rstrip(b'\r\n')]
1631        for csv in csvs:
1632            for block_size in block_sizes:
1633                # Need at least two lines for type inference
1634                assert csv[:block_size].count(b'\n') >= 2
1635                read_options = ReadOptions(block_size=block_size)
1636                reader = self.open_bytes(
1637                    csv, read_options=read_options)
1638                table = reader.read_all()
1639                assert table.schema == expected.schema
1640                if not table.equals(expected):
1641                    # Better error output
1642                    assert table.to_pydict() == expected.to_pydict()
1643
1644    def test_batch_lifetime(self):
1645        gc.collect()
1646        old_allocated = pa.total_allocated_bytes()
1647
1648        # Memory occupation should not grow with CSV file size
1649        def check_one_batch(reader, expected):
1650            batch = reader.read_next_batch()
1651            assert batch.to_pydict() == expected
1652
1653        rows = b"10,11\n12,13\n14,15\n16,17\n"
1654        read_options = ReadOptions()
1655        read_options.column_names = ['a', 'b']
1656        read_options.block_size = 6
1657        reader = self.open_bytes(rows, read_options=read_options)
1658        check_one_batch(reader, {'a': [10], 'b': [11]})
1659        allocated_after_first_batch = pa.total_allocated_bytes()
1660        check_one_batch(reader, {'a': [12], 'b': [13]})
1661        assert pa.total_allocated_bytes() <= allocated_after_first_batch
1662        check_one_batch(reader, {'a': [14], 'b': [15]})
1663        assert pa.total_allocated_bytes() <= allocated_after_first_batch
1664        check_one_batch(reader, {'a': [16], 'b': [17]})
1665        assert pa.total_allocated_bytes() <= allocated_after_first_batch
1666        with pytest.raises(StopIteration):
1667            reader.read_next_batch()
1668        assert pa.total_allocated_bytes() == old_allocated
1669        reader = None
1670        assert pa.total_allocated_bytes() == old_allocated
1671
1672    def test_header_skip_rows(self):
1673        super().test_header_skip_rows()
1674
1675        rows = b"ab,cd\nef,gh\nij,kl\nmn,op\n"
1676
1677        # Skipping all rows immediately results in end of iteration
1678        opts = ReadOptions()
1679        opts.skip_rows = 4
1680        opts.column_names = ['ab', 'cd']
1681        reader = self.open_bytes(rows, read_options=opts)
1682        with pytest.raises(StopIteration):
1683            assert reader.read_next_batch()
1684
1685    def test_skip_rows_after_names(self):
1686        super().test_skip_rows_after_names()
1687
1688        rows = b"ab,cd\nef,gh\nij,kl\nmn,op\n"
1689
1690        # Skipping all rows immediately results in end of iteration
1691        opts = ReadOptions()
1692        opts.skip_rows_after_names = 3
1693        reader = self.open_bytes(rows, read_options=opts)
1694        with pytest.raises(StopIteration):
1695            assert reader.read_next_batch()
1696
1697        # Skipping beyond all rows immediately results in end of iteration
1698        opts.skip_rows_after_names = 99999
1699        reader = self.open_bytes(rows, read_options=opts)
1700        with pytest.raises(StopIteration):
1701            assert reader.read_next_batch()
1702
1703
1704class TestSerialStreamingCSVRead(BaseStreamingCSVRead, unittest.TestCase):
1705    @property
1706    def use_threads(self):
1707        return False
1708
1709
1710class TestThreadedStreamingCSVRead(BaseStreamingCSVRead, unittest.TestCase):
1711    @property
1712    def use_threads(self):
1713        return True
1714
1715
1716class BaseTestCompressedCSVRead:
1717
1718    def setUp(self):
1719        self.tmpdir = tempfile.mkdtemp(prefix='arrow-csv-test-')
1720
1721    def tearDown(self):
1722        shutil.rmtree(self.tmpdir)
1723
1724    def read_csv(self, csv_path):
1725        try:
1726            return read_csv(csv_path)
1727        except pa.ArrowNotImplementedError as e:
1728            pytest.skip(str(e))
1729
1730    def test_random_csv(self):
1731        csv, expected = make_random_csv(num_cols=2, num_rows=100)
1732        csv_path = os.path.join(self.tmpdir, self.csv_filename)
1733        self.write_file(csv_path, csv)
1734        table = self.read_csv(csv_path)
1735        table.validate(full=True)
1736        assert table.schema == expected.schema
1737        assert table.equals(expected)
1738        assert table.to_pydict() == expected.to_pydict()
1739
1740
1741class TestGZipCSVRead(BaseTestCompressedCSVRead, unittest.TestCase):
1742    csv_filename = "compressed.csv.gz"
1743
1744    def write_file(self, path, contents):
1745        with gzip.open(path, 'wb', 3) as f:
1746            f.write(contents)
1747
1748    def test_concatenated(self):
1749        # ARROW-5974
1750        csv_path = os.path.join(self.tmpdir, self.csv_filename)
1751        with gzip.open(csv_path, 'wb', 3) as f:
1752            f.write(b"ab,cd\nef,gh\n")
1753        with gzip.open(csv_path, 'ab', 3) as f:
1754            f.write(b"ij,kl\nmn,op\n")
1755        table = self.read_csv(csv_path)
1756        assert table.to_pydict() == {
1757            'ab': ['ef', 'ij', 'mn'],
1758            'cd': ['gh', 'kl', 'op'],
1759        }
1760
1761
1762class TestBZ2CSVRead(BaseTestCompressedCSVRead, unittest.TestCase):
1763    csv_filename = "compressed.csv.bz2"
1764
1765    def write_file(self, path, contents):
1766        with bz2.BZ2File(path, 'w') as f:
1767            f.write(contents)
1768
1769
1770def test_read_csv_does_not_close_passed_file_handles():
1771    # ARROW-4823
1772    buf = io.BytesIO(b"a,b,c\n1,2,3\n4,5,6")
1773    read_csv(buf)
1774    assert not buf.closed
1775
1776
1777def test_write_read_round_trip():
1778    t = pa.Table.from_arrays([[1, 2, 3], ["a", "b", "c"]], ["c1", "c2"])
1779    record_batch = t.to_batches(max_chunksize=4)[0]
1780    for data in [t, record_batch]:
1781        # Test with header
1782        buf = io.BytesIO()
1783        write_csv(data, buf, WriteOptions(include_header=True))
1784        buf.seek(0)
1785        assert t == read_csv(buf)
1786
1787        # Test without header
1788        buf = io.BytesIO()
1789        write_csv(data, buf, WriteOptions(include_header=False))
1790        buf.seek(0)
1791
1792        read_options = ReadOptions(column_names=t.column_names)
1793        assert t == read_csv(buf, read_options=read_options)
1794
1795    # Test with writer
1796    for read_options, write_options in [
1797        (None, WriteOptions(include_header=True)),
1798        (ReadOptions(column_names=t.column_names),
1799         WriteOptions(include_header=False)),
1800    ]:
1801        buf = io.BytesIO()
1802        with CSVWriter(buf, t.schema, write_options=write_options) as writer:
1803            writer.write_table(t)
1804        buf.seek(0)
1805        assert t == read_csv(buf, read_options=read_options)
1806
1807        buf = io.BytesIO()
1808        with CSVWriter(buf, t.schema, write_options=write_options) as writer:
1809            for batch in t.to_batches(max_chunksize=1):
1810                writer.write_batch(batch)
1811        buf.seek(0)
1812        assert t == read_csv(buf, read_options=read_options)
1813
1814
1815def test_read_csv_reference_cycle():
1816    # ARROW-13187
1817    def inner():
1818        buf = io.BytesIO(b"a,b,c\n1,2,3\n4,5,6")
1819        table = read_csv(buf)
1820        return weakref.ref(table)
1821
1822    with util.disabled_gc():
1823        wr = inner()
1824        assert wr() is None
1825