1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include "benchmark/benchmark.h"
19 
20 #include "arrow/array.h"
21 #include "arrow/io/memory.h"
22 #include "arrow/testing/random.h"
23 
24 #include "parquet/column_reader.h"
25 #include "parquet/column_writer.h"
26 #include "parquet/file_reader.h"
27 #include "parquet/metadata.h"
28 #include "parquet/platform.h"
29 #include "parquet/thrift_internal.h"
30 
31 namespace parquet {
32 
33 using schema::PrimitiveNode;
34 
35 namespace benchmark {
36 
BuildWriter(int64_t output_size,const std::shared_ptr<ArrowOutputStream> & dst,ColumnChunkMetaDataBuilder * metadata,ColumnDescriptor * schema,const WriterProperties * properties,Compression::type codec)37 std::shared_ptr<Int64Writer> BuildWriter(int64_t output_size,
38                                          const std::shared_ptr<ArrowOutputStream>& dst,
39                                          ColumnChunkMetaDataBuilder* metadata,
40                                          ColumnDescriptor* schema,
41                                          const WriterProperties* properties,
42                                          Compression::type codec) {
43   std::unique_ptr<PageWriter> pager =
44       PageWriter::Open(dst, codec, Codec::UseDefaultCompressionLevel(), metadata);
45   std::shared_ptr<ColumnWriter> writer =
46       ColumnWriter::Make(metadata, std::move(pager), properties);
47   return std::static_pointer_cast<Int64Writer>(writer);
48 }
49 
Int64Schema(Repetition::type repetition)50 std::shared_ptr<ColumnDescriptor> Int64Schema(Repetition::type repetition) {
51   auto node = PrimitiveNode::Make("int64", repetition, Type::INT64);
52   return std::make_shared<ColumnDescriptor>(node, repetition != Repetition::REQUIRED,
53                                             repetition == Repetition::REPEATED);
54 }
55 
SetBytesProcessed(::benchmark::State & state,Repetition::type repetition)56 void SetBytesProcessed(::benchmark::State& state, Repetition::type repetition) {
57   int64_t bytes_processed = state.iterations() * state.range(0) * sizeof(int64_t);
58   if (repetition != Repetition::REQUIRED) {
59     bytes_processed += state.iterations() * state.range(0) * sizeof(int16_t);
60   }
61   if (repetition == Repetition::REPEATED) {
62     bytes_processed += state.iterations() * state.range(0) * sizeof(int16_t);
63   }
64   state.SetBytesProcessed(state.iterations() * state.range(0) * sizeof(int16_t));
65 }
66 
67 template <Repetition::type repetition,
68           Compression::type codec = Compression::UNCOMPRESSED>
BM_WriteInt64Column(::benchmark::State & state)69 static void BM_WriteInt64Column(::benchmark::State& state) {
70   format::ColumnChunk thrift_metadata;
71 
72   ::arrow::random::RandomArrayGenerator rgen(1337);
73   auto values = rgen.Int64(state.range(0), 0, 1000000, 0);
74   const auto& i8_values = static_cast<const ::arrow::Int64Array&>(*values);
75 
76   std::vector<int16_t> definition_levels(state.range(0), 1);
77   std::vector<int16_t> repetition_levels(state.range(0), 0);
78   std::shared_ptr<ColumnDescriptor> schema = Int64Schema(repetition);
79   std::shared_ptr<WriterProperties> properties = WriterProperties::Builder()
80                                                      .compression(codec)
81                                                      ->encoding(Encoding::PLAIN)
82                                                      ->disable_dictionary()
83                                                      ->build();
84   auto metadata = ColumnChunkMetaDataBuilder::Make(
85       properties, schema.get(), reinterpret_cast<uint8_t*>(&thrift_metadata));
86 
87   while (state.KeepRunning()) {
88     auto stream = CreateOutputStream();
89     std::shared_ptr<Int64Writer> writer = BuildWriter(
90         state.range(0), stream, metadata.get(), schema.get(), properties.get(), codec);
91     writer->WriteBatch(i8_values.length(), definition_levels.data(),
92                        repetition_levels.data(), i8_values.raw_values());
93     writer->Close();
94   }
95   SetBytesProcessed(state, repetition);
96 }
97 
98 BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REQUIRED)->Arg(1 << 20);
99 BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::OPTIONAL)->Arg(1 << 20);
100 BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REPEATED)->Arg(1 << 20);
101 BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REQUIRED, Compression::SNAPPY)
102     ->Arg(1 << 20);
103 BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::OPTIONAL, Compression::SNAPPY)
104     ->Arg(1 << 20);
105 BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REPEATED, Compression::SNAPPY)
106     ->Arg(1 << 20);
107 
108 BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REQUIRED, Compression::LZ4)
109     ->Arg(1 << 20);
110 BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::OPTIONAL, Compression::LZ4)
111     ->Arg(1 << 20);
112 BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REPEATED, Compression::LZ4)
113     ->Arg(1 << 20);
114 
115 BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REQUIRED, Compression::ZSTD)
116     ->Arg(1 << 20);
117 BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::OPTIONAL, Compression::ZSTD)
118     ->Arg(1 << 20);
119 BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REPEATED, Compression::ZSTD)
120     ->Arg(1 << 20);
121 
BuildReader(std::shared_ptr<Buffer> & buffer,int64_t num_values,Compression::type codec,ColumnDescriptor * schema)122 std::shared_ptr<Int64Reader> BuildReader(std::shared_ptr<Buffer>& buffer,
123                                          int64_t num_values, Compression::type codec,
124                                          ColumnDescriptor* schema) {
125   auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
126   std::unique_ptr<PageReader> page_reader = PageReader::Open(source, num_values, codec);
127   return std::static_pointer_cast<Int64Reader>(
128       ColumnReader::Make(schema, std::move(page_reader)));
129 }
130 
131 template <Repetition::type repetition,
132           Compression::type codec = Compression::UNCOMPRESSED>
BM_ReadInt64Column(::benchmark::State & state)133 static void BM_ReadInt64Column(::benchmark::State& state) {
134   format::ColumnChunk thrift_metadata;
135   std::vector<int64_t> values(state.range(0), 128);
136   std::vector<int16_t> definition_levels(state.range(0), 1);
137   std::vector<int16_t> repetition_levels(state.range(0), 0);
138   std::shared_ptr<ColumnDescriptor> schema = Int64Schema(repetition);
139   std::shared_ptr<WriterProperties> properties = WriterProperties::Builder()
140                                                      .compression(codec)
141                                                      ->encoding(Encoding::PLAIN)
142                                                      ->disable_dictionary()
143                                                      ->build();
144 
145   auto metadata = ColumnChunkMetaDataBuilder::Make(
146       properties, schema.get(), reinterpret_cast<uint8_t*>(&thrift_metadata));
147 
148   auto stream = CreateOutputStream();
149   std::shared_ptr<Int64Writer> writer = BuildWriter(
150       state.range(0), stream, metadata.get(), schema.get(), properties.get(), codec);
151   writer->WriteBatch(values.size(), definition_levels.data(), repetition_levels.data(),
152                      values.data());
153   writer->Close();
154 
155   PARQUET_ASSIGN_OR_THROW(auto src, stream->Finish());
156   std::vector<int64_t> values_out(state.range(1));
157   std::vector<int16_t> definition_levels_out(state.range(1));
158   std::vector<int16_t> repetition_levels_out(state.range(1));
159   while (state.KeepRunning()) {
160     std::shared_ptr<Int64Reader> reader =
161         BuildReader(src, state.range(1), codec, schema.get());
162     int64_t values_read = 0;
163     for (size_t i = 0; i < values.size(); i += values_read) {
164       reader->ReadBatch(values_out.size(), definition_levels_out.data(),
165                         repetition_levels_out.data(), values_out.data(), &values_read);
166     }
167   }
168   SetBytesProcessed(state, repetition);
169 }
170 
ReadColumnSetArgs(::benchmark::internal::Benchmark * bench)171 void ReadColumnSetArgs(::benchmark::internal::Benchmark* bench) {
172   // Small column, tiny reads
173   bench->Args({1024, 16});
174   // Small column, full read
175   bench->Args({1024, 1024});
176   // Midsize column, midsize reads
177   bench->Args({65536, 1024});
178 }
179 
180 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REQUIRED)->Apply(ReadColumnSetArgs);
181 
182 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::OPTIONAL)->Apply(ReadColumnSetArgs);
183 
184 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REPEATED)->Apply(ReadColumnSetArgs);
185 
186 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REQUIRED, Compression::SNAPPY)
187     ->Apply(ReadColumnSetArgs);
188 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::OPTIONAL, Compression::SNAPPY)
189     ->Apply(ReadColumnSetArgs);
190 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REPEATED, Compression::SNAPPY)
191     ->Apply(ReadColumnSetArgs);
192 
193 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REQUIRED, Compression::LZ4)
194     ->Apply(ReadColumnSetArgs);
195 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::OPTIONAL, Compression::LZ4)
196     ->Apply(ReadColumnSetArgs);
197 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REPEATED, Compression::LZ4)
198     ->Apply(ReadColumnSetArgs);
199 
200 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REQUIRED, Compression::ZSTD)
201     ->Apply(ReadColumnSetArgs);
202 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::OPTIONAL, Compression::ZSTD)
203     ->Apply(ReadColumnSetArgs);
204 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REPEATED, Compression::ZSTD)
205     ->Apply(ReadColumnSetArgs);
206 
BM_RleEncoding(::benchmark::State & state)207 static void BM_RleEncoding(::benchmark::State& state) {
208   std::vector<int16_t> levels(state.range(0), 0);
209   int64_t n = 0;
210   std::generate(levels.begin(), levels.end(),
211                 [&state, &n] { return (n++ % state.range(1)) == 0; });
212   int16_t max_level = 1;
213   int64_t rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level,
214                                                  static_cast<int>(levels.size()));
215   auto buffer_rle = AllocateBuffer();
216   PARQUET_THROW_NOT_OK(buffer_rle->Resize(rle_size));
217 
218   while (state.KeepRunning()) {
219     LevelEncoder level_encoder;
220     level_encoder.Init(Encoding::RLE, max_level, static_cast<int>(levels.size()),
221                        buffer_rle->mutable_data(), static_cast<int>(buffer_rle->size()));
222     level_encoder.Encode(static_cast<int>(levels.size()), levels.data());
223   }
224   state.SetBytesProcessed(state.iterations() * state.range(0) * sizeof(int16_t));
225   state.SetItemsProcessed(state.iterations() * state.range(0));
226 }
227 
228 BENCHMARK(BM_RleEncoding)->RangePair(1024, 65536, 1, 16);
229 
BM_RleDecoding(::benchmark::State & state)230 static void BM_RleDecoding(::benchmark::State& state) {
231   LevelEncoder level_encoder;
232   std::vector<int16_t> levels(state.range(0), 0);
233   int64_t n = 0;
234   std::generate(levels.begin(), levels.end(),
235                 [&state, &n] { return (n++ % state.range(1)) == 0; });
236   int16_t max_level = 1;
237   int rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level,
238                                              static_cast<int>(levels.size()));
239   auto buffer_rle = AllocateBuffer();
240   PARQUET_THROW_NOT_OK(buffer_rle->Resize(rle_size + sizeof(int32_t)));
241   level_encoder.Init(Encoding::RLE, max_level, static_cast<int>(levels.size()),
242                      buffer_rle->mutable_data() + sizeof(int32_t), rle_size);
243   level_encoder.Encode(static_cast<int>(levels.size()), levels.data());
244   reinterpret_cast<int32_t*>(buffer_rle->mutable_data())[0] = level_encoder.len();
245 
246   while (state.KeepRunning()) {
247     LevelDecoder level_decoder;
248     level_decoder.SetData(Encoding::RLE, max_level, static_cast<int>(levels.size()),
249                           buffer_rle->data(), rle_size);
250     level_decoder.Decode(static_cast<int>(state.range(0)), levels.data());
251   }
252 
253   state.SetBytesProcessed(state.iterations() * state.range(0) * sizeof(int16_t));
254   state.SetItemsProcessed(state.iterations() * state.range(0));
255 }
256 
257 BENCHMARK(BM_RleDecoding)->RangePair(1024, 65536, 1, 16);
258 
259 }  // namespace benchmark
260 
261 }  // namespace parquet
262