1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include "benchmark/benchmark.h"
19
20 #include "arrow/array.h"
21 #include "arrow/io/memory.h"
22 #include "arrow/testing/random.h"
23
24 #include "parquet/column_reader.h"
25 #include "parquet/column_writer.h"
26 #include "parquet/file_reader.h"
27 #include "parquet/metadata.h"
28 #include "parquet/platform.h"
29 #include "parquet/thrift_internal.h"
30
31 namespace parquet {
32
33 using schema::PrimitiveNode;
34
35 namespace benchmark {
36
BuildWriter(int64_t output_size,const std::shared_ptr<ArrowOutputStream> & dst,ColumnChunkMetaDataBuilder * metadata,ColumnDescriptor * schema,const WriterProperties * properties,Compression::type codec)37 std::shared_ptr<Int64Writer> BuildWriter(int64_t output_size,
38 const std::shared_ptr<ArrowOutputStream>& dst,
39 ColumnChunkMetaDataBuilder* metadata,
40 ColumnDescriptor* schema,
41 const WriterProperties* properties,
42 Compression::type codec) {
43 std::unique_ptr<PageWriter> pager =
44 PageWriter::Open(dst, codec, Codec::UseDefaultCompressionLevel(), metadata);
45 std::shared_ptr<ColumnWriter> writer =
46 ColumnWriter::Make(metadata, std::move(pager), properties);
47 return std::static_pointer_cast<Int64Writer>(writer);
48 }
49
Int64Schema(Repetition::type repetition)50 std::shared_ptr<ColumnDescriptor> Int64Schema(Repetition::type repetition) {
51 auto node = PrimitiveNode::Make("int64", repetition, Type::INT64);
52 return std::make_shared<ColumnDescriptor>(node, repetition != Repetition::REQUIRED,
53 repetition == Repetition::REPEATED);
54 }
55
SetBytesProcessed(::benchmark::State & state,Repetition::type repetition)56 void SetBytesProcessed(::benchmark::State& state, Repetition::type repetition) {
57 int64_t bytes_processed = state.iterations() * state.range(0) * sizeof(int64_t);
58 if (repetition != Repetition::REQUIRED) {
59 bytes_processed += state.iterations() * state.range(0) * sizeof(int16_t);
60 }
61 if (repetition == Repetition::REPEATED) {
62 bytes_processed += state.iterations() * state.range(0) * sizeof(int16_t);
63 }
64 state.SetBytesProcessed(state.iterations() * state.range(0) * sizeof(int16_t));
65 }
66
67 template <Repetition::type repetition,
68 Compression::type codec = Compression::UNCOMPRESSED>
BM_WriteInt64Column(::benchmark::State & state)69 static void BM_WriteInt64Column(::benchmark::State& state) {
70 format::ColumnChunk thrift_metadata;
71
72 ::arrow::random::RandomArrayGenerator rgen(1337);
73 auto values = rgen.Int64(state.range(0), 0, 1000000, 0);
74 const auto& i8_values = static_cast<const ::arrow::Int64Array&>(*values);
75
76 std::vector<int16_t> definition_levels(state.range(0), 1);
77 std::vector<int16_t> repetition_levels(state.range(0), 0);
78 std::shared_ptr<ColumnDescriptor> schema = Int64Schema(repetition);
79 std::shared_ptr<WriterProperties> properties = WriterProperties::Builder()
80 .compression(codec)
81 ->encoding(Encoding::PLAIN)
82 ->disable_dictionary()
83 ->build();
84 auto metadata = ColumnChunkMetaDataBuilder::Make(
85 properties, schema.get(), reinterpret_cast<uint8_t*>(&thrift_metadata));
86
87 while (state.KeepRunning()) {
88 auto stream = CreateOutputStream();
89 std::shared_ptr<Int64Writer> writer = BuildWriter(
90 state.range(0), stream, metadata.get(), schema.get(), properties.get(), codec);
91 writer->WriteBatch(i8_values.length(), definition_levels.data(),
92 repetition_levels.data(), i8_values.raw_values());
93 writer->Close();
94 }
95 SetBytesProcessed(state, repetition);
96 }
97
98 BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REQUIRED)->Arg(1 << 20);
99 BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::OPTIONAL)->Arg(1 << 20);
100 BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REPEATED)->Arg(1 << 20);
101 BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REQUIRED, Compression::SNAPPY)
102 ->Arg(1 << 20);
103 BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::OPTIONAL, Compression::SNAPPY)
104 ->Arg(1 << 20);
105 BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REPEATED, Compression::SNAPPY)
106 ->Arg(1 << 20);
107
108 BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REQUIRED, Compression::LZ4)
109 ->Arg(1 << 20);
110 BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::OPTIONAL, Compression::LZ4)
111 ->Arg(1 << 20);
112 BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REPEATED, Compression::LZ4)
113 ->Arg(1 << 20);
114
115 BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REQUIRED, Compression::ZSTD)
116 ->Arg(1 << 20);
117 BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::OPTIONAL, Compression::ZSTD)
118 ->Arg(1 << 20);
119 BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REPEATED, Compression::ZSTD)
120 ->Arg(1 << 20);
121
BuildReader(std::shared_ptr<Buffer> & buffer,int64_t num_values,Compression::type codec,ColumnDescriptor * schema)122 std::shared_ptr<Int64Reader> BuildReader(std::shared_ptr<Buffer>& buffer,
123 int64_t num_values, Compression::type codec,
124 ColumnDescriptor* schema) {
125 auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
126 std::unique_ptr<PageReader> page_reader = PageReader::Open(source, num_values, codec);
127 return std::static_pointer_cast<Int64Reader>(
128 ColumnReader::Make(schema, std::move(page_reader)));
129 }
130
131 template <Repetition::type repetition,
132 Compression::type codec = Compression::UNCOMPRESSED>
BM_ReadInt64Column(::benchmark::State & state)133 static void BM_ReadInt64Column(::benchmark::State& state) {
134 format::ColumnChunk thrift_metadata;
135 std::vector<int64_t> values(state.range(0), 128);
136 std::vector<int16_t> definition_levels(state.range(0), 1);
137 std::vector<int16_t> repetition_levels(state.range(0), 0);
138 std::shared_ptr<ColumnDescriptor> schema = Int64Schema(repetition);
139 std::shared_ptr<WriterProperties> properties = WriterProperties::Builder()
140 .compression(codec)
141 ->encoding(Encoding::PLAIN)
142 ->disable_dictionary()
143 ->build();
144
145 auto metadata = ColumnChunkMetaDataBuilder::Make(
146 properties, schema.get(), reinterpret_cast<uint8_t*>(&thrift_metadata));
147
148 auto stream = CreateOutputStream();
149 std::shared_ptr<Int64Writer> writer = BuildWriter(
150 state.range(0), stream, metadata.get(), schema.get(), properties.get(), codec);
151 writer->WriteBatch(values.size(), definition_levels.data(), repetition_levels.data(),
152 values.data());
153 writer->Close();
154
155 PARQUET_ASSIGN_OR_THROW(auto src, stream->Finish());
156 std::vector<int64_t> values_out(state.range(1));
157 std::vector<int16_t> definition_levels_out(state.range(1));
158 std::vector<int16_t> repetition_levels_out(state.range(1));
159 while (state.KeepRunning()) {
160 std::shared_ptr<Int64Reader> reader =
161 BuildReader(src, state.range(1), codec, schema.get());
162 int64_t values_read = 0;
163 for (size_t i = 0; i < values.size(); i += values_read) {
164 reader->ReadBatch(values_out.size(), definition_levels_out.data(),
165 repetition_levels_out.data(), values_out.data(), &values_read);
166 }
167 }
168 SetBytesProcessed(state, repetition);
169 }
170
ReadColumnSetArgs(::benchmark::internal::Benchmark * bench)171 void ReadColumnSetArgs(::benchmark::internal::Benchmark* bench) {
172 // Small column, tiny reads
173 bench->Args({1024, 16});
174 // Small column, full read
175 bench->Args({1024, 1024});
176 // Midsize column, midsize reads
177 bench->Args({65536, 1024});
178 }
179
180 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REQUIRED)->Apply(ReadColumnSetArgs);
181
182 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::OPTIONAL)->Apply(ReadColumnSetArgs);
183
184 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REPEATED)->Apply(ReadColumnSetArgs);
185
186 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REQUIRED, Compression::SNAPPY)
187 ->Apply(ReadColumnSetArgs);
188 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::OPTIONAL, Compression::SNAPPY)
189 ->Apply(ReadColumnSetArgs);
190 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REPEATED, Compression::SNAPPY)
191 ->Apply(ReadColumnSetArgs);
192
193 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REQUIRED, Compression::LZ4)
194 ->Apply(ReadColumnSetArgs);
195 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::OPTIONAL, Compression::LZ4)
196 ->Apply(ReadColumnSetArgs);
197 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REPEATED, Compression::LZ4)
198 ->Apply(ReadColumnSetArgs);
199
200 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REQUIRED, Compression::ZSTD)
201 ->Apply(ReadColumnSetArgs);
202 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::OPTIONAL, Compression::ZSTD)
203 ->Apply(ReadColumnSetArgs);
204 BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REPEATED, Compression::ZSTD)
205 ->Apply(ReadColumnSetArgs);
206
BM_RleEncoding(::benchmark::State & state)207 static void BM_RleEncoding(::benchmark::State& state) {
208 std::vector<int16_t> levels(state.range(0), 0);
209 int64_t n = 0;
210 std::generate(levels.begin(), levels.end(),
211 [&state, &n] { return (n++ % state.range(1)) == 0; });
212 int16_t max_level = 1;
213 int64_t rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level,
214 static_cast<int>(levels.size()));
215 auto buffer_rle = AllocateBuffer();
216 PARQUET_THROW_NOT_OK(buffer_rle->Resize(rle_size));
217
218 while (state.KeepRunning()) {
219 LevelEncoder level_encoder;
220 level_encoder.Init(Encoding::RLE, max_level, static_cast<int>(levels.size()),
221 buffer_rle->mutable_data(), static_cast<int>(buffer_rle->size()));
222 level_encoder.Encode(static_cast<int>(levels.size()), levels.data());
223 }
224 state.SetBytesProcessed(state.iterations() * state.range(0) * sizeof(int16_t));
225 state.SetItemsProcessed(state.iterations() * state.range(0));
226 }
227
228 BENCHMARK(BM_RleEncoding)->RangePair(1024, 65536, 1, 16);
229
BM_RleDecoding(::benchmark::State & state)230 static void BM_RleDecoding(::benchmark::State& state) {
231 LevelEncoder level_encoder;
232 std::vector<int16_t> levels(state.range(0), 0);
233 int64_t n = 0;
234 std::generate(levels.begin(), levels.end(),
235 [&state, &n] { return (n++ % state.range(1)) == 0; });
236 int16_t max_level = 1;
237 int rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level,
238 static_cast<int>(levels.size()));
239 auto buffer_rle = AllocateBuffer();
240 PARQUET_THROW_NOT_OK(buffer_rle->Resize(rle_size + sizeof(int32_t)));
241 level_encoder.Init(Encoding::RLE, max_level, static_cast<int>(levels.size()),
242 buffer_rle->mutable_data() + sizeof(int32_t), rle_size);
243 level_encoder.Encode(static_cast<int>(levels.size()), levels.data());
244 reinterpret_cast<int32_t*>(buffer_rle->mutable_data())[0] = level_encoder.len();
245
246 while (state.KeepRunning()) {
247 LevelDecoder level_decoder;
248 level_decoder.SetData(Encoding::RLE, max_level, static_cast<int>(levels.size()),
249 buffer_rle->data(), rle_size);
250 level_decoder.Decode(static_cast<int>(state.range(0)), levels.data());
251 }
252
253 state.SetBytesProcessed(state.iterations() * state.range(0) * sizeof(int16_t));
254 state.SetItemsProcessed(state.iterations() * state.range(0));
255 }
256
257 BENCHMARK(BM_RleDecoding)->RangePair(1024, 65536, 1, 16);
258
259 } // namespace benchmark
260
261 } // namespace parquet
262