1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 // This example showcases various ways to work with Datasets. It's
19 // intended to be paired with the documentation.
20
21 #include <arrow/api.h>
22 #include <arrow/compute/cast.h>
23 #include <arrow/compute/exec/expression.h>
24 #include <arrow/dataset/dataset.h>
25 #include <arrow/dataset/discovery.h>
26 #include <arrow/dataset/file_base.h>
27 #include <arrow/dataset/file_ipc.h>
28 #include <arrow/dataset/file_parquet.h>
29 #include <arrow/dataset/scanner.h>
30 #include <arrow/filesystem/filesystem.h>
31 #include <arrow/ipc/writer.h>
32 #include <arrow/util/iterator.h>
33 #include <parquet/arrow/writer.h>
34
35 #include <iostream>
36 #include <vector>
37
38 namespace ds = arrow::dataset;
39 namespace fs = arrow::fs;
40 namespace cp = arrow::compute;
41
42 #define ABORT_ON_FAILURE(expr) \
43 do { \
44 arrow::Status status_ = (expr); \
45 if (!status_.ok()) { \
46 std::cerr << status_.message() << std::endl; \
47 abort(); \
48 } \
49 } while (0);
50
51 // (Doc section: Reading Datasets)
52 // Generate some data for the rest of this example.
CreateTable()53 std::shared_ptr<arrow::Table> CreateTable() {
54 auto schema =
55 arrow::schema({arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
56 arrow::field("c", arrow::int64())});
57 std::shared_ptr<arrow::Array> array_a;
58 std::shared_ptr<arrow::Array> array_b;
59 std::shared_ptr<arrow::Array> array_c;
60 arrow::NumericBuilder<arrow::Int64Type> builder;
61 ABORT_ON_FAILURE(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
62 ABORT_ON_FAILURE(builder.Finish(&array_a));
63 builder.Reset();
64 ABORT_ON_FAILURE(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
65 ABORT_ON_FAILURE(builder.Finish(&array_b));
66 builder.Reset();
67 ABORT_ON_FAILURE(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
68 ABORT_ON_FAILURE(builder.Finish(&array_c));
69 return arrow::Table::Make(schema, {array_a, array_b, array_c});
70 }
71
72 // Set up a dataset by writing two Parquet files.
CreateExampleParquetDataset(const std::shared_ptr<fs::FileSystem> & filesystem,const std::string & root_path)73 std::string CreateExampleParquetDataset(const std::shared_ptr<fs::FileSystem>& filesystem,
74 const std::string& root_path) {
75 auto base_path = root_path + "/parquet_dataset";
76 ABORT_ON_FAILURE(filesystem->CreateDir(base_path));
77 // Create an Arrow Table
78 auto table = CreateTable();
79 // Write it into two Parquet files
80 auto output = filesystem->OpenOutputStream(base_path + "/data1.parquet").ValueOrDie();
81 ABORT_ON_FAILURE(parquet::arrow::WriteTable(
82 *table->Slice(0, 5), arrow::default_memory_pool(), output, /*chunk_size=*/2048));
83 output = filesystem->OpenOutputStream(base_path + "/data2.parquet").ValueOrDie();
84 ABORT_ON_FAILURE(parquet::arrow::WriteTable(
85 *table->Slice(5), arrow::default_memory_pool(), output, /*chunk_size=*/2048));
86 return base_path;
87 }
88 // (Doc section: Reading Datasets)
89
90 // (Doc section: Reading different file formats)
91 // Set up a dataset by writing two Feather files.
CreateExampleFeatherDataset(const std::shared_ptr<fs::FileSystem> & filesystem,const std::string & root_path)92 std::string CreateExampleFeatherDataset(const std::shared_ptr<fs::FileSystem>& filesystem,
93 const std::string& root_path) {
94 auto base_path = root_path + "/feather_dataset";
95 ABORT_ON_FAILURE(filesystem->CreateDir(base_path));
96 // Create an Arrow Table
97 auto table = CreateTable();
98 // Write it into two Feather files
99 auto output = filesystem->OpenOutputStream(base_path + "/data1.feather").ValueOrDie();
100 auto writer = arrow::ipc::MakeFileWriter(output.get(), table->schema()).ValueOrDie();
101 ABORT_ON_FAILURE(writer->WriteTable(*table->Slice(0, 5)));
102 ABORT_ON_FAILURE(writer->Close());
103 output = filesystem->OpenOutputStream(base_path + "/data2.feather").ValueOrDie();
104 writer = arrow::ipc::MakeFileWriter(output.get(), table->schema()).ValueOrDie();
105 ABORT_ON_FAILURE(writer->WriteTable(*table->Slice(5)));
106 ABORT_ON_FAILURE(writer->Close());
107 return base_path;
108 }
109 // (Doc section: Reading different file formats)
110
111 // (Doc section: Reading and writing partitioned data)
112 // Set up a dataset by writing files with partitioning
CreateExampleParquetHivePartitionedDataset(const std::shared_ptr<fs::FileSystem> & filesystem,const std::string & root_path)113 std::string CreateExampleParquetHivePartitionedDataset(
114 const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
115 auto base_path = root_path + "/parquet_dataset";
116 ABORT_ON_FAILURE(filesystem->CreateDir(base_path));
117 // Create an Arrow Table
118 auto schema = arrow::schema(
119 {arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
120 arrow::field("c", arrow::int64()), arrow::field("part", arrow::utf8())});
121 std::vector<std::shared_ptr<arrow::Array>> arrays(4);
122 arrow::NumericBuilder<arrow::Int64Type> builder;
123 ABORT_ON_FAILURE(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
124 ABORT_ON_FAILURE(builder.Finish(&arrays[0]));
125 builder.Reset();
126 ABORT_ON_FAILURE(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
127 ABORT_ON_FAILURE(builder.Finish(&arrays[1]));
128 builder.Reset();
129 ABORT_ON_FAILURE(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
130 ABORT_ON_FAILURE(builder.Finish(&arrays[2]));
131 arrow::StringBuilder string_builder;
132 ABORT_ON_FAILURE(
133 string_builder.AppendValues({"a", "a", "a", "a", "a", "b", "b", "b", "b", "b"}));
134 ABORT_ON_FAILURE(string_builder.Finish(&arrays[3]));
135 auto table = arrow::Table::Make(schema, arrays);
136 // Write it using Datasets
137 auto dataset = std::make_shared<ds::InMemoryDataset>(table);
138 auto scanner_builder = dataset->NewScan().ValueOrDie();
139 auto scanner = scanner_builder->Finish().ValueOrDie();
140
141 // The partition schema determines which fields are part of the partitioning.
142 auto partition_schema = arrow::schema({arrow::field("part", arrow::utf8())});
143 // We'll use Hive-style partitioning, which creates directories with "key=value" pairs.
144 auto partitioning = std::make_shared<ds::HivePartitioning>(partition_schema);
145 // We'll write Parquet files.
146 auto format = std::make_shared<ds::ParquetFileFormat>();
147 ds::FileSystemDatasetWriteOptions write_options;
148 write_options.file_write_options = format->DefaultWriteOptions();
149 write_options.filesystem = filesystem;
150 write_options.base_dir = base_path;
151 write_options.partitioning = partitioning;
152 write_options.basename_template = "part{i}.parquet";
153 ABORT_ON_FAILURE(ds::FileSystemDataset::Write(write_options, scanner));
154 return base_path;
155 }
156 // (Doc section: Reading and writing partitioned data)
157
158 // (Doc section: Dataset discovery)
159 // Read the whole dataset with the given format, without partitioning.
ScanWholeDataset(const std::shared_ptr<fs::FileSystem> & filesystem,const std::shared_ptr<ds::FileFormat> & format,const std::string & base_dir)160 std::shared_ptr<arrow::Table> ScanWholeDataset(
161 const std::shared_ptr<fs::FileSystem>& filesystem,
162 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
163 // Create a dataset by scanning the filesystem for files
164 fs::FileSelector selector;
165 selector.base_dir = base_dir;
166 auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
167 ds::FileSystemFactoryOptions())
168 .ValueOrDie();
169 auto dataset = factory->Finish().ValueOrDie();
170 // Print out the fragments
171 for (const auto& fragment : dataset->GetFragments().ValueOrDie()) {
172 std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
173 }
174 // Read the entire dataset as a Table
175 auto scan_builder = dataset->NewScan().ValueOrDie();
176 auto scanner = scan_builder->Finish().ValueOrDie();
177 return scanner->ToTable().ValueOrDie();
178 }
179 // (Doc section: Dataset discovery)
180
181 // (Doc section: Filtering data)
182 // Read a dataset, but select only column "b" and only rows where b < 4.
183 //
184 // This is useful when you only want a few columns from a dataset. Where possible,
185 // Datasets will push down the column selection such that less work is done.
FilterAndSelectDataset(const std::shared_ptr<fs::FileSystem> & filesystem,const std::shared_ptr<ds::FileFormat> & format,const std::string & base_dir)186 std::shared_ptr<arrow::Table> FilterAndSelectDataset(
187 const std::shared_ptr<fs::FileSystem>& filesystem,
188 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
189 fs::FileSelector selector;
190 selector.base_dir = base_dir;
191 auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
192 ds::FileSystemFactoryOptions())
193 .ValueOrDie();
194 auto dataset = factory->Finish().ValueOrDie();
195 // Read specified columns with a row filter
196 auto scan_builder = dataset->NewScan().ValueOrDie();
197 ABORT_ON_FAILURE(scan_builder->Project({"b"}));
198 ABORT_ON_FAILURE(scan_builder->Filter(cp::less(cp::field_ref("b"), cp::literal(4))));
199 auto scanner = scan_builder->Finish().ValueOrDie();
200 return scanner->ToTable().ValueOrDie();
201 }
202 // (Doc section: Filtering data)
203
204 // (Doc section: Projecting columns)
205 // Read a dataset, but with column projection.
206 //
207 // This is useful to derive new columns from existing data. For example, here we
208 // demonstrate casting a column to a different type, and turning a numeric column into a
209 // boolean column based on a predicate. You could also rename columns or perform
210 // computations involving multiple columns.
ProjectDataset(const std::shared_ptr<fs::FileSystem> & filesystem,const std::shared_ptr<ds::FileFormat> & format,const std::string & base_dir)211 std::shared_ptr<arrow::Table> ProjectDataset(
212 const std::shared_ptr<fs::FileSystem>& filesystem,
213 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
214 fs::FileSelector selector;
215 selector.base_dir = base_dir;
216 auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
217 ds::FileSystemFactoryOptions())
218 .ValueOrDie();
219 auto dataset = factory->Finish().ValueOrDie();
220 // Read specified columns with a row filter
221 auto scan_builder = dataset->NewScan().ValueOrDie();
222 ABORT_ON_FAILURE(scan_builder->Project(
223 {
224 // Leave column "a" as-is.
225 cp::field_ref("a"),
226 // Cast column "b" to float32.
227 cp::call("cast", {cp::field_ref("b")},
228 arrow::compute::CastOptions::Safe(arrow::float32())),
229 // Derive a boolean column from "c".
230 cp::equal(cp::field_ref("c"), cp::literal(1)),
231 },
232 {"a_renamed", "b_as_float32", "c_1"}));
233 auto scanner = scan_builder->Finish().ValueOrDie();
234 return scanner->ToTable().ValueOrDie();
235 }
236 // (Doc section: Projecting columns)
237
238 // (Doc section: Projecting columns #2)
239 // Read a dataset, but with column projection.
240 //
241 // This time, we read all original columns plus one derived column. This simply combines
242 // the previous two examples: selecting a subset of columns by name, and deriving new
243 // columns with an expression.
SelectAndProjectDataset(const std::shared_ptr<fs::FileSystem> & filesystem,const std::shared_ptr<ds::FileFormat> & format,const std::string & base_dir)244 std::shared_ptr<arrow::Table> SelectAndProjectDataset(
245 const std::shared_ptr<fs::FileSystem>& filesystem,
246 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
247 fs::FileSelector selector;
248 selector.base_dir = base_dir;
249 auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
250 ds::FileSystemFactoryOptions())
251 .ValueOrDie();
252 auto dataset = factory->Finish().ValueOrDie();
253 // Read specified columns with a row filter
254 auto scan_builder = dataset->NewScan().ValueOrDie();
255 std::vector<std::string> names;
256 std::vector<cp::Expression> exprs;
257 // Read all the original columns.
258 for (const auto& field : dataset->schema()->fields()) {
259 names.push_back(field->name());
260 exprs.push_back(cp::field_ref(field->name()));
261 }
262 // Also derive a new column.
263 names.emplace_back("b_large");
264 exprs.push_back(cp::greater(cp::field_ref("b"), cp::literal(1)));
265 ABORT_ON_FAILURE(scan_builder->Project(exprs, names));
266 auto scanner = scan_builder->Finish().ValueOrDie();
267 return scanner->ToTable().ValueOrDie();
268 }
269 // (Doc section: Projecting columns #2)
270
271 // (Doc section: Reading and writing partitioned data #2)
272 // Read an entire dataset, but with partitioning information.
ScanPartitionedDataset(const std::shared_ptr<fs::FileSystem> & filesystem,const std::shared_ptr<ds::FileFormat> & format,const std::string & base_dir)273 std::shared_ptr<arrow::Table> ScanPartitionedDataset(
274 const std::shared_ptr<fs::FileSystem>& filesystem,
275 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
276 fs::FileSelector selector;
277 selector.base_dir = base_dir;
278 selector.recursive = true; // Make sure to search subdirectories
279 ds::FileSystemFactoryOptions options;
280 // We'll use Hive-style partitioning. We'll let Arrow Datasets infer the partition
281 // schema.
282 options.partitioning = ds::HivePartitioning::MakeFactory();
283 auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format, options)
284 .ValueOrDie();
285 auto dataset = factory->Finish().ValueOrDie();
286 // Print out the fragments
287 for (const auto& fragment : dataset->GetFragments().ValueOrDie()) {
288 std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
289 std::cout << "Partition expression: "
290 << (*fragment)->partition_expression().ToString() << std::endl;
291 }
292 auto scan_builder = dataset->NewScan().ValueOrDie();
293 auto scanner = scan_builder->Finish().ValueOrDie();
294 return scanner->ToTable().ValueOrDie();
295 }
296 // (Doc section: Reading and writing partitioned data #2)
297
298 // (Doc section: Reading and writing partitioned data #3)
299 // Read an entire dataset, but with partitioning information. Also, filter the dataset on
300 // the partition values.
FilterPartitionedDataset(const std::shared_ptr<fs::FileSystem> & filesystem,const std::shared_ptr<ds::FileFormat> & format,const std::string & base_dir)301 std::shared_ptr<arrow::Table> FilterPartitionedDataset(
302 const std::shared_ptr<fs::FileSystem>& filesystem,
303 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
304 fs::FileSelector selector;
305 selector.base_dir = base_dir;
306 selector.recursive = true;
307 ds::FileSystemFactoryOptions options;
308 options.partitioning = ds::HivePartitioning::MakeFactory();
309 auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format, options)
310 .ValueOrDie();
311 auto dataset = factory->Finish().ValueOrDie();
312 auto scan_builder = dataset->NewScan().ValueOrDie();
313 // Filter based on the partition values. This will mean that we won't even read the
314 // files whose partition expressions don't match the filter.
315 ABORT_ON_FAILURE(
316 scan_builder->Filter(cp::equal(cp::field_ref("part"), cp::literal("b"))));
317 auto scanner = scan_builder->Finish().ValueOrDie();
318 return scanner->ToTable().ValueOrDie();
319 }
320 // (Doc section: Reading and writing partitioned data #3)
321
main(int argc,char ** argv)322 int main(int argc, char** argv) {
323 if (argc < 3) {
324 // Fake success for CI purposes.
325 return EXIT_SUCCESS;
326 }
327
328 std::string uri = argv[1];
329 std::string format_name = argv[2];
330 std::string mode = argc > 3 ? argv[3] : "no_filter";
331 std::string root_path;
332 auto fs = fs::FileSystemFromUri(uri, &root_path).ValueOrDie();
333
334 std::string base_path;
335 std::shared_ptr<ds::FileFormat> format;
336 if (format_name == "feather") {
337 format = std::make_shared<ds::IpcFileFormat>();
338 base_path = CreateExampleFeatherDataset(fs, root_path);
339 } else if (format_name == "parquet") {
340 format = std::make_shared<ds::ParquetFileFormat>();
341 base_path = CreateExampleParquetDataset(fs, root_path);
342 } else if (format_name == "parquet_hive") {
343 format = std::make_shared<ds::ParquetFileFormat>();
344 base_path = CreateExampleParquetHivePartitionedDataset(fs, root_path);
345 } else {
346 std::cerr << "Unknown format: " << format_name << std::endl;
347 std::cerr << "Supported formats: feather, parquet, parquet_hive" << std::endl;
348 return EXIT_FAILURE;
349 }
350
351 std::shared_ptr<arrow::Table> table;
352 if (mode == "no_filter") {
353 table = ScanWholeDataset(fs, format, base_path);
354 } else if (mode == "filter") {
355 table = FilterAndSelectDataset(fs, format, base_path);
356 } else if (mode == "project") {
357 table = ProjectDataset(fs, format, base_path);
358 } else if (mode == "select_project") {
359 table = SelectAndProjectDataset(fs, format, base_path);
360 } else if (mode == "partitioned") {
361 table = ScanPartitionedDataset(fs, format, base_path);
362 } else if (mode == "filter_partitioned") {
363 table = FilterPartitionedDataset(fs, format, base_path);
364 } else {
365 std::cerr << "Unknown mode: " << mode << std::endl;
366 std::cerr
367 << "Supported modes: no_filter, filter, project, select_project, partitioned"
368 << std::endl;
369 return EXIT_FAILURE;
370 }
371 std::cout << "Read " << table->num_rows() << " rows" << std::endl;
372 std::cout << table->ToString() << std::endl;
373 return EXIT_SUCCESS;
374 }
375