1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include "arrow/dataset/dataset.h"
19
20 #include "arrow/dataset/dataset_internal.h"
21 #include "arrow/dataset/discovery.h"
22 #include "arrow/dataset/partition.h"
23 #include "arrow/dataset/test_util.h"
24 #include "arrow/filesystem/mockfs.h"
25 #include "arrow/stl.h"
26 #include "arrow/testing/generator.h"
27 #include "arrow/util/optional.h"
28
29 namespace arrow {
30 namespace dataset {
31
32 class TestInMemoryFragment : public DatasetFixtureMixin {};
33
34 using RecordBatchVector = std::vector<std::shared_ptr<RecordBatch>>;
35
TEST_F(TestInMemoryFragment,Scan)36 TEST_F(TestInMemoryFragment, Scan) {
37 constexpr int64_t kBatchSize = 1024;
38 constexpr int64_t kNumberBatches = 16;
39
40 SetSchema({field("i32", int32()), field("f64", float64())});
41 auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_);
42 auto reader = ConstantArrayGenerator::Repeat(kNumberBatches, batch);
43
44 // Creates a InMemoryFragment of the same repeated batch.
45 RecordBatchVector batches = {static_cast<size_t>(kNumberBatches), batch};
46 auto fragment = std::make_shared<InMemoryFragment>(batches);
47
48 AssertFragmentEquals(reader.get(), fragment.get());
49 }
50
51 class TestInMemoryDataset : public DatasetFixtureMixin {};
52
TEST_F(TestInMemoryDataset,ReplaceSchema)53 TEST_F(TestInMemoryDataset, ReplaceSchema) {
54 constexpr int64_t kBatchSize = 1;
55 constexpr int64_t kNumberBatches = 1;
56
57 SetSchema({field("i32", int32()), field("f64", float64())});
58 auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_);
59 auto reader = ConstantArrayGenerator::Repeat(kNumberBatches, batch);
60
61 auto dataset = std::make_shared<InMemoryDataset>(
62 schema_, RecordBatchVector{static_cast<size_t>(kNumberBatches), batch});
63
64 // drop field
65 ASSERT_OK(dataset->ReplaceSchema(schema({field("i32", int32())})).status());
66 // add field (will be materialized as null during projection)
67 ASSERT_OK(dataset->ReplaceSchema(schema({field("str", utf8())})).status());
68 // incompatible type
69 ASSERT_RAISES(TypeError,
70 dataset->ReplaceSchema(schema({field("i32", utf8())})).status());
71 // incompatible nullability
72 ASSERT_RAISES(
73 TypeError,
74 dataset->ReplaceSchema(schema({field("f64", float64(), /*nullable=*/false)}))
75 .status());
76 // add non-nullable field
77 ASSERT_RAISES(TypeError,
78 dataset->ReplaceSchema(schema({field("str", utf8(), /*nullable=*/false)}))
79 .status());
80 }
81
TEST_F(TestInMemoryDataset,GetFragments)82 TEST_F(TestInMemoryDataset, GetFragments) {
83 constexpr int64_t kBatchSize = 1024;
84 constexpr int64_t kNumberBatches = 16;
85
86 SetSchema({field("i32", int32()), field("f64", float64())});
87 auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_);
88 auto reader = ConstantArrayGenerator::Repeat(kNumberBatches, batch);
89
90 auto dataset = std::make_shared<InMemoryDataset>(
91 schema_, RecordBatchVector{static_cast<size_t>(kNumberBatches), batch});
92
93 AssertDatasetEquals(reader.get(), dataset.get());
94 }
95
TEST_F(TestInMemoryDataset,InMemoryFragment)96 TEST_F(TestInMemoryDataset, InMemoryFragment) {
97 constexpr int64_t kBatchSize = 1024;
98
99 SetSchema({field("i32", int32()), field("f64", float64())});
100 auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_);
101 RecordBatchVector batches{batch};
102
103 // Regression test: previously this constructor relied on undefined behavior (order of
104 // evaluation of arguments) leading to fragments being constructed with empty schemas
105 auto fragment = std::make_shared<InMemoryFragment>(batches);
106 ASSERT_OK_AND_ASSIGN(auto schema, fragment->ReadPhysicalSchema());
107 AssertSchemaEqual(batch->schema(), schema);
108 }
109
110 class TestUnionDataset : public DatasetFixtureMixin {};
111
TEST_F(TestUnionDataset,ReplaceSchema)112 TEST_F(TestUnionDataset, ReplaceSchema) {
113 constexpr int64_t kBatchSize = 1;
114 constexpr int64_t kNumberBatches = 1;
115
116 SetSchema({field("i32", int32()), field("f64", float64())});
117 auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_);
118
119 std::vector<std::shared_ptr<RecordBatch>> batches{static_cast<size_t>(kNumberBatches),
120 batch};
121
122 DatasetVector children = {
123 std::make_shared<InMemoryDataset>(schema_, batches),
124 std::make_shared<InMemoryDataset>(schema_, batches),
125 };
126
127 const int64_t total_batches = children.size() * kNumberBatches;
128 auto reader = ConstantArrayGenerator::Repeat(total_batches, batch);
129
130 ASSERT_OK_AND_ASSIGN(auto dataset, UnionDataset::Make(schema_, children));
131 AssertDatasetEquals(reader.get(), dataset.get());
132
133 // drop field
134 ASSERT_OK(dataset->ReplaceSchema(schema({field("i32", int32())})).status());
135 // add nullable field (will be materialized as null during projection)
136 ASSERT_OK(dataset->ReplaceSchema(schema({field("str", utf8())})).status());
137 // incompatible type
138 ASSERT_RAISES(TypeError,
139 dataset->ReplaceSchema(schema({field("i32", utf8())})).status());
140 // incompatible nullability
141 ASSERT_RAISES(
142 TypeError,
143 dataset->ReplaceSchema(schema({field("f64", float64(), /*nullable=*/false)}))
144 .status());
145 // add non-nullable field
146 ASSERT_RAISES(TypeError,
147 dataset->ReplaceSchema(schema({field("str", utf8(), /*nullable=*/false)}))
148 .status());
149 }
150
TEST_F(TestUnionDataset,GetFragments)151 TEST_F(TestUnionDataset, GetFragments) {
152 constexpr int64_t kBatchSize = 1024;
153 constexpr int64_t kChildPerNode = 2;
154 constexpr int64_t kCompleteBinaryTreeDepth = 4;
155
156 SetSchema({field("i32", int32()), field("f64", float64())});
157 auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_);
158
159 auto n_leaves = 1U << kCompleteBinaryTreeDepth;
160 auto reader = ConstantArrayGenerator::Repeat(n_leaves, batch);
161
162 // Creates a complete binary tree of depth kCompleteBinaryTreeDepth where the
163 // leaves are InMemoryDataset containing kChildPerNode fragments.
164
165 auto l1_leaf_dataset = std::make_shared<InMemoryDataset>(
166 schema_, RecordBatchVector{static_cast<size_t>(kChildPerNode), batch});
167
168 ASSERT_OK_AND_ASSIGN(
169 auto l2_leaf_tree_dataset,
170 UnionDataset::Make(
171 schema_, DatasetVector{static_cast<size_t>(kChildPerNode), l1_leaf_dataset}));
172
173 ASSERT_OK_AND_ASSIGN(
174 auto l3_middle_tree_dataset,
175 UnionDataset::Make(schema_, DatasetVector{static_cast<size_t>(kChildPerNode),
176 l2_leaf_tree_dataset}));
177
178 ASSERT_OK_AND_ASSIGN(
179 auto root_dataset,
180 UnionDataset::Make(schema_, DatasetVector{static_cast<size_t>(kChildPerNode),
181 l3_middle_tree_dataset}));
182
183 AssertDatasetEquals(reader.get(), root_dataset.get());
184 }
185
TEST_F(TestUnionDataset,TrivialScan)186 TEST_F(TestUnionDataset, TrivialScan) {
187 constexpr int64_t kNumberBatches = 16;
188 constexpr int64_t kBatchSize = 1024;
189
190 SetSchema({field("i32", int32()), field("f64", float64())});
191 auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_);
192
193 std::vector<std::shared_ptr<RecordBatch>> batches{static_cast<size_t>(kNumberBatches),
194 batch};
195
196 DatasetVector children = {
197 std::make_shared<InMemoryDataset>(schema_, batches),
198 std::make_shared<InMemoryDataset>(schema_, batches),
199 };
200
201 const int64_t total_batches = children.size() * kNumberBatches;
202 auto reader = ConstantArrayGenerator::Repeat(total_batches, batch);
203
204 ASSERT_OK_AND_ASSIGN(auto dataset, UnionDataset::Make(schema_, children));
205 AssertDatasetEquals(reader.get(), dataset.get());
206 }
207
TEST(TestProjector,CheckProjectable)208 TEST(TestProjector, CheckProjectable) {
209 struct Assert {
210 explicit Assert(FieldVector from) : from_(from) {}
211 Schema from_;
212
213 void ProjectableTo(FieldVector to) {
214 ARROW_EXPECT_OK(CheckProjectable(from_, Schema(to)));
215 }
216
217 void NotProjectableTo(FieldVector to, std::string substr = "") {
218 EXPECT_RAISES_WITH_MESSAGE_THAT(TypeError, testing::HasSubstr(substr),
219 CheckProjectable(from_, Schema(to)));
220 }
221 };
222
223 auto i8 = field("i8", int8());
224 auto u16 = field("u16", uint16());
225 auto str = field("str", utf8());
226 auto i8_req = field("i8", int8(), false);
227 auto u16_req = field("u16", uint16(), false);
228 auto str_req = field("str", utf8(), false);
229 auto str_nil = field("str", null());
230
231 // trivial
232 Assert({}).ProjectableTo({});
233 Assert({i8}).ProjectableTo({i8});
234 Assert({i8, u16_req}).ProjectableTo({i8, u16_req});
235
236 // reorder
237 Assert({i8, u16}).ProjectableTo({u16, i8});
238 Assert({i8, str, u16}).ProjectableTo({u16, i8, str});
239
240 // drop field(s)
241 Assert({i8}).ProjectableTo({});
242
243 // add field(s)
244 Assert({}).ProjectableTo({i8});
245 Assert({}).ProjectableTo({i8, u16});
246 Assert({}).NotProjectableTo({u16_req},
247 "is not nullable and does not exist in origin schema");
248 Assert({i8}).NotProjectableTo({u16_req, i8});
249
250 // change nullability
251 Assert({i8}).NotProjectableTo({i8_req},
252 "not nullable but is not required in origin schema");
253 Assert({i8_req}).ProjectableTo({i8});
254 Assert({str_nil}).ProjectableTo({str});
255 Assert({str_nil}).NotProjectableTo({str_req});
256
257 // change field type
258 Assert({i8}).NotProjectableTo({field("i8", utf8())},
259 "fields had matching names but differing types");
260 }
261
262 class TestEndToEnd : public TestUnionDataset {
SetUp()263 void SetUp() override {
264 bool nullable = false;
265 SetSchema({
266 field("region", utf8(), nullable),
267 field("model", utf8(), nullable),
268 field("sales", float64(), nullable),
269 // partition columns
270 field("year", int32()),
271 field("month", int32()),
272 field("country", utf8()),
273 });
274
275 using PathAndContent = std::vector<std::pair<std::string, std::string>>;
276 auto files = PathAndContent{
277 {"/dataset/2018/01/US/dat.json", R"([
278 {"region": "NY", "model": "3", "sales": 742.0},
279 {"region": "NY", "model": "S", "sales": 304.125},
280 {"region": "NY", "model": "X", "sales": 136.25},
281 {"region": "NY", "model": "Y", "sales": 27.5}
282 ])"},
283 {"/dataset/2018/01/CA/dat.json", R"([
284 {"region": "CA", "model": "3", "sales": 512},
285 {"region": "CA", "model": "S", "sales": 978},
286 {"region": "CA", "model": "X", "sales": 1.0},
287 {"region": "CA", "model": "Y", "sales": 69}
288 ])"},
289 {"/dataset/2019/01/US/dat.json", R"([
290 {"region": "QC", "model": "3", "sales": 273.5},
291 {"region": "QC", "model": "S", "sales": 13},
292 {"region": "QC", "model": "X", "sales": 54},
293 {"region": "QC", "model": "Y", "sales": 21}
294 ])"},
295 {"/dataset/2019/01/CA/dat.json", R"([
296 {"region": "QC", "model": "3", "sales": 152.25},
297 {"region": "QC", "model": "S", "sales": 10},
298 {"region": "QC", "model": "X", "sales": 42},
299 {"region": "QC", "model": "Y", "sales": 37}
300 ])"},
301 {"/dataset/.pesky", "garbage content"},
302 };
303
304 auto mock_fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
305 for (const auto& f : files) {
306 ARROW_EXPECT_OK(mock_fs->CreateFile(f.first, f.second, /*recursive=*/true));
307 }
308
309 fs_ = mock_fs;
310 }
311
312 protected:
313 std::shared_ptr<fs::FileSystem> fs_;
314 };
315
TEST_F(TestEndToEnd,EndToEndSingleDataset)316 TEST_F(TestEndToEnd, EndToEndSingleDataset) {
317 // The dataset API is divided in 3 parts:
318 // - Creation
319 // - Querying
320 // - Consuming
321
322 // Creation.
323 //
324 // A Dataset is the union of one or more Datasets with the same schema.
325 // Example of Dataset, FileSystemDataset, OdbcDataset,
326 // FlightDataset.
327
328 // A Dataset is composed of Fragments. Each Fragment can yield
329 // multiple RecordBatches. Datasets can be created manually or "discovered"
330 // via the DatasetFactory interface.
331 std::shared_ptr<DatasetFactory> factory;
332
333 // The user must specify which FileFormat is used to create FileFragments.
334 // This option is specific to FileSystemDataset (and the builder).
335 auto format_schema = SchemaFromColumnNames(schema_, {"region", "model", "sales"});
336 auto format = std::make_shared<JSONRecordBatchFileFormat>(format_schema);
337
338 // A selector is used to crawl files and directories of a
339 // filesystem. If the options in FileSelector are not enough, the
340 // FileSystemDatasetFactory class also supports an explicit list of
341 // fs::FileInfo instead of the selector.
342 fs::FileSelector s;
343 s.base_dir = "/dataset";
344 s.recursive = true;
345
346 // Further options can be given to the factory mechanism via the
347 // FileSystemFactoryOptions configuration class. See the docstring for more
348 // information.
349 FileSystemFactoryOptions options;
350 options.selector_ignore_prefixes = {"."};
351
352 // Partitions expressions can be discovered for Dataset and Fragments.
353 // This metadata is then used in conjunction with the query filter to apply
354 // the pushdown predicate optimization.
355 //
356 // The DirectoryPartitioning is a partitioning where the path is split with
357 // the directory separator character and the components are parsed as values
358 // of the corresponding fields in its schema.
359 //
360 // Since a PartitioningFactory is specified instead of an explicit
361 // Partitioning, the types of partition fields will be inferred.
362 //
363 // - "/2019" -> {"year": 2019}
364 // - "/2019/01 -> {"year": 2019, "month": 1}
365 // - "/2019/01/CA -> {"year": 2019, "month": 1, "country": "CA"}
366 // - "/2019/01/CA/a_file.json -> {"year": 2019, "month": 1, "country": "CA"}
367 options.partitioning = DirectoryPartitioning::MakeFactory({"year", "month", "country"});
368
369 ASSERT_OK_AND_ASSIGN(factory, FileSystemDatasetFactory::Make(fs_, s, format, options));
370
371 // Fragments might have compatible but slightly different schemas, e.g.
372 // schema evolved by adding/renaming columns. In this case, the schema is
373 // passed to the dataset constructor.
374 // The inspected_schema may optionally be modified before being finalized.
375 InspectOptions inspect_options;
376 inspect_options.fragments = InspectOptions::kInspectAllFragments;
377 ASSERT_OK_AND_ASSIGN(auto inspected_schema, factory->Inspect(inspect_options));
378 EXPECT_EQ(*schema_, *inspected_schema);
379
380 // Build the Dataset where partitions are attached to fragments (files).
381 ASSERT_OK_AND_ASSIGN(auto source, factory->Finish(inspected_schema));
382
383 // Create the Dataset from our single Dataset.
384 ASSERT_OK_AND_ASSIGN(auto dataset, UnionDataset::Make(inspected_schema, {source}));
385
386 // Querying.
387 //
388 // The Scan operator materializes data from io into memory. Avoiding data
389 // transfer is a critical optimization done by analytical engine. Thus, a
390 // Scan can take multiple options, notably a subset of columns and a filter
391 // expression.
392 ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
393
394 // An optional subset of columns can be provided. This will trickle to
395 // Fragment drivers. The net effect is that only columns of interest will
396 // be materialized if the Fragment supports it. This is the major benefit
397 // of using a column-major format versus a row-major format.
398 //
399 // This API decouples the Dataset/Fragment implementation and column
400 // projection from the query part.
401 //
402 // For example, a ParquetFileFragment may read the necessary byte ranges
403 // exclusively, ranges, or an OdbcFragment could convert the projection to a SELECT
404 // statement. The CsvFileFragment wouldn't benefit from this as much, but
405 // can still benefit from skipping conversion of unneeded columns.
406 std::vector<std::string> columns{"sales", "model", "country"};
407 ASSERT_OK(scanner_builder->Project(columns));
408
409 // An optional filter expression may also be specified. The filter expression
410 // is evaluated against input rows. Only rows for which the filter evaluates to true
411 // are yielded. Predicate pushdown optimizations are applied using partition
412 // information if available.
413 //
414 // This API decouples predicate pushdown from the Dataset implementation
415 // and partition discovery.
416 //
417 // The following filter tests both predicate pushdown and post filtering
418 // without partition information because `year` is a partition and `sales` is
419 // not.
420 auto filter = and_(equal(field_ref("year"), literal(2019)),
421 greater(field_ref("sales"), literal(100.0)));
422 ASSERT_OK(scanner_builder->Filter(filter));
423
424 ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
425 // In the simplest case, consumption is simply conversion to a Table.
426 ASSERT_OK_AND_ASSIGN(auto table, scanner->ToTable());
427
428 auto expected = TableFromJSON(scanner_builder->projected_schema(), {R"([
429 {"sales": 152.25, "model": "3", "country": "CA"},
430 {"sales": 273.5, "model": "3", "country": "US"}
431 ])"});
432 AssertTablesEqual(*expected, *table, false, true);
433 }
434
SchemaFromNames(const std::vector<std::string> names)435 inline std::shared_ptr<Schema> SchemaFromNames(const std::vector<std::string> names) {
436 std::vector<std::shared_ptr<Field>> fields;
437 for (const auto& name : names) {
438 fields.push_back(field(name, int32()));
439 }
440
441 return schema(fields);
442 }
443
444 class TestSchemaUnification : public TestUnionDataset {
445 public:
446 using i32 = util::optional<int32_t>;
447 using PathAndContent = std::vector<std::pair<std::string, std::string>>;
448
SetUp()449 void SetUp() override {
450 // The following test creates 2 sources with divergent but compatible
451 // schemas. Each source have a common partitioning where the
452 // fields are not materialized in the data fragments.
453 //
454 // Each data is composed of 2 data fragments with divergent but
455 // compatible schemas. The data fragment within a source share at
456 // least one column.
457 //
458 // Thus, the fixture helps verifying various scenarios where the Scanner
459 // must fix the RecordBatches to align with the final unified schema exposed
460 // to the consumer.
461 static constexpr auto ds1_df1 = "/dataset/alpha/part_ds=1/part_df=1/data.json";
462 static constexpr auto ds1_df2 = "/dataset/alpha/part_ds=1/part_df=2/data.json";
463 static constexpr auto ds2_df1 = "/dataset/beta/part_ds=2/part_df=1/data.json";
464 static constexpr auto ds2_df2 = "/dataset/beta/part_ds=2/part_df=2/data.json";
465 auto files = PathAndContent{
466 // First Dataset
467 {ds1_df1, R"([{"phy_1": 111, "phy_2": 211}])"},
468 {ds1_df2, R"([{"phy_2": 212, "phy_3": 312}])"},
469 // Second Dataset
470 {ds2_df1, R"([{"phy_3": 321, "phy_4": 421}])"},
471 {ds2_df2, R"([{"phy_4": 422, "phy_2": 222}])"},
472 };
473
474 auto mock_fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
475 for (const auto& f : files) {
476 ARROW_EXPECT_OK(mock_fs->CreateFile(f.first, f.second, /* recursive */ true));
477 }
478 fs_ = mock_fs;
479
480 auto get_source =
481 [this](std::string base,
482 std::vector<std::string> paths) -> Result<std::shared_ptr<Dataset>> {
483 auto resolver = [](const FileSource& source) -> std::shared_ptr<Schema> {
484 auto path = source.path();
485 // A different schema for each data fragment.
486 if (path == ds1_df1) {
487 return SchemaFromNames({"phy_1", "phy_2"});
488 } else if (path == ds1_df2) {
489 return SchemaFromNames({"phy_2", "phy_3"});
490 } else if (path == ds2_df1) {
491 return SchemaFromNames({"phy_3", "phy_4"});
492 } else if (path == ds2_df2) {
493 return SchemaFromNames({"phy_4", "phy_2"});
494 }
495
496 return nullptr;
497 };
498
499 auto format = std::make_shared<JSONRecordBatchFileFormat>(resolver);
500
501 FileSystemFactoryOptions options;
502 options.partition_base_dir = base;
503 options.partitioning =
504 std::make_shared<HivePartitioning>(SchemaFromNames({"part_ds", "part_df"}));
505
506 ARROW_ASSIGN_OR_RAISE(auto factory,
507 FileSystemDatasetFactory::Make(fs_, paths, format, options));
508
509 ARROW_ASSIGN_OR_RAISE(auto schema, factory->Inspect());
510
511 return factory->Finish(schema);
512 };
513
514 schema_ = SchemaFromNames({"phy_1", "phy_2", "phy_3", "phy_4", "part_ds", "part_df"});
515 ASSERT_OK_AND_ASSIGN(auto ds1, get_source("/dataset/alpha", {ds1_df1, ds1_df2}));
516 ASSERT_OK_AND_ASSIGN(auto ds2, get_source("/dataset/beta", {ds2_df1, ds2_df2}));
517
518 // FIXME(bkietz) this is a hack: allow differing schemas for the purposes of this
519 // test
520 class DisparateSchemasUnionDataset : public UnionDataset {
521 public:
522 DisparateSchemasUnionDataset(std::shared_ptr<Schema> schema, DatasetVector children)
523 : UnionDataset(std::move(schema), std::move(children)) {}
524 };
525 dataset_ =
526 std::make_shared<DisparateSchemasUnionDataset>(schema_, DatasetVector{ds1, ds2});
527 }
528
529 template <typename TupleType>
AssertScanEquals(std::shared_ptr<Scanner> scanner,const std::vector<TupleType> & expected_rows)530 void AssertScanEquals(std::shared_ptr<Scanner> scanner,
531 const std::vector<TupleType>& expected_rows) {
532 std::vector<std::string> columns;
533 for (const auto& field : scanner->options()->projected_schema->fields()) {
534 columns.push_back(field->name());
535 }
536
537 ASSERT_OK_AND_ASSIGN(auto actual, scanner->ToTable());
538 std::shared_ptr<Table> expected;
539 ASSERT_OK(stl::TableFromTupleRange(default_memory_pool(), expected_rows, columns,
540 &expected));
541 AssertTablesEqual(*expected, *actual, false, true);
542 }
543
544 template <typename TupleType>
AssertBuilderEquals(std::shared_ptr<ScannerBuilder> builder,const std::vector<TupleType> & expected_rows)545 void AssertBuilderEquals(std::shared_ptr<ScannerBuilder> builder,
546 const std::vector<TupleType>& expected_rows) {
547 ASSERT_OK_AND_ASSIGN(auto scanner, builder->Finish());
548 AssertScanEquals(scanner, expected_rows);
549 }
550
551 protected:
552 std::shared_ptr<fs::FileSystem> fs_;
553 std::shared_ptr<Dataset> dataset_;
554 };
555
556 using util::nullopt;
557
TEST_F(TestSchemaUnification,SelectStar)558 TEST_F(TestSchemaUnification, SelectStar) {
559 // This is a `SELECT * FROM dataset` where it ensures:
560 //
561 // - proper re-ordering of columns
562 // - materializing missing physical columns in Fragments
563 // - materializing missing partition columns extracted from Partitioning
564 ASSERT_OK_AND_ASSIGN(auto scan_builder, dataset_->NewScan());
565
566 using TupleType = std::tuple<i32, i32, i32, i32, i32, i32>;
567 std::vector<TupleType> rows = {
568 TupleType(111, 211, nullopt, nullopt, 1, 1),
569 TupleType(nullopt, 212, 312, nullopt, 1, 2),
570 TupleType(nullopt, nullopt, 321, 421, 2, 1),
571 TupleType(nullopt, 222, nullopt, 422, 2, 2),
572 };
573
574 AssertBuilderEquals(scan_builder, rows);
575 }
576
TEST_F(TestSchemaUnification,SelectPhysicalColumns)577 TEST_F(TestSchemaUnification, SelectPhysicalColumns) {
578 // Same as above, but scoped to physical columns.
579 ASSERT_OK_AND_ASSIGN(auto scan_builder, dataset_->NewScan());
580 ASSERT_OK(scan_builder->Project({"phy_1", "phy_2", "phy_3", "phy_4"}));
581
582 using TupleType = std::tuple<i32, i32, i32, i32>;
583 std::vector<TupleType> rows = {
584 TupleType(111, 211, nullopt, nullopt),
585 TupleType(nullopt, 212, 312, nullopt),
586 TupleType(nullopt, nullopt, 321, 421),
587 TupleType(nullopt, 222, nullopt, 422),
588 };
589
590 AssertBuilderEquals(scan_builder, rows);
591 }
592
TEST_F(TestSchemaUnification,SelectSomeReorderedPhysicalColumns)593 TEST_F(TestSchemaUnification, SelectSomeReorderedPhysicalColumns) {
594 // Select physical columns in a different order than physical Fragments
595 ASSERT_OK_AND_ASSIGN(auto scan_builder, dataset_->NewScan());
596 ASSERT_OK(scan_builder->Project({"phy_2", "phy_1", "phy_4"}));
597
598 using TupleType = std::tuple<i32, i32, i32>;
599 std::vector<TupleType> rows = {
600 TupleType(211, 111, nullopt),
601 TupleType(212, nullopt, nullopt),
602 TupleType(nullopt, nullopt, 421),
603 TupleType(222, nullopt, 422),
604 };
605
606 AssertBuilderEquals(scan_builder, rows);
607 }
608
TEST_F(TestSchemaUnification,SelectPhysicalColumnsFilterPartitionColumn)609 TEST_F(TestSchemaUnification, SelectPhysicalColumnsFilterPartitionColumn) {
610 // Select a subset of physical column with a filter on a missing physical
611 // column and a partition column, it ensures:
612 //
613 // - Can filter on virtual and physical columns with a non-trivial filter
614 // when some of the columns may not be materialized
615 ASSERT_OK_AND_ASSIGN(auto scan_builder, dataset_->NewScan());
616 ASSERT_OK(scan_builder->Project({"phy_2", "phy_3", "phy_4"}));
617 ASSERT_OK(scan_builder->Filter(or_(and_(equal(field_ref("part_df"), literal(1)),
618 equal(field_ref("phy_2"), literal(211))),
619 and_(equal(field_ref("part_ds"), literal(2)),
620 not_equal(field_ref("phy_4"), literal(422))))));
621
622 using TupleType = std::tuple<i32, i32, i32>;
623 std::vector<TupleType> rows = {
624 TupleType(211, nullopt, nullopt),
625 TupleType(nullopt, 321, 421),
626 };
627
628 AssertBuilderEquals(scan_builder, rows);
629 }
630
TEST_F(TestSchemaUnification,SelectSyntheticColumn)631 TEST_F(TestSchemaUnification, SelectSyntheticColumn) {
632 // Select only a synthetic column
633 ASSERT_OK_AND_ASSIGN(auto scan_builder, dataset_->NewScan());
634 ASSERT_OK(scan_builder->Project(
635 {call("add", {field_ref("phy_1"), field_ref("part_df")})}, {"phy_1 + part_df"}));
636
637 ASSERT_OK_AND_ASSIGN(auto scanner, scan_builder->Finish());
638 AssertSchemaEqual(Schema({field("phy_1 + part_df", int32())}),
639 *scanner->options()->projected_schema);
640
641 using TupleType = std::tuple<i32>;
642 std::vector<TupleType> rows = {
643 TupleType(111 + 1),
644 TupleType(nullopt),
645 TupleType(nullopt),
646 TupleType(nullopt),
647 };
648
649 AssertBuilderEquals(scan_builder, rows);
650 }
651
TEST_F(TestSchemaUnification,SelectPartitionColumns)652 TEST_F(TestSchemaUnification, SelectPartitionColumns) {
653 // Selects partition (virtual) columns, it ensures:
654 //
655 // - virtual column are materialized
656 // - Fragment yield the right number of rows even if no column is selected
657 ASSERT_OK_AND_ASSIGN(auto scan_builder, dataset_->NewScan());
658 ASSERT_OK(scan_builder->Project({"part_ds", "part_df"}));
659 using TupleType = std::tuple<i32, i32>;
660 std::vector<TupleType> rows = {
661 TupleType(1, 1),
662 TupleType(1, 2),
663 TupleType(2, 1),
664 TupleType(2, 2),
665 };
666 AssertBuilderEquals(scan_builder, rows);
667 }
668
TEST_F(TestSchemaUnification,SelectPartitionColumnsFilterPhysicalColumn)669 TEST_F(TestSchemaUnification, SelectPartitionColumnsFilterPhysicalColumn) {
670 // Selects re-ordered virtual columns with a filter on a physical columns
671 ASSERT_OK_AND_ASSIGN(auto scan_builder, dataset_->NewScan());
672 ASSERT_OK(scan_builder->Filter(equal(field_ref("phy_1"), literal(111))));
673
674 ASSERT_OK(scan_builder->Project({"part_df", "part_ds"}));
675 using TupleType = std::tuple<i32, i32>;
676 std::vector<TupleType> rows = {
677 TupleType(1, 1),
678 };
679 AssertBuilderEquals(scan_builder, rows);
680 }
681
TEST_F(TestSchemaUnification,SelectMixedColumnsAndFilter)682 TEST_F(TestSchemaUnification, SelectMixedColumnsAndFilter) {
683 // Selects mix of physical/virtual with a different order and uses a filter on
684 // a physical column not selected.
685 ASSERT_OK_AND_ASSIGN(auto scan_builder, dataset_->NewScan());
686 ASSERT_OK(scan_builder->Filter(greater_equal(field_ref("phy_2"), literal(212))));
687 ASSERT_OK(scan_builder->Project({"part_df", "phy_3", "part_ds", "phy_1"}));
688
689 using TupleType = std::tuple<i32, i32, i32, i32>;
690 std::vector<TupleType> rows = {
691 TupleType(2, 312, 1, nullopt),
692 TupleType(2, nullopt, 2, nullopt),
693 };
694 AssertBuilderEquals(scan_builder, rows);
695 }
696
TEST(TestDictPartitionColumn,SelectPartitionColumnFilterPhysicalColumn)697 TEST(TestDictPartitionColumn, SelectPartitionColumnFilterPhysicalColumn) {
698 auto partition_field = field("part", dictionary(int32(), utf8()));
699 auto path = "/dataset/part=one/data.json";
700 auto dictionary = ArrayFromJSON(utf8(), R"(["one"])");
701
702 auto mock_fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
703 ARROW_EXPECT_OK(mock_fs->CreateFile(path, R"([ {"phy_1": 111, "phy_2": 211} ])",
704 /*recursive=*/true));
705
706 auto physical_schema = SchemaFromNames({"phy_1", "phy_2"});
707 auto format = std::make_shared<JSONRecordBatchFileFormat>(
708 [=](const FileSource&) { return physical_schema; });
709
710 FileSystemFactoryOptions options;
711 options.partition_base_dir = "/dataset";
712 options.partitioning = std::make_shared<HivePartitioning>(schema({partition_field}),
713 ArrayVector{dictionary});
714
715 ASSERT_OK_AND_ASSIGN(auto factory,
716 FileSystemDatasetFactory::Make(mock_fs, {path}, format, options));
717
718 ASSERT_OK_AND_ASSIGN(auto schema, factory->Inspect());
719
720 ASSERT_OK_AND_ASSIGN(auto dataset, factory->Finish(schema));
721
722 // Selects re-ordered virtual column with a filter on a physical column
723 ASSERT_OK_AND_ASSIGN(auto scan_builder, dataset->NewScan());
724 ASSERT_OK(scan_builder->Filter(equal(field_ref("phy_1"), literal(111))));
725 ASSERT_OK(scan_builder->Project({"part"}));
726
727 ASSERT_OK_AND_ASSIGN(auto scanner, scan_builder->Finish());
728 ASSERT_OK_AND_ASSIGN(auto table, scanner->ToTable());
729 AssertArraysEqual(*table->column(0)->chunk(0),
730 *ArrayFromJSON(partition_field->type(), R"(["one"])"));
731 }
732
733 } // namespace dataset
734 } // namespace arrow
735