1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 // This example showcases various ways to work with Datasets. It's
19 // intended to be paired with the documentation.
20 
21 #include <arrow/api.h>
22 #include <arrow/compute/cast.h>
23 #include <arrow/compute/exec/expression.h>
24 #include <arrow/dataset/dataset.h>
25 #include <arrow/dataset/discovery.h>
26 #include <arrow/dataset/file_base.h>
27 #include <arrow/dataset/file_ipc.h>
28 #include <arrow/dataset/file_parquet.h>
29 #include <arrow/dataset/scanner.h>
30 #include <arrow/filesystem/filesystem.h>
31 #include <arrow/ipc/writer.h>
32 #include <arrow/util/iterator.h>
33 #include <parquet/arrow/writer.h>
34 
35 #include <iostream>
36 #include <vector>
37 
38 namespace ds = arrow::dataset;
39 namespace fs = arrow::fs;
40 namespace cp = arrow::compute;
41 
42 #define ABORT_ON_FAILURE(expr)                     \
43   do {                                             \
44     arrow::Status status_ = (expr);                \
45     if (!status_.ok()) {                           \
46       std::cerr << status_.message() << std::endl; \
47       abort();                                     \
48     }                                              \
49   } while (0);
50 
51 // (Doc section: Reading Datasets)
52 // Generate some data for the rest of this example.
CreateTable()53 std::shared_ptr<arrow::Table> CreateTable() {
54   auto schema =
55       arrow::schema({arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
56                      arrow::field("c", arrow::int64())});
57   std::shared_ptr<arrow::Array> array_a;
58   std::shared_ptr<arrow::Array> array_b;
59   std::shared_ptr<arrow::Array> array_c;
60   arrow::NumericBuilder<arrow::Int64Type> builder;
61   ABORT_ON_FAILURE(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
62   ABORT_ON_FAILURE(builder.Finish(&array_a));
63   builder.Reset();
64   ABORT_ON_FAILURE(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
65   ABORT_ON_FAILURE(builder.Finish(&array_b));
66   builder.Reset();
67   ABORT_ON_FAILURE(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
68   ABORT_ON_FAILURE(builder.Finish(&array_c));
69   return arrow::Table::Make(schema, {array_a, array_b, array_c});
70 }
71 
72 // Set up a dataset by writing two Parquet files.
CreateExampleParquetDataset(const std::shared_ptr<fs::FileSystem> & filesystem,const std::string & root_path)73 std::string CreateExampleParquetDataset(const std::shared_ptr<fs::FileSystem>& filesystem,
74                                         const std::string& root_path) {
75   auto base_path = root_path + "/parquet_dataset";
76   ABORT_ON_FAILURE(filesystem->CreateDir(base_path));
77   // Create an Arrow Table
78   auto table = CreateTable();
79   // Write it into two Parquet files
80   auto output = filesystem->OpenOutputStream(base_path + "/data1.parquet").ValueOrDie();
81   ABORT_ON_FAILURE(parquet::arrow::WriteTable(
82       *table->Slice(0, 5), arrow::default_memory_pool(), output, /*chunk_size=*/2048));
83   output = filesystem->OpenOutputStream(base_path + "/data2.parquet").ValueOrDie();
84   ABORT_ON_FAILURE(parquet::arrow::WriteTable(
85       *table->Slice(5), arrow::default_memory_pool(), output, /*chunk_size=*/2048));
86   return base_path;
87 }
88 // (Doc section: Reading Datasets)
89 
90 // (Doc section: Reading different file formats)
91 // Set up a dataset by writing two Feather files.
CreateExampleFeatherDataset(const std::shared_ptr<fs::FileSystem> & filesystem,const std::string & root_path)92 std::string CreateExampleFeatherDataset(const std::shared_ptr<fs::FileSystem>& filesystem,
93                                         const std::string& root_path) {
94   auto base_path = root_path + "/feather_dataset";
95   ABORT_ON_FAILURE(filesystem->CreateDir(base_path));
96   // Create an Arrow Table
97   auto table = CreateTable();
98   // Write it into two Feather files
99   auto output = filesystem->OpenOutputStream(base_path + "/data1.feather").ValueOrDie();
100   auto writer = arrow::ipc::MakeFileWriter(output.get(), table->schema()).ValueOrDie();
101   ABORT_ON_FAILURE(writer->WriteTable(*table->Slice(0, 5)));
102   ABORT_ON_FAILURE(writer->Close());
103   output = filesystem->OpenOutputStream(base_path + "/data2.feather").ValueOrDie();
104   writer = arrow::ipc::MakeFileWriter(output.get(), table->schema()).ValueOrDie();
105   ABORT_ON_FAILURE(writer->WriteTable(*table->Slice(5)));
106   ABORT_ON_FAILURE(writer->Close());
107   return base_path;
108 }
109 // (Doc section: Reading different file formats)
110 
111 // (Doc section: Reading and writing partitioned data)
112 // Set up a dataset by writing files with partitioning
CreateExampleParquetHivePartitionedDataset(const std::shared_ptr<fs::FileSystem> & filesystem,const std::string & root_path)113 std::string CreateExampleParquetHivePartitionedDataset(
114     const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
115   auto base_path = root_path + "/parquet_dataset";
116   ABORT_ON_FAILURE(filesystem->CreateDir(base_path));
117   // Create an Arrow Table
118   auto schema = arrow::schema(
119       {arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
120        arrow::field("c", arrow::int64()), arrow::field("part", arrow::utf8())});
121   std::vector<std::shared_ptr<arrow::Array>> arrays(4);
122   arrow::NumericBuilder<arrow::Int64Type> builder;
123   ABORT_ON_FAILURE(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
124   ABORT_ON_FAILURE(builder.Finish(&arrays[0]));
125   builder.Reset();
126   ABORT_ON_FAILURE(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
127   ABORT_ON_FAILURE(builder.Finish(&arrays[1]));
128   builder.Reset();
129   ABORT_ON_FAILURE(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
130   ABORT_ON_FAILURE(builder.Finish(&arrays[2]));
131   arrow::StringBuilder string_builder;
132   ABORT_ON_FAILURE(
133       string_builder.AppendValues({"a", "a", "a", "a", "a", "b", "b", "b", "b", "b"}));
134   ABORT_ON_FAILURE(string_builder.Finish(&arrays[3]));
135   auto table = arrow::Table::Make(schema, arrays);
136   // Write it using Datasets
137   auto dataset = std::make_shared<ds::InMemoryDataset>(table);
138   auto scanner_builder = dataset->NewScan().ValueOrDie();
139   auto scanner = scanner_builder->Finish().ValueOrDie();
140 
141   // The partition schema determines which fields are part of the partitioning.
142   auto partition_schema = arrow::schema({arrow::field("part", arrow::utf8())});
143   // We'll use Hive-style partitioning, which creates directories with "key=value" pairs.
144   auto partitioning = std::make_shared<ds::HivePartitioning>(partition_schema);
145   // We'll write Parquet files.
146   auto format = std::make_shared<ds::ParquetFileFormat>();
147   ds::FileSystemDatasetWriteOptions write_options;
148   write_options.file_write_options = format->DefaultWriteOptions();
149   write_options.filesystem = filesystem;
150   write_options.base_dir = base_path;
151   write_options.partitioning = partitioning;
152   write_options.basename_template = "part{i}.parquet";
153   ABORT_ON_FAILURE(ds::FileSystemDataset::Write(write_options, scanner));
154   return base_path;
155 }
156 // (Doc section: Reading and writing partitioned data)
157 
158 // (Doc section: Dataset discovery)
159 // Read the whole dataset with the given format, without partitioning.
ScanWholeDataset(const std::shared_ptr<fs::FileSystem> & filesystem,const std::shared_ptr<ds::FileFormat> & format,const std::string & base_dir)160 std::shared_ptr<arrow::Table> ScanWholeDataset(
161     const std::shared_ptr<fs::FileSystem>& filesystem,
162     const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
163   // Create a dataset by scanning the filesystem for files
164   fs::FileSelector selector;
165   selector.base_dir = base_dir;
166   auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
167                                                     ds::FileSystemFactoryOptions())
168                      .ValueOrDie();
169   auto dataset = factory->Finish().ValueOrDie();
170   // Print out the fragments
171   for (const auto& fragment : dataset->GetFragments().ValueOrDie()) {
172     std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
173   }
174   // Read the entire dataset as a Table
175   auto scan_builder = dataset->NewScan().ValueOrDie();
176   auto scanner = scan_builder->Finish().ValueOrDie();
177   return scanner->ToTable().ValueOrDie();
178 }
179 // (Doc section: Dataset discovery)
180 
181 // (Doc section: Filtering data)
182 // Read a dataset, but select only column "b" and only rows where b < 4.
183 //
184 // This is useful when you only want a few columns from a dataset. Where possible,
185 // Datasets will push down the column selection such that less work is done.
FilterAndSelectDataset(const std::shared_ptr<fs::FileSystem> & filesystem,const std::shared_ptr<ds::FileFormat> & format,const std::string & base_dir)186 std::shared_ptr<arrow::Table> FilterAndSelectDataset(
187     const std::shared_ptr<fs::FileSystem>& filesystem,
188     const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
189   fs::FileSelector selector;
190   selector.base_dir = base_dir;
191   auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
192                                                     ds::FileSystemFactoryOptions())
193                      .ValueOrDie();
194   auto dataset = factory->Finish().ValueOrDie();
195   // Read specified columns with a row filter
196   auto scan_builder = dataset->NewScan().ValueOrDie();
197   ABORT_ON_FAILURE(scan_builder->Project({"b"}));
198   ABORT_ON_FAILURE(scan_builder->Filter(cp::less(cp::field_ref("b"), cp::literal(4))));
199   auto scanner = scan_builder->Finish().ValueOrDie();
200   return scanner->ToTable().ValueOrDie();
201 }
202 // (Doc section: Filtering data)
203 
204 // (Doc section: Projecting columns)
205 // Read a dataset, but with column projection.
206 //
207 // This is useful to derive new columns from existing data. For example, here we
208 // demonstrate casting a column to a different type, and turning a numeric column into a
209 // boolean column based on a predicate. You could also rename columns or perform
210 // computations involving multiple columns.
ProjectDataset(const std::shared_ptr<fs::FileSystem> & filesystem,const std::shared_ptr<ds::FileFormat> & format,const std::string & base_dir)211 std::shared_ptr<arrow::Table> ProjectDataset(
212     const std::shared_ptr<fs::FileSystem>& filesystem,
213     const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
214   fs::FileSelector selector;
215   selector.base_dir = base_dir;
216   auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
217                                                     ds::FileSystemFactoryOptions())
218                      .ValueOrDie();
219   auto dataset = factory->Finish().ValueOrDie();
220   // Read specified columns with a row filter
221   auto scan_builder = dataset->NewScan().ValueOrDie();
222   ABORT_ON_FAILURE(scan_builder->Project(
223       {
224           // Leave column "a" as-is.
225           cp::field_ref("a"),
226           // Cast column "b" to float32.
227           cp::call("cast", {cp::field_ref("b")},
228                    arrow::compute::CastOptions::Safe(arrow::float32())),
229           // Derive a boolean column from "c".
230           cp::equal(cp::field_ref("c"), cp::literal(1)),
231       },
232       {"a_renamed", "b_as_float32", "c_1"}));
233   auto scanner = scan_builder->Finish().ValueOrDie();
234   return scanner->ToTable().ValueOrDie();
235 }
236 // (Doc section: Projecting columns)
237 
238 // (Doc section: Projecting columns #2)
239 // Read a dataset, but with column projection.
240 //
241 // This time, we read all original columns plus one derived column. This simply combines
242 // the previous two examples: selecting a subset of columns by name, and deriving new
243 // columns with an expression.
SelectAndProjectDataset(const std::shared_ptr<fs::FileSystem> & filesystem,const std::shared_ptr<ds::FileFormat> & format,const std::string & base_dir)244 std::shared_ptr<arrow::Table> SelectAndProjectDataset(
245     const std::shared_ptr<fs::FileSystem>& filesystem,
246     const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
247   fs::FileSelector selector;
248   selector.base_dir = base_dir;
249   auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
250                                                     ds::FileSystemFactoryOptions())
251                      .ValueOrDie();
252   auto dataset = factory->Finish().ValueOrDie();
253   // Read specified columns with a row filter
254   auto scan_builder = dataset->NewScan().ValueOrDie();
255   std::vector<std::string> names;
256   std::vector<cp::Expression> exprs;
257   // Read all the original columns.
258   for (const auto& field : dataset->schema()->fields()) {
259     names.push_back(field->name());
260     exprs.push_back(cp::field_ref(field->name()));
261   }
262   // Also derive a new column.
263   names.emplace_back("b_large");
264   exprs.push_back(cp::greater(cp::field_ref("b"), cp::literal(1)));
265   ABORT_ON_FAILURE(scan_builder->Project(exprs, names));
266   auto scanner = scan_builder->Finish().ValueOrDie();
267   return scanner->ToTable().ValueOrDie();
268 }
269 // (Doc section: Projecting columns #2)
270 
271 // (Doc section: Reading and writing partitioned data #2)
272 // Read an entire dataset, but with partitioning information.
ScanPartitionedDataset(const std::shared_ptr<fs::FileSystem> & filesystem,const std::shared_ptr<ds::FileFormat> & format,const std::string & base_dir)273 std::shared_ptr<arrow::Table> ScanPartitionedDataset(
274     const std::shared_ptr<fs::FileSystem>& filesystem,
275     const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
276   fs::FileSelector selector;
277   selector.base_dir = base_dir;
278   selector.recursive = true;  // Make sure to search subdirectories
279   ds::FileSystemFactoryOptions options;
280   // We'll use Hive-style partitioning. We'll let Arrow Datasets infer the partition
281   // schema.
282   options.partitioning = ds::HivePartitioning::MakeFactory();
283   auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format, options)
284                      .ValueOrDie();
285   auto dataset = factory->Finish().ValueOrDie();
286   // Print out the fragments
287   for (const auto& fragment : dataset->GetFragments().ValueOrDie()) {
288     std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
289     std::cout << "Partition expression: "
290               << (*fragment)->partition_expression().ToString() << std::endl;
291   }
292   auto scan_builder = dataset->NewScan().ValueOrDie();
293   auto scanner = scan_builder->Finish().ValueOrDie();
294   return scanner->ToTable().ValueOrDie();
295 }
296 // (Doc section: Reading and writing partitioned data #2)
297 
298 // (Doc section: Reading and writing partitioned data #3)
299 // Read an entire dataset, but with partitioning information. Also, filter the dataset on
300 // the partition values.
FilterPartitionedDataset(const std::shared_ptr<fs::FileSystem> & filesystem,const std::shared_ptr<ds::FileFormat> & format,const std::string & base_dir)301 std::shared_ptr<arrow::Table> FilterPartitionedDataset(
302     const std::shared_ptr<fs::FileSystem>& filesystem,
303     const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
304   fs::FileSelector selector;
305   selector.base_dir = base_dir;
306   selector.recursive = true;
307   ds::FileSystemFactoryOptions options;
308   options.partitioning = ds::HivePartitioning::MakeFactory();
309   auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format, options)
310                      .ValueOrDie();
311   auto dataset = factory->Finish().ValueOrDie();
312   auto scan_builder = dataset->NewScan().ValueOrDie();
313   // Filter based on the partition values. This will mean that we won't even read the
314   // files whose partition expressions don't match the filter.
315   ABORT_ON_FAILURE(
316       scan_builder->Filter(cp::equal(cp::field_ref("part"), cp::literal("b"))));
317   auto scanner = scan_builder->Finish().ValueOrDie();
318   return scanner->ToTable().ValueOrDie();
319 }
320 // (Doc section: Reading and writing partitioned data #3)
321 
main(int argc,char ** argv)322 int main(int argc, char** argv) {
323   if (argc < 3) {
324     // Fake success for CI purposes.
325     return EXIT_SUCCESS;
326   }
327 
328   std::string uri = argv[1];
329   std::string format_name = argv[2];
330   std::string mode = argc > 3 ? argv[3] : "no_filter";
331   std::string root_path;
332   auto fs = fs::FileSystemFromUri(uri, &root_path).ValueOrDie();
333 
334   std::string base_path;
335   std::shared_ptr<ds::FileFormat> format;
336   if (format_name == "feather") {
337     format = std::make_shared<ds::IpcFileFormat>();
338     base_path = CreateExampleFeatherDataset(fs, root_path);
339   } else if (format_name == "parquet") {
340     format = std::make_shared<ds::ParquetFileFormat>();
341     base_path = CreateExampleParquetDataset(fs, root_path);
342   } else if (format_name == "parquet_hive") {
343     format = std::make_shared<ds::ParquetFileFormat>();
344     base_path = CreateExampleParquetHivePartitionedDataset(fs, root_path);
345   } else {
346     std::cerr << "Unknown format: " << format_name << std::endl;
347     std::cerr << "Supported formats: feather, parquet, parquet_hive" << std::endl;
348     return EXIT_FAILURE;
349   }
350 
351   std::shared_ptr<arrow::Table> table;
352   if (mode == "no_filter") {
353     table = ScanWholeDataset(fs, format, base_path);
354   } else if (mode == "filter") {
355     table = FilterAndSelectDataset(fs, format, base_path);
356   } else if (mode == "project") {
357     table = ProjectDataset(fs, format, base_path);
358   } else if (mode == "select_project") {
359     table = SelectAndProjectDataset(fs, format, base_path);
360   } else if (mode == "partitioned") {
361     table = ScanPartitionedDataset(fs, format, base_path);
362   } else if (mode == "filter_partitioned") {
363     table = FilterPartitionedDataset(fs, format, base_path);
364   } else {
365     std::cerr << "Unknown mode: " << mode << std::endl;
366     std::cerr
367         << "Supported modes: no_filter, filter, project, select_project, partitioned"
368         << std::endl;
369     return EXIT_FAILURE;
370   }
371   std::cout << "Read " << table->num_rows() << " rows" << std::endl;
372   std::cout << table->ToString() << std::endl;
373   return EXIT_SUCCESS;
374 }
375