1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 // This API is EXPERIMENTAL.
19 
20 #pragma once
21 
22 #include <functional>
23 #include <memory>
24 #include <string>
25 #include <utility>
26 #include <vector>
27 
28 #include "arrow/dataset/expression.h"
29 #include "arrow/dataset/type_fwd.h"
30 #include "arrow/dataset/visibility.h"
31 #include "arrow/util/macros.h"
32 #include "arrow/util/mutex.h"
33 
34 namespace arrow {
35 namespace dataset {
36 
37 /// \brief A granular piece of a Dataset, such as an individual file.
38 ///
39 /// A Fragment can be read/scanned separately from other fragments. It yields a
40 /// collection of RecordBatches when scanned, encapsulated in one or more
41 /// ScanTasks.
42 ///
43 /// Note that Fragments have well defined physical schemas which are reconciled by
44 /// the Datasets which contain them; these physical schemas may differ from a parent
45 /// Dataset's schema and the physical schemas of sibling Fragments.
46 class ARROW_DS_EXPORT Fragment {
47  public:
48   /// \brief Return the physical schema of the Fragment.
49   ///
50   /// The physical schema is also called the writer schema.
51   /// This method is blocking and may suffer from high latency filesystem.
52   /// The schema is cached after being read once, or may be specified at construction.
53   Result<std::shared_ptr<Schema>> ReadPhysicalSchema();
54 
55   /// \brief Scan returns an iterator of ScanTasks, each of which yields
56   /// RecordBatches from this Fragment.
57   ///
58   /// Note that batches yielded using this method will not be filtered and may not align
59   /// with the Fragment's schema. In particular, note that columns referenced by the
60   /// filter may be present in yielded batches even if they are not projected (so that
61   /// they are available when a filter is applied). Additionally, explicitly projected
62   /// columns may be absent if they were not present in this fragment.
63   ///
64   /// To receive a record batch stream which is fully filtered and projected, use Scanner.
65   virtual Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options,
66                                         std::shared_ptr<ScanContext> context) = 0;
67 
68   /// \brief Return true if the fragment can benefit from parallel scanning.
69   virtual bool splittable() const = 0;
70 
71   virtual std::string type_name() const = 0;
72 
73   /// \brief An expression which evaluates to true for all data viewed by this
74   /// Fragment.
partition_expression()75   const Expression& partition_expression() const { return partition_expression_; }
76 
77   virtual ~Fragment() = default;
78 
79  protected:
80   Fragment() = default;
81   explicit Fragment(Expression partition_expression,
82                     std::shared_ptr<Schema> physical_schema);
83 
84   virtual Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() = 0;
85 
86   util::Mutex physical_schema_mutex_;
87   Expression partition_expression_ = literal(true);
88   std::shared_ptr<Schema> physical_schema_;
89 };
90 
91 /// \brief A trivial Fragment that yields ScanTask out of a fixed set of
92 /// RecordBatch.
93 class ARROW_DS_EXPORT InMemoryFragment : public Fragment {
94  public:
95   InMemoryFragment(std::shared_ptr<Schema> schema, RecordBatchVector record_batches,
96                    Expression = literal(true));
97   explicit InMemoryFragment(RecordBatchVector record_batches, Expression = literal(true));
98 
99   Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options,
100                                 std::shared_ptr<ScanContext> context) override;
101 
splittable()102   bool splittable() const override { return false; }
103 
type_name()104   std::string type_name() const override { return "in-memory"; }
105 
106  protected:
107   Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override;
108 
109   RecordBatchVector record_batches_;
110 };
111 
112 /// \brief A container of zero or more Fragments.
113 ///
114 /// A Dataset acts as a union of Fragments, e.g. files deeply nested in a
115 /// directory. A Dataset has a schema to which Fragments must align during a
116 /// scan operation. This is analogous to Avro's reader and writer schema.
117 class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this<Dataset> {
118  public:
119   /// \brief Begin to build a new Scan operation against this Dataset
120   Result<std::shared_ptr<ScannerBuilder>> NewScan(std::shared_ptr<ScanContext> context);
121   Result<std::shared_ptr<ScannerBuilder>> NewScan();
122 
123   /// \brief GetFragments returns an iterator of Fragments given a predicate.
124   Result<FragmentIterator> GetFragments(Expression predicate);
125   Result<FragmentIterator> GetFragments();
126 
schema()127   const std::shared_ptr<Schema>& schema() const { return schema_; }
128 
129   /// \brief An expression which evaluates to true for all data viewed by this Dataset.
130   /// May be null, which indicates no information is available.
partition_expression()131   const Expression& partition_expression() const { return partition_expression_; }
132 
133   /// \brief The name identifying the kind of Dataset
134   virtual std::string type_name() const = 0;
135 
136   /// \brief Return a copy of this Dataset with a different schema.
137   ///
138   /// The copy will view the same Fragments. If the new schema is not compatible with the
139   /// original dataset's schema then an error will be raised.
140   virtual Result<std::shared_ptr<Dataset>> ReplaceSchema(
141       std::shared_ptr<Schema> schema) const = 0;
142 
143   virtual ~Dataset() = default;
144 
145  protected:
Dataset(std::shared_ptr<Schema> schema)146   explicit Dataset(std::shared_ptr<Schema> schema) : schema_(std::move(schema)) {}
147 
148   Dataset(std::shared_ptr<Schema> schema, Expression partition_expression);
149 
150   virtual Result<FragmentIterator> GetFragmentsImpl(Expression predicate) = 0;
151 
152   std::shared_ptr<Schema> schema_;
153   Expression partition_expression_ = literal(true);
154 };
155 
156 /// \brief A Source which yields fragments wrapping a stream of record batches.
157 ///
158 /// The record batches must match the schema provided to the source at construction.
159 class ARROW_DS_EXPORT InMemoryDataset : public Dataset {
160  public:
161   class RecordBatchGenerator {
162    public:
163     virtual ~RecordBatchGenerator() = default;
164     virtual RecordBatchIterator Get() const = 0;
165   };
166 
InMemoryDataset(std::shared_ptr<Schema> schema,std::shared_ptr<RecordBatchGenerator> get_batches)167   InMemoryDataset(std::shared_ptr<Schema> schema,
168                   std::shared_ptr<RecordBatchGenerator> get_batches)
169       : Dataset(std::move(schema)), get_batches_(std::move(get_batches)) {}
170 
171   // Convenience constructor taking a fixed list of batches
172   InMemoryDataset(std::shared_ptr<Schema> schema, RecordBatchVector batches);
173 
174   explicit InMemoryDataset(std::shared_ptr<Table> table);
175 
type_name()176   std::string type_name() const override { return "in-memory"; }
177 
178   Result<std::shared_ptr<Dataset>> ReplaceSchema(
179       std::shared_ptr<Schema> schema) const override;
180 
181  protected:
182   Result<FragmentIterator> GetFragmentsImpl(Expression predicate) override;
183 
184   std::shared_ptr<RecordBatchGenerator> get_batches_;
185 };
186 
187 /// \brief A Dataset wrapping child Datasets.
188 class ARROW_DS_EXPORT UnionDataset : public Dataset {
189  public:
190   /// \brief Construct a UnionDataset wrapping child Datasets.
191   ///
192   /// \param[in] schema the schema of the resulting dataset.
193   /// \param[in] children one or more child Datasets. Their schemas must be identical to
194   /// schema.
195   static Result<std::shared_ptr<UnionDataset>> Make(std::shared_ptr<Schema> schema,
196                                                     DatasetVector children);
197 
children()198   const DatasetVector& children() const { return children_; }
199 
type_name()200   std::string type_name() const override { return "union"; }
201 
202   Result<std::shared_ptr<Dataset>> ReplaceSchema(
203       std::shared_ptr<Schema> schema) const override;
204 
205  protected:
206   Result<FragmentIterator> GetFragmentsImpl(Expression predicate) override;
207 
UnionDataset(std::shared_ptr<Schema> schema,DatasetVector children)208   explicit UnionDataset(std::shared_ptr<Schema> schema, DatasetVector children)
209       : Dataset(std::move(schema)), children_(std::move(children)) {}
210 
211   DatasetVector children_;
212 
213   friend class UnionDatasetFactory;
214 };
215 
216 }  // namespace dataset
217 }  // namespace arrow
218