1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include <memory>
19 #include <sstream>
20 #include <utility>
21
22 #include "benchmark/benchmark.h"
23
24 #include <aws/core/auth/AWSCredentials.h>
25 #include <aws/s3/S3Client.h>
26 #include <aws/s3/model/CreateBucketRequest.h>
27 #include <aws/s3/model/HeadBucketRequest.h>
28 #include <aws/s3/model/PutObjectRequest.h>
29
30 #include "arrow/filesystem/s3_internal.h"
31 #include "arrow/filesystem/s3_test_util.h"
32 #include "arrow/filesystem/s3fs.h"
33 #include "arrow/io/caching.h"
34 #include "arrow/io/interfaces.h"
35 #include "arrow/status.h"
36 #include "arrow/table.h"
37 #include "arrow/testing/gtest_util.h"
38 #include "arrow/testing/random.h"
39 #include "arrow/util/range.h"
40
41 #include "parquet/arrow/reader.h"
42 #include "parquet/arrow/writer.h"
43 #include "parquet/properties.h"
44
45 namespace arrow {
46 namespace fs {
47
48 using ::arrow::fs::internal::ConnectRetryStrategy;
49 using ::arrow::fs::internal::OutcomeToStatus;
50 using ::arrow::fs::internal::ToAwsString;
51
52 // Environment variables to configure the S3 test environment
53 static const char* kEnvBucketName = "ARROW_TEST_S3_BUCKET";
54 static const char* kEnvSkipSetup = "ARROW_TEST_S3_SKIP_SETUP";
55 static const char* kEnvAwsRegion = "ARROW_TEST_S3_REGION";
56
57 // Set up Minio and create the test bucket and files.
58 class MinioFixture : public benchmark::Fixture {
59 public:
SetUp(const::benchmark::State & state)60 void SetUp(const ::benchmark::State& state) override {
61 minio_.reset(new MinioTestServer());
62 ASSERT_OK(minio_->Start());
63
64 const char* region_str = std::getenv(kEnvAwsRegion);
65 if (region_str) {
66 region_ = region_str;
67 std::cerr << "Using region from environment: " << region_ << std::endl;
68 } else {
69 std::cerr << "Using default region" << std::endl;
70 }
71
72 const char* bucket_str = std::getenv(kEnvBucketName);
73 if (bucket_str) {
74 bucket_ = bucket_str;
75 std::cerr << "Using bucket from environment: " << bucket_ << std::endl;
76 } else {
77 bucket_ = "bucket";
78 std::cerr << "Using default bucket: " << bucket_ << std::endl;
79 }
80
81 client_config_.endpointOverride = ToAwsString(minio_->connect_string());
82 client_config_.scheme = Aws::Http::Scheme::HTTP;
83 if (!region_.empty()) {
84 client_config_.region = ToAwsString(region_);
85 }
86 client_config_.retryStrategy = std::make_shared<ConnectRetryStrategy>();
87 credentials_ = {ToAwsString(minio_->access_key()), ToAwsString(minio_->secret_key())};
88 bool use_virtual_addressing = false;
89 client_.reset(
90 new Aws::S3::S3Client(credentials_, client_config_,
91 Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
92 use_virtual_addressing));
93
94 MakeFileSystem();
95
96 const char* skip_str = std::getenv(kEnvSkipSetup);
97 const std::string skip = skip_str ? std::string(skip_str) : "";
98 if (!skip.empty()) {
99 std::cerr << "Skipping creation of bucket/objects as requested" << std::endl;
100 } else {
101 ASSERT_OK(MakeBucket());
102 ASSERT_OK(MakeObject("bytes_1mib", 1024 * 1024));
103 ASSERT_OK(MakeObject("bytes_100mib", 100 * 1024 * 1024));
104 ASSERT_OK(MakeObject("bytes_500mib", 500 * 1024 * 1024));
105 ASSERT_OK(MakeParquetObject(bucket_ + "/pq_c402_r250k", 400, 250000));
106 }
107 }
108
MakeFileSystem()109 void MakeFileSystem() {
110 options_.ConfigureAccessKey(minio_->access_key(), minio_->secret_key());
111 options_.scheme = "http";
112 if (!region_.empty()) {
113 options_.region = region_;
114 }
115 options_.endpoint_override = minio_->connect_string();
116 ASSERT_OK_AND_ASSIGN(fs_, S3FileSystem::Make(options_));
117 }
118
119 /// Set up bucket if it doesn't exist.
120 ///
121 /// When using Minio we'll have a fresh setup each time, but
122 /// otherwise we may have a leftover bucket.
MakeBucket()123 Status MakeBucket() {
124 Aws::S3::Model::HeadBucketRequest head;
125 head.SetBucket(ToAwsString(bucket_));
126 const Status st = OutcomeToStatus(client_->HeadBucket(head));
127 if (st.ok()) {
128 // Bucket exists already
129 return st;
130 }
131 Aws::S3::Model::CreateBucketRequest req;
132 req.SetBucket(ToAwsString(bucket_));
133 return OutcomeToStatus(client_->CreateBucket(req));
134 }
135
136 /// Make an object with dummy data.
MakeObject(const std::string & name,int size)137 Status MakeObject(const std::string& name, int size) {
138 Aws::S3::Model::PutObjectRequest req;
139 req.SetBucket(ToAwsString(bucket_));
140 req.SetKey(ToAwsString(name));
141 req.SetBody(std::make_shared<std::stringstream>(std::string(size, 'a')));
142 return OutcomeToStatus(client_->PutObject(req));
143 }
144
145 /// Make an object with Parquet data.
146 /// Appends integer columns to the beginning (to act as indices).
MakeParquetObject(const std::string & path,int num_columns,int num_rows)147 Status MakeParquetObject(const std::string& path, int num_columns, int num_rows) {
148 std::vector<std::shared_ptr<ChunkedArray>> columns;
149 std::vector<std::shared_ptr<Field>> fields;
150
151 {
152 arrow::random::RandomArrayGenerator generator(0);
153 std::shared_ptr<Array> values = generator.Int64(num_rows, 0, 1e10, 0);
154 columns.push_back(std::make_shared<ChunkedArray>(values));
155 fields.push_back(::arrow::field("timestamp", values->type()));
156 }
157 {
158 arrow::random::RandomArrayGenerator generator(1);
159 std::shared_ptr<Array> values = generator.Int32(num_rows, 0, 1e9, 0);
160 columns.push_back(std::make_shared<ChunkedArray>(values));
161 fields.push_back(::arrow::field("val", values->type()));
162 }
163
164 for (int i = 0; i < num_columns; i++) {
165 arrow::random::RandomArrayGenerator generator(i);
166 std::shared_ptr<Array> values = generator.Float64(num_rows, -1.e10, 1e10, 0);
167 std::stringstream ss;
168 ss << "col" << i;
169 columns.push_back(std::make_shared<ChunkedArray>(values));
170 fields.push_back(::arrow::field(ss.str(), values->type()));
171 }
172 auto schema = std::make_shared<::arrow::Schema>(fields);
173
174 std::shared_ptr<Table> table = Table::Make(schema, columns);
175
176 std::shared_ptr<io::OutputStream> sink;
177 ARROW_ASSIGN_OR_RAISE(sink, fs_->OpenOutputStream(path));
178 RETURN_NOT_OK(
179 parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), sink, num_rows));
180
181 return Status::OK();
182 }
183
TearDown(const::benchmark::State & state)184 void TearDown(const ::benchmark::State& state) override {
185 ASSERT_OK(minio_->Stop());
186 // Delete temporary directory, freeing up disk space
187 minio_.reset();
188 }
189
190 protected:
191 std::unique_ptr<MinioTestServer> minio_;
192 std::string region_;
193 std::string bucket_;
194 Aws::Client::ClientConfiguration client_config_;
195 Aws::Auth::AWSCredentials credentials_;
196 std::unique_ptr<Aws::S3::S3Client> client_;
197 S3Options options_;
198 std::shared_ptr<S3FileSystem> fs_;
199 };
200
201 /// Set up/tear down the AWS SDK globally.
202 /// (GBenchmark doesn't run GTest environments.)
203 class S3BenchmarkEnvironment {
204 public:
S3BenchmarkEnvironment()205 S3BenchmarkEnvironment() { s3_env->SetUp(); }
~S3BenchmarkEnvironment()206 ~S3BenchmarkEnvironment() { s3_env->TearDown(); }
207 };
208
209 S3BenchmarkEnvironment env{};
210
211 /// Read the entire file into memory in one go to measure bandwidth.
NaiveRead(benchmark::State & st,S3FileSystem * fs,const std::string & path)212 static void NaiveRead(benchmark::State& st, S3FileSystem* fs, const std::string& path) {
213 int64_t total_bytes = 0;
214 int total_items = 0;
215 for (auto _ : st) {
216 std::shared_ptr<io::RandomAccessFile> file;
217 std::shared_ptr<Buffer> buf;
218 int64_t size;
219 ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(path));
220 ASSERT_OK_AND_ASSIGN(size, file->GetSize());
221 ASSERT_OK_AND_ASSIGN(buf, file->ReadAt(0, size));
222 total_bytes += buf->size();
223 total_items += 1;
224 }
225 st.SetBytesProcessed(total_bytes);
226 st.SetItemsProcessed(total_items);
227 std::cerr << "Read the file " << total_items << " times" << std::endl;
228 }
229
230 constexpr int64_t kChunkSize = 5 * 1024 * 1024;
231
232 /// Mimic the Parquet reader, reading the file in small chunks.
ChunkedRead(benchmark::State & st,S3FileSystem * fs,const std::string & path)233 static void ChunkedRead(benchmark::State& st, S3FileSystem* fs, const std::string& path) {
234 int64_t total_bytes = 0;
235 int total_items = 0;
236 for (auto _ : st) {
237 std::shared_ptr<io::RandomAccessFile> file;
238 std::shared_ptr<Buffer> buf;
239 int64_t size = 0;
240 ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(path));
241 ASSERT_OK_AND_ASSIGN(size, file->GetSize());
242 total_items += 1;
243
244 int64_t offset = 0;
245 while (offset < size) {
246 const int64_t read = std::min(size, kChunkSize);
247 ASSERT_OK_AND_ASSIGN(buf, file->ReadAt(offset, read));
248 total_bytes += buf->size();
249 offset += buf->size();
250 }
251 }
252 st.SetBytesProcessed(total_bytes);
253 st.SetItemsProcessed(total_items);
254 std::cerr << "Read the file " << total_items << " times" << std::endl;
255 }
256
257 /// Read the file in small chunks, but using read coalescing.
CoalescedRead(benchmark::State & st,S3FileSystem * fs,const std::string & path)258 static void CoalescedRead(benchmark::State& st, S3FileSystem* fs,
259 const std::string& path) {
260 int64_t total_bytes = 0;
261 int total_items = 0;
262 for (auto _ : st) {
263 std::shared_ptr<io::RandomAccessFile> file;
264 std::shared_ptr<Buffer> buf;
265 int64_t size = 0;
266 ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(path));
267 ASSERT_OK_AND_ASSIGN(size, file->GetSize());
268 total_items += 1;
269
270 io::internal::ReadRangeCache cache(file, {},
271 io::CacheOptions{8192, 64 * 1024 * 1024});
272 std::vector<io::ReadRange> ranges;
273
274 int64_t offset = 0;
275 while (offset < size) {
276 const int64_t read = std::min(size, kChunkSize);
277 ranges.push_back(io::ReadRange{offset, read});
278 offset += read;
279 }
280 ASSERT_OK(cache.Cache(ranges));
281
282 offset = 0;
283 while (offset < size) {
284 const int64_t read = std::min(size, kChunkSize);
285 ASSERT_OK_AND_ASSIGN(buf, cache.Read({offset, read}));
286 total_bytes += buf->size();
287 offset += read;
288 }
289 }
290 st.SetBytesProcessed(total_bytes);
291 st.SetItemsProcessed(total_items);
292 std::cerr << "Read the file " << total_items << " times" << std::endl;
293 }
294
295 /// Read a Parquet file from S3.
ParquetRead(benchmark::State & st,S3FileSystem * fs,const std::string & path,std::vector<int> column_indices,bool pre_buffer,std::string read_strategy)296 static void ParquetRead(benchmark::State& st, S3FileSystem* fs, const std::string& path,
297 std::vector<int> column_indices, bool pre_buffer,
298 std::string read_strategy) {
299 int64_t total_bytes = 0;
300 int total_items = 0;
301
302 parquet::ArrowReaderProperties properties;
303 properties.set_use_threads(true);
304 properties.set_pre_buffer(pre_buffer);
305 parquet::ReaderProperties parquet_properties = parquet::default_reader_properties();
306
307 for (auto _ : st) {
308 std::shared_ptr<io::RandomAccessFile> file;
309 int64_t size = 0;
310 ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(path));
311 ASSERT_OK_AND_ASSIGN(size, file->GetSize());
312
313 std::unique_ptr<parquet::arrow::FileReader> reader;
314 parquet::arrow::FileReaderBuilder builder;
315 ASSERT_OK(builder.Open(file, parquet_properties));
316 ASSERT_OK(builder.properties(properties)->Build(&reader));
317
318 std::shared_ptr<Table> table;
319
320 if (read_strategy == "ReadTable") {
321 ASSERT_OK(reader->ReadTable(column_indices, &table));
322 } else {
323 std::shared_ptr<RecordBatchReader> rb_reader;
324 ASSERT_OK(reader->GetRecordBatchReader({0}, column_indices, &rb_reader));
325 ASSERT_OK(rb_reader->ReadAll(&table));
326 }
327
328 // TODO: actually measure table memory usage
329 total_bytes += size;
330 total_items += 1;
331 }
332 st.SetBytesProcessed(total_bytes);
333 st.SetItemsProcessed(total_items);
334 }
335
336 /// Helper function used in the macros below to template benchmarks.
ParquetReadAll(benchmark::State & st,S3FileSystem * fs,const std::string & bucket,int64_t file_rows,int64_t file_cols,bool pre_buffer,std::string read_strategy)337 static void ParquetReadAll(benchmark::State& st, S3FileSystem* fs,
338 const std::string& bucket, int64_t file_rows,
339 int64_t file_cols, bool pre_buffer,
340 std::string read_strategy) {
341 std::vector<int> column_indices(file_cols);
342 std::iota(column_indices.begin(), column_indices.end(), 0);
343 std::stringstream ss;
344 ss << bucket << "/pq_c" << file_cols << "_r" << file_rows << "k";
345 ParquetRead(st, fs, ss.str(), column_indices, false, read_strategy);
346 }
347
348 /// Helper function used in the macros below to template benchmarks.
ParquetReadSome(benchmark::State & st,S3FileSystem * fs,const std::string & bucket,int64_t file_rows,int64_t file_cols,std::vector<int> cols_to_read,bool pre_buffer,std::string read_strategy)349 static void ParquetReadSome(benchmark::State& st, S3FileSystem* fs,
350 const std::string& bucket, int64_t file_rows,
351 int64_t file_cols, std::vector<int> cols_to_read,
352 bool pre_buffer, std::string read_strategy) {
353 std::stringstream ss;
354 ss << bucket << "/pq_c" << file_cols << "_r" << file_rows << "k";
355 ParquetRead(st, fs, ss.str(), cols_to_read, false, read_strategy);
356 }
357
BENCHMARK_DEFINE_F(MinioFixture,ReadAll1Mib)358 BENCHMARK_DEFINE_F(MinioFixture, ReadAll1Mib)(benchmark::State& st) {
359 NaiveRead(st, fs_.get(), bucket_ + "/bytes_1mib");
360 }
361 BENCHMARK_REGISTER_F(MinioFixture, ReadAll1Mib)->UseRealTime();
BENCHMARK_DEFINE_F(MinioFixture,ReadAll100Mib)362 BENCHMARK_DEFINE_F(MinioFixture, ReadAll100Mib)(benchmark::State& st) {
363 NaiveRead(st, fs_.get(), bucket_ + "/bytes_100mib");
364 }
365 BENCHMARK_REGISTER_F(MinioFixture, ReadAll100Mib)->UseRealTime();
BENCHMARK_DEFINE_F(MinioFixture,ReadAll500Mib)366 BENCHMARK_DEFINE_F(MinioFixture, ReadAll500Mib)(benchmark::State& st) {
367 NaiveRead(st, fs_.get(), bucket_ + "/bytes_500mib");
368 }
369 BENCHMARK_REGISTER_F(MinioFixture, ReadAll500Mib)->UseRealTime();
370
BENCHMARK_DEFINE_F(MinioFixture,ReadChunked100Mib)371 BENCHMARK_DEFINE_F(MinioFixture, ReadChunked100Mib)(benchmark::State& st) {
372 ChunkedRead(st, fs_.get(), bucket_ + "/bytes_100mib");
373 }
374 BENCHMARK_REGISTER_F(MinioFixture, ReadChunked100Mib)->UseRealTime();
BENCHMARK_DEFINE_F(MinioFixture,ReadChunked500Mib)375 BENCHMARK_DEFINE_F(MinioFixture, ReadChunked500Mib)(benchmark::State& st) {
376 ChunkedRead(st, fs_.get(), bucket_ + "/bytes_500mib");
377 }
378 BENCHMARK_REGISTER_F(MinioFixture, ReadChunked500Mib)->UseRealTime();
379
BENCHMARK_DEFINE_F(MinioFixture,ReadCoalesced100Mib)380 BENCHMARK_DEFINE_F(MinioFixture, ReadCoalesced100Mib)(benchmark::State& st) {
381 CoalescedRead(st, fs_.get(), bucket_ + "/bytes_100mib");
382 }
383 BENCHMARK_REGISTER_F(MinioFixture, ReadCoalesced100Mib)->UseRealTime();
BENCHMARK_DEFINE_F(MinioFixture,ReadCoalesced500Mib)384 BENCHMARK_DEFINE_F(MinioFixture, ReadCoalesced500Mib)(benchmark::State& st) {
385 CoalescedRead(st, fs_.get(), bucket_ + "/bytes_500mib");
386 }
387 BENCHMARK_REGISTER_F(MinioFixture, ReadCoalesced500Mib)->UseRealTime();
388
389 // Helpers to generate various multiple benchmarks for a given Parquet file.
390
391 // NAME: the base name of the benchmark.
392 // ROWS: the number of rows in the Parquet file.
393 // COLS: the number of columns in the Parquet file.
394 // STRATEGY: how to read the file (ReadTable or GetRecordBatchReader)
395 #define PQ_BENCHMARK_IMPL(NAME, ROWS, COLS, STRATEGY) \
396 BENCHMARK_DEFINE_F(MinioFixture, NAME##STRATEGY##AllNaive)(benchmark::State & st) { \
397 ParquetReadAll(st, fs_.get(), bucket_, ROWS, COLS, false, #STRATEGY); \
398 } \
399 BENCHMARK_REGISTER_F(MinioFixture, NAME##STRATEGY##AllNaive)->UseRealTime(); \
400 BENCHMARK_DEFINE_F(MinioFixture, NAME##STRATEGY##AllCoalesced) \
401 (benchmark::State & st) { \
402 ParquetReadAll(st, fs_.get(), bucket_, ROWS, COLS, true, #STRATEGY); \
403 } \
404 BENCHMARK_REGISTER_F(MinioFixture, NAME##STRATEGY##AllCoalesced)->UseRealTime();
405
406 // COL_INDICES: a vector specifying a subset of column indices to read.
407 #define PQ_BENCHMARK_PICK_IMPL(NAME, ROWS, COLS, COL_INDICES, STRATEGY) \
408 BENCHMARK_DEFINE_F(MinioFixture, NAME##STRATEGY##PickNaive)(benchmark::State & st) { \
409 ParquetReadSome(st, fs_.get(), bucket_, ROWS, COLS, COL_INDICES, false, #STRATEGY); \
410 } \
411 BENCHMARK_REGISTER_F(MinioFixture, NAME##STRATEGY##PickNaive)->UseRealTime(); \
412 BENCHMARK_DEFINE_F(MinioFixture, NAME##STRATEGY##PickCoalesced) \
413 (benchmark::State & st) { \
414 ParquetReadSome(st, fs_.get(), bucket_, ROWS, COLS, COL_INDICES, true, #STRATEGY); \
415 } \
416 BENCHMARK_REGISTER_F(MinioFixture, NAME##STRATEGY##PickCoalesced)->UseRealTime();
417
418 #define PQ_BENCHMARK(ROWS, COLS) \
419 PQ_BENCHMARK_IMPL(ReadParquet_c##COLS##_r##ROWS##K_, ROWS, COLS, \
420 GetRecordBatchReader); \
421 PQ_BENCHMARK_IMPL(ReadParquet_c##COLS##_r##ROWS##K_, ROWS, COLS, ReadTable);
422
423 #define PQ_BENCHMARK_PICK(NAME, ROWS, COLS, COL_INDICES) \
424 PQ_BENCHMARK_PICK_IMPL(ReadParquet_c##COLS##_r##ROWS##K_##NAME##_, ROWS, COLS, \
425 COL_INDICES, GetRecordBatchReader); \
426 PQ_BENCHMARK_PICK_IMPL(ReadParquet_c##COLS##_r##ROWS##K_##NAME##_, ROWS, COLS, \
427 COL_INDICES, ReadTable);
428
429 // Test a Parquet file with 250k rows, 402 columns.
430 PQ_BENCHMARK(250, 402);
431 // Scenario A: test selecting a small set of contiguous columns, and a "far" column.
432 PQ_BENCHMARK_PICK(A, 250, 402, (std::vector<int>{0, 1, 2, 3, 4, 90}));
433 // Scenario B: test selecting a large set of contiguous columns.
434 PQ_BENCHMARK_PICK(B, 250, 402, (::arrow::internal::Iota(41)));
435
436 } // namespace fs
437 } // namespace arrow
438