1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include "arrow/dataset/dataset.h"
19 
20 #include "arrow/dataset/dataset_internal.h"
21 #include "arrow/dataset/discovery.h"
22 #include "arrow/dataset/partition.h"
23 #include "arrow/dataset/test_util.h"
24 #include "arrow/filesystem/mockfs.h"
25 #include "arrow/stl.h"
26 #include "arrow/testing/generator.h"
27 #include "arrow/util/optional.h"
28 
29 namespace arrow {
30 namespace dataset {
31 
32 class TestInMemoryFragment : public DatasetFixtureMixin {};
33 
34 using RecordBatchVector = std::vector<std::shared_ptr<RecordBatch>>;
35 
TEST_F(TestInMemoryFragment,Scan)36 TEST_F(TestInMemoryFragment, Scan) {
37   constexpr int64_t kBatchSize = 1024;
38   constexpr int64_t kNumberBatches = 16;
39 
40   SetSchema({field("i32", int32()), field("f64", float64())});
41   auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_);
42   auto reader = ConstantArrayGenerator::Repeat(kNumberBatches, batch);
43 
44   // Creates a InMemoryFragment of the same repeated batch.
45   RecordBatchVector batches = {static_cast<size_t>(kNumberBatches), batch};
46   auto fragment = std::make_shared<InMemoryFragment>(batches);
47 
48   AssertFragmentEquals(reader.get(), fragment.get());
49 }
50 
51 class TestInMemoryDataset : public DatasetFixtureMixin {};
52 
TEST_F(TestInMemoryDataset,ReplaceSchema)53 TEST_F(TestInMemoryDataset, ReplaceSchema) {
54   constexpr int64_t kBatchSize = 1;
55   constexpr int64_t kNumberBatches = 1;
56 
57   SetSchema({field("i32", int32()), field("f64", float64())});
58   auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_);
59   auto reader = ConstantArrayGenerator::Repeat(kNumberBatches, batch);
60 
61   auto dataset = std::make_shared<InMemoryDataset>(
62       schema_, RecordBatchVector{static_cast<size_t>(kNumberBatches), batch});
63 
64   // drop field
65   ASSERT_OK(dataset->ReplaceSchema(schema({field("i32", int32())})).status());
66   // add field (will be materialized as null during projection)
67   ASSERT_OK(dataset->ReplaceSchema(schema({field("str", utf8())})).status());
68   // incompatible type
69   ASSERT_RAISES(TypeError,
70                 dataset->ReplaceSchema(schema({field("i32", utf8())})).status());
71   // incompatible nullability
72   ASSERT_RAISES(
73       TypeError,
74       dataset->ReplaceSchema(schema({field("f64", float64(), /*nullable=*/false)}))
75           .status());
76   // add non-nullable field
77   ASSERT_RAISES(TypeError,
78                 dataset->ReplaceSchema(schema({field("str", utf8(), /*nullable=*/false)}))
79                     .status());
80 }
81 
TEST_F(TestInMemoryDataset,GetFragments)82 TEST_F(TestInMemoryDataset, GetFragments) {
83   constexpr int64_t kBatchSize = 1024;
84   constexpr int64_t kNumberBatches = 16;
85 
86   SetSchema({field("i32", int32()), field("f64", float64())});
87   auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_);
88   auto reader = ConstantArrayGenerator::Repeat(kNumberBatches, batch);
89 
90   auto dataset = std::make_shared<InMemoryDataset>(
91       schema_, RecordBatchVector{static_cast<size_t>(kNumberBatches), batch});
92 
93   AssertDatasetEquals(reader.get(), dataset.get());
94 }
95 
TEST_F(TestInMemoryDataset,InMemoryFragment)96 TEST_F(TestInMemoryDataset, InMemoryFragment) {
97   constexpr int64_t kBatchSize = 1024;
98 
99   SetSchema({field("i32", int32()), field("f64", float64())});
100   auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_);
101   RecordBatchVector batches{batch};
102 
103   // Regression test: previously this constructor relied on undefined behavior (order of
104   // evaluation of arguments) leading to fragments being constructed with empty schemas
105   auto fragment = std::make_shared<InMemoryFragment>(batches);
106   ASSERT_OK_AND_ASSIGN(auto schema, fragment->ReadPhysicalSchema());
107   AssertSchemaEqual(batch->schema(), schema);
108 }
109 
110 class TestUnionDataset : public DatasetFixtureMixin {};
111 
TEST_F(TestUnionDataset,ReplaceSchema)112 TEST_F(TestUnionDataset, ReplaceSchema) {
113   constexpr int64_t kBatchSize = 1;
114   constexpr int64_t kNumberBatches = 1;
115 
116   SetSchema({field("i32", int32()), field("f64", float64())});
117   auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_);
118 
119   std::vector<std::shared_ptr<RecordBatch>> batches{static_cast<size_t>(kNumberBatches),
120                                                     batch};
121 
122   DatasetVector children = {
123       std::make_shared<InMemoryDataset>(schema_, batches),
124       std::make_shared<InMemoryDataset>(schema_, batches),
125   };
126 
127   const int64_t total_batches = children.size() * kNumberBatches;
128   auto reader = ConstantArrayGenerator::Repeat(total_batches, batch);
129 
130   ASSERT_OK_AND_ASSIGN(auto dataset, UnionDataset::Make(schema_, children));
131   AssertDatasetEquals(reader.get(), dataset.get());
132 
133   // drop field
134   ASSERT_OK(dataset->ReplaceSchema(schema({field("i32", int32())})).status());
135   // add nullable field (will be materialized as null during projection)
136   ASSERT_OK(dataset->ReplaceSchema(schema({field("str", utf8())})).status());
137   // incompatible type
138   ASSERT_RAISES(TypeError,
139                 dataset->ReplaceSchema(schema({field("i32", utf8())})).status());
140   // incompatible nullability
141   ASSERT_RAISES(
142       TypeError,
143       dataset->ReplaceSchema(schema({field("f64", float64(), /*nullable=*/false)}))
144           .status());
145   // add non-nullable field
146   ASSERT_RAISES(TypeError,
147                 dataset->ReplaceSchema(schema({field("str", utf8(), /*nullable=*/false)}))
148                     .status());
149 }
150 
TEST_F(TestUnionDataset,GetFragments)151 TEST_F(TestUnionDataset, GetFragments) {
152   constexpr int64_t kBatchSize = 1024;
153   constexpr int64_t kChildPerNode = 2;
154   constexpr int64_t kCompleteBinaryTreeDepth = 4;
155 
156   SetSchema({field("i32", int32()), field("f64", float64())});
157   auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_);
158 
159   auto n_leaves = 1U << kCompleteBinaryTreeDepth;
160   auto reader = ConstantArrayGenerator::Repeat(n_leaves, batch);
161 
162   // Creates a complete binary tree of depth kCompleteBinaryTreeDepth where the
163   // leaves are InMemoryDataset containing kChildPerNode fragments.
164 
165   auto l1_leaf_dataset = std::make_shared<InMemoryDataset>(
166       schema_, RecordBatchVector{static_cast<size_t>(kChildPerNode), batch});
167 
168   ASSERT_OK_AND_ASSIGN(
169       auto l2_leaf_tree_dataset,
170       UnionDataset::Make(
171           schema_, DatasetVector{static_cast<size_t>(kChildPerNode), l1_leaf_dataset}));
172 
173   ASSERT_OK_AND_ASSIGN(
174       auto l3_middle_tree_dataset,
175       UnionDataset::Make(schema_, DatasetVector{static_cast<size_t>(kChildPerNode),
176                                                 l2_leaf_tree_dataset}));
177 
178   ASSERT_OK_AND_ASSIGN(
179       auto root_dataset,
180       UnionDataset::Make(schema_, DatasetVector{static_cast<size_t>(kChildPerNode),
181                                                 l3_middle_tree_dataset}));
182 
183   AssertDatasetEquals(reader.get(), root_dataset.get());
184 }
185 
TEST_F(TestUnionDataset,TrivialScan)186 TEST_F(TestUnionDataset, TrivialScan) {
187   constexpr int64_t kNumberBatches = 16;
188   constexpr int64_t kBatchSize = 1024;
189 
190   SetSchema({field("i32", int32()), field("f64", float64())});
191   auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_);
192 
193   std::vector<std::shared_ptr<RecordBatch>> batches{static_cast<size_t>(kNumberBatches),
194                                                     batch};
195 
196   DatasetVector children = {
197       std::make_shared<InMemoryDataset>(schema_, batches),
198       std::make_shared<InMemoryDataset>(schema_, batches),
199   };
200 
201   const int64_t total_batches = children.size() * kNumberBatches;
202   auto reader = ConstantArrayGenerator::Repeat(total_batches, batch);
203 
204   ASSERT_OK_AND_ASSIGN(auto dataset, UnionDataset::Make(schema_, children));
205   AssertDatasetEquals(reader.get(), dataset.get());
206 }
207 
TEST(TestProjector,CheckProjectable)208 TEST(TestProjector, CheckProjectable) {
209   struct Assert {
210     explicit Assert(FieldVector from) : from_(from) {}
211     Schema from_;
212 
213     void ProjectableTo(FieldVector to) {
214       ARROW_EXPECT_OK(CheckProjectable(from_, Schema(to)));
215     }
216 
217     void NotProjectableTo(FieldVector to, std::string substr = "") {
218       EXPECT_RAISES_WITH_MESSAGE_THAT(TypeError, testing::HasSubstr(substr),
219                                       CheckProjectable(from_, Schema(to)));
220     }
221   };
222 
223   auto i8 = field("i8", int8());
224   auto u16 = field("u16", uint16());
225   auto str = field("str", utf8());
226   auto i8_req = field("i8", int8(), false);
227   auto u16_req = field("u16", uint16(), false);
228   auto str_req = field("str", utf8(), false);
229   auto str_nil = field("str", null());
230 
231   // trivial
232   Assert({}).ProjectableTo({});
233   Assert({i8}).ProjectableTo({i8});
234   Assert({i8, u16_req}).ProjectableTo({i8, u16_req});
235 
236   // reorder
237   Assert({i8, u16}).ProjectableTo({u16, i8});
238   Assert({i8, str, u16}).ProjectableTo({u16, i8, str});
239 
240   // drop field(s)
241   Assert({i8}).ProjectableTo({});
242 
243   // add field(s)
244   Assert({}).ProjectableTo({i8});
245   Assert({}).ProjectableTo({i8, u16});
246   Assert({}).NotProjectableTo({u16_req},
247                               "is not nullable and does not exist in origin schema");
248   Assert({i8}).NotProjectableTo({u16_req, i8});
249 
250   // change nullability
251   Assert({i8}).NotProjectableTo({i8_req},
252                                 "not nullable but is not required in origin schema");
253   Assert({i8_req}).ProjectableTo({i8});
254   Assert({str_nil}).ProjectableTo({str});
255   Assert({str_nil}).NotProjectableTo({str_req});
256 
257   // change field type
258   Assert({i8}).NotProjectableTo({field("i8", utf8())},
259                                 "fields had matching names but differing types");
260 }
261 
262 class TestEndToEnd : public TestUnionDataset {
SetUp()263   void SetUp() override {
264     bool nullable = false;
265     SetSchema({
266         field("region", utf8(), nullable),
267         field("model", utf8(), nullable),
268         field("sales", float64(), nullable),
269         // partition columns
270         field("year", int32()),
271         field("month", int32()),
272         field("country", utf8()),
273     });
274 
275     using PathAndContent = std::vector<std::pair<std::string, std::string>>;
276     auto files = PathAndContent{
277         {"/dataset/2018/01/US/dat.json", R"([
278         {"region": "NY", "model": "3", "sales": 742.0},
279         {"region": "NY", "model": "S", "sales": 304.125},
280         {"region": "NY", "model": "X", "sales": 136.25},
281         {"region": "NY", "model": "Y", "sales": 27.5}
282       ])"},
283         {"/dataset/2018/01/CA/dat.json", R"([
284         {"region": "CA", "model": "3", "sales": 512},
285         {"region": "CA", "model": "S", "sales": 978},
286         {"region": "CA", "model": "X", "sales": 1.0},
287         {"region": "CA", "model": "Y", "sales": 69}
288       ])"},
289         {"/dataset/2019/01/US/dat.json", R"([
290         {"region": "QC", "model": "3", "sales": 273.5},
291         {"region": "QC", "model": "S", "sales": 13},
292         {"region": "QC", "model": "X", "sales": 54},
293         {"region": "QC", "model": "Y", "sales": 21}
294       ])"},
295         {"/dataset/2019/01/CA/dat.json", R"([
296         {"region": "QC", "model": "3", "sales": 152.25},
297         {"region": "QC", "model": "S", "sales": 10},
298         {"region": "QC", "model": "X", "sales": 42},
299         {"region": "QC", "model": "Y", "sales": 37}
300       ])"},
301         {"/dataset/.pesky", "garbage content"},
302     };
303 
304     auto mock_fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
305     for (const auto& f : files) {
306       ARROW_EXPECT_OK(mock_fs->CreateFile(f.first, f.second, /*recursive=*/true));
307     }
308 
309     fs_ = mock_fs;
310   }
311 
312  protected:
313   std::shared_ptr<fs::FileSystem> fs_;
314 };
315 
TEST_F(TestEndToEnd,EndToEndSingleDataset)316 TEST_F(TestEndToEnd, EndToEndSingleDataset) {
317   // The dataset API is divided in 3 parts:
318   //  - Creation
319   //  - Querying
320   //  - Consuming
321 
322   // Creation.
323   //
324   // A Dataset is the union of one or more Datasets with the same schema.
325   // Example of Dataset, FileSystemDataset, OdbcDataset,
326   // FlightDataset.
327 
328   // A Dataset is composed of Fragments. Each Fragment can yield
329   // multiple RecordBatches. Datasets can be created manually or "discovered"
330   // via the DatasetFactory interface.
331   std::shared_ptr<DatasetFactory> factory;
332 
333   // The user must specify which FileFormat is used to create FileFragments.
334   // This option is specific to FileSystemDataset (and the builder).
335   auto format_schema = SchemaFromColumnNames(schema_, {"region", "model", "sales"});
336   auto format = std::make_shared<JSONRecordBatchFileFormat>(format_schema);
337 
338   // A selector is used to crawl files and directories of a
339   // filesystem. If the options in FileSelector are not enough, the
340   // FileSystemDatasetFactory class also supports an explicit list of
341   // fs::FileInfo instead of the selector.
342   fs::FileSelector s;
343   s.base_dir = "/dataset";
344   s.recursive = true;
345 
346   // Further options can be given to the factory mechanism via the
347   // FileSystemFactoryOptions configuration class. See the docstring for more
348   // information.
349   FileSystemFactoryOptions options;
350   options.selector_ignore_prefixes = {"."};
351 
352   // Partitions expressions can be discovered for Dataset and Fragments.
353   // This metadata is then used in conjunction with the query filter to apply
354   // the pushdown predicate optimization.
355   //
356   // The DirectoryPartitioning is a partitioning where the path is split with
357   // the directory separator character and the components are parsed as values
358   // of the corresponding fields in its schema.
359   //
360   // Since a PartitioningFactory is specified instead of an explicit
361   // Partitioning, the types of partition fields will be inferred.
362   //
363   // - "/2019" -> {"year": 2019}
364   // - "/2019/01 -> {"year": 2019, "month": 1}
365   // - "/2019/01/CA -> {"year": 2019, "month": 1, "country": "CA"}
366   // - "/2019/01/CA/a_file.json -> {"year": 2019, "month": 1, "country": "CA"}
367   options.partitioning = DirectoryPartitioning::MakeFactory({"year", "month", "country"});
368 
369   ASSERT_OK_AND_ASSIGN(factory, FileSystemDatasetFactory::Make(fs_, s, format, options));
370 
371   // Fragments might have compatible but slightly different schemas, e.g.
372   // schema evolved by adding/renaming columns. In this case, the schema is
373   // passed to the dataset constructor.
374   // The inspected_schema may optionally be modified before being finalized.
375   InspectOptions inspect_options;
376   inspect_options.fragments = InspectOptions::kInspectAllFragments;
377   ASSERT_OK_AND_ASSIGN(auto inspected_schema, factory->Inspect(inspect_options));
378   EXPECT_EQ(*schema_, *inspected_schema);
379 
380   // Build the Dataset where partitions are attached to fragments (files).
381   ASSERT_OK_AND_ASSIGN(auto source, factory->Finish(inspected_schema));
382 
383   // Create the Dataset from our single Dataset.
384   ASSERT_OK_AND_ASSIGN(auto dataset, UnionDataset::Make(inspected_schema, {source}));
385 
386   // Querying.
387   //
388   // The Scan operator materializes data from io into memory. Avoiding data
389   // transfer is a critical optimization done by analytical engine. Thus, a
390   // Scan can take multiple options, notably a subset of columns and a filter
391   // expression.
392   ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
393 
394   // An optional subset of columns can be provided. This will trickle to
395   // Fragment drivers. The net effect is that only columns of interest will
396   // be materialized if the Fragment supports it. This is the major benefit
397   // of using a column-major format versus a row-major format.
398   //
399   // This API decouples the Dataset/Fragment implementation and column
400   // projection from the query part.
401   //
402   // For example, a ParquetFileFragment may read the necessary byte ranges
403   // exclusively, ranges, or an OdbcFragment could convert the projection to a SELECT
404   // statement. The CsvFileFragment wouldn't benefit from this as much, but
405   // can still benefit from skipping conversion of unneeded columns.
406   std::vector<std::string> columns{"sales", "model", "country"};
407   ASSERT_OK(scanner_builder->Project(columns));
408 
409   // An optional filter expression may also be specified. The filter expression
410   // is evaluated against input rows. Only rows for which the filter evaluates to true
411   // are yielded. Predicate pushdown optimizations are applied using partition
412   // information if available.
413   //
414   // This API decouples predicate pushdown from the Dataset implementation
415   // and partition discovery.
416   //
417   // The following filter tests both predicate pushdown and post filtering
418   // without partition information because `year` is a partition and `sales` is
419   // not.
420   auto filter = and_(equal(field_ref("year"), literal(2019)),
421                      greater(field_ref("sales"), literal(100.0)));
422   ASSERT_OK(scanner_builder->Filter(filter));
423 
424   ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
425   // In the simplest case, consumption is simply conversion to a Table.
426   ASSERT_OK_AND_ASSIGN(auto table, scanner->ToTable());
427 
428   auto expected = TableFromJSON(scanner_builder->projected_schema(), {R"([
429     {"sales": 152.25, "model": "3", "country": "CA"},
430     {"sales": 273.5, "model": "3", "country": "US"}
431   ])"});
432   AssertTablesEqual(*expected, *table, false, true);
433 }
434 
SchemaFromNames(const std::vector<std::string> names)435 inline std::shared_ptr<Schema> SchemaFromNames(const std::vector<std::string> names) {
436   std::vector<std::shared_ptr<Field>> fields;
437   for (const auto& name : names) {
438     fields.push_back(field(name, int32()));
439   }
440 
441   return schema(fields);
442 }
443 
444 class TestSchemaUnification : public TestUnionDataset {
445  public:
446   using i32 = util::optional<int32_t>;
447   using PathAndContent = std::vector<std::pair<std::string, std::string>>;
448 
SetUp()449   void SetUp() override {
450     // The following test creates 2 sources with divergent but compatible
451     // schemas. Each source have a common partitioning where the
452     // fields are not materialized in the data fragments.
453     //
454     // Each data is composed of 2 data fragments with divergent but
455     // compatible schemas. The data fragment within a source share at
456     // least one column.
457     //
458     // Thus, the fixture helps verifying various scenarios where the Scanner
459     // must fix the RecordBatches to align with the final unified schema exposed
460     // to the consumer.
461     static constexpr auto ds1_df1 = "/dataset/alpha/part_ds=1/part_df=1/data.json";
462     static constexpr auto ds1_df2 = "/dataset/alpha/part_ds=1/part_df=2/data.json";
463     static constexpr auto ds2_df1 = "/dataset/beta/part_ds=2/part_df=1/data.json";
464     static constexpr auto ds2_df2 = "/dataset/beta/part_ds=2/part_df=2/data.json";
465     auto files = PathAndContent{
466         // First Dataset
467         {ds1_df1, R"([{"phy_1": 111, "phy_2": 211}])"},
468         {ds1_df2, R"([{"phy_2": 212, "phy_3": 312}])"},
469         // Second Dataset
470         {ds2_df1, R"([{"phy_3": 321, "phy_4": 421}])"},
471         {ds2_df2, R"([{"phy_4": 422, "phy_2": 222}])"},
472     };
473 
474     auto mock_fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
475     for (const auto& f : files) {
476       ARROW_EXPECT_OK(mock_fs->CreateFile(f.first, f.second, /* recursive */ true));
477     }
478     fs_ = mock_fs;
479 
480     auto get_source =
481         [this](std::string base,
482                std::vector<std::string> paths) -> Result<std::shared_ptr<Dataset>> {
483       auto resolver = [](const FileSource& source) -> std::shared_ptr<Schema> {
484         auto path = source.path();
485         // A different schema for each data fragment.
486         if (path == ds1_df1) {
487           return SchemaFromNames({"phy_1", "phy_2"});
488         } else if (path == ds1_df2) {
489           return SchemaFromNames({"phy_2", "phy_3"});
490         } else if (path == ds2_df1) {
491           return SchemaFromNames({"phy_3", "phy_4"});
492         } else if (path == ds2_df2) {
493           return SchemaFromNames({"phy_4", "phy_2"});
494         }
495 
496         return nullptr;
497       };
498 
499       auto format = std::make_shared<JSONRecordBatchFileFormat>(resolver);
500 
501       FileSystemFactoryOptions options;
502       options.partition_base_dir = base;
503       options.partitioning =
504           std::make_shared<HivePartitioning>(SchemaFromNames({"part_ds", "part_df"}));
505 
506       ARROW_ASSIGN_OR_RAISE(auto factory,
507                             FileSystemDatasetFactory::Make(fs_, paths, format, options));
508 
509       ARROW_ASSIGN_OR_RAISE(auto schema, factory->Inspect());
510 
511       return factory->Finish(schema);
512     };
513 
514     schema_ = SchemaFromNames({"phy_1", "phy_2", "phy_3", "phy_4", "part_ds", "part_df"});
515     ASSERT_OK_AND_ASSIGN(auto ds1, get_source("/dataset/alpha", {ds1_df1, ds1_df2}));
516     ASSERT_OK_AND_ASSIGN(auto ds2, get_source("/dataset/beta", {ds2_df1, ds2_df2}));
517 
518     // FIXME(bkietz) this is a hack: allow differing schemas for the purposes of this
519     // test
520     class DisparateSchemasUnionDataset : public UnionDataset {
521      public:
522       DisparateSchemasUnionDataset(std::shared_ptr<Schema> schema, DatasetVector children)
523           : UnionDataset(std::move(schema), std::move(children)) {}
524     };
525     dataset_ =
526         std::make_shared<DisparateSchemasUnionDataset>(schema_, DatasetVector{ds1, ds2});
527   }
528 
529   template <typename TupleType>
AssertScanEquals(std::shared_ptr<Scanner> scanner,const std::vector<TupleType> & expected_rows)530   void AssertScanEquals(std::shared_ptr<Scanner> scanner,
531                         const std::vector<TupleType>& expected_rows) {
532     std::vector<std::string> columns;
533     for (const auto& field : scanner->options()->projected_schema->fields()) {
534       columns.push_back(field->name());
535     }
536 
537     ASSERT_OK_AND_ASSIGN(auto actual, scanner->ToTable());
538     std::shared_ptr<Table> expected;
539     ASSERT_OK(stl::TableFromTupleRange(default_memory_pool(), expected_rows, columns,
540                                        &expected));
541     AssertTablesEqual(*expected, *actual, false, true);
542   }
543 
544   template <typename TupleType>
AssertBuilderEquals(std::shared_ptr<ScannerBuilder> builder,const std::vector<TupleType> & expected_rows)545   void AssertBuilderEquals(std::shared_ptr<ScannerBuilder> builder,
546                            const std::vector<TupleType>& expected_rows) {
547     ASSERT_OK_AND_ASSIGN(auto scanner, builder->Finish());
548     AssertScanEquals(scanner, expected_rows);
549   }
550 
551  protected:
552   std::shared_ptr<fs::FileSystem> fs_;
553   std::shared_ptr<Dataset> dataset_;
554 };
555 
556 using util::nullopt;
557 
TEST_F(TestSchemaUnification,SelectStar)558 TEST_F(TestSchemaUnification, SelectStar) {
559   // This is a `SELECT * FROM dataset` where it ensures:
560   //
561   // - proper re-ordering of columns
562   // - materializing missing physical columns in Fragments
563   // - materializing missing partition columns extracted from Partitioning
564   ASSERT_OK_AND_ASSIGN(auto scan_builder, dataset_->NewScan());
565 
566   using TupleType = std::tuple<i32, i32, i32, i32, i32, i32>;
567   std::vector<TupleType> rows = {
568       TupleType(111, 211, nullopt, nullopt, 1, 1),
569       TupleType(nullopt, 212, 312, nullopt, 1, 2),
570       TupleType(nullopt, nullopt, 321, 421, 2, 1),
571       TupleType(nullopt, 222, nullopt, 422, 2, 2),
572   };
573 
574   AssertBuilderEquals(scan_builder, rows);
575 }
576 
TEST_F(TestSchemaUnification,SelectPhysicalColumns)577 TEST_F(TestSchemaUnification, SelectPhysicalColumns) {
578   // Same as above, but scoped to physical columns.
579   ASSERT_OK_AND_ASSIGN(auto scan_builder, dataset_->NewScan());
580   ASSERT_OK(scan_builder->Project({"phy_1", "phy_2", "phy_3", "phy_4"}));
581 
582   using TupleType = std::tuple<i32, i32, i32, i32>;
583   std::vector<TupleType> rows = {
584       TupleType(111, 211, nullopt, nullopt),
585       TupleType(nullopt, 212, 312, nullopt),
586       TupleType(nullopt, nullopt, 321, 421),
587       TupleType(nullopt, 222, nullopt, 422),
588   };
589 
590   AssertBuilderEquals(scan_builder, rows);
591 }
592 
TEST_F(TestSchemaUnification,SelectSomeReorderedPhysicalColumns)593 TEST_F(TestSchemaUnification, SelectSomeReorderedPhysicalColumns) {
594   // Select physical columns in a different order than physical Fragments
595   ASSERT_OK_AND_ASSIGN(auto scan_builder, dataset_->NewScan());
596   ASSERT_OK(scan_builder->Project({"phy_2", "phy_1", "phy_4"}));
597 
598   using TupleType = std::tuple<i32, i32, i32>;
599   std::vector<TupleType> rows = {
600       TupleType(211, 111, nullopt),
601       TupleType(212, nullopt, nullopt),
602       TupleType(nullopt, nullopt, 421),
603       TupleType(222, nullopt, 422),
604   };
605 
606   AssertBuilderEquals(scan_builder, rows);
607 }
608 
TEST_F(TestSchemaUnification,SelectPhysicalColumnsFilterPartitionColumn)609 TEST_F(TestSchemaUnification, SelectPhysicalColumnsFilterPartitionColumn) {
610   // Select a subset of physical column with a filter on a missing physical
611   // column and a partition column, it ensures:
612   //
613   // - Can filter on virtual and physical columns with a non-trivial filter
614   //   when some of the columns may not be materialized
615   ASSERT_OK_AND_ASSIGN(auto scan_builder, dataset_->NewScan());
616   ASSERT_OK(scan_builder->Project({"phy_2", "phy_3", "phy_4"}));
617   ASSERT_OK(scan_builder->Filter(or_(and_(equal(field_ref("part_df"), literal(1)),
618                                           equal(field_ref("phy_2"), literal(211))),
619                                      and_(equal(field_ref("part_ds"), literal(2)),
620                                           not_equal(field_ref("phy_4"), literal(422))))));
621 
622   using TupleType = std::tuple<i32, i32, i32>;
623   std::vector<TupleType> rows = {
624       TupleType(211, nullopt, nullopt),
625       TupleType(nullopt, 321, 421),
626   };
627 
628   AssertBuilderEquals(scan_builder, rows);
629 }
630 
TEST_F(TestSchemaUnification,SelectSyntheticColumn)631 TEST_F(TestSchemaUnification, SelectSyntheticColumn) {
632   // Select only a synthetic column
633   ASSERT_OK_AND_ASSIGN(auto scan_builder, dataset_->NewScan());
634   ASSERT_OK(scan_builder->Project(
635       {call("add", {field_ref("phy_1"), field_ref("part_df")})}, {"phy_1 + part_df"}));
636 
637   ASSERT_OK_AND_ASSIGN(auto scanner, scan_builder->Finish());
638   AssertSchemaEqual(Schema({field("phy_1 + part_df", int32())}),
639                     *scanner->options()->projected_schema);
640 
641   using TupleType = std::tuple<i32>;
642   std::vector<TupleType> rows = {
643       TupleType(111 + 1),
644       TupleType(nullopt),
645       TupleType(nullopt),
646       TupleType(nullopt),
647   };
648 
649   AssertBuilderEquals(scan_builder, rows);
650 }
651 
TEST_F(TestSchemaUnification,SelectPartitionColumns)652 TEST_F(TestSchemaUnification, SelectPartitionColumns) {
653   // Selects partition (virtual) columns, it ensures:
654   //
655   // - virtual column are materialized
656   // - Fragment yield the right number of rows even if no column is selected
657   ASSERT_OK_AND_ASSIGN(auto scan_builder, dataset_->NewScan());
658   ASSERT_OK(scan_builder->Project({"part_ds", "part_df"}));
659   using TupleType = std::tuple<i32, i32>;
660   std::vector<TupleType> rows = {
661       TupleType(1, 1),
662       TupleType(1, 2),
663       TupleType(2, 1),
664       TupleType(2, 2),
665   };
666   AssertBuilderEquals(scan_builder, rows);
667 }
668 
TEST_F(TestSchemaUnification,SelectPartitionColumnsFilterPhysicalColumn)669 TEST_F(TestSchemaUnification, SelectPartitionColumnsFilterPhysicalColumn) {
670   // Selects re-ordered virtual columns with a filter on a physical columns
671   ASSERT_OK_AND_ASSIGN(auto scan_builder, dataset_->NewScan());
672   ASSERT_OK(scan_builder->Filter(equal(field_ref("phy_1"), literal(111))));
673 
674   ASSERT_OK(scan_builder->Project({"part_df", "part_ds"}));
675   using TupleType = std::tuple<i32, i32>;
676   std::vector<TupleType> rows = {
677       TupleType(1, 1),
678   };
679   AssertBuilderEquals(scan_builder, rows);
680 }
681 
TEST_F(TestSchemaUnification,SelectMixedColumnsAndFilter)682 TEST_F(TestSchemaUnification, SelectMixedColumnsAndFilter) {
683   // Selects mix of physical/virtual with a different order and uses a filter on
684   // a physical column not selected.
685   ASSERT_OK_AND_ASSIGN(auto scan_builder, dataset_->NewScan());
686   ASSERT_OK(scan_builder->Filter(greater_equal(field_ref("phy_2"), literal(212))));
687   ASSERT_OK(scan_builder->Project({"part_df", "phy_3", "part_ds", "phy_1"}));
688 
689   using TupleType = std::tuple<i32, i32, i32, i32>;
690   std::vector<TupleType> rows = {
691       TupleType(2, 312, 1, nullopt),
692       TupleType(2, nullopt, 2, nullopt),
693   };
694   AssertBuilderEquals(scan_builder, rows);
695 }
696 
TEST(TestDictPartitionColumn,SelectPartitionColumnFilterPhysicalColumn)697 TEST(TestDictPartitionColumn, SelectPartitionColumnFilterPhysicalColumn) {
698   auto partition_field = field("part", dictionary(int32(), utf8()));
699   auto path = "/dataset/part=one/data.json";
700   auto dictionary = ArrayFromJSON(utf8(), R"(["one"])");
701 
702   auto mock_fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
703   ARROW_EXPECT_OK(mock_fs->CreateFile(path, R"([ {"phy_1": 111, "phy_2": 211} ])",
704                                       /*recursive=*/true));
705 
706   auto physical_schema = SchemaFromNames({"phy_1", "phy_2"});
707   auto format = std::make_shared<JSONRecordBatchFileFormat>(
708       [=](const FileSource&) { return physical_schema; });
709 
710   FileSystemFactoryOptions options;
711   options.partition_base_dir = "/dataset";
712   options.partitioning = std::make_shared<HivePartitioning>(schema({partition_field}),
713                                                             ArrayVector{dictionary});
714 
715   ASSERT_OK_AND_ASSIGN(auto factory,
716                        FileSystemDatasetFactory::Make(mock_fs, {path}, format, options));
717 
718   ASSERT_OK_AND_ASSIGN(auto schema, factory->Inspect());
719 
720   ASSERT_OK_AND_ASSIGN(auto dataset, factory->Finish(schema));
721 
722   // Selects re-ordered virtual column with a filter on a physical column
723   ASSERT_OK_AND_ASSIGN(auto scan_builder, dataset->NewScan());
724   ASSERT_OK(scan_builder->Filter(equal(field_ref("phy_1"), literal(111))));
725   ASSERT_OK(scan_builder->Project({"part"}));
726 
727   ASSERT_OK_AND_ASSIGN(auto scanner, scan_builder->Finish());
728   ASSERT_OK_AND_ASSIGN(auto table, scanner->ToTable());
729   AssertArraysEqual(*table->column(0)->chunk(0),
730                     *ArrayFromJSON(partition_field->type(), R"(["one"])"));
731 }
732 
733 }  // namespace dataset
734 }  // namespace arrow
735