1 // Licensed to the Apache Software Foundation (ASF) under one 2 // or more contributor license agreements. See the NOTICE file 3 // distributed with this work for additional information 4 // regarding copyright ownership. The ASF licenses this file 5 // to you under the Apache License, Version 2.0 (the 6 // "License"); you may not use this file except in compliance 7 // with the License. You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, 12 // software distributed under the License is distributed on an 13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 // KIND, either express or implied. See the License for the 15 // specific language governing permissions and limitations 16 // under the License. 17 18 // This API is EXPERIMENTAL. 19 20 #pragma once 21 22 #include <functional> 23 #include <memory> 24 #include <string> 25 #include <utility> 26 #include <vector> 27 28 #include "arrow/dataset/expression.h" 29 #include "arrow/dataset/type_fwd.h" 30 #include "arrow/dataset/visibility.h" 31 #include "arrow/util/macros.h" 32 #include "arrow/util/mutex.h" 33 34 namespace arrow { 35 namespace dataset { 36 37 /// \brief A granular piece of a Dataset, such as an individual file. 38 /// 39 /// A Fragment can be read/scanned separately from other fragments. It yields a 40 /// collection of RecordBatches when scanned, encapsulated in one or more 41 /// ScanTasks. 42 /// 43 /// Note that Fragments have well defined physical schemas which are reconciled by 44 /// the Datasets which contain them; these physical schemas may differ from a parent 45 /// Dataset's schema and the physical schemas of sibling Fragments. 46 class ARROW_DS_EXPORT Fragment { 47 public: 48 /// \brief Return the physical schema of the Fragment. 49 /// 50 /// The physical schema is also called the writer schema. 51 /// This method is blocking and may suffer from high latency filesystem. 52 /// The schema is cached after being read once, or may be specified at construction. 53 Result<std::shared_ptr<Schema>> ReadPhysicalSchema(); 54 55 /// \brief Scan returns an iterator of ScanTasks, each of which yields 56 /// RecordBatches from this Fragment. 57 /// 58 /// Note that batches yielded using this method will not be filtered and may not align 59 /// with the Fragment's schema. In particular, note that columns referenced by the 60 /// filter may be present in yielded batches even if they are not projected (so that 61 /// they are available when a filter is applied). Additionally, explicitly projected 62 /// columns may be absent if they were not present in this fragment. 63 /// 64 /// To receive a record batch stream which is fully filtered and projected, use Scanner. 65 virtual Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options, 66 std::shared_ptr<ScanContext> context) = 0; 67 68 /// \brief Return true if the fragment can benefit from parallel scanning. 69 virtual bool splittable() const = 0; 70 71 virtual std::string type_name() const = 0; 72 73 /// \brief An expression which evaluates to true for all data viewed by this 74 /// Fragment. partition_expression()75 const Expression& partition_expression() const { return partition_expression_; } 76 77 virtual ~Fragment() = default; 78 79 protected: 80 Fragment() = default; 81 explicit Fragment(Expression partition_expression, 82 std::shared_ptr<Schema> physical_schema); 83 84 virtual Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() = 0; 85 86 util::Mutex physical_schema_mutex_; 87 Expression partition_expression_ = literal(true); 88 std::shared_ptr<Schema> physical_schema_; 89 }; 90 91 /// \brief A trivial Fragment that yields ScanTask out of a fixed set of 92 /// RecordBatch. 93 class ARROW_DS_EXPORT InMemoryFragment : public Fragment { 94 public: 95 InMemoryFragment(std::shared_ptr<Schema> schema, RecordBatchVector record_batches, 96 Expression = literal(true)); 97 explicit InMemoryFragment(RecordBatchVector record_batches, Expression = literal(true)); 98 99 Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options, 100 std::shared_ptr<ScanContext> context) override; 101 splittable()102 bool splittable() const override { return false; } 103 type_name()104 std::string type_name() const override { return "in-memory"; } 105 106 protected: 107 Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override; 108 109 RecordBatchVector record_batches_; 110 }; 111 112 /// \brief A container of zero or more Fragments. 113 /// 114 /// A Dataset acts as a union of Fragments, e.g. files deeply nested in a 115 /// directory. A Dataset has a schema to which Fragments must align during a 116 /// scan operation. This is analogous to Avro's reader and writer schema. 117 class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this<Dataset> { 118 public: 119 /// \brief Begin to build a new Scan operation against this Dataset 120 Result<std::shared_ptr<ScannerBuilder>> NewScan(std::shared_ptr<ScanContext> context); 121 Result<std::shared_ptr<ScannerBuilder>> NewScan(); 122 123 /// \brief GetFragments returns an iterator of Fragments given a predicate. 124 Result<FragmentIterator> GetFragments(Expression predicate); 125 Result<FragmentIterator> GetFragments(); 126 schema()127 const std::shared_ptr<Schema>& schema() const { return schema_; } 128 129 /// \brief An expression which evaluates to true for all data viewed by this Dataset. 130 /// May be null, which indicates no information is available. partition_expression()131 const Expression& partition_expression() const { return partition_expression_; } 132 133 /// \brief The name identifying the kind of Dataset 134 virtual std::string type_name() const = 0; 135 136 /// \brief Return a copy of this Dataset with a different schema. 137 /// 138 /// The copy will view the same Fragments. If the new schema is not compatible with the 139 /// original dataset's schema then an error will be raised. 140 virtual Result<std::shared_ptr<Dataset>> ReplaceSchema( 141 std::shared_ptr<Schema> schema) const = 0; 142 143 virtual ~Dataset() = default; 144 145 protected: Dataset(std::shared_ptr<Schema> schema)146 explicit Dataset(std::shared_ptr<Schema> schema) : schema_(std::move(schema)) {} 147 148 Dataset(std::shared_ptr<Schema> schema, Expression partition_expression); 149 150 virtual Result<FragmentIterator> GetFragmentsImpl(Expression predicate) = 0; 151 152 std::shared_ptr<Schema> schema_; 153 Expression partition_expression_ = literal(true); 154 }; 155 156 /// \brief A Source which yields fragments wrapping a stream of record batches. 157 /// 158 /// The record batches must match the schema provided to the source at construction. 159 class ARROW_DS_EXPORT InMemoryDataset : public Dataset { 160 public: 161 class RecordBatchGenerator { 162 public: 163 virtual ~RecordBatchGenerator() = default; 164 virtual RecordBatchIterator Get() const = 0; 165 }; 166 InMemoryDataset(std::shared_ptr<Schema> schema,std::shared_ptr<RecordBatchGenerator> get_batches)167 InMemoryDataset(std::shared_ptr<Schema> schema, 168 std::shared_ptr<RecordBatchGenerator> get_batches) 169 : Dataset(std::move(schema)), get_batches_(std::move(get_batches)) {} 170 171 // Convenience constructor taking a fixed list of batches 172 InMemoryDataset(std::shared_ptr<Schema> schema, RecordBatchVector batches); 173 174 explicit InMemoryDataset(std::shared_ptr<Table> table); 175 type_name()176 std::string type_name() const override { return "in-memory"; } 177 178 Result<std::shared_ptr<Dataset>> ReplaceSchema( 179 std::shared_ptr<Schema> schema) const override; 180 181 protected: 182 Result<FragmentIterator> GetFragmentsImpl(Expression predicate) override; 183 184 std::shared_ptr<RecordBatchGenerator> get_batches_; 185 }; 186 187 /// \brief A Dataset wrapping child Datasets. 188 class ARROW_DS_EXPORT UnionDataset : public Dataset { 189 public: 190 /// \brief Construct a UnionDataset wrapping child Datasets. 191 /// 192 /// \param[in] schema the schema of the resulting dataset. 193 /// \param[in] children one or more child Datasets. Their schemas must be identical to 194 /// schema. 195 static Result<std::shared_ptr<UnionDataset>> Make(std::shared_ptr<Schema> schema, 196 DatasetVector children); 197 children()198 const DatasetVector& children() const { return children_; } 199 type_name()200 std::string type_name() const override { return "union"; } 201 202 Result<std::shared_ptr<Dataset>> ReplaceSchema( 203 std::shared_ptr<Schema> schema) const override; 204 205 protected: 206 Result<FragmentIterator> GetFragmentsImpl(Expression predicate) override; 207 UnionDataset(std::shared_ptr<Schema> schema,DatasetVector children)208 explicit UnionDataset(std::shared_ptr<Schema> schema, DatasetVector children) 209 : Dataset(std::move(schema)), children_(std::move(children)) {} 210 211 DatasetVector children_; 212 213 friend class UnionDatasetFactory; 214 }; 215 216 } // namespace dataset 217 } // namespace arrow 218