1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include "arrow/dataset/file_base.h"
19 
20 #include <arrow/compute/exec/exec_plan.h>
21 
22 #include <algorithm>
23 #include <unordered_map>
24 #include <vector>
25 
26 #include "arrow/compute/exec/forest_internal.h"
27 #include "arrow/compute/exec/subtree_internal.h"
28 #include "arrow/dataset/dataset_internal.h"
29 #include "arrow/dataset/dataset_writer.h"
30 #include "arrow/dataset/scanner.h"
31 #include "arrow/dataset/scanner_internal.h"
32 #include "arrow/filesystem/filesystem.h"
33 #include "arrow/filesystem/path_util.h"
34 #include "arrow/io/compressed.h"
35 #include "arrow/io/interfaces.h"
36 #include "arrow/io/memory.h"
37 #include "arrow/util/compression.h"
38 #include "arrow/util/iterator.h"
39 #include "arrow/util/macros.h"
40 #include "arrow/util/make_unique.h"
41 #include "arrow/util/map.h"
42 #include "arrow/util/string.h"
43 #include "arrow/util/task_group.h"
44 #include "arrow/util/variant.h"
45 
46 namespace arrow {
47 
48 using internal::checked_pointer_cast;
49 
50 namespace dataset {
51 
Open() const52 Result<std::shared_ptr<io::RandomAccessFile>> FileSource::Open() const {
53   if (filesystem_) {
54     return filesystem_->OpenInputFile(file_info_);
55   }
56 
57   if (buffer_) {
58     return std::make_shared<io::BufferReader>(buffer_);
59   }
60 
61   return custom_open_();
62 }
63 
OpenCompressed(util::optional<Compression::type> compression) const64 Result<std::shared_ptr<io::InputStream>> FileSource::OpenCompressed(
65     util::optional<Compression::type> compression) const {
66   ARROW_ASSIGN_OR_RAISE(auto file, Open());
67   auto actual_compression = Compression::type::UNCOMPRESSED;
68   if (!compression.has_value()) {
69     // Guess compression from file extension
70     auto extension = fs::internal::GetAbstractPathExtension(path());
71     if (extension == "gz") {
72       actual_compression = Compression::type::GZIP;
73     } else {
74       auto maybe_compression = util::Codec::GetCompressionType(extension);
75       if (maybe_compression.ok()) {
76         ARROW_ASSIGN_OR_RAISE(actual_compression, maybe_compression);
77       }
78     }
79   } else {
80     actual_compression = compression.value();
81   }
82   if (actual_compression == Compression::type::UNCOMPRESSED) {
83     return file;
84   }
85   ARROW_ASSIGN_OR_RAISE(auto codec, util::Codec::Create(actual_compression));
86   return io::CompressedInputStream::Make(codec.get(), std::move(file));
87 }
88 
CountRows(const std::shared_ptr<FileFragment> &,compute::Expression,const std::shared_ptr<ScanOptions> &)89 Future<util::optional<int64_t>> FileFormat::CountRows(
90     const std::shared_ptr<FileFragment>&, compute::Expression,
91     const std::shared_ptr<ScanOptions>&) {
92   return Future<util::optional<int64_t>>::MakeFinished(util::nullopt);
93 }
94 
MakeFragment(FileSource source,std::shared_ptr<Schema> physical_schema)95 Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment(
96     FileSource source, std::shared_ptr<Schema> physical_schema) {
97   return MakeFragment(std::move(source), compute::literal(true),
98                       std::move(physical_schema));
99 }
100 
MakeFragment(FileSource source,compute::Expression partition_expression)101 Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment(
102     FileSource source, compute::Expression partition_expression) {
103   return MakeFragment(std::move(source), std::move(partition_expression), nullptr);
104 }
105 
MakeFragment(FileSource source,compute::Expression partition_expression,std::shared_ptr<Schema> physical_schema)106 Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment(
107     FileSource source, compute::Expression partition_expression,
108     std::shared_ptr<Schema> physical_schema) {
109   return std::shared_ptr<FileFragment>(
110       new FileFragment(std::move(source), shared_from_this(),
111                        std::move(partition_expression), std::move(physical_schema)));
112 }
113 
114 // The following implementation of ScanBatchesAsync is both ugly and terribly inefficient.
115 // Each of the formats should provide their own efficient implementation.  However, this
116 // is a reasonable starting point or implementation for a dummy/mock format.
ScanBatchesAsync(const std::shared_ptr<ScanOptions> & scan_options,const std::shared_ptr<FileFragment> & file) const117 Result<RecordBatchGenerator> FileFormat::ScanBatchesAsync(
118     const std::shared_ptr<ScanOptions>& scan_options,
119     const std::shared_ptr<FileFragment>& file) const {
120   ARROW_ASSIGN_OR_RAISE(auto scan_task_it, ScanFile(scan_options, file));
121   struct State {
122     State(std::shared_ptr<ScanOptions> scan_options, ScanTaskIterator scan_task_it)
123         : scan_options(std::move(scan_options)),
124           scan_task_it(std::move(scan_task_it)),
125           current_rb_it(),
126           finished(false) {}
127 
128     std::shared_ptr<ScanOptions> scan_options;
129     ScanTaskIterator scan_task_it;
130     RecordBatchIterator current_rb_it;
131     bool finished;
132   };
133   struct Generator {
134     Future<std::shared_ptr<RecordBatch>> operator()() {
135       while (!state->finished) {
136         if (!state->current_rb_it) {
137           RETURN_NOT_OK(PumpScanTask());
138           if (state->finished) {
139             return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
140           }
141         }
142         ARROW_ASSIGN_OR_RAISE(auto next_batch, state->current_rb_it.Next());
143         if (IsIterationEnd(next_batch)) {
144           state->current_rb_it = RecordBatchIterator();
145         } else {
146           return Future<std::shared_ptr<RecordBatch>>::MakeFinished(next_batch);
147         }
148       }
149       return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
150     }
151     Status PumpScanTask() {
152       ARROW_ASSIGN_OR_RAISE(auto next_task, state->scan_task_it.Next());
153       if (IsIterationEnd(next_task)) {
154         state->finished = true;
155       } else {
156         ARROW_ASSIGN_OR_RAISE(state->current_rb_it, next_task->Execute());
157       }
158       return Status::OK();
159     }
160     std::shared_ptr<State> state;
161   };
162   return Generator{std::make_shared<State>(scan_options, std::move(scan_task_it))};
163 }
164 
ReadPhysicalSchemaImpl()165 Result<std::shared_ptr<Schema>> FileFragment::ReadPhysicalSchemaImpl() {
166   return format_->Inspect(source_);
167 }
168 
Scan(std::shared_ptr<ScanOptions> options)169 Result<ScanTaskIterator> FileFragment::Scan(std::shared_ptr<ScanOptions> options) {
170   auto self = std::dynamic_pointer_cast<FileFragment>(shared_from_this());
171   return format_->ScanFile(options, self);
172 }
173 
ScanBatchesAsync(const std::shared_ptr<ScanOptions> & options)174 Result<RecordBatchGenerator> FileFragment::ScanBatchesAsync(
175     const std::shared_ptr<ScanOptions>& options) {
176   auto self = std::dynamic_pointer_cast<FileFragment>(shared_from_this());
177   return format_->ScanBatchesAsync(options, self);
178 }
179 
CountRows(compute::Expression predicate,const std::shared_ptr<ScanOptions> & options)180 Future<util::optional<int64_t>> FileFragment::CountRows(
181     compute::Expression predicate, const std::shared_ptr<ScanOptions>& options) {
182   ARROW_ASSIGN_OR_RAISE(predicate, compute::SimplifyWithGuarantee(std::move(predicate),
183                                                                   partition_expression_));
184   if (!predicate.IsSatisfiable()) {
185     return Future<util::optional<int64_t>>::MakeFinished(0);
186   }
187   auto self = checked_pointer_cast<FileFragment>(shared_from_this());
188   return format()->CountRows(self, std::move(predicate), options);
189 }
190 
191 struct FileSystemDataset::FragmentSubtrees {
192   // Forest for skipping fragments based on extracted subtree expressions
193   compute::Forest forest;
194   // fragment indices and subtree expressions in forest order
195   std::vector<util::Variant<int, compute::Expression>> fragments_and_subtrees;
196 };
197 
Make(std::shared_ptr<Schema> schema,compute::Expression root_partition,std::shared_ptr<FileFormat> format,std::shared_ptr<fs::FileSystem> filesystem,std::vector<std::shared_ptr<FileFragment>> fragments,std::shared_ptr<Partitioning> partitioning)198 Result<std::shared_ptr<FileSystemDataset>> FileSystemDataset::Make(
199     std::shared_ptr<Schema> schema, compute::Expression root_partition,
200     std::shared_ptr<FileFormat> format, std::shared_ptr<fs::FileSystem> filesystem,
201     std::vector<std::shared_ptr<FileFragment>> fragments,
202     std::shared_ptr<Partitioning> partitioning) {
203   std::shared_ptr<FileSystemDataset> out(
204       new FileSystemDataset(std::move(schema), std::move(root_partition)));
205   out->format_ = std::move(format);
206   out->filesystem_ = std::move(filesystem);
207   out->fragments_ = std::move(fragments);
208   out->partitioning_ = std::move(partitioning);
209   out->SetupSubtreePruning();
210   return out;
211 }
212 
ReplaceSchema(std::shared_ptr<Schema> schema) const213 Result<std::shared_ptr<Dataset>> FileSystemDataset::ReplaceSchema(
214     std::shared_ptr<Schema> schema) const {
215   RETURN_NOT_OK(CheckProjectable(*schema_, *schema));
216   return Make(std::move(schema), partition_expression_, format_, filesystem_, fragments_);
217 }
218 
files() const219 std::vector<std::string> FileSystemDataset::files() const {
220   std::vector<std::string> files;
221 
222   for (const auto& fragment : fragments_) {
223     files.push_back(fragment->source().path());
224   }
225 
226   return files;
227 }
228 
ToString() const229 std::string FileSystemDataset::ToString() const {
230   std::string repr = "FileSystemDataset:";
231 
232   if (fragments_.empty()) {
233     return repr + " []";
234   }
235 
236   for (const auto& fragment : fragments_) {
237     repr += "\n" + fragment->source().path();
238 
239     const auto& partition = fragment->partition_expression();
240     if (partition != compute::literal(true)) {
241       repr += ": " + partition.ToString();
242     }
243   }
244 
245   return repr;
246 }
247 
SetupSubtreePruning()248 void FileSystemDataset::SetupSubtreePruning() {
249   subtrees_ = std::make_shared<FragmentSubtrees>();
250   compute::SubtreeImpl impl;
251 
252   auto encoded = impl.EncodeGuarantees(
253       [&](int index) { return fragments_[index]->partition_expression(); },
254       static_cast<int>(fragments_.size()));
255 
256   std::sort(encoded.begin(), encoded.end(), compute::SubtreeImpl::ByGuarantee());
257 
258   for (const auto& e : encoded) {
259     if (e.index) {
260       subtrees_->fragments_and_subtrees.emplace_back(*e.index);
261     } else {
262       subtrees_->fragments_and_subtrees.emplace_back(impl.GetSubtreeExpression(e));
263     }
264   }
265 
266   subtrees_->forest = compute::Forest(static_cast<int>(encoded.size()),
267                                       compute::SubtreeImpl::IsAncestor{encoded});
268 }
269 
GetFragmentsImpl(compute::Expression predicate)270 Result<FragmentIterator> FileSystemDataset::GetFragmentsImpl(
271     compute::Expression predicate) {
272   if (predicate == compute::literal(true)) {
273     // trivial predicate; skip subtree pruning
274     return MakeVectorIterator(FragmentVector(fragments_.begin(), fragments_.end()));
275   }
276 
277   std::vector<int> fragment_indices;
278 
279   std::vector<compute::Expression> predicates{predicate};
280   RETURN_NOT_OK(subtrees_->forest.Visit(
281       [&](compute::Forest::Ref ref) -> Result<bool> {
282         if (auto fragment_index =
283                 util::get_if<int>(&subtrees_->fragments_and_subtrees[ref.i])) {
284           fragment_indices.push_back(*fragment_index);
285           return false;
286         }
287 
288         const auto& subtree_expr =
289             util::get<compute::Expression>(subtrees_->fragments_and_subtrees[ref.i]);
290         ARROW_ASSIGN_OR_RAISE(auto simplified,
291                               SimplifyWithGuarantee(predicates.back(), subtree_expr));
292 
293         if (!simplified.IsSatisfiable()) {
294           return false;
295         }
296 
297         predicates.push_back(std::move(simplified));
298         return true;
299       },
300       [&](compute::Forest::Ref ref) { predicates.pop_back(); }));
301 
302   std::sort(fragment_indices.begin(), fragment_indices.end());
303 
304   FragmentVector fragments(fragment_indices.size());
305   std::transform(fragment_indices.begin(), fragment_indices.end(), fragments.begin(),
306                  [this](int i) { return fragments_[i]; });
307 
308   return MakeVectorIterator(std::move(fragments));
309 }
310 
Write(RecordBatchReader * batches)311 Status FileWriter::Write(RecordBatchReader* batches) {
312   while (true) {
313     ARROW_ASSIGN_OR_RAISE(auto batch, batches->Next());
314     if (batch == nullptr) break;
315     RETURN_NOT_OK(Write(batch));
316   }
317   return Status::OK();
318 }
319 
Finish()320 Status FileWriter::Finish() {
321   RETURN_NOT_OK(FinishInternal());
322   return destination_->Close();
323 }
324 
325 namespace {
326 
327 class DatasetWritingSinkNodeConsumer : public compute::SinkNodeConsumer {
328  public:
DatasetWritingSinkNodeConsumer(std::shared_ptr<Schema> schema,std::unique_ptr<internal::DatasetWriter> dataset_writer,FileSystemDatasetWriteOptions write_options,std::shared_ptr<util::AsyncToggle> backpressure_toggle)329   DatasetWritingSinkNodeConsumer(std::shared_ptr<Schema> schema,
330                                  std::unique_ptr<internal::DatasetWriter> dataset_writer,
331                                  FileSystemDatasetWriteOptions write_options,
332                                  std::shared_ptr<util::AsyncToggle> backpressure_toggle)
333       : schema_(std::move(schema)),
334         dataset_writer_(std::move(dataset_writer)),
335         write_options_(std::move(write_options)),
336         backpressure_toggle_(std::move(backpressure_toggle)) {}
337 
Consume(compute::ExecBatch batch)338   Status Consume(compute::ExecBatch batch) {
339     ARROW_ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> record_batch,
340                           batch.ToRecordBatch(schema_));
341     return WriteNextBatch(std::move(record_batch), batch.guarantee);
342   }
343 
Finish()344   Future<> Finish() {
345     RETURN_NOT_OK(task_group_.AddTask([this] { return dataset_writer_->Finish(); }));
346     return task_group_.End();
347   }
348 
349  private:
WriteNextBatch(std::shared_ptr<RecordBatch> batch,compute::Expression guarantee)350   Status WriteNextBatch(std::shared_ptr<RecordBatch> batch,
351                         compute::Expression guarantee) {
352     ARROW_ASSIGN_OR_RAISE(auto groups, write_options_.partitioning->Partition(batch));
353     batch.reset();  // drop to hopefully conserve memory
354 
355     if (groups.batches.size() > static_cast<size_t>(write_options_.max_partitions)) {
356       return Status::Invalid("Fragment would be written into ", groups.batches.size(),
357                              " partitions. This exceeds the maximum of ",
358                              write_options_.max_partitions);
359     }
360 
361     for (std::size_t index = 0; index < groups.batches.size(); index++) {
362       auto partition_expression = and_(groups.expressions[index], guarantee);
363       auto next_batch = groups.batches[index];
364       ARROW_ASSIGN_OR_RAISE(std::string destination,
365                             write_options_.partitioning->Format(partition_expression));
366       RETURN_NOT_OK(task_group_.AddTask([this, next_batch, destination] {
367         Future<> has_room = dataset_writer_->WriteRecordBatch(next_batch, destination);
368         if (!has_room.is_finished() && backpressure_toggle_) {
369           backpressure_toggle_->Close();
370           return has_room.Then([this] { backpressure_toggle_->Open(); });
371         }
372         return has_room;
373       }));
374     }
375     return Status::OK();
376   }
377 
378   std::shared_ptr<Schema> schema_;
379   std::unique_ptr<internal::DatasetWriter> dataset_writer_;
380   FileSystemDatasetWriteOptions write_options_;
381   std::shared_ptr<util::AsyncToggle> backpressure_toggle_;
382   util::SerializedAsyncTaskGroup task_group_;
383 };
384 
385 }  // namespace
386 
Write(const FileSystemDatasetWriteOptions & write_options,std::shared_ptr<Scanner> scanner)387 Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_options,
388                                 std::shared_ptr<Scanner> scanner) {
389   if (!scanner->options()->use_async) {
390     return Status::Invalid(
391         "A dataset write operation was invoked on a scanner that was configured for "
392         "synchronous scanning.  Dataset writing requires a scanner configured for "
393         "asynchronous scanning.  Please recreate the scanner with the use_async or "
394         "UseAsync option set to true");
395   }
396   const io::IOContext& io_context = scanner->options()->io_context;
397   std::shared_ptr<compute::ExecContext> exec_context =
398       std::make_shared<compute::ExecContext>(io_context.pool(),
399                                              ::arrow::internal::GetCpuThreadPool());
400 
401   ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make(exec_context.get()));
402 
403   auto exprs = scanner->options()->projection.call()->arguments;
404   auto names = checked_cast<const compute::MakeStructOptions*>(
405                    scanner->options()->projection.call()->options.get())
406                    ->field_names;
407   std::shared_ptr<Dataset> dataset = scanner->dataset();
408   std::shared_ptr<util::AsyncToggle> backpressure_toggle =
409       std::make_shared<util::AsyncToggle>();
410 
411   RETURN_NOT_OK(
412       compute::Declaration::Sequence(
413           {
414               {"scan", ScanNodeOptions{dataset, scanner->options(), backpressure_toggle}},
415               {"filter", compute::FilterNodeOptions{scanner->options()->filter}},
416               {"project",
417                compute::ProjectNodeOptions{std::move(exprs), std::move(names)}},
418               {"write",
419                WriteNodeOptions{write_options, scanner->options()->projected_schema,
420                                 backpressure_toggle}},
421           })
422           .AddToPlan(plan.get()));
423 
424   RETURN_NOT_OK(plan->StartProducing());
425   return plan->finished().status();
426 }
427 
MakeWriteNode(compute::ExecPlan * plan,std::vector<compute::ExecNode * > inputs,const compute::ExecNodeOptions & options)428 Result<compute::ExecNode*> MakeWriteNode(compute::ExecPlan* plan,
429                                          std::vector<compute::ExecNode*> inputs,
430                                          const compute::ExecNodeOptions& options) {
431   if (inputs.size() != 1) {
432     return Status::Invalid("Write SinkNode requires exactly 1 input, got ",
433                            inputs.size());
434   }
435 
436   const WriteNodeOptions write_node_options =
437       checked_cast<const WriteNodeOptions&>(options);
438   const FileSystemDatasetWriteOptions& write_options = write_node_options.write_options;
439   const std::shared_ptr<Schema>& schema = write_node_options.schema;
440   const std::shared_ptr<util::AsyncToggle>& backpressure_toggle =
441       write_node_options.backpressure_toggle;
442 
443   ARROW_ASSIGN_OR_RAISE(auto dataset_writer,
444                         internal::DatasetWriter::Make(write_options));
445 
446   std::shared_ptr<DatasetWritingSinkNodeConsumer> consumer =
447       std::make_shared<DatasetWritingSinkNodeConsumer>(
448           schema, std::move(dataset_writer), write_options, backpressure_toggle);
449 
450   ARROW_ASSIGN_OR_RAISE(
451       auto node,
452       compute::MakeExecNode("consuming_sink", plan, std::move(inputs),
453                             compute::ConsumingSinkNodeOptions{std::move(consumer)}));
454 
455   return node;
456 }
457 
458 namespace internal {
InitializeDatasetWriter(arrow::compute::ExecFactoryRegistry * registry)459 void InitializeDatasetWriter(arrow::compute::ExecFactoryRegistry* registry) {
460   DCHECK_OK(registry->AddFactory("write", MakeWriteNode));
461 }
462 }  // namespace internal
463 
464 }  // namespace dataset
465 
466 }  // namespace arrow
467