1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include <memory>
19 #include <sstream>
20 #include <utility>
21 
22 #include "benchmark/benchmark.h"
23 
24 #include <aws/core/auth/AWSCredentials.h>
25 #include <aws/s3/S3Client.h>
26 #include <aws/s3/model/CreateBucketRequest.h>
27 #include <aws/s3/model/HeadBucketRequest.h>
28 #include <aws/s3/model/PutObjectRequest.h>
29 
30 #include "arrow/filesystem/s3_internal.h"
31 #include "arrow/filesystem/s3_test_util.h"
32 #include "arrow/filesystem/s3fs.h"
33 #include "arrow/io/caching.h"
34 #include "arrow/io/interfaces.h"
35 #include "arrow/status.h"
36 #include "arrow/table.h"
37 #include "arrow/testing/gtest_util.h"
38 #include "arrow/testing/random.h"
39 #include "arrow/util/range.h"
40 
41 #include "parquet/arrow/reader.h"
42 #include "parquet/arrow/writer.h"
43 #include "parquet/properties.h"
44 
45 namespace arrow {
46 namespace fs {
47 
48 using ::arrow::fs::internal::ConnectRetryStrategy;
49 using ::arrow::fs::internal::OutcomeToStatus;
50 using ::arrow::fs::internal::ToAwsString;
51 
52 // Environment variables to configure the S3 test environment
53 static const char* kEnvBucketName = "ARROW_TEST_S3_BUCKET";
54 static const char* kEnvSkipSetup = "ARROW_TEST_S3_SKIP_SETUP";
55 static const char* kEnvAwsRegion = "ARROW_TEST_S3_REGION";
56 
57 // Set up Minio and create the test bucket and files.
58 class MinioFixture : public benchmark::Fixture {
59  public:
SetUp(const::benchmark::State & state)60   void SetUp(const ::benchmark::State& state) override {
61     minio_.reset(new MinioTestServer());
62     ASSERT_OK(minio_->Start());
63 
64     const char* region_str = std::getenv(kEnvAwsRegion);
65     if (region_str) {
66       region_ = region_str;
67       std::cerr << "Using region from environment: " << region_ << std::endl;
68     } else {
69       std::cerr << "Using default region" << std::endl;
70     }
71 
72     const char* bucket_str = std::getenv(kEnvBucketName);
73     if (bucket_str) {
74       bucket_ = bucket_str;
75       std::cerr << "Using bucket from environment: " << bucket_ << std::endl;
76     } else {
77       bucket_ = "bucket";
78       std::cerr << "Using default bucket: " << bucket_ << std::endl;
79     }
80 
81     client_config_.endpointOverride = ToAwsString(minio_->connect_string());
82     client_config_.scheme = Aws::Http::Scheme::HTTP;
83     if (!region_.empty()) {
84       client_config_.region = ToAwsString(region_);
85     }
86     client_config_.retryStrategy = std::make_shared<ConnectRetryStrategy>();
87     credentials_ = {ToAwsString(minio_->access_key()), ToAwsString(minio_->secret_key())};
88     bool use_virtual_addressing = false;
89     client_.reset(
90         new Aws::S3::S3Client(credentials_, client_config_,
91                               Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
92                               use_virtual_addressing));
93 
94     MakeFileSystem();
95 
96     const char* skip_str = std::getenv(kEnvSkipSetup);
97     const std::string skip = skip_str ? std::string(skip_str) : "";
98     if (!skip.empty()) {
99       std::cerr << "Skipping creation of bucket/objects as requested" << std::endl;
100     } else {
101       ASSERT_OK(MakeBucket());
102       ASSERT_OK(MakeObject("bytes_1mib", 1024 * 1024));
103       ASSERT_OK(MakeObject("bytes_100mib", 100 * 1024 * 1024));
104       ASSERT_OK(MakeObject("bytes_500mib", 500 * 1024 * 1024));
105       ASSERT_OK(MakeParquetObject(bucket_ + "/pq_c402_r250k", 400, 250000));
106     }
107   }
108 
MakeFileSystem()109   void MakeFileSystem() {
110     options_.ConfigureAccessKey(minio_->access_key(), minio_->secret_key());
111     options_.scheme = "http";
112     if (!region_.empty()) {
113       options_.region = region_;
114     }
115     options_.endpoint_override = minio_->connect_string();
116     ASSERT_OK_AND_ASSIGN(fs_, S3FileSystem::Make(options_));
117   }
118 
119   /// Set up bucket if it doesn't exist.
120   ///
121   /// When using Minio we'll have a fresh setup each time, but
122   /// otherwise we may have a leftover bucket.
MakeBucket()123   Status MakeBucket() {
124     Aws::S3::Model::HeadBucketRequest head;
125     head.SetBucket(ToAwsString(bucket_));
126     const Status st = OutcomeToStatus(client_->HeadBucket(head));
127     if (st.ok()) {
128       // Bucket exists already
129       return st;
130     }
131     Aws::S3::Model::CreateBucketRequest req;
132     req.SetBucket(ToAwsString(bucket_));
133     return OutcomeToStatus(client_->CreateBucket(req));
134   }
135 
136   /// Make an object with dummy data.
MakeObject(const std::string & name,int size)137   Status MakeObject(const std::string& name, int size) {
138     Aws::S3::Model::PutObjectRequest req;
139     req.SetBucket(ToAwsString(bucket_));
140     req.SetKey(ToAwsString(name));
141     req.SetBody(std::make_shared<std::stringstream>(std::string(size, 'a')));
142     return OutcomeToStatus(client_->PutObject(req));
143   }
144 
145   /// Make an object with Parquet data.
146   /// Appends integer columns to the beginning (to act as indices).
MakeParquetObject(const std::string & path,int num_columns,int num_rows)147   Status MakeParquetObject(const std::string& path, int num_columns, int num_rows) {
148     std::vector<std::shared_ptr<ChunkedArray>> columns;
149     std::vector<std::shared_ptr<Field>> fields;
150 
151     {
152       arrow::random::RandomArrayGenerator generator(0);
153       std::shared_ptr<Array> values = generator.Int64(num_rows, 0, 1e10, 0);
154       columns.push_back(std::make_shared<ChunkedArray>(values));
155       fields.push_back(::arrow::field("timestamp", values->type()));
156     }
157     {
158       arrow::random::RandomArrayGenerator generator(1);
159       std::shared_ptr<Array> values = generator.Int32(num_rows, 0, 1e9, 0);
160       columns.push_back(std::make_shared<ChunkedArray>(values));
161       fields.push_back(::arrow::field("val", values->type()));
162     }
163 
164     for (int i = 0; i < num_columns; i++) {
165       arrow::random::RandomArrayGenerator generator(i);
166       std::shared_ptr<Array> values = generator.Float64(num_rows, -1.e10, 1e10, 0);
167       std::stringstream ss;
168       ss << "col" << i;
169       columns.push_back(std::make_shared<ChunkedArray>(values));
170       fields.push_back(::arrow::field(ss.str(), values->type()));
171     }
172     auto schema = std::make_shared<::arrow::Schema>(fields);
173 
174     std::shared_ptr<Table> table = Table::Make(schema, columns);
175 
176     std::shared_ptr<io::OutputStream> sink;
177     ARROW_ASSIGN_OR_RAISE(sink, fs_->OpenOutputStream(path));
178     RETURN_NOT_OK(
179         parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), sink, num_rows));
180 
181     return Status::OK();
182   }
183 
TearDown(const::benchmark::State & state)184   void TearDown(const ::benchmark::State& state) override {
185     ASSERT_OK(minio_->Stop());
186     // Delete temporary directory, freeing up disk space
187     minio_.reset();
188   }
189 
190  protected:
191   std::unique_ptr<MinioTestServer> minio_;
192   std::string region_;
193   std::string bucket_;
194   Aws::Client::ClientConfiguration client_config_;
195   Aws::Auth::AWSCredentials credentials_;
196   std::unique_ptr<Aws::S3::S3Client> client_;
197   S3Options options_;
198   std::shared_ptr<S3FileSystem> fs_;
199 };
200 
201 /// Set up/tear down the AWS SDK globally.
202 /// (GBenchmark doesn't run GTest environments.)
203 class S3BenchmarkEnvironment {
204  public:
S3BenchmarkEnvironment()205   S3BenchmarkEnvironment() { s3_env->SetUp(); }
~S3BenchmarkEnvironment()206   ~S3BenchmarkEnvironment() { s3_env->TearDown(); }
207 };
208 
209 S3BenchmarkEnvironment env{};
210 
211 /// Read the entire file into memory in one go to measure bandwidth.
NaiveRead(benchmark::State & st,S3FileSystem * fs,const std::string & path)212 static void NaiveRead(benchmark::State& st, S3FileSystem* fs, const std::string& path) {
213   int64_t total_bytes = 0;
214   int total_items = 0;
215   for (auto _ : st) {
216     std::shared_ptr<io::RandomAccessFile> file;
217     std::shared_ptr<Buffer> buf;
218     int64_t size;
219     ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(path));
220     ASSERT_OK_AND_ASSIGN(size, file->GetSize());
221     ASSERT_OK_AND_ASSIGN(buf, file->ReadAt(0, size));
222     total_bytes += buf->size();
223     total_items += 1;
224   }
225   st.SetBytesProcessed(total_bytes);
226   st.SetItemsProcessed(total_items);
227   std::cerr << "Read the file " << total_items << " times" << std::endl;
228 }
229 
230 constexpr int64_t kChunkSize = 5 * 1024 * 1024;
231 
232 /// Mimic the Parquet reader, reading the file in small chunks.
ChunkedRead(benchmark::State & st,S3FileSystem * fs,const std::string & path)233 static void ChunkedRead(benchmark::State& st, S3FileSystem* fs, const std::string& path) {
234   int64_t total_bytes = 0;
235   int total_items = 0;
236   for (auto _ : st) {
237     std::shared_ptr<io::RandomAccessFile> file;
238     std::shared_ptr<Buffer> buf;
239     int64_t size = 0;
240     ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(path));
241     ASSERT_OK_AND_ASSIGN(size, file->GetSize());
242     total_items += 1;
243 
244     int64_t offset = 0;
245     while (offset < size) {
246       const int64_t read = std::min(size, kChunkSize);
247       ASSERT_OK_AND_ASSIGN(buf, file->ReadAt(offset, read));
248       total_bytes += buf->size();
249       offset += buf->size();
250     }
251   }
252   st.SetBytesProcessed(total_bytes);
253   st.SetItemsProcessed(total_items);
254   std::cerr << "Read the file " << total_items << " times" << std::endl;
255 }
256 
257 /// Read the file in small chunks, but using read coalescing.
CoalescedRead(benchmark::State & st,S3FileSystem * fs,const std::string & path)258 static void CoalescedRead(benchmark::State& st, S3FileSystem* fs,
259                           const std::string& path) {
260   int64_t total_bytes = 0;
261   int total_items = 0;
262   for (auto _ : st) {
263     std::shared_ptr<io::RandomAccessFile> file;
264     std::shared_ptr<Buffer> buf;
265     int64_t size = 0;
266     ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(path));
267     ASSERT_OK_AND_ASSIGN(size, file->GetSize());
268     total_items += 1;
269 
270     io::internal::ReadRangeCache cache(file, {},
271                                        io::CacheOptions{8192, 64 * 1024 * 1024});
272     std::vector<io::ReadRange> ranges;
273 
274     int64_t offset = 0;
275     while (offset < size) {
276       const int64_t read = std::min(size, kChunkSize);
277       ranges.push_back(io::ReadRange{offset, read});
278       offset += read;
279     }
280     ASSERT_OK(cache.Cache(ranges));
281 
282     offset = 0;
283     while (offset < size) {
284       const int64_t read = std::min(size, kChunkSize);
285       ASSERT_OK_AND_ASSIGN(buf, cache.Read({offset, read}));
286       total_bytes += buf->size();
287       offset += read;
288     }
289   }
290   st.SetBytesProcessed(total_bytes);
291   st.SetItemsProcessed(total_items);
292   std::cerr << "Read the file " << total_items << " times" << std::endl;
293 }
294 
295 /// Read a Parquet file from S3.
ParquetRead(benchmark::State & st,S3FileSystem * fs,const std::string & path,std::vector<int> column_indices,bool pre_buffer,std::string read_strategy)296 static void ParquetRead(benchmark::State& st, S3FileSystem* fs, const std::string& path,
297                         std::vector<int> column_indices, bool pre_buffer,
298                         std::string read_strategy) {
299   int64_t total_bytes = 0;
300   int total_items = 0;
301 
302   parquet::ArrowReaderProperties properties;
303   properties.set_use_threads(true);
304   properties.set_pre_buffer(pre_buffer);
305   parquet::ReaderProperties parquet_properties = parquet::default_reader_properties();
306 
307   for (auto _ : st) {
308     std::shared_ptr<io::RandomAccessFile> file;
309     int64_t size = 0;
310     ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(path));
311     ASSERT_OK_AND_ASSIGN(size, file->GetSize());
312 
313     std::unique_ptr<parquet::arrow::FileReader> reader;
314     parquet::arrow::FileReaderBuilder builder;
315     ASSERT_OK(builder.Open(file, parquet_properties));
316     ASSERT_OK(builder.properties(properties)->Build(&reader));
317 
318     std::shared_ptr<Table> table;
319 
320     if (read_strategy == "ReadTable") {
321       ASSERT_OK(reader->ReadTable(column_indices, &table));
322     } else {
323       std::shared_ptr<RecordBatchReader> rb_reader;
324       ASSERT_OK(reader->GetRecordBatchReader({0}, column_indices, &rb_reader));
325       ASSERT_OK(rb_reader->ReadAll(&table));
326     }
327 
328     // TODO: actually measure table memory usage
329     total_bytes += size;
330     total_items += 1;
331   }
332   st.SetBytesProcessed(total_bytes);
333   st.SetItemsProcessed(total_items);
334 }
335 
336 /// Helper function used in the macros below to template benchmarks.
ParquetReadAll(benchmark::State & st,S3FileSystem * fs,const std::string & bucket,int64_t file_rows,int64_t file_cols,bool pre_buffer,std::string read_strategy)337 static void ParquetReadAll(benchmark::State& st, S3FileSystem* fs,
338                            const std::string& bucket, int64_t file_rows,
339                            int64_t file_cols, bool pre_buffer,
340                            std::string read_strategy) {
341   std::vector<int> column_indices(file_cols);
342   std::iota(column_indices.begin(), column_indices.end(), 0);
343   std::stringstream ss;
344   ss << bucket << "/pq_c" << file_cols << "_r" << file_rows << "k";
345   ParquetRead(st, fs, ss.str(), column_indices, false, read_strategy);
346 }
347 
348 /// Helper function used in the macros below to template benchmarks.
ParquetReadSome(benchmark::State & st,S3FileSystem * fs,const std::string & bucket,int64_t file_rows,int64_t file_cols,std::vector<int> cols_to_read,bool pre_buffer,std::string read_strategy)349 static void ParquetReadSome(benchmark::State& st, S3FileSystem* fs,
350                             const std::string& bucket, int64_t file_rows,
351                             int64_t file_cols, std::vector<int> cols_to_read,
352                             bool pre_buffer, std::string read_strategy) {
353   std::stringstream ss;
354   ss << bucket << "/pq_c" << file_cols << "_r" << file_rows << "k";
355   ParquetRead(st, fs, ss.str(), cols_to_read, false, read_strategy);
356 }
357 
BENCHMARK_DEFINE_F(MinioFixture,ReadAll1Mib)358 BENCHMARK_DEFINE_F(MinioFixture, ReadAll1Mib)(benchmark::State& st) {
359   NaiveRead(st, fs_.get(), bucket_ + "/bytes_1mib");
360 }
361 BENCHMARK_REGISTER_F(MinioFixture, ReadAll1Mib)->UseRealTime();
BENCHMARK_DEFINE_F(MinioFixture,ReadAll100Mib)362 BENCHMARK_DEFINE_F(MinioFixture, ReadAll100Mib)(benchmark::State& st) {
363   NaiveRead(st, fs_.get(), bucket_ + "/bytes_100mib");
364 }
365 BENCHMARK_REGISTER_F(MinioFixture, ReadAll100Mib)->UseRealTime();
BENCHMARK_DEFINE_F(MinioFixture,ReadAll500Mib)366 BENCHMARK_DEFINE_F(MinioFixture, ReadAll500Mib)(benchmark::State& st) {
367   NaiveRead(st, fs_.get(), bucket_ + "/bytes_500mib");
368 }
369 BENCHMARK_REGISTER_F(MinioFixture, ReadAll500Mib)->UseRealTime();
370 
BENCHMARK_DEFINE_F(MinioFixture,ReadChunked100Mib)371 BENCHMARK_DEFINE_F(MinioFixture, ReadChunked100Mib)(benchmark::State& st) {
372   ChunkedRead(st, fs_.get(), bucket_ + "/bytes_100mib");
373 }
374 BENCHMARK_REGISTER_F(MinioFixture, ReadChunked100Mib)->UseRealTime();
BENCHMARK_DEFINE_F(MinioFixture,ReadChunked500Mib)375 BENCHMARK_DEFINE_F(MinioFixture, ReadChunked500Mib)(benchmark::State& st) {
376   ChunkedRead(st, fs_.get(), bucket_ + "/bytes_500mib");
377 }
378 BENCHMARK_REGISTER_F(MinioFixture, ReadChunked500Mib)->UseRealTime();
379 
BENCHMARK_DEFINE_F(MinioFixture,ReadCoalesced100Mib)380 BENCHMARK_DEFINE_F(MinioFixture, ReadCoalesced100Mib)(benchmark::State& st) {
381   CoalescedRead(st, fs_.get(), bucket_ + "/bytes_100mib");
382 }
383 BENCHMARK_REGISTER_F(MinioFixture, ReadCoalesced100Mib)->UseRealTime();
BENCHMARK_DEFINE_F(MinioFixture,ReadCoalesced500Mib)384 BENCHMARK_DEFINE_F(MinioFixture, ReadCoalesced500Mib)(benchmark::State& st) {
385   CoalescedRead(st, fs_.get(), bucket_ + "/bytes_500mib");
386 }
387 BENCHMARK_REGISTER_F(MinioFixture, ReadCoalesced500Mib)->UseRealTime();
388 
389 // Helpers to generate various multiple benchmarks for a given Parquet file.
390 
391 // NAME: the base name of the benchmark.
392 // ROWS: the number of rows in the Parquet file.
393 // COLS: the number of columns in the Parquet file.
394 // STRATEGY: how to read the file (ReadTable or GetRecordBatchReader)
395 #define PQ_BENCHMARK_IMPL(NAME, ROWS, COLS, STRATEGY)                                 \
396   BENCHMARK_DEFINE_F(MinioFixture, NAME##STRATEGY##AllNaive)(benchmark::State & st) { \
397     ParquetReadAll(st, fs_.get(), bucket_, ROWS, COLS, false, #STRATEGY);             \
398   }                                                                                   \
399   BENCHMARK_REGISTER_F(MinioFixture, NAME##STRATEGY##AllNaive)->UseRealTime();        \
400   BENCHMARK_DEFINE_F(MinioFixture, NAME##STRATEGY##AllCoalesced)                      \
401   (benchmark::State & st) {                                                           \
402     ParquetReadAll(st, fs_.get(), bucket_, ROWS, COLS, true, #STRATEGY);              \
403   }                                                                                   \
404   BENCHMARK_REGISTER_F(MinioFixture, NAME##STRATEGY##AllCoalesced)->UseRealTime();
405 
406 // COL_INDICES: a vector specifying a subset of column indices to read.
407 #define PQ_BENCHMARK_PICK_IMPL(NAME, ROWS, COLS, COL_INDICES, STRATEGY)                 \
408   BENCHMARK_DEFINE_F(MinioFixture, NAME##STRATEGY##PickNaive)(benchmark::State & st) {  \
409     ParquetReadSome(st, fs_.get(), bucket_, ROWS, COLS, COL_INDICES, false, #STRATEGY); \
410   }                                                                                     \
411   BENCHMARK_REGISTER_F(MinioFixture, NAME##STRATEGY##PickNaive)->UseRealTime();         \
412   BENCHMARK_DEFINE_F(MinioFixture, NAME##STRATEGY##PickCoalesced)                       \
413   (benchmark::State & st) {                                                             \
414     ParquetReadSome(st, fs_.get(), bucket_, ROWS, COLS, COL_INDICES, true, #STRATEGY);  \
415   }                                                                                     \
416   BENCHMARK_REGISTER_F(MinioFixture, NAME##STRATEGY##PickCoalesced)->UseRealTime();
417 
418 #define PQ_BENCHMARK(ROWS, COLS)                                   \
419   PQ_BENCHMARK_IMPL(ReadParquet_c##COLS##_r##ROWS##K_, ROWS, COLS, \
420                     GetRecordBatchReader);                         \
421   PQ_BENCHMARK_IMPL(ReadParquet_c##COLS##_r##ROWS##K_, ROWS, COLS, ReadTable);
422 
423 #define PQ_BENCHMARK_PICK(NAME, ROWS, COLS, COL_INDICES)                         \
424   PQ_BENCHMARK_PICK_IMPL(ReadParquet_c##COLS##_r##ROWS##K_##NAME##_, ROWS, COLS, \
425                          COL_INDICES, GetRecordBatchReader);                     \
426   PQ_BENCHMARK_PICK_IMPL(ReadParquet_c##COLS##_r##ROWS##K_##NAME##_, ROWS, COLS, \
427                          COL_INDICES, ReadTable);
428 
429 // Test a Parquet file with 250k rows, 402 columns.
430 PQ_BENCHMARK(250, 402);
431 // Scenario A: test selecting a small set of contiguous columns, and a "far" column.
432 PQ_BENCHMARK_PICK(A, 250, 402, (std::vector<int>{0, 1, 2, 3, 4, 90}));
433 // Scenario B: test selecting a large set of contiguous columns.
434 PQ_BENCHMARK_PICK(B, 250, 402, (::arrow::internal::Iota(41)));
435 
436 }  // namespace fs
437 }  // namespace arrow
438