1 // Licensed to the Apache Software Foundation (ASF) under one 2 // or more contributor license agreements. See the NOTICE file 3 // distributed with this work for additional information 4 // regarding copyright ownership. The ASF licenses this file 5 // to you under the Apache License, Version 2.0 (the 6 // "License"); you may not use this file except in compliance 7 // with the License. You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, 12 // software distributed under the License is distributed on an 13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 // KIND, either express or implied. See the License for the 15 // specific language governing permissions and limitations 16 // under the License. 17 18 // This API is EXPERIMENTAL. 19 20 #pragma once 21 22 #include <functional> 23 #include <memory> 24 #include <string> 25 #include <utility> 26 #include <vector> 27 28 #include "arrow/dataset/type_fwd.h" 29 #include "arrow/dataset/visibility.h" 30 #include "arrow/util/macros.h" 31 32 namespace arrow { 33 namespace dataset { 34 35 /// \brief A granular piece of a Dataset, such as an individual file. 36 /// 37 /// A Fragment can be read/scanned separately from other fragments. It yields a 38 /// collection of RecordBatches when scanned, encapsulated in one or more 39 /// ScanTasks. 40 /// 41 /// Note that Fragments have well defined physical schemas which are reconciled by 42 /// the Datasets which contain them; these physical schemas may differ from a parent 43 /// Dataset's schema and the physical schemas of sibling Fragments. 44 class ARROW_DS_EXPORT Fragment { 45 public: 46 /// \brief Return the physical schema of the Fragment. 47 /// 48 /// The physical schema is also called the writer schema. 49 /// This method is blocking and may suffer from high latency filesystem. 50 virtual Result<std::shared_ptr<Schema>> ReadPhysicalSchema() = 0; 51 52 /// \brief Scan returns an iterator of ScanTasks, each of which yields 53 /// RecordBatches from this Fragment. 54 /// 55 /// Note that batches yielded using this method will not be filtered and may not align 56 /// with the Fragment's schema. In particular, note that columns referenced by the 57 /// filter may be present in yielded batches even if they are not projected (so that 58 /// they are available when a filter is applied). Additionally, explicitly projected 59 /// columns may be absent if they were not present in this fragment. 60 /// 61 /// To receive a record batch stream which is fully filtered and projected, use Scanner. 62 virtual Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options, 63 std::shared_ptr<ScanContext> context) = 0; 64 65 /// \brief Return true if the fragment can benefit from parallel scanning. 66 virtual bool splittable() const = 0; 67 68 virtual std::string type_name() const = 0; 69 70 /// \brief An expression which evaluates to true for all data viewed by this 71 /// Fragment. partition_expression()72 const std::shared_ptr<Expression>& partition_expression() const { 73 return partition_expression_; 74 } 75 76 virtual ~Fragment() = default; 77 78 protected: 79 Fragment() = default; 80 explicit Fragment(std::shared_ptr<Expression> partition_expression); 81 82 std::shared_ptr<Expression> partition_expression_ = scalar(true); 83 }; 84 85 /// \brief A trivial Fragment that yields ScanTask out of a fixed set of 86 /// RecordBatch. 87 class ARROW_DS_EXPORT InMemoryFragment : public Fragment { 88 public: 89 InMemoryFragment(std::shared_ptr<Schema> schema, RecordBatchVector record_batches, 90 std::shared_ptr<Expression> = scalar(true)); 91 explicit InMemoryFragment(RecordBatchVector record_batches, 92 std::shared_ptr<Expression> = scalar(true)); 93 94 Result<std::shared_ptr<Schema>> ReadPhysicalSchema() override; 95 96 Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options, 97 std::shared_ptr<ScanContext> context) override; 98 splittable()99 bool splittable() const override { return false; } 100 type_name()101 std::string type_name() const override { return "in-memory"; } 102 103 protected: 104 std::shared_ptr<Schema> schema_; 105 RecordBatchVector record_batches_; 106 }; 107 108 /// \brief A container of zero or more Fragments. 109 /// 110 /// A Dataset acts as a union of Fragments, e.g. files deeply nested in a 111 /// directory. A Dataset has a schema to which Fragments must align during a 112 /// scan operation. This is analogous to Avro's reader and writer schema. 113 class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this<Dataset> { 114 public: 115 /// \brief Begin to build a new Scan operation against this Dataset 116 Result<std::shared_ptr<ScannerBuilder>> NewScan(std::shared_ptr<ScanContext> context); 117 Result<std::shared_ptr<ScannerBuilder>> NewScan(); 118 119 /// \brief GetFragments returns an iterator of Fragments given a predicate. 120 FragmentIterator GetFragments(std::shared_ptr<Expression> predicate = scalar(true)); 121 schema()122 const std::shared_ptr<Schema>& schema() const { return schema_; } 123 124 /// \brief An expression which evaluates to true for all data viewed by this Dataset. 125 /// May be null, which indicates no information is available. partition_expression()126 const std::shared_ptr<Expression>& partition_expression() const { 127 return partition_expression_; 128 } 129 130 /// \brief The name identifying the kind of Dataset 131 virtual std::string type_name() const = 0; 132 133 /// \brief Return a copy of this Dataset with a different schema. 134 /// 135 /// The copy will view the same Fragments. If the new schema is not compatible with the 136 /// original dataset's schema then an error will be raised. 137 virtual Result<std::shared_ptr<Dataset>> ReplaceSchema( 138 std::shared_ptr<Schema> schema) const = 0; 139 140 virtual ~Dataset() = default; 141 142 protected: Dataset(std::shared_ptr<Schema> schema)143 explicit Dataset(std::shared_ptr<Schema> schema) : schema_(std::move(schema)) {} 144 145 Dataset(std::shared_ptr<Schema> schema, 146 std::shared_ptr<Expression> partition_expression); 147 148 virtual FragmentIterator GetFragmentsImpl(std::shared_ptr<Expression> predicate) = 0; 149 150 std::shared_ptr<Schema> schema_; 151 std::shared_ptr<Expression> partition_expression_ = scalar(true); 152 }; 153 154 /// \brief A Source which yields fragments wrapping a stream of record batches. 155 /// 156 /// The record batches must match the schema provided to the source at construction. 157 class ARROW_DS_EXPORT InMemoryDataset : public Dataset { 158 public: 159 class RecordBatchGenerator { 160 public: 161 virtual ~RecordBatchGenerator() = default; 162 virtual RecordBatchIterator Get() const = 0; 163 }; 164 InMemoryDataset(std::shared_ptr<Schema> schema,std::shared_ptr<RecordBatchGenerator> get_batches)165 InMemoryDataset(std::shared_ptr<Schema> schema, 166 std::shared_ptr<RecordBatchGenerator> get_batches) 167 : Dataset(std::move(schema)), get_batches_(std::move(get_batches)) {} 168 169 // Convenience constructor taking a fixed list of batches 170 InMemoryDataset(std::shared_ptr<Schema> schema, RecordBatchVector batches); 171 172 explicit InMemoryDataset(std::shared_ptr<Table> table); 173 type_name()174 std::string type_name() const override { return "in-memory"; } 175 176 Result<std::shared_ptr<Dataset>> ReplaceSchema( 177 std::shared_ptr<Schema> schema) const override; 178 179 protected: 180 FragmentIterator GetFragmentsImpl(std::shared_ptr<Expression> predicate) override; 181 182 std::shared_ptr<RecordBatchGenerator> get_batches_; 183 }; 184 185 /// \brief A Dataset wrapping child Datasets. 186 class ARROW_DS_EXPORT UnionDataset : public Dataset { 187 public: 188 /// \brief Construct a UnionDataset wrapping child Datasets. 189 /// 190 /// \param[in] schema the schema of the resulting dataset. 191 /// \param[in] children one or more child Datasets. Their schemas must be identical to 192 /// schema. 193 static Result<std::shared_ptr<UnionDataset>> Make(std::shared_ptr<Schema> schema, 194 DatasetVector children); 195 children()196 const DatasetVector& children() const { return children_; } 197 type_name()198 std::string type_name() const override { return "union"; } 199 200 Result<std::shared_ptr<Dataset>> ReplaceSchema( 201 std::shared_ptr<Schema> schema) const override; 202 203 protected: 204 FragmentIterator GetFragmentsImpl(std::shared_ptr<Expression> predicate) override; 205 UnionDataset(std::shared_ptr<Schema> schema,DatasetVector children)206 explicit UnionDataset(std::shared_ptr<Schema> schema, DatasetVector children) 207 : Dataset(std::move(schema)), children_(std::move(children)) {} 208 209 DatasetVector children_; 210 211 friend class UnionDatasetFactory; 212 }; 213 214 } // namespace dataset 215 } // namespace arrow 216