1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 // This API is EXPERIMENTAL.
19 
20 #pragma once
21 
22 #include <functional>
23 #include <memory>
24 #include <string>
25 #include <utility>
26 #include <vector>
27 
28 #include "arrow/dataset/type_fwd.h"
29 #include "arrow/dataset/visibility.h"
30 #include "arrow/util/macros.h"
31 
32 namespace arrow {
33 namespace dataset {
34 
35 /// \brief A granular piece of a Dataset, such as an individual file.
36 ///
37 /// A Fragment can be read/scanned separately from other fragments. It yields a
38 /// collection of RecordBatches when scanned, encapsulated in one or more
39 /// ScanTasks.
40 ///
41 /// Note that Fragments have well defined physical schemas which are reconciled by
42 /// the Datasets which contain them; these physical schemas may differ from a parent
43 /// Dataset's schema and the physical schemas of sibling Fragments.
44 class ARROW_DS_EXPORT Fragment {
45  public:
46   /// \brief Return the physical schema of the Fragment.
47   ///
48   /// The physical schema is also called the writer schema.
49   /// This method is blocking and may suffer from high latency filesystem.
50   virtual Result<std::shared_ptr<Schema>> ReadPhysicalSchema() = 0;
51 
52   /// \brief Scan returns an iterator of ScanTasks, each of which yields
53   /// RecordBatches from this Fragment.
54   ///
55   /// Note that batches yielded using this method will not be filtered and may not align
56   /// with the Fragment's schema. In particular, note that columns referenced by the
57   /// filter may be present in yielded batches even if they are not projected (so that
58   /// they are available when a filter is applied). Additionally, explicitly projected
59   /// columns may be absent if they were not present in this fragment.
60   ///
61   /// To receive a record batch stream which is fully filtered and projected, use Scanner.
62   virtual Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options,
63                                         std::shared_ptr<ScanContext> context) = 0;
64 
65   /// \brief Return true if the fragment can benefit from parallel scanning.
66   virtual bool splittable() const = 0;
67 
68   virtual std::string type_name() const = 0;
69 
70   /// \brief An expression which evaluates to true for all data viewed by this
71   /// Fragment.
partition_expression()72   const std::shared_ptr<Expression>& partition_expression() const {
73     return partition_expression_;
74   }
75 
76   virtual ~Fragment() = default;
77 
78  protected:
79   Fragment() = default;
80   explicit Fragment(std::shared_ptr<Expression> partition_expression);
81 
82   std::shared_ptr<Expression> partition_expression_ = scalar(true);
83 };
84 
85 /// \brief A trivial Fragment that yields ScanTask out of a fixed set of
86 /// RecordBatch.
87 class ARROW_DS_EXPORT InMemoryFragment : public Fragment {
88  public:
89   InMemoryFragment(std::shared_ptr<Schema> schema, RecordBatchVector record_batches,
90                    std::shared_ptr<Expression> = scalar(true));
91   explicit InMemoryFragment(RecordBatchVector record_batches,
92                             std::shared_ptr<Expression> = scalar(true));
93 
94   Result<std::shared_ptr<Schema>> ReadPhysicalSchema() override;
95 
96   Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options,
97                                 std::shared_ptr<ScanContext> context) override;
98 
splittable()99   bool splittable() const override { return false; }
100 
type_name()101   std::string type_name() const override { return "in-memory"; }
102 
103  protected:
104   std::shared_ptr<Schema> schema_;
105   RecordBatchVector record_batches_;
106 };
107 
108 /// \brief A container of zero or more Fragments.
109 ///
110 /// A Dataset acts as a union of Fragments, e.g. files deeply nested in a
111 /// directory. A Dataset has a schema to which Fragments must align during a
112 /// scan operation. This is analogous to Avro's reader and writer schema.
113 class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this<Dataset> {
114  public:
115   /// \brief Begin to build a new Scan operation against this Dataset
116   Result<std::shared_ptr<ScannerBuilder>> NewScan(std::shared_ptr<ScanContext> context);
117   Result<std::shared_ptr<ScannerBuilder>> NewScan();
118 
119   /// \brief GetFragments returns an iterator of Fragments given a predicate.
120   FragmentIterator GetFragments(std::shared_ptr<Expression> predicate = scalar(true));
121 
schema()122   const std::shared_ptr<Schema>& schema() const { return schema_; }
123 
124   /// \brief An expression which evaluates to true for all data viewed by this Dataset.
125   /// May be null, which indicates no information is available.
partition_expression()126   const std::shared_ptr<Expression>& partition_expression() const {
127     return partition_expression_;
128   }
129 
130   /// \brief The name identifying the kind of Dataset
131   virtual std::string type_name() const = 0;
132 
133   /// \brief Return a copy of this Dataset with a different schema.
134   ///
135   /// The copy will view the same Fragments. If the new schema is not compatible with the
136   /// original dataset's schema then an error will be raised.
137   virtual Result<std::shared_ptr<Dataset>> ReplaceSchema(
138       std::shared_ptr<Schema> schema) const = 0;
139 
140   virtual ~Dataset() = default;
141 
142  protected:
Dataset(std::shared_ptr<Schema> schema)143   explicit Dataset(std::shared_ptr<Schema> schema) : schema_(std::move(schema)) {}
144 
145   Dataset(std::shared_ptr<Schema> schema,
146           std::shared_ptr<Expression> partition_expression);
147 
148   virtual FragmentIterator GetFragmentsImpl(std::shared_ptr<Expression> predicate) = 0;
149 
150   std::shared_ptr<Schema> schema_;
151   std::shared_ptr<Expression> partition_expression_ = scalar(true);
152 };
153 
154 /// \brief A Source which yields fragments wrapping a stream of record batches.
155 ///
156 /// The record batches must match the schema provided to the source at construction.
157 class ARROW_DS_EXPORT InMemoryDataset : public Dataset {
158  public:
159   class RecordBatchGenerator {
160    public:
161     virtual ~RecordBatchGenerator() = default;
162     virtual RecordBatchIterator Get() const = 0;
163   };
164 
InMemoryDataset(std::shared_ptr<Schema> schema,std::shared_ptr<RecordBatchGenerator> get_batches)165   InMemoryDataset(std::shared_ptr<Schema> schema,
166                   std::shared_ptr<RecordBatchGenerator> get_batches)
167       : Dataset(std::move(schema)), get_batches_(std::move(get_batches)) {}
168 
169   // Convenience constructor taking a fixed list of batches
170   InMemoryDataset(std::shared_ptr<Schema> schema, RecordBatchVector batches);
171 
172   explicit InMemoryDataset(std::shared_ptr<Table> table);
173 
type_name()174   std::string type_name() const override { return "in-memory"; }
175 
176   Result<std::shared_ptr<Dataset>> ReplaceSchema(
177       std::shared_ptr<Schema> schema) const override;
178 
179  protected:
180   FragmentIterator GetFragmentsImpl(std::shared_ptr<Expression> predicate) override;
181 
182   std::shared_ptr<RecordBatchGenerator> get_batches_;
183 };
184 
185 /// \brief A Dataset wrapping child Datasets.
186 class ARROW_DS_EXPORT UnionDataset : public Dataset {
187  public:
188   /// \brief Construct a UnionDataset wrapping child Datasets.
189   ///
190   /// \param[in] schema the schema of the resulting dataset.
191   /// \param[in] children one or more child Datasets. Their schemas must be identical to
192   /// schema.
193   static Result<std::shared_ptr<UnionDataset>> Make(std::shared_ptr<Schema> schema,
194                                                     DatasetVector children);
195 
children()196   const DatasetVector& children() const { return children_; }
197 
type_name()198   std::string type_name() const override { return "union"; }
199 
200   Result<std::shared_ptr<Dataset>> ReplaceSchema(
201       std::shared_ptr<Schema> schema) const override;
202 
203  protected:
204   FragmentIterator GetFragmentsImpl(std::shared_ptr<Expression> predicate) override;
205 
UnionDataset(std::shared_ptr<Schema> schema,DatasetVector children)206   explicit UnionDataset(std::shared_ptr<Schema> schema, DatasetVector children)
207       : Dataset(std::move(schema)), children_(std::move(children)) {}
208 
209   DatasetVector children_;
210 
211   friend class UnionDatasetFactory;
212 };
213 
214 }  // namespace dataset
215 }  // namespace arrow
216