1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements.  See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership.  The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License.  You may obtain a copy of the License at
8#
9#   http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied.  See the License for the
15# specific language governing permissions and limitations
16# under the License.
17
18# cython: profile=False
19# distutils: language = c++
20# cython: language_level = 3
21
22from cython.operator cimport dereference as deref
23
24import codecs
25from collections.abc import Mapping
26
27from pyarrow.includes.common cimport *
28from pyarrow.includes.libarrow cimport *
29from pyarrow.lib cimport (check_status, Field, MemoryPool, Schema,
30                          RecordBatchReader, ensure_type,
31                          maybe_unbox_memory_pool, get_input_stream,
32                          get_writer, native_transcoding_input_stream,
33                          pyarrow_unwrap_batch, pyarrow_unwrap_schema,
34                          pyarrow_unwrap_table, pyarrow_wrap_schema,
35                          pyarrow_wrap_table, pyarrow_wrap_data_type,
36                          pyarrow_unwrap_data_type, Table, RecordBatch,
37                          StopToken, _CRecordBatchWriter)
38from pyarrow.lib import frombytes, tobytes, SignalStopHandler
39from pyarrow.util import _stringify_path
40
41
42cdef unsigned char _single_char(s) except 0:
43    val = ord(s)
44    if val == 0 or val > 127:
45        raise ValueError("Expecting an ASCII character")
46    return <unsigned char> val
47
48
49cdef class ReadOptions(_Weakrefable):
50    """
51    Options for reading CSV files.
52
53    Parameters
54    ----------
55    use_threads : bool, optional (default True)
56        Whether to use multiple threads to accelerate reading
57    block_size : int, optional
58        How much bytes to process at a time from the input stream.
59        This will determine multi-threading granularity as well as
60        the size of individual record batches or table chunks.
61        Minimum valid value for block size is 1
62    skip_rows : int, optional (default 0)
63        The number of rows to skip before the column names (if any)
64        and the CSV data.
65    skip_rows_after_names : int, optional (default 0)
66        The number of rows to skip after the column names.
67        This number can be larger than the number of rows in one
68        block, and empty rows are counted.
69        The order of application is as follows:
70        - `skip_rows` is applied (if non-zero);
71        - column names aread (unless `column_names` is set);
72        - `skip_rows_after_names` is applied (if non-zero).
73    column_names : list, optional
74        The column names of the target table.  If empty, fall back on
75        `autogenerate_column_names`.
76    autogenerate_column_names : bool, optional (default False)
77        Whether to autogenerate column names if `column_names` is empty.
78        If true, column names will be of the form "f0", "f1"...
79        If false, column names will be read from the first CSV row
80        after `skip_rows`.
81    encoding : str, optional (default 'utf8')
82        The character encoding of the CSV data.  Columns that cannot
83        decode using this encoding can still be read as Binary.
84    """
85
86    # Avoid mistakingly creating attributes
87    __slots__ = ()
88
89    # __init__() is not called when unpickling, initialize storage here
90    def __cinit__(self, *argw, **kwargs):
91        self.options.reset(new CCSVReadOptions(CCSVReadOptions.Defaults()))
92
93    def __init__(self, *, use_threads=None, block_size=None, skip_rows=None,
94                 column_names=None, autogenerate_column_names=None,
95                 encoding='utf8', skip_rows_after_names=None):
96        if use_threads is not None:
97            self.use_threads = use_threads
98        if block_size is not None:
99            self.block_size = block_size
100        if skip_rows is not None:
101            self.skip_rows = skip_rows
102        if column_names is not None:
103            self.column_names = column_names
104        if autogenerate_column_names is not None:
105            self.autogenerate_column_names= autogenerate_column_names
106        # Python-specific option
107        self.encoding = encoding
108        if skip_rows_after_names is not None:
109            self.skip_rows_after_names = skip_rows_after_names
110
111    @property
112    def use_threads(self):
113        """
114        Whether to use multiple threads to accelerate reading.
115        """
116        return deref(self.options).use_threads
117
118    @use_threads.setter
119    def use_threads(self, value):
120        deref(self.options).use_threads = value
121
122    @property
123    def block_size(self):
124        """
125        How much bytes to process at a time from the input stream.
126        This will determine multi-threading granularity as well as
127        the size of individual record batches or table chunks.
128        """
129        return deref(self.options).block_size
130
131    @block_size.setter
132    def block_size(self, value):
133        deref(self.options).block_size = value
134
135    @property
136    def skip_rows(self):
137        """
138        The number of rows to skip before the column names (if any)
139        and the CSV data.
140        See `skip_rows_after_names` for interaction description
141        """
142        return deref(self.options).skip_rows
143
144    @skip_rows.setter
145    def skip_rows(self, value):
146        deref(self.options).skip_rows = value
147
148    @property
149    def column_names(self):
150        """
151        The column names of the target table.  If empty, fall back on
152        `autogenerate_column_names`.
153        """
154        return [frombytes(s) for s in deref(self.options).column_names]
155
156    @column_names.setter
157    def column_names(self, value):
158        deref(self.options).column_names.clear()
159        for item in value:
160            deref(self.options).column_names.push_back(tobytes(item))
161
162    @property
163    def autogenerate_column_names(self):
164        """
165        Whether to autogenerate column names if `column_names` is empty.
166        If true, column names will be of the form "f0", "f1"...
167        If false, column names will be read from the first CSV row
168        after `skip_rows`.
169        """
170        return deref(self.options).autogenerate_column_names
171
172    @autogenerate_column_names.setter
173    def autogenerate_column_names(self, value):
174        deref(self.options).autogenerate_column_names = value
175
176    @property
177    def skip_rows_after_names(self):
178        """
179        The number of rows to skip after the column names.
180        This number can be larger than the number of rows in one
181        block, and empty rows are counted.
182        The order of application is as follows:
183        - `skip_rows` is applied (if non-zero);
184        - column names aread (unless `column_names` is set);
185        - `skip_rows_after_names` is applied (if non-zero).
186        """
187        return deref(self.options).skip_rows_after_names
188
189    @skip_rows_after_names.setter
190    def skip_rows_after_names(self, value):
191        deref(self.options).skip_rows_after_names = value
192
193    def validate(self):
194        check_status(deref(self.options).Validate())
195
196    def equals(self, ReadOptions other):
197        return (
198            self.use_threads == other.use_threads and
199            self.block_size == other.block_size and
200            self.skip_rows == other.skip_rows and
201            self.column_names == other.column_names and
202            self.autogenerate_column_names ==
203            other.autogenerate_column_names and
204            self.encoding == other.encoding and
205            self.skip_rows_after_names == other.skip_rows_after_names
206        )
207
208    @staticmethod
209    cdef ReadOptions wrap(CCSVReadOptions options):
210        out = ReadOptions()
211        out.options.reset(new CCSVReadOptions(move(options)))
212        out.encoding = 'utf8'  # No way to know this
213        return out
214
215    def __getstate__(self):
216        return (self.use_threads, self.block_size, self.skip_rows,
217                self.column_names, self.autogenerate_column_names,
218                self.encoding, self.skip_rows_after_names)
219
220    def __setstate__(self, state):
221        (self.use_threads, self.block_size, self.skip_rows,
222         self.column_names, self.autogenerate_column_names,
223         self.encoding, self.skip_rows_after_names) = state
224
225    def __eq__(self, other):
226        try:
227            return self.equals(other)
228        except TypeError:
229            return False
230
231
232cdef class ParseOptions(_Weakrefable):
233    """
234    Options for parsing CSV files.
235
236    Parameters
237    ----------
238    delimiter : 1-character string, optional (default ',')
239        The character delimiting individual cells in the CSV data.
240    quote_char : 1-character string or False, optional (default '"')
241        The character used optionally for quoting CSV values
242        (False if quoting is not allowed).
243    double_quote : bool, optional (default True)
244        Whether two quotes in a quoted CSV value denote a single quote
245        in the data.
246    escape_char : 1-character string or False, optional (default False)
247        The character used optionally for escaping special characters
248        (False if escaping is not allowed).
249    newlines_in_values : bool, optional (default False)
250        Whether newline characters are allowed in CSV values.
251        Setting this to True reduces the performance of multi-threaded
252        CSV reading.
253    ignore_empty_lines : bool, optional (default True)
254        Whether empty lines are ignored in CSV input.
255        If False, an empty line is interpreted as containing a single empty
256        value (assuming a one-column CSV file).
257    """
258    __slots__ = ()
259
260    def __cinit__(self, *argw, **kwargs):
261        self.options.reset(new CCSVParseOptions(CCSVParseOptions.Defaults()))
262
263    def __init__(self, *, delimiter=None, quote_char=None, double_quote=None,
264                 escape_char=None, newlines_in_values=None,
265                 ignore_empty_lines=None):
266        if delimiter is not None:
267            self.delimiter = delimiter
268        if quote_char is not None:
269            self.quote_char = quote_char
270        if double_quote is not None:
271            self.double_quote = double_quote
272        if escape_char is not None:
273            self.escape_char = escape_char
274        if newlines_in_values is not None:
275            self.newlines_in_values = newlines_in_values
276        if ignore_empty_lines is not None:
277            self.ignore_empty_lines = ignore_empty_lines
278
279    @property
280    def delimiter(self):
281        """
282        The character delimiting individual cells in the CSV data.
283        """
284        return chr(deref(self.options).delimiter)
285
286    @delimiter.setter
287    def delimiter(self, value):
288        deref(self.options).delimiter = _single_char(value)
289
290    @property
291    def quote_char(self):
292        """
293        The character used optionally for quoting CSV values
294        (False if quoting is not allowed).
295        """
296        if deref(self.options).quoting:
297            return chr(deref(self.options).quote_char)
298        else:
299            return False
300
301    @quote_char.setter
302    def quote_char(self, value):
303        if value is False:
304            deref(self.options).quoting = False
305        else:
306            deref(self.options).quote_char = _single_char(value)
307            deref(self.options).quoting = True
308
309    @property
310    def double_quote(self):
311        """
312        Whether two quotes in a quoted CSV value denote a single quote
313        in the data.
314        """
315        return deref(self.options).double_quote
316
317    @double_quote.setter
318    def double_quote(self, value):
319        deref(self.options).double_quote = value
320
321    @property
322    def escape_char(self):
323        """
324        The character used optionally for escaping special characters
325        (False if escaping is not allowed).
326        """
327        if deref(self.options).escaping:
328            return chr(deref(self.options).escape_char)
329        else:
330            return False
331
332    @escape_char.setter
333    def escape_char(self, value):
334        if value is False:
335            deref(self.options).escaping = False
336        else:
337            deref(self.options).escape_char = _single_char(value)
338            deref(self.options).escaping = True
339
340    @property
341    def newlines_in_values(self):
342        """
343        Whether newline characters are allowed in CSV values.
344        Setting this to True reduces the performance of multi-threaded
345        CSV reading.
346        """
347        return deref(self.options).newlines_in_values
348
349    @newlines_in_values.setter
350    def newlines_in_values(self, value):
351        deref(self.options).newlines_in_values = value
352
353    @property
354    def ignore_empty_lines(self):
355        """
356        Whether empty lines are ignored in CSV input.
357        If False, an empty line is interpreted as containing a single empty
358        value (assuming a one-column CSV file).
359        """
360        return deref(self.options).ignore_empty_lines
361
362    @ignore_empty_lines.setter
363    def ignore_empty_lines(self, value):
364        deref(self.options).ignore_empty_lines = value
365
366    def validate(self):
367        check_status(deref(self.options).Validate())
368
369    def equals(self, ParseOptions other):
370        return (
371            self.delimiter == other.delimiter and
372            self.quote_char == other.quote_char and
373            self.double_quote == other.double_quote and
374            self.escape_char == other.escape_char and
375            self.newlines_in_values == other.newlines_in_values and
376            self.ignore_empty_lines == other.ignore_empty_lines
377        )
378
379    @staticmethod
380    cdef ParseOptions wrap(CCSVParseOptions options):
381        out = ParseOptions()
382        out.options.reset(new CCSVParseOptions(move(options)))
383        return out
384
385    def __getstate__(self):
386        return (self.delimiter, self.quote_char, self.double_quote,
387                self.escape_char, self.newlines_in_values,
388                self.ignore_empty_lines)
389
390    def __setstate__(self, state):
391        (self.delimiter, self.quote_char, self.double_quote,
392         self.escape_char, self.newlines_in_values,
393         self.ignore_empty_lines) = state
394
395    def __eq__(self, other):
396        try:
397            return self.equals(other)
398        except TypeError:
399            return False
400
401
402cdef class _ISO8601(_Weakrefable):
403    """
404    A special object indicating ISO-8601 parsing.
405    """
406    __slots__ = ()
407
408    def __str__(self):
409        return 'ISO8601'
410
411    def __eq__(self, other):
412        return isinstance(other, _ISO8601)
413
414
415ISO8601 = _ISO8601()
416
417
418cdef class ConvertOptions(_Weakrefable):
419    """
420    Options for converting CSV data.
421
422    Parameters
423    ----------
424    check_utf8 : bool, optional (default True)
425        Whether to check UTF8 validity of string columns.
426    column_types : pa.Schema or dict, optional
427        Explicitly map column names to column types. Passing this argument
428        disables type inference on the defined columns.
429    null_values : list, optional
430        A sequence of strings that denote nulls in the data
431        (defaults are appropriate in most cases). Note that by default,
432        string columns are not checked for null values. To enable
433        null checking for those, specify ``strings_can_be_null=True``.
434    true_values : list, optional
435        A sequence of strings that denote true booleans in the data
436        (defaults are appropriate in most cases).
437    false_values : list, optional
438        A sequence of strings that denote false booleans in the data
439        (defaults are appropriate in most cases).
440    decimal_point : 1-character string, optional (default '.')
441        The character used as decimal point in floating-point and decimal
442        data.
443    timestamp_parsers : list, optional
444        A sequence of strptime()-compatible format strings, tried in order
445        when attempting to infer or convert timestamp values (the special
446        value ISO8601() can also be given).  By default, a fast built-in
447        ISO-8601 parser is used.
448    strings_can_be_null : bool, optional (default False)
449        Whether string / binary columns can have null values.
450        If true, then strings in null_values are considered null for
451        string columns.
452        If false, then all strings are valid string values.
453    quoted_strings_can_be_null : bool, optional (default True)
454        Whether quoted values can be null.
455        If true, then strings in "null_values" are also considered null
456        when they appear quoted in the CSV file. Otherwise, quoted values
457        are never considered null.
458    auto_dict_encode : bool, optional (default False)
459        Whether to try to automatically dict-encode string / binary data.
460        If true, then when type inference detects a string or binary column,
461        it it dict-encoded up to `auto_dict_max_cardinality` distinct values
462        (per chunk), after which it switches to regular encoding.
463        This setting is ignored for non-inferred columns (those in
464        `column_types`).
465    auto_dict_max_cardinality : int, optional
466        The maximum dictionary cardinality for `auto_dict_encode`.
467        This value is per chunk.
468    include_columns : list, optional
469        The names of columns to include in the Table.
470        If empty, the Table will include all columns from the CSV file.
471        If not empty, only these columns will be included, in this order.
472    include_missing_columns : bool, optional (default False)
473        If false, columns in `include_columns` but not in the CSV file will
474        error out.
475        If true, columns in `include_columns` but not in the CSV file will
476        produce a column of nulls (whose type is selected using
477        `column_types`, or null by default).
478        This option is ignored if `include_columns` is empty.
479    """
480    # Avoid mistakingly creating attributes
481    __slots__ = ()
482
483    def __cinit__(self, *argw, **kwargs):
484        self.options.reset(
485            new CCSVConvertOptions(CCSVConvertOptions.Defaults()))
486
487    def __init__(self, *, check_utf8=None, column_types=None, null_values=None,
488                 true_values=None, false_values=None, decimal_point=None,
489                 strings_can_be_null=None, quoted_strings_can_be_null=None,
490                 include_columns=None, include_missing_columns=None,
491                 auto_dict_encode=None, auto_dict_max_cardinality=None,
492                 timestamp_parsers=None):
493        if check_utf8 is not None:
494            self.check_utf8 = check_utf8
495        if column_types is not None:
496            self.column_types = column_types
497        if null_values is not None:
498            self.null_values = null_values
499        if true_values is not None:
500            self.true_values = true_values
501        if false_values is not None:
502            self.false_values = false_values
503        if decimal_point is not None:
504            self.decimal_point = decimal_point
505        if strings_can_be_null is not None:
506            self.strings_can_be_null = strings_can_be_null
507        if quoted_strings_can_be_null is not None:
508            self.quoted_strings_can_be_null = quoted_strings_can_be_null
509        if include_columns is not None:
510            self.include_columns = include_columns
511        if include_missing_columns is not None:
512            self.include_missing_columns = include_missing_columns
513        if auto_dict_encode is not None:
514            self.auto_dict_encode = auto_dict_encode
515        if auto_dict_max_cardinality is not None:
516            self.auto_dict_max_cardinality = auto_dict_max_cardinality
517        if timestamp_parsers is not None:
518            self.timestamp_parsers = timestamp_parsers
519
520    @property
521    def check_utf8(self):
522        """
523        Whether to check UTF8 validity of string columns.
524        """
525        return deref(self.options).check_utf8
526
527    @check_utf8.setter
528    def check_utf8(self, value):
529        deref(self.options).check_utf8 = value
530
531    @property
532    def strings_can_be_null(self):
533        """
534        Whether string / binary columns can have null values.
535        """
536        return deref(self.options).strings_can_be_null
537
538    @strings_can_be_null.setter
539    def strings_can_be_null(self, value):
540        deref(self.options).strings_can_be_null = value
541
542    @property
543    def quoted_strings_can_be_null(self):
544        """
545        Whether quoted values can be null.
546        """
547        return deref(self.options).quoted_strings_can_be_null
548
549    @quoted_strings_can_be_null.setter
550    def quoted_strings_can_be_null(self, value):
551        deref(self.options).quoted_strings_can_be_null = value
552
553    @property
554    def column_types(self):
555        """
556        Explicitly map column names to column types.
557        """
558        d = {frombytes(item.first): pyarrow_wrap_data_type(item.second)
559             for item in deref(self.options).column_types}
560        return d
561
562    @column_types.setter
563    def column_types(self, value):
564        cdef:
565            shared_ptr[CDataType] typ
566
567        if isinstance(value, Mapping):
568            value = value.items()
569
570        deref(self.options).column_types.clear()
571        for item in value:
572            if isinstance(item, Field):
573                k = item.name
574                v = item.type
575            else:
576                k, v = item
577            typ = pyarrow_unwrap_data_type(ensure_type(v))
578            assert typ != NULL
579            deref(self.options).column_types[tobytes(k)] = typ
580
581    @property
582    def null_values(self):
583        """
584        A sequence of strings that denote nulls in the data.
585        """
586        return [frombytes(x) for x in deref(self.options).null_values]
587
588    @null_values.setter
589    def null_values(self, value):
590        deref(self.options).null_values = [tobytes(x) for x in value]
591
592    @property
593    def true_values(self):
594        """
595        A sequence of strings that denote true booleans in the data.
596        """
597        return [frombytes(x) for x in deref(self.options).true_values]
598
599    @true_values.setter
600    def true_values(self, value):
601        deref(self.options).true_values = [tobytes(x) for x in value]
602
603    @property
604    def false_values(self):
605        """
606        A sequence of strings that denote false booleans in the data.
607        """
608        return [frombytes(x) for x in deref(self.options).false_values]
609
610    @false_values.setter
611    def false_values(self, value):
612        deref(self.options).false_values = [tobytes(x) for x in value]
613
614    @property
615    def decimal_point(self):
616        """
617        The character used as decimal point in floating-point and decimal
618        data.
619        """
620        return chr(deref(self.options).decimal_point)
621
622    @decimal_point.setter
623    def decimal_point(self, value):
624        deref(self.options).decimal_point = _single_char(value)
625
626    @property
627    def auto_dict_encode(self):
628        """
629        Whether to try to automatically dict-encode string / binary data.
630        """
631        return deref(self.options).auto_dict_encode
632
633    @auto_dict_encode.setter
634    def auto_dict_encode(self, value):
635        deref(self.options).auto_dict_encode = value
636
637    @property
638    def auto_dict_max_cardinality(self):
639        """
640        The maximum dictionary cardinality for `auto_dict_encode`.
641
642        This value is per chunk.
643        """
644        return deref(self.options).auto_dict_max_cardinality
645
646    @auto_dict_max_cardinality.setter
647    def auto_dict_max_cardinality(self, value):
648        deref(self.options).auto_dict_max_cardinality = value
649
650    @property
651    def include_columns(self):
652        """
653        The names of columns to include in the Table.
654
655        If empty, the Table will include all columns from the CSV file.
656        If not empty, only these columns will be included, in this order.
657        """
658        return [frombytes(s) for s in deref(self.options).include_columns]
659
660    @include_columns.setter
661    def include_columns(self, value):
662        deref(self.options).include_columns.clear()
663        for item in value:
664            deref(self.options).include_columns.push_back(tobytes(item))
665
666    @property
667    def include_missing_columns(self):
668        """
669        If false, columns in `include_columns` but not in the CSV file will
670        error out.
671        If true, columns in `include_columns` but not in the CSV file will
672        produce a null column (whose type is selected using `column_types`,
673        or null by default).
674        This option is ignored if `include_columns` is empty.
675        """
676        return deref(self.options).include_missing_columns
677
678    @include_missing_columns.setter
679    def include_missing_columns(self, value):
680        deref(self.options).include_missing_columns = value
681
682    @property
683    def timestamp_parsers(self):
684        """
685        A sequence of strptime()-compatible format strings, tried in order
686        when attempting to infer or convert timestamp values (the special
687        value ISO8601() can also be given).  By default, a fast built-in
688        ISO-8601 parser is used.
689        """
690        cdef:
691            shared_ptr[CTimestampParser] c_parser
692            c_string kind
693
694        parsers = []
695        for c_parser in deref(self.options).timestamp_parsers:
696            kind = deref(c_parser).kind()
697            if kind == b'strptime':
698                parsers.append(frombytes(deref(c_parser).format()))
699            else:
700                assert kind == b'iso8601'
701                parsers.append(ISO8601)
702
703        return parsers
704
705    @timestamp_parsers.setter
706    def timestamp_parsers(self, value):
707        cdef:
708            vector[shared_ptr[CTimestampParser]] c_parsers
709
710        for v in value:
711            if isinstance(v, str):
712                c_parsers.push_back(CTimestampParser.MakeStrptime(tobytes(v)))
713            elif v == ISO8601:
714                c_parsers.push_back(CTimestampParser.MakeISO8601())
715            else:
716                raise TypeError("Expected list of str or ISO8601 objects")
717
718        deref(self.options).timestamp_parsers = move(c_parsers)
719
720    @staticmethod
721    cdef ConvertOptions wrap(CCSVConvertOptions options):
722        out = ConvertOptions()
723        out.options.reset(new CCSVConvertOptions(move(options)))
724        return out
725
726    def validate(self):
727        check_status(deref(self.options).Validate())
728
729    def equals(self, ConvertOptions other):
730        return (
731            self.check_utf8 == other.check_utf8 and
732            self.column_types == other.column_types and
733            self.null_values == other.null_values and
734            self.true_values == other.true_values and
735            self.false_values == other.false_values and
736            self.decimal_point == other.decimal_point and
737            self.timestamp_parsers == other.timestamp_parsers and
738            self.strings_can_be_null == other.strings_can_be_null and
739            self.quoted_strings_can_be_null ==
740            other.quoted_strings_can_be_null and
741            self.auto_dict_encode == other.auto_dict_encode and
742            self.auto_dict_max_cardinality ==
743            other.auto_dict_max_cardinality and
744            self.include_columns == other.include_columns and
745            self.include_missing_columns == other.include_missing_columns
746        )
747
748    def __getstate__(self):
749        return (self.check_utf8, self.column_types, self.null_values,
750                self.true_values, self.false_values, self.decimal_point,
751                self.timestamp_parsers, self.strings_can_be_null,
752                self.quoted_strings_can_be_null, self.auto_dict_encode,
753                self.auto_dict_max_cardinality, self.include_columns,
754                self.include_missing_columns)
755
756    def __setstate__(self, state):
757        (self.check_utf8, self.column_types, self.null_values,
758         self.true_values, self.false_values, self.decimal_point,
759         self.timestamp_parsers, self.strings_can_be_null,
760         self.quoted_strings_can_be_null, self.auto_dict_encode,
761         self.auto_dict_max_cardinality, self.include_columns,
762         self.include_missing_columns) = state
763
764    def __eq__(self, other):
765        try:
766            return self.equals(other)
767        except TypeError:
768            return False
769
770
771cdef _get_reader(input_file, ReadOptions read_options,
772                 shared_ptr[CInputStream]* out):
773    use_memory_map = False
774    get_input_stream(input_file, use_memory_map, out)
775    if read_options is not None:
776        out[0] = native_transcoding_input_stream(out[0],
777                                                 read_options.encoding,
778                                                 'utf8')
779
780
781cdef _get_read_options(ReadOptions read_options, CCSVReadOptions* out):
782    if read_options is None:
783        out[0] = CCSVReadOptions.Defaults()
784    else:
785        out[0] = deref(read_options.options)
786
787
788cdef _get_parse_options(ParseOptions parse_options, CCSVParseOptions* out):
789    if parse_options is None:
790        out[0] = CCSVParseOptions.Defaults()
791    else:
792        out[0] = deref(parse_options.options)
793
794
795cdef _get_convert_options(ConvertOptions convert_options,
796                          CCSVConvertOptions* out):
797    if convert_options is None:
798        out[0] = CCSVConvertOptions.Defaults()
799    else:
800        out[0] = deref(convert_options.options)
801
802
803cdef class CSVStreamingReader(RecordBatchReader):
804    """An object that reads record batches incrementally from a CSV file.
805
806    Should not be instantiated directly by user code.
807    """
808    cdef readonly:
809        Schema schema
810
811    def __init__(self):
812        raise TypeError("Do not call {}'s constructor directly, "
813                        "use pyarrow.csv.open_csv() instead."
814                        .format(self.__class__.__name__))
815
816    # Note about cancellation: we cannot create a SignalStopHandler
817    # by default here, as several CSVStreamingReader instances may be
818    # created (including by the same thread).  Handling cancellation
819    # would require having the user pass the SignalStopHandler.
820    # (in addition to solving ARROW-11853)
821
822    cdef _open(self, shared_ptr[CInputStream] stream,
823               CCSVReadOptions c_read_options,
824               CCSVParseOptions c_parse_options,
825               CCSVConvertOptions c_convert_options,
826               MemoryPool memory_pool):
827        cdef:
828            shared_ptr[CSchema] c_schema
829            CIOContext io_context
830
831        io_context = CIOContext(maybe_unbox_memory_pool(memory_pool))
832
833        with nogil:
834            self.reader = <shared_ptr[CRecordBatchReader]> GetResultValue(
835                CCSVStreamingReader.Make(
836                    io_context, stream,
837                    move(c_read_options), move(c_parse_options),
838                    move(c_convert_options)))
839            c_schema = self.reader.get().schema()
840
841        self.schema = pyarrow_wrap_schema(c_schema)
842
843
844def read_csv(input_file, read_options=None, parse_options=None,
845             convert_options=None, MemoryPool memory_pool=None):
846    """
847    Read a Table from a stream of CSV data.
848
849    Parameters
850    ----------
851    input_file : string, path or file-like object
852        The location of CSV data.  If a string or path, and if it ends
853        with a recognized compressed file extension (e.g. ".gz" or ".bz2"),
854        the data is automatically decompressed when reading.
855    read_options : pyarrow.csv.ReadOptions, optional
856        Options for the CSV reader (see pyarrow.csv.ReadOptions constructor
857        for defaults)
858    parse_options : pyarrow.csv.ParseOptions, optional
859        Options for the CSV parser
860        (see pyarrow.csv.ParseOptions constructor for defaults)
861    convert_options : pyarrow.csv.ConvertOptions, optional
862        Options for converting CSV data
863        (see pyarrow.csv.ConvertOptions constructor for defaults)
864    memory_pool : MemoryPool, optional
865        Pool to allocate Table memory from
866
867    Returns
868    -------
869    :class:`pyarrow.Table`
870        Contents of the CSV file as a in-memory table.
871    """
872    cdef:
873        shared_ptr[CInputStream] stream
874        CCSVReadOptions c_read_options
875        CCSVParseOptions c_parse_options
876        CCSVConvertOptions c_convert_options
877        CIOContext io_context
878        shared_ptr[CCSVReader] reader
879        shared_ptr[CTable] table
880
881    _get_reader(input_file, read_options, &stream)
882    _get_read_options(read_options, &c_read_options)
883    _get_parse_options(parse_options, &c_parse_options)
884    _get_convert_options(convert_options, &c_convert_options)
885
886    with SignalStopHandler() as stop_handler:
887        io_context = CIOContext(
888            maybe_unbox_memory_pool(memory_pool),
889            (<StopToken> stop_handler.stop_token).stop_token)
890        reader = GetResultValue(CCSVReader.Make(
891            io_context, stream,
892            c_read_options, c_parse_options, c_convert_options))
893
894        with nogil:
895            table = GetResultValue(reader.get().Read())
896
897    return pyarrow_wrap_table(table)
898
899
900def open_csv(input_file, read_options=None, parse_options=None,
901             convert_options=None, MemoryPool memory_pool=None):
902    """
903    Open a streaming reader of CSV data.
904
905    Reading using this function is always single-threaded.
906
907    Parameters
908    ----------
909    input_file : string, path or file-like object
910        The location of CSV data.  If a string or path, and if it ends
911        with a recognized compressed file extension (e.g. ".gz" or ".bz2"),
912        the data is automatically decompressed when reading.
913    read_options : pyarrow.csv.ReadOptions, optional
914        Options for the CSV reader (see pyarrow.csv.ReadOptions constructor
915        for defaults)
916    parse_options : pyarrow.csv.ParseOptions, optional
917        Options for the CSV parser
918        (see pyarrow.csv.ParseOptions constructor for defaults)
919    convert_options : pyarrow.csv.ConvertOptions, optional
920        Options for converting CSV data
921        (see pyarrow.csv.ConvertOptions constructor for defaults)
922    memory_pool : MemoryPool, optional
923        Pool to allocate Table memory from
924
925    Returns
926    -------
927    :class:`pyarrow.csv.CSVStreamingReader`
928    """
929    cdef:
930        shared_ptr[CInputStream] stream
931        CCSVReadOptions c_read_options
932        CCSVParseOptions c_parse_options
933        CCSVConvertOptions c_convert_options
934        CSVStreamingReader reader
935
936    _get_reader(input_file, read_options, &stream)
937    _get_read_options(read_options, &c_read_options)
938    _get_parse_options(parse_options, &c_parse_options)
939    _get_convert_options(convert_options, &c_convert_options)
940
941    reader = CSVStreamingReader.__new__(CSVStreamingReader)
942    reader._open(stream, move(c_read_options), move(c_parse_options),
943                 move(c_convert_options), memory_pool)
944    return reader
945
946
947cdef class WriteOptions(_Weakrefable):
948    """
949    Options for writing CSV files.
950
951    Parameters
952    ----------
953    include_header : bool, optional (default True)
954        Whether to write an initial header line with column names
955    batch_size : int, optional (default 1024)
956        How many rows to process together when converting and writing
957        CSV data
958    """
959
960    # Avoid mistakingly creating attributes
961    __slots__ = ()
962
963    def __init__(self, *, include_header=None, batch_size=None):
964        self.options.reset(new CCSVWriteOptions(CCSVWriteOptions.Defaults()))
965        if include_header is not None:
966            self.include_header = include_header
967        if batch_size is not None:
968            self.batch_size = batch_size
969
970    @property
971    def include_header(self):
972        """
973        Whether to write an initial header line with column names.
974        """
975        return deref(self.options).include_header
976
977    @include_header.setter
978    def include_header(self, value):
979        deref(self.options).include_header = value
980
981    @property
982    def batch_size(self):
983        """
984        How many rows to process together when converting and writing
985        CSV data.
986        """
987        return deref(self.options).batch_size
988
989    @batch_size.setter
990    def batch_size(self, value):
991        deref(self.options).batch_size = value
992
993    @staticmethod
994    cdef WriteOptions wrap(CCSVWriteOptions options):
995        out = WriteOptions()
996        out.options.reset(new CCSVWriteOptions(move(options)))
997        return out
998
999    def validate(self):
1000        check_status(self.options.get().Validate())
1001
1002
1003cdef _get_write_options(WriteOptions write_options, CCSVWriteOptions* out):
1004    if write_options is None:
1005        out[0] = CCSVWriteOptions.Defaults()
1006    else:
1007        out[0] = deref(write_options.options)
1008
1009
1010def write_csv(data, output_file, write_options=None,
1011              MemoryPool memory_pool=None):
1012    """
1013    Write record batch or table to a CSV file.
1014
1015    Parameters
1016    ----------
1017    data : pyarrow.RecordBatch or pyarrow.Table
1018        The data to write.
1019    output_file : string, path, pyarrow.NativeFile, or file-like object
1020        The location where to write the CSV data.
1021    write_options : pyarrow.csv.WriteOptions
1022        Options to configure writing the CSV data.
1023    memory_pool : MemoryPool, optional
1024        Pool for temporary allocations.
1025    """
1026    cdef:
1027        shared_ptr[COutputStream] stream
1028        CCSVWriteOptions c_write_options
1029        CMemoryPool* c_memory_pool
1030        CRecordBatch* batch
1031        CTable* table
1032    _get_write_options(write_options, &c_write_options)
1033
1034    get_writer(output_file, &stream)
1035    c_memory_pool = maybe_unbox_memory_pool(memory_pool)
1036    c_write_options.io_context = CIOContext(c_memory_pool)
1037    if isinstance(data, RecordBatch):
1038        batch = pyarrow_unwrap_batch(data).get()
1039        with nogil:
1040            check_status(WriteCSV(deref(batch), c_write_options, stream.get()))
1041    elif isinstance(data, Table):
1042        table = pyarrow_unwrap_table(data).get()
1043        with nogil:
1044            check_status(WriteCSV(deref(table), c_write_options, stream.get()))
1045    else:
1046        raise TypeError(f"Expected Table or RecordBatch, got '{type(data)}'")
1047
1048
1049cdef class CSVWriter(_CRecordBatchWriter):
1050    """
1051    Writer to create a CSV file.
1052
1053    Parameters
1054    ----------
1055    sink : str, path, pyarrow.OutputStream or file-like object
1056        The location where to write the CSV data.
1057    schema : pyarrow.Schema
1058        The schema of the data to be written.
1059    write_options : pyarrow.csv.WriteOptions
1060        Options to configure writing the CSV data.
1061    memory_pool : MemoryPool, optional
1062        Pool for temporary allocations.
1063    """
1064
1065    def __init__(self, sink, Schema schema, *,
1066                 WriteOptions write_options=None, MemoryPool memory_pool=None):
1067        cdef:
1068            shared_ptr[COutputStream] c_stream
1069            shared_ptr[CSchema] c_schema = pyarrow_unwrap_schema(schema)
1070            CCSVWriteOptions c_write_options
1071            CMemoryPool* c_memory_pool = maybe_unbox_memory_pool(memory_pool)
1072        _get_write_options(write_options, &c_write_options)
1073        c_write_options.io_context = CIOContext(c_memory_pool)
1074        get_writer(sink, &c_stream)
1075        with nogil:
1076            self.writer = GetResultValue(MakeCSVWriter(
1077                c_stream, c_schema, c_write_options))
1078