1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include "arrow/csv/reader.h"
19 
20 #include <cstdint>
21 #include <cstring>
22 #include <functional>
23 #include <limits>
24 #include <memory>
25 #include <sstream>
26 #include <string>
27 #include <unordered_map>
28 #include <utility>
29 #include <vector>
30 
31 #include "arrow/array.h"
32 #include "arrow/buffer.h"
33 #include "arrow/csv/chunker.h"
34 #include "arrow/csv/column_builder.h"
35 #include "arrow/csv/column_decoder.h"
36 #include "arrow/csv/options.h"
37 #include "arrow/csv/parser.h"
38 #include "arrow/io/interfaces.h"
39 #include "arrow/result.h"
40 #include "arrow/status.h"
41 #include "arrow/table.h"
42 #include "arrow/type.h"
43 #include "arrow/type_fwd.h"
44 #include "arrow/util/async_generator.h"
45 #include "arrow/util/future.h"
46 #include "arrow/util/iterator.h"
47 #include "arrow/util/logging.h"
48 #include "arrow/util/macros.h"
49 #include "arrow/util/optional.h"
50 #include "arrow/util/task_group.h"
51 #include "arrow/util/thread_pool.h"
52 #include "arrow/util/utf8.h"
53 #include "arrow/util/vector.h"
54 
55 namespace arrow {
56 namespace csv {
57 
58 using internal::Executor;
59 
60 namespace {
61 
62 struct ConversionSchema {
63   struct Column {
64     std::string name;
65     // Physical column index in CSV file
66     int32_t index;
67     // If true, make a column of nulls
68     bool is_missing;
69     // If set, convert the CSV column to this type
70     // If unset (and is_missing is false), infer the type from the CSV column
71     std::shared_ptr<DataType> type;
72   };
73 
NullColumnarrow::csv::__anon660eeea90111::ConversionSchema74   static Column NullColumn(std::string col_name, std::shared_ptr<DataType> type) {
75     return Column{std::move(col_name), -1, true, std::move(type)};
76   }
77 
TypedColumnarrow::csv::__anon660eeea90111::ConversionSchema78   static Column TypedColumn(std::string col_name, int32_t col_index,
79                             std::shared_ptr<DataType> type) {
80     return Column{std::move(col_name), col_index, false, std::move(type)};
81   }
82 
InferredColumnarrow::csv::__anon660eeea90111::ConversionSchema83   static Column InferredColumn(std::string col_name, int32_t col_index) {
84     return Column{std::move(col_name), col_index, false, nullptr};
85   }
86 
87   std::vector<Column> columns;
88 };
89 
90 // An iterator of Buffers that makes sure there is no straddling CRLF sequence.
91 class CSVBufferIterator {
92  public:
Make(Iterator<std::shared_ptr<Buffer>> buffer_iterator)93   static Iterator<std::shared_ptr<Buffer>> Make(
94       Iterator<std::shared_ptr<Buffer>> buffer_iterator) {
95     Transformer<std::shared_ptr<Buffer>, std::shared_ptr<Buffer>> fn =
96         CSVBufferIterator();
97     return MakeTransformedIterator(std::move(buffer_iterator), fn);
98   }
99 
MakeAsync(AsyncGenerator<std::shared_ptr<Buffer>> buffer_iterator)100   static AsyncGenerator<std::shared_ptr<Buffer>> MakeAsync(
101       AsyncGenerator<std::shared_ptr<Buffer>> buffer_iterator) {
102     Transformer<std::shared_ptr<Buffer>, std::shared_ptr<Buffer>> fn =
103         CSVBufferIterator();
104     return MakeTransformedGenerator(std::move(buffer_iterator), fn);
105   }
106 
operator ()(std::shared_ptr<Buffer> buf)107   Result<TransformFlow<std::shared_ptr<Buffer>>> operator()(std::shared_ptr<Buffer> buf) {
108     if (buf == nullptr) {
109       // EOF
110       return TransformFinish();
111     }
112 
113     int64_t offset = 0;
114     if (first_buffer_) {
115       ARROW_ASSIGN_OR_RAISE(auto data, util::SkipUTF8BOM(buf->data(), buf->size()));
116       offset += data - buf->data();
117       DCHECK_GE(offset, 0);
118       first_buffer_ = false;
119     }
120 
121     if (trailing_cr_ && buf->data()[offset] == '\n') {
122       // Skip '\r\n' line separator that started at the end of previous buffer
123       ++offset;
124     }
125 
126     trailing_cr_ = (buf->data()[buf->size() - 1] == '\r');
127     buf = SliceBuffer(buf, offset);
128     if (buf->size() == 0) {
129       // EOF
130       return TransformFinish();
131     } else {
132       return TransformYield(buf);
133     }
134   }
135 
136  protected:
137   bool first_buffer_ = true;
138   // Whether there was a trailing CR at the end of last received buffer
139   bool trailing_cr_ = false;
140 };
141 
142 struct CSVBlock {
143   // (partial + completion + buffer) is an entire delimited CSV buffer.
144   std::shared_ptr<Buffer> partial;
145   std::shared_ptr<Buffer> completion;
146   std::shared_ptr<Buffer> buffer;
147   int64_t block_index;
148   bool is_final;
149   int64_t bytes_skipped;
150   std::function<Status(int64_t)> consume_bytes;
151 };
152 
153 }  // namespace
154 }  // namespace csv
155 
156 template <>
157 struct IterationTraits<csv::CSVBlock> {
Endarrow::IterationTraits158   static csv::CSVBlock End() { return csv::CSVBlock{{}, {}, {}, -1, true, 0, {}}; }
IsEndarrow::IterationTraits159   static bool IsEnd(const csv::CSVBlock& val) { return val.block_index < 0; }
160 };
161 
162 namespace csv {
163 namespace {
164 
165 // This is a callable that can be used to transform an iterator.  The source iterator
166 // will contain buffers of data and the output iterator will contain delimited CSV
167 // blocks.  util::optional is used so that there is an end token (required by the
168 // iterator APIs (e.g. Visit)) even though an empty optional is never used in this code.
169 class BlockReader {
170  public:
BlockReader(std::unique_ptr<Chunker> chunker,std::shared_ptr<Buffer> first_buffer,int64_t skip_rows)171   BlockReader(std::unique_ptr<Chunker> chunker, std::shared_ptr<Buffer> first_buffer,
172               int64_t skip_rows)
173       : chunker_(std::move(chunker)),
174         partial_(std::make_shared<Buffer>("")),
175         buffer_(std::move(first_buffer)),
176         skip_rows_(skip_rows) {}
177 
178  protected:
179   std::unique_ptr<Chunker> chunker_;
180   std::shared_ptr<Buffer> partial_, buffer_;
181   int64_t skip_rows_;
182   int64_t block_index_ = 0;
183   // Whether there was a trailing CR at the end of last received buffer
184   bool trailing_cr_ = false;
185 };
186 
187 // An object that reads delimited CSV blocks for serial use.
188 // The number of bytes consumed should be notified after each read,
189 // using CSVBlock::consume_bytes.
190 class SerialBlockReader : public BlockReader {
191  public:
192   using BlockReader::BlockReader;
193 
MakeIterator(Iterator<std::shared_ptr<Buffer>> buffer_iterator,std::unique_ptr<Chunker> chunker,std::shared_ptr<Buffer> first_buffer,int64_t skip_rows)194   static Iterator<CSVBlock> MakeIterator(
195       Iterator<std::shared_ptr<Buffer>> buffer_iterator, std::unique_ptr<Chunker> chunker,
196       std::shared_ptr<Buffer> first_buffer, int64_t skip_rows) {
197     auto block_reader =
198         std::make_shared<SerialBlockReader>(std::move(chunker), first_buffer, skip_rows);
199     // Wrap shared pointer in callable
200     Transformer<std::shared_ptr<Buffer>, CSVBlock> block_reader_fn =
201         [block_reader](std::shared_ptr<Buffer> buf) {
202           return (*block_reader)(std::move(buf));
203         };
204     return MakeTransformedIterator(std::move(buffer_iterator), block_reader_fn);
205   }
206 
MakeAsyncIterator(AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator,std::unique_ptr<Chunker> chunker,std::shared_ptr<Buffer> first_buffer,int64_t skip_rows)207   static AsyncGenerator<CSVBlock> MakeAsyncIterator(
208       AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator,
209       std::unique_ptr<Chunker> chunker, std::shared_ptr<Buffer> first_buffer,
210       int64_t skip_rows) {
211     auto block_reader =
212         std::make_shared<SerialBlockReader>(std::move(chunker), first_buffer, skip_rows);
213     // Wrap shared pointer in callable
214     Transformer<std::shared_ptr<Buffer>, CSVBlock> block_reader_fn =
215         [block_reader](std::shared_ptr<Buffer> next) {
216           return (*block_reader)(std::move(next));
217         };
218     return MakeTransformedGenerator(std::move(buffer_generator), block_reader_fn);
219   }
220 
operator ()(std::shared_ptr<Buffer> next_buffer)221   Result<TransformFlow<CSVBlock>> operator()(std::shared_ptr<Buffer> next_buffer) {
222     if (buffer_ == nullptr) {
223       return TransformFinish();
224     }
225 
226     bool is_final = (next_buffer == nullptr);
227     int64_t bytes_skipped = 0;
228 
229     if (skip_rows_) {
230       bytes_skipped += partial_->size();
231       auto orig_size = buffer_->size();
232       RETURN_NOT_OK(
233           chunker_->ProcessSkip(partial_, buffer_, is_final, &skip_rows_, &buffer_));
234       bytes_skipped += orig_size - buffer_->size();
235       auto empty = std::make_shared<Buffer>(nullptr, 0);
236       if (skip_rows_) {
237         // Still have rows beyond this buffer to skip return empty block
238         partial_ = std::move(buffer_);
239         buffer_ = next_buffer;
240         return TransformYield<CSVBlock>(CSVBlock{empty, empty, empty, block_index_++,
241                                                  is_final, bytes_skipped,
242                                                  [](int64_t) { return Status::OK(); }});
243       }
244       partial_ = std::move(empty);
245     }
246 
247     std::shared_ptr<Buffer> completion;
248 
249     if (is_final) {
250       // End of file reached => compute completion from penultimate block
251       RETURN_NOT_OK(chunker_->ProcessFinal(partial_, buffer_, &completion, &buffer_));
252     } else {
253       // Get completion of partial from previous block.
254       RETURN_NOT_OK(
255           chunker_->ProcessWithPartial(partial_, buffer_, &completion, &buffer_));
256     }
257     int64_t bytes_before_buffer = partial_->size() + completion->size();
258 
259     auto consume_bytes = [this, bytes_before_buffer,
260                           next_buffer](int64_t nbytes) -> Status {
261       DCHECK_GE(nbytes, 0);
262       auto offset = nbytes - bytes_before_buffer;
263       if (offset < 0) {
264         // Should not happen
265         return Status::Invalid("CSV parser got out of sync with chunker");
266       }
267       partial_ = SliceBuffer(buffer_, offset);
268       buffer_ = next_buffer;
269       return Status::OK();
270     };
271 
272     return TransformYield<CSVBlock>(CSVBlock{partial_, completion, buffer_,
273                                              block_index_++, is_final, bytes_skipped,
274                                              std::move(consume_bytes)});
275   }
276 };
277 
278 // An object that reads delimited CSV blocks for threaded use.
279 class ThreadedBlockReader : public BlockReader {
280  public:
281   using BlockReader::BlockReader;
282 
MakeAsyncIterator(AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator,std::unique_ptr<Chunker> chunker,std::shared_ptr<Buffer> first_buffer,int64_t skip_rows)283   static AsyncGenerator<CSVBlock> MakeAsyncIterator(
284       AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator,
285       std::unique_ptr<Chunker> chunker, std::shared_ptr<Buffer> first_buffer,
286       int64_t skip_rows) {
287     auto block_reader = std::make_shared<ThreadedBlockReader>(std::move(chunker),
288                                                               first_buffer, skip_rows);
289     // Wrap shared pointer in callable
290     Transformer<std::shared_ptr<Buffer>, CSVBlock> block_reader_fn =
291         [block_reader](std::shared_ptr<Buffer> next) { return (*block_reader)(next); };
292     return MakeTransformedGenerator(std::move(buffer_generator), block_reader_fn);
293   }
294 
operator ()(std::shared_ptr<Buffer> next_buffer)295   Result<TransformFlow<CSVBlock>> operator()(std::shared_ptr<Buffer> next_buffer) {
296     if (buffer_ == nullptr) {
297       // EOF
298       return TransformFinish();
299     }
300 
301     bool is_final = (next_buffer == nullptr);
302 
303     auto current_partial = std::move(partial_);
304     auto current_buffer = std::move(buffer_);
305     int64_t bytes_skipped = 0;
306 
307     if (skip_rows_) {
308       auto orig_size = current_buffer->size();
309       bytes_skipped = current_partial->size();
310       RETURN_NOT_OK(chunker_->ProcessSkip(current_partial, current_buffer, is_final,
311                                           &skip_rows_, &current_buffer));
312       bytes_skipped += orig_size - current_buffer->size();
313       current_partial = std::make_shared<Buffer>(nullptr, 0);
314       if (skip_rows_) {
315         partial_ = std::move(current_buffer);
316         buffer_ = std::move(next_buffer);
317         return TransformYield<CSVBlock>(CSVBlock{current_partial,
318                                                  current_partial,
319                                                  current_partial,
320                                                  block_index_++,
321                                                  is_final,
322                                                  bytes_skipped,
323                                                  {}});
324       }
325     }
326 
327     std::shared_ptr<Buffer> whole, completion, next_partial;
328 
329     if (is_final) {
330       // End of file reached => compute completion from penultimate block
331       RETURN_NOT_OK(
332           chunker_->ProcessFinal(current_partial, current_buffer, &completion, &whole));
333     } else {
334       // Get completion of partial from previous block.
335       std::shared_ptr<Buffer> starts_with_whole;
336       // Get completion of partial from previous block.
337       RETURN_NOT_OK(chunker_->ProcessWithPartial(current_partial, current_buffer,
338                                                  &completion, &starts_with_whole));
339 
340       // Get a complete CSV block inside `partial + block`, and keep
341       // the rest for the next iteration.
342       RETURN_NOT_OK(chunker_->Process(starts_with_whole, &whole, &next_partial));
343     }
344 
345     partial_ = std::move(next_partial);
346     buffer_ = std::move(next_buffer);
347 
348     return TransformYield<CSVBlock>(CSVBlock{
349         current_partial, completion, whole, block_index_++, is_final, bytes_skipped, {}});
350   }
351 };
352 
353 struct ParsedBlock {
354   std::shared_ptr<BlockParser> parser;
355   int64_t block_index;
356   int64_t bytes_parsed_or_skipped;
357 };
358 
359 struct DecodedBlock {
360   std::shared_ptr<RecordBatch> record_batch;
361   // Represents the number of input bytes represented by this batch
362   // This will include bytes skipped when skipping rows after the header
363   int64_t bytes_processed;
364 };
365 
366 }  // namespace
367 
368 }  // namespace csv
369 
370 template <>
371 struct IterationTraits<csv::ParsedBlock> {
Endarrow::IterationTraits372   static csv::ParsedBlock End() { return csv::ParsedBlock{nullptr, -1, -1}; }
IsEndarrow::IterationTraits373   static bool IsEnd(const csv::ParsedBlock& val) { return val.block_index < 0; }
374 };
375 
376 template <>
377 struct IterationTraits<csv::DecodedBlock> {
Endarrow::IterationTraits378   static csv::DecodedBlock End() { return csv::DecodedBlock{nullptr, -1}; }
IsEndarrow::IterationTraits379   static bool IsEnd(const csv::DecodedBlock& val) { return val.bytes_processed < 0; }
380 };
381 
382 namespace csv {
383 namespace {
384 
385 // A function object that takes in a buffer of CSV data and returns a parsed batch of CSV
386 // data (CSVBlock -> ParsedBlock) for use with MakeMappedGenerator.
387 // The parsed batch contains a list of offsets for each of the columns so that columns
388 // can be individually scanned
389 //
390 // This operator is not re-entrant
391 class BlockParsingOperator {
392  public:
BlockParsingOperator(io::IOContext io_context,ParseOptions parse_options,int num_csv_cols,int64_t first_row)393   BlockParsingOperator(io::IOContext io_context, ParseOptions parse_options,
394                        int num_csv_cols, int64_t first_row)
395       : io_context_(io_context),
396         parse_options_(parse_options),
397         num_csv_cols_(num_csv_cols),
398         count_rows_(first_row >= 0),
399         num_rows_seen_(first_row) {}
400 
operator ()(const CSVBlock & block)401   Result<ParsedBlock> operator()(const CSVBlock& block) {
402     constexpr int32_t max_num_rows = std::numeric_limits<int32_t>::max();
403     auto parser = std::make_shared<BlockParser>(
404         io_context_.pool(), parse_options_, num_csv_cols_, num_rows_seen_, max_num_rows);
405 
406     std::shared_ptr<Buffer> straddling;
407     std::vector<util::string_view> views;
408     if (block.partial->size() != 0 || block.completion->size() != 0) {
409       if (block.partial->size() == 0) {
410         straddling = block.completion;
411       } else if (block.completion->size() == 0) {
412         straddling = block.partial;
413       } else {
414         ARROW_ASSIGN_OR_RAISE(
415             straddling,
416             ConcatenateBuffers({block.partial, block.completion}, io_context_.pool()));
417       }
418       views = {util::string_view(*straddling), util::string_view(*block.buffer)};
419     } else {
420       views = {util::string_view(*block.buffer)};
421     }
422     uint32_t parsed_size;
423     if (block.is_final) {
424       RETURN_NOT_OK(parser->ParseFinal(views, &parsed_size));
425     } else {
426       RETURN_NOT_OK(parser->Parse(views, &parsed_size));
427     }
428     if (count_rows_) {
429       num_rows_seen_ += parser->total_num_rows();
430     }
431     RETURN_NOT_OK(block.consume_bytes(parsed_size));
432     return ParsedBlock{std::move(parser), block.block_index,
433                        static_cast<int64_t>(parsed_size) + block.bytes_skipped};
434   }
435 
436  private:
437   io::IOContext io_context_;
438   ParseOptions parse_options_;
439   int num_csv_cols_;
440   bool count_rows_;
441   int64_t num_rows_seen_;
442 };
443 
444 // A function object that takes in parsed batch of CSV data and decodes it to an arrow
445 // record batch (ParsedBlock -> DecodedBlock) for use with MakeMappedGenerator.
446 class BlockDecodingOperator {
447  public:
operator ()(const ParsedBlock & block)448   Future<DecodedBlock> operator()(const ParsedBlock& block) {
449     DCHECK(!state_->column_decoders.empty());
450     std::vector<Future<std::shared_ptr<Array>>> decoded_array_futs;
451     for (auto& decoder : state_->column_decoders) {
452       decoded_array_futs.push_back(decoder->Decode(block.parser));
453     }
454     auto bytes_parsed_or_skipped = block.bytes_parsed_or_skipped;
455     auto decoded_arrays_fut = All(std::move(decoded_array_futs));
456     auto state = state_;
457     return decoded_arrays_fut.Then(
458         [state, bytes_parsed_or_skipped](
459             const std::vector<Result<std::shared_ptr<Array>>>& maybe_decoded_arrays)
460             -> Result<DecodedBlock> {
461           ARROW_ASSIGN_OR_RAISE(auto decoded_arrays,
462                                 internal::UnwrapOrRaise(maybe_decoded_arrays));
463 
464           ARROW_ASSIGN_OR_RAISE(auto batch,
465                                 state->DecodedArraysToBatch(std::move(decoded_arrays)));
466           return DecodedBlock{std::move(batch), bytes_parsed_or_skipped};
467         });
468   }
469 
Make(io::IOContext io_context,ConvertOptions convert_options,ConversionSchema conversion_schema)470   static Result<BlockDecodingOperator> Make(io::IOContext io_context,
471                                             ConvertOptions convert_options,
472                                             ConversionSchema conversion_schema) {
473     BlockDecodingOperator op(std::move(io_context), std::move(convert_options),
474                              std::move(conversion_schema));
475     RETURN_NOT_OK(op.state_->MakeColumnDecoders(io_context));
476     return op;
477   }
478 
479  private:
BlockDecodingOperator(io::IOContext io_context,ConvertOptions convert_options,ConversionSchema conversion_schema)480   BlockDecodingOperator(io::IOContext io_context, ConvertOptions convert_options,
481                         ConversionSchema conversion_schema)
482       : state_(std::make_shared<State>(std::move(io_context), std::move(convert_options),
483                                        std::move(conversion_schema))) {}
484 
485   struct State {
Statearrow::csv::__anon660eeea90811::BlockDecodingOperator::State486     State(io::IOContext io_context, ConvertOptions convert_options,
487           ConversionSchema conversion_schema)
488         : convert_options(std::move(convert_options)),
489           conversion_schema(std::move(conversion_schema)) {}
490 
DecodedArraysToBatcharrow::csv::__anon660eeea90811::BlockDecodingOperator::State491     Result<std::shared_ptr<RecordBatch>> DecodedArraysToBatch(
492         std::vector<std::shared_ptr<Array>> arrays) {
493       const auto n_rows = arrays[0]->length();
494 
495       if (schema == nullptr) {
496         FieldVector fields(arrays.size());
497         for (size_t i = 0; i < arrays.size(); ++i) {
498           fields[i] = field(conversion_schema.columns[i].name, arrays[i]->type());
499         }
500 
501         if (n_rows == 0) {
502           // No rows so schema is not reliable. return RecordBatch but do not set schema
503           return RecordBatch::Make(arrow::schema(std::move(fields)), n_rows,
504                                    std::move(arrays));
505         }
506 
507         schema = arrow::schema(std::move(fields));
508       }
509 
510       return RecordBatch::Make(schema, n_rows, std::move(arrays));
511     }
512 
513     // Make column decoders from conversion schema
MakeColumnDecodersarrow::csv::__anon660eeea90811::BlockDecodingOperator::State514     Status MakeColumnDecoders(io::IOContext io_context) {
515       for (const auto& column : conversion_schema.columns) {
516         std::shared_ptr<ColumnDecoder> decoder;
517         if (column.is_missing) {
518           ARROW_ASSIGN_OR_RAISE(decoder,
519                                 ColumnDecoder::MakeNull(io_context.pool(), column.type));
520         } else if (column.type != nullptr) {
521           ARROW_ASSIGN_OR_RAISE(
522               decoder, ColumnDecoder::Make(io_context.pool(), column.type, column.index,
523                                            convert_options));
524         } else {
525           ARROW_ASSIGN_OR_RAISE(
526               decoder,
527               ColumnDecoder::Make(io_context.pool(), column.index, convert_options));
528         }
529         column_decoders.push_back(std::move(decoder));
530       }
531       return Status::OK();
532     }
533 
534     ConvertOptions convert_options;
535     ConversionSchema conversion_schema;
536     std::vector<std::shared_ptr<ColumnDecoder>> column_decoders;
537     std::shared_ptr<Schema> schema;
538   };
539 
540   std::shared_ptr<State> state_;
541 };
542 
543 /////////////////////////////////////////////////////////////////////////
544 // Base class for common functionality
545 
546 class ReaderMixin {
547  public:
ReaderMixin(io::IOContext io_context,std::shared_ptr<io::InputStream> input,const ReadOptions & read_options,const ParseOptions & parse_options,const ConvertOptions & convert_options,bool count_rows)548   ReaderMixin(io::IOContext io_context, std::shared_ptr<io::InputStream> input,
549               const ReadOptions& read_options, const ParseOptions& parse_options,
550               const ConvertOptions& convert_options, bool count_rows)
551       : io_context_(std::move(io_context)),
552         read_options_(read_options),
553         parse_options_(parse_options),
554         convert_options_(convert_options),
555         count_rows_(count_rows),
556         num_rows_seen_(count_rows_ ? 1 : -1),
557         input_(std::move(input)) {}
558 
559  protected:
560   // Read header and column names from buffer, create column builders
561   // Returns the # of bytes consumed
ProcessHeader(const std::shared_ptr<Buffer> & buf,std::shared_ptr<Buffer> * rest)562   Result<int64_t> ProcessHeader(const std::shared_ptr<Buffer>& buf,
563                                 std::shared_ptr<Buffer>* rest) {
564     const uint8_t* data = buf->data();
565     const auto data_end = data + buf->size();
566     DCHECK_GT(data_end - data, 0);
567 
568     if (read_options_.skip_rows) {
569       // Skip initial rows (potentially invalid CSV data)
570       auto num_skipped_rows = SkipRows(data, static_cast<uint32_t>(data_end - data),
571                                        read_options_.skip_rows, &data);
572       if (num_skipped_rows < read_options_.skip_rows) {
573         return Status::Invalid(
574             "Could not skip initial ", read_options_.skip_rows,
575             " rows from CSV file, "
576             "either file is too short or header is larger than block size");
577       }
578       if (count_rows_) {
579         num_rows_seen_ += num_skipped_rows;
580       }
581     }
582 
583     if (read_options_.column_names.empty()) {
584       // Parse one row (either to read column names or to know the number of columns)
585       BlockParser parser(io_context_.pool(), parse_options_, num_csv_cols_,
586                          num_rows_seen_, 1);
587       uint32_t parsed_size = 0;
588       RETURN_NOT_OK(parser.Parse(
589           util::string_view(reinterpret_cast<const char*>(data), data_end - data),
590           &parsed_size));
591       if (parser.num_rows() != 1) {
592         return Status::Invalid(
593             "Could not read first row from CSV file, either "
594             "file is too short or header is larger than block size");
595       }
596       if (parser.num_cols() == 0) {
597         return Status::Invalid("No columns in CSV file");
598       }
599 
600       if (read_options_.autogenerate_column_names) {
601         column_names_ = GenerateColumnNames(parser.num_cols());
602       } else {
603         // Read column names from header row
604         auto visit = [&](const uint8_t* data, uint32_t size, bool quoted) -> Status {
605           column_names_.emplace_back(reinterpret_cast<const char*>(data), size);
606           return Status::OK();
607         };
608         RETURN_NOT_OK(parser.VisitLastRow(visit));
609         DCHECK_EQ(static_cast<size_t>(parser.num_cols()), column_names_.size());
610         // Skip parsed header row
611         data += parsed_size;
612         if (count_rows_) {
613           ++num_rows_seen_;
614         }
615       }
616     } else {
617       column_names_ = read_options_.column_names;
618     }
619 
620     if (count_rows_) {
621       // increase rows seen to skip past rows which will be skipped
622       num_rows_seen_ += read_options_.skip_rows_after_names;
623     }
624 
625     auto bytes_consumed = data - buf->data();
626     *rest = SliceBuffer(buf, bytes_consumed);
627 
628     num_csv_cols_ = static_cast<int32_t>(column_names_.size());
629     DCHECK_GT(num_csv_cols_, 0);
630 
631     RETURN_NOT_OK(MakeConversionSchema());
632     return bytes_consumed;
633   }
634 
GenerateColumnNames(int32_t num_cols)635   std::vector<std::string> GenerateColumnNames(int32_t num_cols) {
636     std::vector<std::string> res;
637     res.reserve(num_cols);
638     for (int32_t i = 0; i < num_cols; ++i) {
639       std::stringstream ss;
640       ss << "f" << i;
641       res.push_back(ss.str());
642     }
643     return res;
644   }
645 
646   // Make conversion schema from options and parsed CSV header
MakeConversionSchema()647   Status MakeConversionSchema() {
648     // Append a column converted from CSV data
649     auto append_csv_column = [&](std::string col_name, int32_t col_index) {
650       // Does the named column have a fixed type?
651       auto it = convert_options_.column_types.find(col_name);
652       if (it == convert_options_.column_types.end()) {
653         conversion_schema_.columns.push_back(
654             ConversionSchema::InferredColumn(std::move(col_name), col_index));
655       } else {
656         conversion_schema_.columns.push_back(
657             ConversionSchema::TypedColumn(std::move(col_name), col_index, it->second));
658       }
659     };
660 
661     // Append a column of nulls
662     auto append_null_column = [&](std::string col_name) {
663       // If the named column has a fixed type, use it, otherwise use null()
664       std::shared_ptr<DataType> type;
665       auto it = convert_options_.column_types.find(col_name);
666       if (it == convert_options_.column_types.end()) {
667         type = null();
668       } else {
669         type = it->second;
670       }
671       conversion_schema_.columns.push_back(
672           ConversionSchema::NullColumn(std::move(col_name), std::move(type)));
673     };
674 
675     if (convert_options_.include_columns.empty()) {
676       // Include all columns in CSV file order
677       for (int32_t col_index = 0; col_index < num_csv_cols_; ++col_index) {
678         append_csv_column(column_names_[col_index], col_index);
679       }
680     } else {
681       // Include columns from `include_columns` (in that order)
682       // Compute indices of columns in the CSV file
683       std::unordered_map<std::string, int32_t> col_indices;
684       col_indices.reserve(column_names_.size());
685       for (int32_t i = 0; i < static_cast<int32_t>(column_names_.size()); ++i) {
686         col_indices.emplace(column_names_[i], i);
687       }
688 
689       for (const auto& col_name : convert_options_.include_columns) {
690         auto it = col_indices.find(col_name);
691         if (it != col_indices.end()) {
692           append_csv_column(col_name, it->second);
693         } else if (convert_options_.include_missing_columns) {
694           append_null_column(col_name);
695         } else {
696           return Status::KeyError("Column '", col_name,
697                                   "' in include_columns "
698                                   "does not exist in CSV file");
699         }
700       }
701     }
702     return Status::OK();
703   }
704 
705   struct ParseResult {
706     std::shared_ptr<BlockParser> parser;
707     int64_t parsed_bytes;
708   };
709 
Parse(const std::shared_ptr<Buffer> & partial,const std::shared_ptr<Buffer> & completion,const std::shared_ptr<Buffer> & block,int64_t block_index,bool is_final)710   Result<ParseResult> Parse(const std::shared_ptr<Buffer>& partial,
711                             const std::shared_ptr<Buffer>& completion,
712                             const std::shared_ptr<Buffer>& block, int64_t block_index,
713                             bool is_final) {
714     static constexpr int32_t max_num_rows = std::numeric_limits<int32_t>::max();
715     auto parser = std::make_shared<BlockParser>(
716         io_context_.pool(), parse_options_, num_csv_cols_, num_rows_seen_, max_num_rows);
717 
718     std::shared_ptr<Buffer> straddling;
719     std::vector<util::string_view> views;
720     if (partial->size() != 0 || completion->size() != 0) {
721       if (partial->size() == 0) {
722         straddling = completion;
723       } else if (completion->size() == 0) {
724         straddling = partial;
725       } else {
726         ARROW_ASSIGN_OR_RAISE(
727             straddling, ConcatenateBuffers({partial, completion}, io_context_.pool()));
728       }
729       views = {util::string_view(*straddling), util::string_view(*block)};
730     } else {
731       views = {util::string_view(*block)};
732     }
733     uint32_t parsed_size;
734     if (is_final) {
735       RETURN_NOT_OK(parser->ParseFinal(views, &parsed_size));
736     } else {
737       RETURN_NOT_OK(parser->Parse(views, &parsed_size));
738     }
739     if (count_rows_) {
740       num_rows_seen_ += parser->total_num_rows();
741     }
742     return ParseResult{std::move(parser), static_cast<int64_t>(parsed_size)};
743   }
744 
745   io::IOContext io_context_;
746   ReadOptions read_options_;
747   ParseOptions parse_options_;
748   ConvertOptions convert_options_;
749 
750   // Number of columns in the CSV file
751   int32_t num_csv_cols_ = -1;
752   // Whether num_rows_seen_ tracks the number of rows seen in the CSV being parsed
753   bool count_rows_;
754   // Number of rows seen in the csv. Not used if count_rows is false
755   int64_t num_rows_seen_;
756   // Column names in the CSV file
757   std::vector<std::string> column_names_;
758   ConversionSchema conversion_schema_;
759 
760   std::shared_ptr<io::InputStream> input_;
761   std::shared_ptr<internal::TaskGroup> task_group_;
762 };
763 
764 /////////////////////////////////////////////////////////////////////////
765 // Base class for one-shot table readers
766 
767 class BaseTableReader : public ReaderMixin, public csv::TableReader {
768  public:
769   using ReaderMixin::ReaderMixin;
770 
771   virtual Status Init() = 0;
772 
ReadAsync()773   Future<std::shared_ptr<Table>> ReadAsync() override {
774     return Future<std::shared_ptr<Table>>::MakeFinished(Read());
775   }
776 
777  protected:
778   // Make column builders from conversion schema
MakeColumnBuilders()779   Status MakeColumnBuilders() {
780     for (const auto& column : conversion_schema_.columns) {
781       std::shared_ptr<ColumnBuilder> builder;
782       if (column.is_missing) {
783         ARROW_ASSIGN_OR_RAISE(builder, ColumnBuilder::MakeNull(io_context_.pool(),
784                                                                column.type, task_group_));
785       } else if (column.type != nullptr) {
786         ARROW_ASSIGN_OR_RAISE(
787             builder, ColumnBuilder::Make(io_context_.pool(), column.type, column.index,
788                                          convert_options_, task_group_));
789       } else {
790         ARROW_ASSIGN_OR_RAISE(builder,
791                               ColumnBuilder::Make(io_context_.pool(), column.index,
792                                                   convert_options_, task_group_));
793       }
794       column_builders_.push_back(std::move(builder));
795     }
796     return Status::OK();
797   }
798 
ParseAndInsert(const std::shared_ptr<Buffer> & partial,const std::shared_ptr<Buffer> & completion,const std::shared_ptr<Buffer> & block,int64_t block_index,bool is_final)799   Result<int64_t> ParseAndInsert(const std::shared_ptr<Buffer>& partial,
800                                  const std::shared_ptr<Buffer>& completion,
801                                  const std::shared_ptr<Buffer>& block,
802                                  int64_t block_index, bool is_final) {
803     ARROW_ASSIGN_OR_RAISE(auto result,
804                           Parse(partial, completion, block, block_index, is_final));
805     RETURN_NOT_OK(ProcessData(result.parser, block_index));
806     return result.parsed_bytes;
807   }
808 
809   // Trigger conversion of parsed block data
ProcessData(const std::shared_ptr<BlockParser> & parser,int64_t block_index)810   Status ProcessData(const std::shared_ptr<BlockParser>& parser, int64_t block_index) {
811     for (auto& builder : column_builders_) {
812       builder->Insert(block_index, parser);
813     }
814     return Status::OK();
815   }
816 
MakeTable()817   Result<std::shared_ptr<Table>> MakeTable() {
818     DCHECK_EQ(column_builders_.size(), conversion_schema_.columns.size());
819 
820     std::vector<std::shared_ptr<Field>> fields;
821     std::vector<std::shared_ptr<ChunkedArray>> columns;
822 
823     for (int32_t i = 0; i < static_cast<int32_t>(column_builders_.size()); ++i) {
824       const auto& column = conversion_schema_.columns[i];
825       ARROW_ASSIGN_OR_RAISE(auto array, column_builders_[i]->Finish());
826       fields.push_back(::arrow::field(column.name, array->type()));
827       columns.emplace_back(std::move(array));
828     }
829     return Table::Make(schema(std::move(fields)), std::move(columns));
830   }
831 
832   // Column builders for target Table (in ConversionSchema order)
833   std::vector<std::shared_ptr<ColumnBuilder>> column_builders_;
834 };
835 
836 /////////////////////////////////////////////////////////////////////////
837 // Base class for streaming readers
838 
839 class StreamingReaderImpl : public ReaderMixin,
840                             public csv::StreamingReader,
841                             public std::enable_shared_from_this<StreamingReaderImpl> {
842  public:
StreamingReaderImpl(io::IOContext io_context,std::shared_ptr<io::InputStream> input,const ReadOptions & read_options,const ParseOptions & parse_options,const ConvertOptions & convert_options,bool count_rows)843   StreamingReaderImpl(io::IOContext io_context, std::shared_ptr<io::InputStream> input,
844                       const ReadOptions& read_options, const ParseOptions& parse_options,
845                       const ConvertOptions& convert_options, bool count_rows)
846       : ReaderMixin(io_context, std::move(input), read_options, parse_options,
847                     convert_options, count_rows),
848         bytes_decoded_(std::make_shared<std::atomic<int64_t>>(0)) {}
849 
Init(Executor * cpu_executor)850   Future<> Init(Executor* cpu_executor) {
851     ARROW_ASSIGN_OR_RAISE(auto istream_it,
852                           io::MakeInputStreamIterator(input_, read_options_.block_size));
853 
854     // TODO Consider exposing readahead as a read option (ARROW-12090)
855     ARROW_ASSIGN_OR_RAISE(auto bg_it, MakeBackgroundGenerator(std::move(istream_it),
856                                                               io_context_.executor()));
857 
858     auto transferred_it = MakeTransferredGenerator(bg_it, cpu_executor);
859 
860     auto buffer_generator = CSVBufferIterator::MakeAsync(std::move(transferred_it));
861 
862     int max_readahead = cpu_executor->GetCapacity();
863     auto self = shared_from_this();
864 
865     return buffer_generator().Then([self, buffer_generator, max_readahead](
866                                        const std::shared_ptr<Buffer>& first_buffer) {
867       return self->InitAfterFirstBuffer(first_buffer, buffer_generator, max_readahead);
868     });
869   }
870 
schema() const871   std::shared_ptr<Schema> schema() const override { return schema_; }
872 
bytes_read() const873   int64_t bytes_read() const override { return bytes_decoded_->load(); }
874 
ReadNext(std::shared_ptr<RecordBatch> * batch)875   Status ReadNext(std::shared_ptr<RecordBatch>* batch) override {
876     auto next_fut = ReadNextAsync();
877     auto next_result = next_fut.result();
878     return std::move(next_result).Value(batch);
879   }
880 
ReadNextAsync()881   Future<std::shared_ptr<RecordBatch>> ReadNextAsync() override {
882     return record_batch_gen_();
883   }
884 
885  protected:
InitAfterFirstBuffer(const std::shared_ptr<Buffer> & first_buffer,AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator,int max_readahead)886   Future<> InitAfterFirstBuffer(const std::shared_ptr<Buffer>& first_buffer,
887                                 AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator,
888                                 int max_readahead) {
889     if (first_buffer == nullptr) {
890       return Status::Invalid("Empty CSV file");
891     }
892 
893     std::shared_ptr<Buffer> after_header;
894     ARROW_ASSIGN_OR_RAISE(auto header_bytes_consumed,
895                           ProcessHeader(first_buffer, &after_header));
896     bytes_decoded_->fetch_add(header_bytes_consumed);
897 
898     auto parser_op =
899         BlockParsingOperator(io_context_, parse_options_, num_csv_cols_, num_rows_seen_);
900     ARROW_ASSIGN_OR_RAISE(
901         auto decoder_op,
902         BlockDecodingOperator::Make(io_context_, convert_options_, conversion_schema_));
903 
904     auto block_gen = SerialBlockReader::MakeAsyncIterator(
905         std::move(buffer_generator), MakeChunker(parse_options_), std::move(after_header),
906         read_options_.skip_rows_after_names);
907     auto parsed_block_gen =
908         MakeMappedGenerator(std::move(block_gen), std::move(parser_op));
909     auto rb_gen = MakeMappedGenerator(std::move(parsed_block_gen), std::move(decoder_op));
910 
911     auto self = shared_from_this();
912     return rb_gen().Then([self, rb_gen, max_readahead](const DecodedBlock& first_block) {
913       return self->InitFromBlock(first_block, std::move(rb_gen), max_readahead, 0);
914     });
915   }
916 
InitFromBlock(const DecodedBlock & block,AsyncGenerator<DecodedBlock> batch_gen,int max_readahead,int64_t prev_bytes_processed)917   Future<> InitFromBlock(const DecodedBlock& block,
918                          AsyncGenerator<DecodedBlock> batch_gen, int max_readahead,
919                          int64_t prev_bytes_processed) {
920     if (!block.record_batch) {
921       // End of file just return null batches
922       record_batch_gen_ = MakeEmptyGenerator<std::shared_ptr<RecordBatch>>();
923       return Status::OK();
924     }
925 
926     schema_ = block.record_batch->schema();
927 
928     if (block.record_batch->num_rows() == 0) {
929       // Keep consuming blocks until the first non empty block is found
930       auto self = shared_from_this();
931       prev_bytes_processed += block.bytes_processed;
932       return batch_gen().Then([self, batch_gen, max_readahead,
933                                prev_bytes_processed](const DecodedBlock& next_block) {
934         return self->InitFromBlock(next_block, std::move(batch_gen), max_readahead,
935                                    prev_bytes_processed);
936       });
937     }
938 
939     AsyncGenerator<DecodedBlock> readahead_gen;
940     if (read_options_.use_threads) {
941       readahead_gen = MakeReadaheadGenerator(std::move(batch_gen), max_readahead);
942     } else {
943       readahead_gen = std::move(batch_gen);
944     }
945 
946     AsyncGenerator<DecodedBlock> restarted_gen =
947         MakeGeneratorStartsWith({block}, std::move(readahead_gen));
948 
949     auto bytes_decoded = bytes_decoded_;
950     auto unwrap_and_record_bytes =
951         [bytes_decoded, prev_bytes_processed](
952             const DecodedBlock& block) mutable -> Result<std::shared_ptr<RecordBatch>> {
953       bytes_decoded->fetch_add(block.bytes_processed + prev_bytes_processed);
954       prev_bytes_processed = 0;
955       return block.record_batch;
956     };
957 
958     auto unwrapped =
959         MakeMappedGenerator(std::move(restarted_gen), std::move(unwrap_and_record_bytes));
960 
961     record_batch_gen_ = MakeCancellable(std::move(unwrapped), io_context_.stop_token());
962     return Status::OK();
963   }
964 
965   std::shared_ptr<Schema> schema_;
966   AsyncGenerator<std::shared_ptr<RecordBatch>> record_batch_gen_;
967   // bytes which have been decoded and asked for by the caller
968   std::shared_ptr<std::atomic<int64_t>> bytes_decoded_;
969 };
970 
971 /////////////////////////////////////////////////////////////////////////
972 // Serial TableReader implementation
973 
974 class SerialTableReader : public BaseTableReader {
975  public:
976   using BaseTableReader::BaseTableReader;
977 
Init()978   Status Init() override {
979     ARROW_ASSIGN_OR_RAISE(auto istream_it,
980                           io::MakeInputStreamIterator(input_, read_options_.block_size));
981 
982     // Since we're converting serially, no need to readahead more than one block
983     int32_t block_queue_size = 1;
984     ARROW_ASSIGN_OR_RAISE(auto rh_it,
985                           MakeReadaheadIterator(std::move(istream_it), block_queue_size));
986     buffer_iterator_ = CSVBufferIterator::Make(std::move(rh_it));
987     return Status::OK();
988   }
989 
Read()990   Result<std::shared_ptr<Table>> Read() override {
991     task_group_ = internal::TaskGroup::MakeSerial(io_context_.stop_token());
992 
993     // First block
994     ARROW_ASSIGN_OR_RAISE(auto first_buffer, buffer_iterator_.Next());
995     if (first_buffer == nullptr) {
996       return Status::Invalid("Empty CSV file");
997     }
998     RETURN_NOT_OK(ProcessHeader(first_buffer, &first_buffer));
999     RETURN_NOT_OK(MakeColumnBuilders());
1000 
1001     auto block_iterator = SerialBlockReader::MakeIterator(
1002         std::move(buffer_iterator_), MakeChunker(parse_options_), std::move(first_buffer),
1003         read_options_.skip_rows_after_names);
1004     while (true) {
1005       RETURN_NOT_OK(io_context_.stop_token().Poll());
1006 
1007       ARROW_ASSIGN_OR_RAISE(auto maybe_block, block_iterator.Next());
1008       if (IsIterationEnd(maybe_block)) {
1009         // EOF
1010         break;
1011       }
1012       ARROW_ASSIGN_OR_RAISE(
1013           int64_t parsed_bytes,
1014           ParseAndInsert(maybe_block.partial, maybe_block.completion, maybe_block.buffer,
1015                          maybe_block.block_index, maybe_block.is_final));
1016       RETURN_NOT_OK(maybe_block.consume_bytes(parsed_bytes));
1017     }
1018     // Finish conversion, create schema and table
1019     RETURN_NOT_OK(task_group_->Finish());
1020     return MakeTable();
1021   }
1022 
1023  protected:
1024   Iterator<std::shared_ptr<Buffer>> buffer_iterator_;
1025 };
1026 
1027 class AsyncThreadedTableReader
1028     : public BaseTableReader,
1029       public std::enable_shared_from_this<AsyncThreadedTableReader> {
1030  public:
1031   using BaseTableReader::BaseTableReader;
1032 
AsyncThreadedTableReader(io::IOContext io_context,std::shared_ptr<io::InputStream> input,const ReadOptions & read_options,const ParseOptions & parse_options,const ConvertOptions & convert_options,Executor * cpu_executor)1033   AsyncThreadedTableReader(io::IOContext io_context,
1034                            std::shared_ptr<io::InputStream> input,
1035                            const ReadOptions& read_options,
1036                            const ParseOptions& parse_options,
1037                            const ConvertOptions& convert_options, Executor* cpu_executor)
1038       // Count rows is currently not supported during parallel read
1039       : BaseTableReader(std::move(io_context), input, read_options, parse_options,
1040                         convert_options, /*count_rows=*/false),
1041         cpu_executor_(cpu_executor) {}
1042 
~AsyncThreadedTableReader()1043   ~AsyncThreadedTableReader() override {
1044     if (task_group_) {
1045       // In case of error, make sure all pending tasks are finished before
1046       // we start destroying BaseTableReader members
1047       ARROW_UNUSED(task_group_->Finish());
1048     }
1049   }
1050 
Init()1051   Status Init() override {
1052     ARROW_ASSIGN_OR_RAISE(auto istream_it,
1053                           io::MakeInputStreamIterator(input_, read_options_.block_size));
1054 
1055     int max_readahead = cpu_executor_->GetCapacity();
1056     int readahead_restart = std::max(1, max_readahead / 2);
1057 
1058     ARROW_ASSIGN_OR_RAISE(
1059         auto bg_it, MakeBackgroundGenerator(std::move(istream_it), io_context_.executor(),
1060                                             max_readahead, readahead_restart));
1061 
1062     auto transferred_it = MakeTransferredGenerator(bg_it, cpu_executor_);
1063     buffer_generator_ = CSVBufferIterator::MakeAsync(std::move(transferred_it));
1064     return Status::OK();
1065   }
1066 
Read()1067   Result<std::shared_ptr<Table>> Read() override { return ReadAsync().result(); }
1068 
ReadAsync()1069   Future<std::shared_ptr<Table>> ReadAsync() override {
1070     task_group_ =
1071         internal::TaskGroup::MakeThreaded(cpu_executor_, io_context_.stop_token());
1072 
1073     auto self = shared_from_this();
1074     return ProcessFirstBuffer().Then([self](const std::shared_ptr<Buffer>& first_buffer) {
1075       auto block_generator = ThreadedBlockReader::MakeAsyncIterator(
1076           self->buffer_generator_, MakeChunker(self->parse_options_),
1077           std::move(first_buffer), self->read_options_.skip_rows_after_names);
1078 
1079       std::function<Status(CSVBlock)> block_visitor =
1080           [self](CSVBlock maybe_block) -> Status {
1081         // The logic in VisitAsyncGenerator ensures that we will never be
1082         // passed an empty block (visit does not call with the end token) so
1083         // we can be assured maybe_block has a value.
1084         DCHECK_GE(maybe_block.block_index, 0);
1085         DCHECK(!maybe_block.consume_bytes);
1086 
1087         // Launch parse task
1088         self->task_group_->Append([self, maybe_block] {
1089           return self
1090               ->ParseAndInsert(maybe_block.partial, maybe_block.completion,
1091                                maybe_block.buffer, maybe_block.block_index,
1092                                maybe_block.is_final)
1093               .status();
1094         });
1095         return Status::OK();
1096       };
1097 
1098       return VisitAsyncGenerator(std::move(block_generator), block_visitor)
1099           .Then([self]() -> Future<> {
1100             // By this point we've added all top level tasks so it is safe to call
1101             // FinishAsync
1102             return self->task_group_->FinishAsync();
1103           })
1104           .Then([self]() -> Result<std::shared_ptr<Table>> {
1105             // Finish conversion, create schema and table
1106             return self->MakeTable();
1107           });
1108     });
1109   }
1110 
1111  protected:
ProcessFirstBuffer()1112   Future<std::shared_ptr<Buffer>> ProcessFirstBuffer() {
1113     // First block
1114     auto first_buffer_future = buffer_generator_();
1115     return first_buffer_future.Then([this](const std::shared_ptr<Buffer>& first_buffer)
1116                                         -> Result<std::shared_ptr<Buffer>> {
1117       if (first_buffer == nullptr) {
1118         return Status::Invalid("Empty CSV file");
1119       }
1120       std::shared_ptr<Buffer> first_buffer_processed;
1121       RETURN_NOT_OK(ProcessHeader(first_buffer, &first_buffer_processed));
1122       RETURN_NOT_OK(MakeColumnBuilders());
1123       return first_buffer_processed;
1124     });
1125   }
1126 
1127   Executor* cpu_executor_;
1128   AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator_;
1129 };
1130 
MakeTableReader(MemoryPool * pool,io::IOContext io_context,std::shared_ptr<io::InputStream> input,const ReadOptions & read_options,const ParseOptions & parse_options,const ConvertOptions & convert_options)1131 Result<std::shared_ptr<TableReader>> MakeTableReader(
1132     MemoryPool* pool, io::IOContext io_context, std::shared_ptr<io::InputStream> input,
1133     const ReadOptions& read_options, const ParseOptions& parse_options,
1134     const ConvertOptions& convert_options) {
1135   RETURN_NOT_OK(parse_options.Validate());
1136   RETURN_NOT_OK(read_options.Validate());
1137   RETURN_NOT_OK(convert_options.Validate());
1138   std::shared_ptr<BaseTableReader> reader;
1139   if (read_options.use_threads) {
1140     auto cpu_executor = internal::GetCpuThreadPool();
1141     reader = std::make_shared<AsyncThreadedTableReader>(
1142         io_context, input, read_options, parse_options, convert_options, cpu_executor);
1143   } else {
1144     reader = std::make_shared<SerialTableReader>(io_context, input, read_options,
1145                                                  parse_options, convert_options,
1146                                                  /*count_rows=*/true);
1147   }
1148   RETURN_NOT_OK(reader->Init());
1149   return reader;
1150 }
1151 
MakeStreamingReader(io::IOContext io_context,std::shared_ptr<io::InputStream> input,internal::Executor * cpu_executor,const ReadOptions & read_options,const ParseOptions & parse_options,const ConvertOptions & convert_options)1152 Future<std::shared_ptr<StreamingReader>> MakeStreamingReader(
1153     io::IOContext io_context, std::shared_ptr<io::InputStream> input,
1154     internal::Executor* cpu_executor, const ReadOptions& read_options,
1155     const ParseOptions& parse_options, const ConvertOptions& convert_options) {
1156   RETURN_NOT_OK(parse_options.Validate());
1157   RETURN_NOT_OK(read_options.Validate());
1158   RETURN_NOT_OK(convert_options.Validate());
1159   std::shared_ptr<StreamingReaderImpl> reader;
1160   reader = std::make_shared<StreamingReaderImpl>(
1161       io_context, input, read_options, parse_options, convert_options,
1162       /*count_rows=*/!read_options.use_threads || cpu_executor->GetCapacity() == 1);
1163   return reader->Init(cpu_executor).Then([reader] {
1164     return std::dynamic_pointer_cast<StreamingReader>(reader);
1165   });
1166 }
1167 
1168 /////////////////////////////////////////////////////////////////////////
1169 // Row count implementation
1170 
1171 class CSVRowCounter : public ReaderMixin,
1172                       public std::enable_shared_from_this<CSVRowCounter> {
1173  public:
CSVRowCounter(io::IOContext io_context,Executor * cpu_executor,std::shared_ptr<io::InputStream> input,const ReadOptions & read_options,const ParseOptions & parse_options)1174   CSVRowCounter(io::IOContext io_context, Executor* cpu_executor,
1175                 std::shared_ptr<io::InputStream> input, const ReadOptions& read_options,
1176                 const ParseOptions& parse_options)
1177       : ReaderMixin(io_context, std::move(input), read_options, parse_options,
1178                     ConvertOptions::Defaults(), /*count_rows=*/true),
1179         cpu_executor_(cpu_executor),
1180         row_count_(0) {}
1181 
Count()1182   Future<int64_t> Count() {
1183     auto self = shared_from_this();
1184     return Init(self).Then([self]() { return self->DoCount(self); });
1185   }
1186 
1187  private:
Init(const std::shared_ptr<CSVRowCounter> & self)1188   Future<> Init(const std::shared_ptr<CSVRowCounter>& self) {
1189     ARROW_ASSIGN_OR_RAISE(auto istream_it,
1190                           io::MakeInputStreamIterator(input_, read_options_.block_size));
1191     // TODO Consider exposing readahead as a read option (ARROW-12090)
1192     ARROW_ASSIGN_OR_RAISE(auto bg_it, MakeBackgroundGenerator(std::move(istream_it),
1193                                                               io_context_.executor()));
1194     auto transferred_it = MakeTransferredGenerator(bg_it, cpu_executor_);
1195     auto buffer_generator = CSVBufferIterator::MakeAsync(std::move(transferred_it));
1196 
1197     return buffer_generator().Then(
1198         [self, buffer_generator](std::shared_ptr<Buffer> first_buffer) {
1199           if (!first_buffer) {
1200             return Status::Invalid("Empty CSV file");
1201           }
1202           RETURN_NOT_OK(self->ProcessHeader(first_buffer, &first_buffer));
1203           self->block_generator_ = SerialBlockReader::MakeAsyncIterator(
1204               buffer_generator, MakeChunker(self->parse_options_),
1205               std::move(first_buffer), 0);
1206           return Status::OK();
1207         });
1208   }
1209 
DoCount(const std::shared_ptr<CSVRowCounter> & self)1210   Future<int64_t> DoCount(const std::shared_ptr<CSVRowCounter>& self) {
1211     // count_cb must return a value instead of Status/Future<> to work with
1212     // MakeMappedGenerator, and it must use a type with a valid end value to work with
1213     // IterationEnd.
1214     std::function<Result<util::optional<int64_t>>(const CSVBlock&)> count_cb =
1215         [self](const CSVBlock& maybe_block) -> Result<util::optional<int64_t>> {
1216       ARROW_ASSIGN_OR_RAISE(
1217           auto parser,
1218           self->Parse(maybe_block.partial, maybe_block.completion, maybe_block.buffer,
1219                       maybe_block.block_index, maybe_block.is_final));
1220       RETURN_NOT_OK(maybe_block.consume_bytes(parser.parsed_bytes));
1221       int32_t total_row_count = parser.parser->total_num_rows();
1222       self->row_count_ += total_row_count;
1223       return total_row_count;
1224     };
1225     auto count_gen = MakeMappedGenerator(block_generator_, std::move(count_cb));
1226     return DiscardAllFromAsyncGenerator(count_gen).Then(
1227         [self]() { return self->row_count_; });
1228   }
1229 
1230   Executor* cpu_executor_;
1231   AsyncGenerator<CSVBlock> block_generator_;
1232   int64_t row_count_;
1233 };
1234 
1235 }  // namespace
1236 
1237 /////////////////////////////////////////////////////////////////////////
1238 // Factory functions
1239 
Make(io::IOContext io_context,std::shared_ptr<io::InputStream> input,const ReadOptions & read_options,const ParseOptions & parse_options,const ConvertOptions & convert_options)1240 Result<std::shared_ptr<TableReader>> TableReader::Make(
1241     io::IOContext io_context, std::shared_ptr<io::InputStream> input,
1242     const ReadOptions& read_options, const ParseOptions& parse_options,
1243     const ConvertOptions& convert_options) {
1244   return MakeTableReader(io_context.pool(), io_context, std::move(input), read_options,
1245                          parse_options, convert_options);
1246 }
1247 
Make(MemoryPool * pool,io::IOContext io_context,std::shared_ptr<io::InputStream> input,const ReadOptions & read_options,const ParseOptions & parse_options,const ConvertOptions & convert_options)1248 Result<std::shared_ptr<TableReader>> TableReader::Make(
1249     MemoryPool* pool, io::IOContext io_context, std::shared_ptr<io::InputStream> input,
1250     const ReadOptions& read_options, const ParseOptions& parse_options,
1251     const ConvertOptions& convert_options) {
1252   return MakeTableReader(pool, io_context, std::move(input), read_options, parse_options,
1253                          convert_options);
1254 }
1255 
Make(MemoryPool * pool,std::shared_ptr<io::InputStream> input,const ReadOptions & read_options,const ParseOptions & parse_options,const ConvertOptions & convert_options)1256 Result<std::shared_ptr<StreamingReader>> StreamingReader::Make(
1257     MemoryPool* pool, std::shared_ptr<io::InputStream> input,
1258     const ReadOptions& read_options, const ParseOptions& parse_options,
1259     const ConvertOptions& convert_options) {
1260   auto io_context = io::IOContext(pool);
1261   auto cpu_executor = internal::GetCpuThreadPool();
1262   auto reader_fut = MakeStreamingReader(io_context, std::move(input), cpu_executor,
1263                                         read_options, parse_options, convert_options);
1264   auto reader_result = reader_fut.result();
1265   ARROW_ASSIGN_OR_RAISE(auto reader, reader_result);
1266   return reader;
1267 }
1268 
Make(io::IOContext io_context,std::shared_ptr<io::InputStream> input,const ReadOptions & read_options,const ParseOptions & parse_options,const ConvertOptions & convert_options)1269 Result<std::shared_ptr<StreamingReader>> StreamingReader::Make(
1270     io::IOContext io_context, std::shared_ptr<io::InputStream> input,
1271     const ReadOptions& read_options, const ParseOptions& parse_options,
1272     const ConvertOptions& convert_options) {
1273   auto cpu_executor = internal::GetCpuThreadPool();
1274   auto reader_fut = MakeStreamingReader(io_context, std::move(input), cpu_executor,
1275                                         read_options, parse_options, convert_options);
1276   auto reader_result = reader_fut.result();
1277   ARROW_ASSIGN_OR_RAISE(auto reader, reader_result);
1278   return reader;
1279 }
1280 
MakeAsync(io::IOContext io_context,std::shared_ptr<io::InputStream> input,internal::Executor * cpu_executor,const ReadOptions & read_options,const ParseOptions & parse_options,const ConvertOptions & convert_options)1281 Future<std::shared_ptr<StreamingReader>> StreamingReader::MakeAsync(
1282     io::IOContext io_context, std::shared_ptr<io::InputStream> input,
1283     internal::Executor* cpu_executor, const ReadOptions& read_options,
1284     const ParseOptions& parse_options, const ConvertOptions& convert_options) {
1285   return MakeStreamingReader(io_context, std::move(input), cpu_executor, read_options,
1286                              parse_options, convert_options);
1287 }
1288 
CountRowsAsync(io::IOContext io_context,std::shared_ptr<io::InputStream> input,internal::Executor * cpu_executor,const ReadOptions & read_options,const ParseOptions & parse_options)1289 Future<int64_t> CountRowsAsync(io::IOContext io_context,
1290                                std::shared_ptr<io::InputStream> input,
1291                                internal::Executor* cpu_executor,
1292                                const ReadOptions& read_options,
1293                                const ParseOptions& parse_options) {
1294   RETURN_NOT_OK(parse_options.Validate());
1295   RETURN_NOT_OK(read_options.Validate());
1296   auto counter = std::make_shared<CSVRowCounter>(
1297       io_context, cpu_executor, std::move(input), read_options, parse_options);
1298   return counter->Count();
1299 }
1300 
1301 }  // namespace csv
1302 
1303 }  // namespace arrow
1304