1# Licensed to the Apache Software Foundation (ASF) under one 2# or more contributor license agreements. See the NOTICE file 3# distributed with this work for additional information 4# regarding copyright ownership. The ASF licenses this file 5# to you under the Apache License, Version 2.0 (the 6# "License"); you may not use this file except in compliance 7# with the License. You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, 12# software distributed under the License is distributed on an 13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14# KIND, either express or implied. See the License for the 15# specific language governing permissions and limitations 16# under the License. 17 18# cython: profile=False 19# distutils: language = c++ 20# cython: language_level = 3 21 22from cython.operator cimport dereference as deref 23 24import codecs 25from collections.abc import Mapping 26 27from pyarrow.includes.common cimport * 28from pyarrow.includes.libarrow cimport * 29from pyarrow.lib cimport (check_status, Field, MemoryPool, Schema, 30 RecordBatchReader, ensure_type, 31 maybe_unbox_memory_pool, get_input_stream, 32 get_writer, native_transcoding_input_stream, 33 pyarrow_unwrap_batch, pyarrow_unwrap_schema, 34 pyarrow_unwrap_table, pyarrow_wrap_schema, 35 pyarrow_wrap_table, pyarrow_wrap_data_type, 36 pyarrow_unwrap_data_type, Table, RecordBatch, 37 StopToken, _CRecordBatchWriter) 38from pyarrow.lib import frombytes, tobytes, SignalStopHandler 39from pyarrow.util import _stringify_path 40 41 42cdef unsigned char _single_char(s) except 0: 43 val = ord(s) 44 if val == 0 or val > 127: 45 raise ValueError("Expecting an ASCII character") 46 return <unsigned char> val 47 48 49cdef class ReadOptions(_Weakrefable): 50 """ 51 Options for reading CSV files. 52 53 Parameters 54 ---------- 55 use_threads : bool, optional (default True) 56 Whether to use multiple threads to accelerate reading 57 block_size : int, optional 58 How much bytes to process at a time from the input stream. 59 This will determine multi-threading granularity as well as 60 the size of individual record batches or table chunks. 61 Minimum valid value for block size is 1 62 skip_rows : int, optional (default 0) 63 The number of rows to skip before the column names (if any) 64 and the CSV data. 65 skip_rows_after_names : int, optional (default 0) 66 The number of rows to skip after the column names. 67 This number can be larger than the number of rows in one 68 block, and empty rows are counted. 69 The order of application is as follows: 70 - `skip_rows` is applied (if non-zero); 71 - column names aread (unless `column_names` is set); 72 - `skip_rows_after_names` is applied (if non-zero). 73 column_names : list, optional 74 The column names of the target table. If empty, fall back on 75 `autogenerate_column_names`. 76 autogenerate_column_names : bool, optional (default False) 77 Whether to autogenerate column names if `column_names` is empty. 78 If true, column names will be of the form "f0", "f1"... 79 If false, column names will be read from the first CSV row 80 after `skip_rows`. 81 encoding : str, optional (default 'utf8') 82 The character encoding of the CSV data. Columns that cannot 83 decode using this encoding can still be read as Binary. 84 """ 85 86 # Avoid mistakingly creating attributes 87 __slots__ = () 88 89 # __init__() is not called when unpickling, initialize storage here 90 def __cinit__(self, *argw, **kwargs): 91 self.options.reset(new CCSVReadOptions(CCSVReadOptions.Defaults())) 92 93 def __init__(self, *, use_threads=None, block_size=None, skip_rows=None, 94 column_names=None, autogenerate_column_names=None, 95 encoding='utf8', skip_rows_after_names=None): 96 if use_threads is not None: 97 self.use_threads = use_threads 98 if block_size is not None: 99 self.block_size = block_size 100 if skip_rows is not None: 101 self.skip_rows = skip_rows 102 if column_names is not None: 103 self.column_names = column_names 104 if autogenerate_column_names is not None: 105 self.autogenerate_column_names= autogenerate_column_names 106 # Python-specific option 107 self.encoding = encoding 108 if skip_rows_after_names is not None: 109 self.skip_rows_after_names = skip_rows_after_names 110 111 @property 112 def use_threads(self): 113 """ 114 Whether to use multiple threads to accelerate reading. 115 """ 116 return deref(self.options).use_threads 117 118 @use_threads.setter 119 def use_threads(self, value): 120 deref(self.options).use_threads = value 121 122 @property 123 def block_size(self): 124 """ 125 How much bytes to process at a time from the input stream. 126 This will determine multi-threading granularity as well as 127 the size of individual record batches or table chunks. 128 """ 129 return deref(self.options).block_size 130 131 @block_size.setter 132 def block_size(self, value): 133 deref(self.options).block_size = value 134 135 @property 136 def skip_rows(self): 137 """ 138 The number of rows to skip before the column names (if any) 139 and the CSV data. 140 See `skip_rows_after_names` for interaction description 141 """ 142 return deref(self.options).skip_rows 143 144 @skip_rows.setter 145 def skip_rows(self, value): 146 deref(self.options).skip_rows = value 147 148 @property 149 def column_names(self): 150 """ 151 The column names of the target table. If empty, fall back on 152 `autogenerate_column_names`. 153 """ 154 return [frombytes(s) for s in deref(self.options).column_names] 155 156 @column_names.setter 157 def column_names(self, value): 158 deref(self.options).column_names.clear() 159 for item in value: 160 deref(self.options).column_names.push_back(tobytes(item)) 161 162 @property 163 def autogenerate_column_names(self): 164 """ 165 Whether to autogenerate column names if `column_names` is empty. 166 If true, column names will be of the form "f0", "f1"... 167 If false, column names will be read from the first CSV row 168 after `skip_rows`. 169 """ 170 return deref(self.options).autogenerate_column_names 171 172 @autogenerate_column_names.setter 173 def autogenerate_column_names(self, value): 174 deref(self.options).autogenerate_column_names = value 175 176 @property 177 def skip_rows_after_names(self): 178 """ 179 The number of rows to skip after the column names. 180 This number can be larger than the number of rows in one 181 block, and empty rows are counted. 182 The order of application is as follows: 183 - `skip_rows` is applied (if non-zero); 184 - column names aread (unless `column_names` is set); 185 - `skip_rows_after_names` is applied (if non-zero). 186 """ 187 return deref(self.options).skip_rows_after_names 188 189 @skip_rows_after_names.setter 190 def skip_rows_after_names(self, value): 191 deref(self.options).skip_rows_after_names = value 192 193 def validate(self): 194 check_status(deref(self.options).Validate()) 195 196 def equals(self, ReadOptions other): 197 return ( 198 self.use_threads == other.use_threads and 199 self.block_size == other.block_size and 200 self.skip_rows == other.skip_rows and 201 self.column_names == other.column_names and 202 self.autogenerate_column_names == 203 other.autogenerate_column_names and 204 self.encoding == other.encoding and 205 self.skip_rows_after_names == other.skip_rows_after_names 206 ) 207 208 @staticmethod 209 cdef ReadOptions wrap(CCSVReadOptions options): 210 out = ReadOptions() 211 out.options.reset(new CCSVReadOptions(move(options))) 212 out.encoding = 'utf8' # No way to know this 213 return out 214 215 def __getstate__(self): 216 return (self.use_threads, self.block_size, self.skip_rows, 217 self.column_names, self.autogenerate_column_names, 218 self.encoding, self.skip_rows_after_names) 219 220 def __setstate__(self, state): 221 (self.use_threads, self.block_size, self.skip_rows, 222 self.column_names, self.autogenerate_column_names, 223 self.encoding, self.skip_rows_after_names) = state 224 225 def __eq__(self, other): 226 try: 227 return self.equals(other) 228 except TypeError: 229 return False 230 231 232cdef class ParseOptions(_Weakrefable): 233 """ 234 Options for parsing CSV files. 235 236 Parameters 237 ---------- 238 delimiter : 1-character string, optional (default ',') 239 The character delimiting individual cells in the CSV data. 240 quote_char : 1-character string or False, optional (default '"') 241 The character used optionally for quoting CSV values 242 (False if quoting is not allowed). 243 double_quote : bool, optional (default True) 244 Whether two quotes in a quoted CSV value denote a single quote 245 in the data. 246 escape_char : 1-character string or False, optional (default False) 247 The character used optionally for escaping special characters 248 (False if escaping is not allowed). 249 newlines_in_values : bool, optional (default False) 250 Whether newline characters are allowed in CSV values. 251 Setting this to True reduces the performance of multi-threaded 252 CSV reading. 253 ignore_empty_lines : bool, optional (default True) 254 Whether empty lines are ignored in CSV input. 255 If False, an empty line is interpreted as containing a single empty 256 value (assuming a one-column CSV file). 257 """ 258 __slots__ = () 259 260 def __cinit__(self, *argw, **kwargs): 261 self.options.reset(new CCSVParseOptions(CCSVParseOptions.Defaults())) 262 263 def __init__(self, *, delimiter=None, quote_char=None, double_quote=None, 264 escape_char=None, newlines_in_values=None, 265 ignore_empty_lines=None): 266 if delimiter is not None: 267 self.delimiter = delimiter 268 if quote_char is not None: 269 self.quote_char = quote_char 270 if double_quote is not None: 271 self.double_quote = double_quote 272 if escape_char is not None: 273 self.escape_char = escape_char 274 if newlines_in_values is not None: 275 self.newlines_in_values = newlines_in_values 276 if ignore_empty_lines is not None: 277 self.ignore_empty_lines = ignore_empty_lines 278 279 @property 280 def delimiter(self): 281 """ 282 The character delimiting individual cells in the CSV data. 283 """ 284 return chr(deref(self.options).delimiter) 285 286 @delimiter.setter 287 def delimiter(self, value): 288 deref(self.options).delimiter = _single_char(value) 289 290 @property 291 def quote_char(self): 292 """ 293 The character used optionally for quoting CSV values 294 (False if quoting is not allowed). 295 """ 296 if deref(self.options).quoting: 297 return chr(deref(self.options).quote_char) 298 else: 299 return False 300 301 @quote_char.setter 302 def quote_char(self, value): 303 if value is False: 304 deref(self.options).quoting = False 305 else: 306 deref(self.options).quote_char = _single_char(value) 307 deref(self.options).quoting = True 308 309 @property 310 def double_quote(self): 311 """ 312 Whether two quotes in a quoted CSV value denote a single quote 313 in the data. 314 """ 315 return deref(self.options).double_quote 316 317 @double_quote.setter 318 def double_quote(self, value): 319 deref(self.options).double_quote = value 320 321 @property 322 def escape_char(self): 323 """ 324 The character used optionally for escaping special characters 325 (False if escaping is not allowed). 326 """ 327 if deref(self.options).escaping: 328 return chr(deref(self.options).escape_char) 329 else: 330 return False 331 332 @escape_char.setter 333 def escape_char(self, value): 334 if value is False: 335 deref(self.options).escaping = False 336 else: 337 deref(self.options).escape_char = _single_char(value) 338 deref(self.options).escaping = True 339 340 @property 341 def newlines_in_values(self): 342 """ 343 Whether newline characters are allowed in CSV values. 344 Setting this to True reduces the performance of multi-threaded 345 CSV reading. 346 """ 347 return deref(self.options).newlines_in_values 348 349 @newlines_in_values.setter 350 def newlines_in_values(self, value): 351 deref(self.options).newlines_in_values = value 352 353 @property 354 def ignore_empty_lines(self): 355 """ 356 Whether empty lines are ignored in CSV input. 357 If False, an empty line is interpreted as containing a single empty 358 value (assuming a one-column CSV file). 359 """ 360 return deref(self.options).ignore_empty_lines 361 362 @ignore_empty_lines.setter 363 def ignore_empty_lines(self, value): 364 deref(self.options).ignore_empty_lines = value 365 366 def validate(self): 367 check_status(deref(self.options).Validate()) 368 369 def equals(self, ParseOptions other): 370 return ( 371 self.delimiter == other.delimiter and 372 self.quote_char == other.quote_char and 373 self.double_quote == other.double_quote and 374 self.escape_char == other.escape_char and 375 self.newlines_in_values == other.newlines_in_values and 376 self.ignore_empty_lines == other.ignore_empty_lines 377 ) 378 379 @staticmethod 380 cdef ParseOptions wrap(CCSVParseOptions options): 381 out = ParseOptions() 382 out.options.reset(new CCSVParseOptions(move(options))) 383 return out 384 385 def __getstate__(self): 386 return (self.delimiter, self.quote_char, self.double_quote, 387 self.escape_char, self.newlines_in_values, 388 self.ignore_empty_lines) 389 390 def __setstate__(self, state): 391 (self.delimiter, self.quote_char, self.double_quote, 392 self.escape_char, self.newlines_in_values, 393 self.ignore_empty_lines) = state 394 395 def __eq__(self, other): 396 try: 397 return self.equals(other) 398 except TypeError: 399 return False 400 401 402cdef class _ISO8601(_Weakrefable): 403 """ 404 A special object indicating ISO-8601 parsing. 405 """ 406 __slots__ = () 407 408 def __str__(self): 409 return 'ISO8601' 410 411 def __eq__(self, other): 412 return isinstance(other, _ISO8601) 413 414 415ISO8601 = _ISO8601() 416 417 418cdef class ConvertOptions(_Weakrefable): 419 """ 420 Options for converting CSV data. 421 422 Parameters 423 ---------- 424 check_utf8 : bool, optional (default True) 425 Whether to check UTF8 validity of string columns. 426 column_types : pa.Schema or dict, optional 427 Explicitly map column names to column types. Passing this argument 428 disables type inference on the defined columns. 429 null_values : list, optional 430 A sequence of strings that denote nulls in the data 431 (defaults are appropriate in most cases). Note that by default, 432 string columns are not checked for null values. To enable 433 null checking for those, specify ``strings_can_be_null=True``. 434 true_values : list, optional 435 A sequence of strings that denote true booleans in the data 436 (defaults are appropriate in most cases). 437 false_values : list, optional 438 A sequence of strings that denote false booleans in the data 439 (defaults are appropriate in most cases). 440 decimal_point : 1-character string, optional (default '.') 441 The character used as decimal point in floating-point and decimal 442 data. 443 timestamp_parsers : list, optional 444 A sequence of strptime()-compatible format strings, tried in order 445 when attempting to infer or convert timestamp values (the special 446 value ISO8601() can also be given). By default, a fast built-in 447 ISO-8601 parser is used. 448 strings_can_be_null : bool, optional (default False) 449 Whether string / binary columns can have null values. 450 If true, then strings in null_values are considered null for 451 string columns. 452 If false, then all strings are valid string values. 453 quoted_strings_can_be_null : bool, optional (default True) 454 Whether quoted values can be null. 455 If true, then strings in "null_values" are also considered null 456 when they appear quoted in the CSV file. Otherwise, quoted values 457 are never considered null. 458 auto_dict_encode : bool, optional (default False) 459 Whether to try to automatically dict-encode string / binary data. 460 If true, then when type inference detects a string or binary column, 461 it it dict-encoded up to `auto_dict_max_cardinality` distinct values 462 (per chunk), after which it switches to regular encoding. 463 This setting is ignored for non-inferred columns (those in 464 `column_types`). 465 auto_dict_max_cardinality : int, optional 466 The maximum dictionary cardinality for `auto_dict_encode`. 467 This value is per chunk. 468 include_columns : list, optional 469 The names of columns to include in the Table. 470 If empty, the Table will include all columns from the CSV file. 471 If not empty, only these columns will be included, in this order. 472 include_missing_columns : bool, optional (default False) 473 If false, columns in `include_columns` but not in the CSV file will 474 error out. 475 If true, columns in `include_columns` but not in the CSV file will 476 produce a column of nulls (whose type is selected using 477 `column_types`, or null by default). 478 This option is ignored if `include_columns` is empty. 479 """ 480 # Avoid mistakingly creating attributes 481 __slots__ = () 482 483 def __cinit__(self, *argw, **kwargs): 484 self.options.reset( 485 new CCSVConvertOptions(CCSVConvertOptions.Defaults())) 486 487 def __init__(self, *, check_utf8=None, column_types=None, null_values=None, 488 true_values=None, false_values=None, decimal_point=None, 489 strings_can_be_null=None, quoted_strings_can_be_null=None, 490 include_columns=None, include_missing_columns=None, 491 auto_dict_encode=None, auto_dict_max_cardinality=None, 492 timestamp_parsers=None): 493 if check_utf8 is not None: 494 self.check_utf8 = check_utf8 495 if column_types is not None: 496 self.column_types = column_types 497 if null_values is not None: 498 self.null_values = null_values 499 if true_values is not None: 500 self.true_values = true_values 501 if false_values is not None: 502 self.false_values = false_values 503 if decimal_point is not None: 504 self.decimal_point = decimal_point 505 if strings_can_be_null is not None: 506 self.strings_can_be_null = strings_can_be_null 507 if quoted_strings_can_be_null is not None: 508 self.quoted_strings_can_be_null = quoted_strings_can_be_null 509 if include_columns is not None: 510 self.include_columns = include_columns 511 if include_missing_columns is not None: 512 self.include_missing_columns = include_missing_columns 513 if auto_dict_encode is not None: 514 self.auto_dict_encode = auto_dict_encode 515 if auto_dict_max_cardinality is not None: 516 self.auto_dict_max_cardinality = auto_dict_max_cardinality 517 if timestamp_parsers is not None: 518 self.timestamp_parsers = timestamp_parsers 519 520 @property 521 def check_utf8(self): 522 """ 523 Whether to check UTF8 validity of string columns. 524 """ 525 return deref(self.options).check_utf8 526 527 @check_utf8.setter 528 def check_utf8(self, value): 529 deref(self.options).check_utf8 = value 530 531 @property 532 def strings_can_be_null(self): 533 """ 534 Whether string / binary columns can have null values. 535 """ 536 return deref(self.options).strings_can_be_null 537 538 @strings_can_be_null.setter 539 def strings_can_be_null(self, value): 540 deref(self.options).strings_can_be_null = value 541 542 @property 543 def quoted_strings_can_be_null(self): 544 """ 545 Whether quoted values can be null. 546 """ 547 return deref(self.options).quoted_strings_can_be_null 548 549 @quoted_strings_can_be_null.setter 550 def quoted_strings_can_be_null(self, value): 551 deref(self.options).quoted_strings_can_be_null = value 552 553 @property 554 def column_types(self): 555 """ 556 Explicitly map column names to column types. 557 """ 558 d = {frombytes(item.first): pyarrow_wrap_data_type(item.second) 559 for item in deref(self.options).column_types} 560 return d 561 562 @column_types.setter 563 def column_types(self, value): 564 cdef: 565 shared_ptr[CDataType] typ 566 567 if isinstance(value, Mapping): 568 value = value.items() 569 570 deref(self.options).column_types.clear() 571 for item in value: 572 if isinstance(item, Field): 573 k = item.name 574 v = item.type 575 else: 576 k, v = item 577 typ = pyarrow_unwrap_data_type(ensure_type(v)) 578 assert typ != NULL 579 deref(self.options).column_types[tobytes(k)] = typ 580 581 @property 582 def null_values(self): 583 """ 584 A sequence of strings that denote nulls in the data. 585 """ 586 return [frombytes(x) for x in deref(self.options).null_values] 587 588 @null_values.setter 589 def null_values(self, value): 590 deref(self.options).null_values = [tobytes(x) for x in value] 591 592 @property 593 def true_values(self): 594 """ 595 A sequence of strings that denote true booleans in the data. 596 """ 597 return [frombytes(x) for x in deref(self.options).true_values] 598 599 @true_values.setter 600 def true_values(self, value): 601 deref(self.options).true_values = [tobytes(x) for x in value] 602 603 @property 604 def false_values(self): 605 """ 606 A sequence of strings that denote false booleans in the data. 607 """ 608 return [frombytes(x) for x in deref(self.options).false_values] 609 610 @false_values.setter 611 def false_values(self, value): 612 deref(self.options).false_values = [tobytes(x) for x in value] 613 614 @property 615 def decimal_point(self): 616 """ 617 The character used as decimal point in floating-point and decimal 618 data. 619 """ 620 return chr(deref(self.options).decimal_point) 621 622 @decimal_point.setter 623 def decimal_point(self, value): 624 deref(self.options).decimal_point = _single_char(value) 625 626 @property 627 def auto_dict_encode(self): 628 """ 629 Whether to try to automatically dict-encode string / binary data. 630 """ 631 return deref(self.options).auto_dict_encode 632 633 @auto_dict_encode.setter 634 def auto_dict_encode(self, value): 635 deref(self.options).auto_dict_encode = value 636 637 @property 638 def auto_dict_max_cardinality(self): 639 """ 640 The maximum dictionary cardinality for `auto_dict_encode`. 641 642 This value is per chunk. 643 """ 644 return deref(self.options).auto_dict_max_cardinality 645 646 @auto_dict_max_cardinality.setter 647 def auto_dict_max_cardinality(self, value): 648 deref(self.options).auto_dict_max_cardinality = value 649 650 @property 651 def include_columns(self): 652 """ 653 The names of columns to include in the Table. 654 655 If empty, the Table will include all columns from the CSV file. 656 If not empty, only these columns will be included, in this order. 657 """ 658 return [frombytes(s) for s in deref(self.options).include_columns] 659 660 @include_columns.setter 661 def include_columns(self, value): 662 deref(self.options).include_columns.clear() 663 for item in value: 664 deref(self.options).include_columns.push_back(tobytes(item)) 665 666 @property 667 def include_missing_columns(self): 668 """ 669 If false, columns in `include_columns` but not in the CSV file will 670 error out. 671 If true, columns in `include_columns` but not in the CSV file will 672 produce a null column (whose type is selected using `column_types`, 673 or null by default). 674 This option is ignored if `include_columns` is empty. 675 """ 676 return deref(self.options).include_missing_columns 677 678 @include_missing_columns.setter 679 def include_missing_columns(self, value): 680 deref(self.options).include_missing_columns = value 681 682 @property 683 def timestamp_parsers(self): 684 """ 685 A sequence of strptime()-compatible format strings, tried in order 686 when attempting to infer or convert timestamp values (the special 687 value ISO8601() can also be given). By default, a fast built-in 688 ISO-8601 parser is used. 689 """ 690 cdef: 691 shared_ptr[CTimestampParser] c_parser 692 c_string kind 693 694 parsers = [] 695 for c_parser in deref(self.options).timestamp_parsers: 696 kind = deref(c_parser).kind() 697 if kind == b'strptime': 698 parsers.append(frombytes(deref(c_parser).format())) 699 else: 700 assert kind == b'iso8601' 701 parsers.append(ISO8601) 702 703 return parsers 704 705 @timestamp_parsers.setter 706 def timestamp_parsers(self, value): 707 cdef: 708 vector[shared_ptr[CTimestampParser]] c_parsers 709 710 for v in value: 711 if isinstance(v, str): 712 c_parsers.push_back(CTimestampParser.MakeStrptime(tobytes(v))) 713 elif v == ISO8601: 714 c_parsers.push_back(CTimestampParser.MakeISO8601()) 715 else: 716 raise TypeError("Expected list of str or ISO8601 objects") 717 718 deref(self.options).timestamp_parsers = move(c_parsers) 719 720 @staticmethod 721 cdef ConvertOptions wrap(CCSVConvertOptions options): 722 out = ConvertOptions() 723 out.options.reset(new CCSVConvertOptions(move(options))) 724 return out 725 726 def validate(self): 727 check_status(deref(self.options).Validate()) 728 729 def equals(self, ConvertOptions other): 730 return ( 731 self.check_utf8 == other.check_utf8 and 732 self.column_types == other.column_types and 733 self.null_values == other.null_values and 734 self.true_values == other.true_values and 735 self.false_values == other.false_values and 736 self.decimal_point == other.decimal_point and 737 self.timestamp_parsers == other.timestamp_parsers and 738 self.strings_can_be_null == other.strings_can_be_null and 739 self.quoted_strings_can_be_null == 740 other.quoted_strings_can_be_null and 741 self.auto_dict_encode == other.auto_dict_encode and 742 self.auto_dict_max_cardinality == 743 other.auto_dict_max_cardinality and 744 self.include_columns == other.include_columns and 745 self.include_missing_columns == other.include_missing_columns 746 ) 747 748 def __getstate__(self): 749 return (self.check_utf8, self.column_types, self.null_values, 750 self.true_values, self.false_values, self.decimal_point, 751 self.timestamp_parsers, self.strings_can_be_null, 752 self.quoted_strings_can_be_null, self.auto_dict_encode, 753 self.auto_dict_max_cardinality, self.include_columns, 754 self.include_missing_columns) 755 756 def __setstate__(self, state): 757 (self.check_utf8, self.column_types, self.null_values, 758 self.true_values, self.false_values, self.decimal_point, 759 self.timestamp_parsers, self.strings_can_be_null, 760 self.quoted_strings_can_be_null, self.auto_dict_encode, 761 self.auto_dict_max_cardinality, self.include_columns, 762 self.include_missing_columns) = state 763 764 def __eq__(self, other): 765 try: 766 return self.equals(other) 767 except TypeError: 768 return False 769 770 771cdef _get_reader(input_file, ReadOptions read_options, 772 shared_ptr[CInputStream]* out): 773 use_memory_map = False 774 get_input_stream(input_file, use_memory_map, out) 775 if read_options is not None: 776 out[0] = native_transcoding_input_stream(out[0], 777 read_options.encoding, 778 'utf8') 779 780 781cdef _get_read_options(ReadOptions read_options, CCSVReadOptions* out): 782 if read_options is None: 783 out[0] = CCSVReadOptions.Defaults() 784 else: 785 out[0] = deref(read_options.options) 786 787 788cdef _get_parse_options(ParseOptions parse_options, CCSVParseOptions* out): 789 if parse_options is None: 790 out[0] = CCSVParseOptions.Defaults() 791 else: 792 out[0] = deref(parse_options.options) 793 794 795cdef _get_convert_options(ConvertOptions convert_options, 796 CCSVConvertOptions* out): 797 if convert_options is None: 798 out[0] = CCSVConvertOptions.Defaults() 799 else: 800 out[0] = deref(convert_options.options) 801 802 803cdef class CSVStreamingReader(RecordBatchReader): 804 """An object that reads record batches incrementally from a CSV file. 805 806 Should not be instantiated directly by user code. 807 """ 808 cdef readonly: 809 Schema schema 810 811 def __init__(self): 812 raise TypeError("Do not call {}'s constructor directly, " 813 "use pyarrow.csv.open_csv() instead." 814 .format(self.__class__.__name__)) 815 816 # Note about cancellation: we cannot create a SignalStopHandler 817 # by default here, as several CSVStreamingReader instances may be 818 # created (including by the same thread). Handling cancellation 819 # would require having the user pass the SignalStopHandler. 820 # (in addition to solving ARROW-11853) 821 822 cdef _open(self, shared_ptr[CInputStream] stream, 823 CCSVReadOptions c_read_options, 824 CCSVParseOptions c_parse_options, 825 CCSVConvertOptions c_convert_options, 826 MemoryPool memory_pool): 827 cdef: 828 shared_ptr[CSchema] c_schema 829 CIOContext io_context 830 831 io_context = CIOContext(maybe_unbox_memory_pool(memory_pool)) 832 833 with nogil: 834 self.reader = <shared_ptr[CRecordBatchReader]> GetResultValue( 835 CCSVStreamingReader.Make( 836 io_context, stream, 837 move(c_read_options), move(c_parse_options), 838 move(c_convert_options))) 839 c_schema = self.reader.get().schema() 840 841 self.schema = pyarrow_wrap_schema(c_schema) 842 843 844def read_csv(input_file, read_options=None, parse_options=None, 845 convert_options=None, MemoryPool memory_pool=None): 846 """ 847 Read a Table from a stream of CSV data. 848 849 Parameters 850 ---------- 851 input_file : string, path or file-like object 852 The location of CSV data. If a string or path, and if it ends 853 with a recognized compressed file extension (e.g. ".gz" or ".bz2"), 854 the data is automatically decompressed when reading. 855 read_options : pyarrow.csv.ReadOptions, optional 856 Options for the CSV reader (see pyarrow.csv.ReadOptions constructor 857 for defaults) 858 parse_options : pyarrow.csv.ParseOptions, optional 859 Options for the CSV parser 860 (see pyarrow.csv.ParseOptions constructor for defaults) 861 convert_options : pyarrow.csv.ConvertOptions, optional 862 Options for converting CSV data 863 (see pyarrow.csv.ConvertOptions constructor for defaults) 864 memory_pool : MemoryPool, optional 865 Pool to allocate Table memory from 866 867 Returns 868 ------- 869 :class:`pyarrow.Table` 870 Contents of the CSV file as a in-memory table. 871 """ 872 cdef: 873 shared_ptr[CInputStream] stream 874 CCSVReadOptions c_read_options 875 CCSVParseOptions c_parse_options 876 CCSVConvertOptions c_convert_options 877 CIOContext io_context 878 shared_ptr[CCSVReader] reader 879 shared_ptr[CTable] table 880 881 _get_reader(input_file, read_options, &stream) 882 _get_read_options(read_options, &c_read_options) 883 _get_parse_options(parse_options, &c_parse_options) 884 _get_convert_options(convert_options, &c_convert_options) 885 886 with SignalStopHandler() as stop_handler: 887 io_context = CIOContext( 888 maybe_unbox_memory_pool(memory_pool), 889 (<StopToken> stop_handler.stop_token).stop_token) 890 reader = GetResultValue(CCSVReader.Make( 891 io_context, stream, 892 c_read_options, c_parse_options, c_convert_options)) 893 894 with nogil: 895 table = GetResultValue(reader.get().Read()) 896 897 return pyarrow_wrap_table(table) 898 899 900def open_csv(input_file, read_options=None, parse_options=None, 901 convert_options=None, MemoryPool memory_pool=None): 902 """ 903 Open a streaming reader of CSV data. 904 905 Reading using this function is always single-threaded. 906 907 Parameters 908 ---------- 909 input_file : string, path or file-like object 910 The location of CSV data. If a string or path, and if it ends 911 with a recognized compressed file extension (e.g. ".gz" or ".bz2"), 912 the data is automatically decompressed when reading. 913 read_options : pyarrow.csv.ReadOptions, optional 914 Options for the CSV reader (see pyarrow.csv.ReadOptions constructor 915 for defaults) 916 parse_options : pyarrow.csv.ParseOptions, optional 917 Options for the CSV parser 918 (see pyarrow.csv.ParseOptions constructor for defaults) 919 convert_options : pyarrow.csv.ConvertOptions, optional 920 Options for converting CSV data 921 (see pyarrow.csv.ConvertOptions constructor for defaults) 922 memory_pool : MemoryPool, optional 923 Pool to allocate Table memory from 924 925 Returns 926 ------- 927 :class:`pyarrow.csv.CSVStreamingReader` 928 """ 929 cdef: 930 shared_ptr[CInputStream] stream 931 CCSVReadOptions c_read_options 932 CCSVParseOptions c_parse_options 933 CCSVConvertOptions c_convert_options 934 CSVStreamingReader reader 935 936 _get_reader(input_file, read_options, &stream) 937 _get_read_options(read_options, &c_read_options) 938 _get_parse_options(parse_options, &c_parse_options) 939 _get_convert_options(convert_options, &c_convert_options) 940 941 reader = CSVStreamingReader.__new__(CSVStreamingReader) 942 reader._open(stream, move(c_read_options), move(c_parse_options), 943 move(c_convert_options), memory_pool) 944 return reader 945 946 947cdef class WriteOptions(_Weakrefable): 948 """ 949 Options for writing CSV files. 950 951 Parameters 952 ---------- 953 include_header : bool, optional (default True) 954 Whether to write an initial header line with column names 955 batch_size : int, optional (default 1024) 956 How many rows to process together when converting and writing 957 CSV data 958 """ 959 960 # Avoid mistakingly creating attributes 961 __slots__ = () 962 963 def __init__(self, *, include_header=None, batch_size=None): 964 self.options.reset(new CCSVWriteOptions(CCSVWriteOptions.Defaults())) 965 if include_header is not None: 966 self.include_header = include_header 967 if batch_size is not None: 968 self.batch_size = batch_size 969 970 @property 971 def include_header(self): 972 """ 973 Whether to write an initial header line with column names. 974 """ 975 return deref(self.options).include_header 976 977 @include_header.setter 978 def include_header(self, value): 979 deref(self.options).include_header = value 980 981 @property 982 def batch_size(self): 983 """ 984 How many rows to process together when converting and writing 985 CSV data. 986 """ 987 return deref(self.options).batch_size 988 989 @batch_size.setter 990 def batch_size(self, value): 991 deref(self.options).batch_size = value 992 993 @staticmethod 994 cdef WriteOptions wrap(CCSVWriteOptions options): 995 out = WriteOptions() 996 out.options.reset(new CCSVWriteOptions(move(options))) 997 return out 998 999 def validate(self): 1000 check_status(self.options.get().Validate()) 1001 1002 1003cdef _get_write_options(WriteOptions write_options, CCSVWriteOptions* out): 1004 if write_options is None: 1005 out[0] = CCSVWriteOptions.Defaults() 1006 else: 1007 out[0] = deref(write_options.options) 1008 1009 1010def write_csv(data, output_file, write_options=None, 1011 MemoryPool memory_pool=None): 1012 """ 1013 Write record batch or table to a CSV file. 1014 1015 Parameters 1016 ---------- 1017 data : pyarrow.RecordBatch or pyarrow.Table 1018 The data to write. 1019 output_file : string, path, pyarrow.NativeFile, or file-like object 1020 The location where to write the CSV data. 1021 write_options : pyarrow.csv.WriteOptions 1022 Options to configure writing the CSV data. 1023 memory_pool : MemoryPool, optional 1024 Pool for temporary allocations. 1025 """ 1026 cdef: 1027 shared_ptr[COutputStream] stream 1028 CCSVWriteOptions c_write_options 1029 CMemoryPool* c_memory_pool 1030 CRecordBatch* batch 1031 CTable* table 1032 _get_write_options(write_options, &c_write_options) 1033 1034 get_writer(output_file, &stream) 1035 c_memory_pool = maybe_unbox_memory_pool(memory_pool) 1036 c_write_options.io_context = CIOContext(c_memory_pool) 1037 if isinstance(data, RecordBatch): 1038 batch = pyarrow_unwrap_batch(data).get() 1039 with nogil: 1040 check_status(WriteCSV(deref(batch), c_write_options, stream.get())) 1041 elif isinstance(data, Table): 1042 table = pyarrow_unwrap_table(data).get() 1043 with nogil: 1044 check_status(WriteCSV(deref(table), c_write_options, stream.get())) 1045 else: 1046 raise TypeError(f"Expected Table or RecordBatch, got '{type(data)}'") 1047 1048 1049cdef class CSVWriter(_CRecordBatchWriter): 1050 """ 1051 Writer to create a CSV file. 1052 1053 Parameters 1054 ---------- 1055 sink : str, path, pyarrow.OutputStream or file-like object 1056 The location where to write the CSV data. 1057 schema : pyarrow.Schema 1058 The schema of the data to be written. 1059 write_options : pyarrow.csv.WriteOptions 1060 Options to configure writing the CSV data. 1061 memory_pool : MemoryPool, optional 1062 Pool for temporary allocations. 1063 """ 1064 1065 def __init__(self, sink, Schema schema, *, 1066 WriteOptions write_options=None, MemoryPool memory_pool=None): 1067 cdef: 1068 shared_ptr[COutputStream] c_stream 1069 shared_ptr[CSchema] c_schema = pyarrow_unwrap_schema(schema) 1070 CCSVWriteOptions c_write_options 1071 CMemoryPool* c_memory_pool = maybe_unbox_memory_pool(memory_pool) 1072 _get_write_options(write_options, &c_write_options) 1073 c_write_options.io_context = CIOContext(c_memory_pool) 1074 get_writer(sink, &c_stream) 1075 with nogil: 1076 self.writer = GetResultValue(MakeCSVWriter( 1077 c_stream, c_schema, c_write_options)) 1078