1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include <iostream>
19 
20 #include "arrow/io/memory.h"
21 #include "arrow/testing/gtest_util.h"
22 #include "arrow/testing/util.h"
23 #include "arrow/util/cpu_info.h"
24 #include "arrow/util/simd.h"
25 
26 #include "benchmark/benchmark.h"
27 
28 namespace arrow {
29 
30 using internal::CpuInfo;
31 static CpuInfo* cpu_info = CpuInfo::GetInstance();
32 
33 static const int kNumCores = cpu_info->num_cores();
34 static const int64_t kL1Size = cpu_info->CacheSize(CpuInfo::L1_CACHE);
35 static const int64_t kL2Size = cpu_info->CacheSize(CpuInfo::L2_CACHE);
36 static const int64_t kL3Size = cpu_info->CacheSize(CpuInfo::L3_CACHE);
37 
38 constexpr size_t kMemoryPerCore = 32 * 1024 * 1024;
39 using BufferPtr = std::shared_ptr<Buffer>;
40 
41 #ifdef ARROW_WITH_BENCHMARKS_REFERENCE
42 #ifndef _MSC_VER
43 
44 #ifdef ARROW_HAVE_SSE4_2
45 
46 #ifdef ARROW_HAVE_AVX512
47 
48 using VectorType = __m512i;
49 #define VectorSet _mm512_set1_epi32
50 #define VectorLoad _mm512_stream_load_si512
51 #define VectorLoadAsm(SRC, DST) \
52   asm volatile("vmovaps %[src], %[dst]" : [ dst ] "=v"(DST) : [ src ] "m"(SRC) :)
53 #define VectorStreamLoad _mm512_stream_load_si512
54 #define VectorStreamLoadAsm(SRC, DST) \
55   asm volatile("vmovntdqa %[src], %[dst]" : [ dst ] "=v"(DST) : [ src ] "m"(SRC) :)
56 #define VectorStreamWrite _mm512_stream_si512
57 
58 #else
59 
60 #ifdef ARROW_HAVE_AVX2
61 
62 using VectorType = __m256i;
63 #define VectorSet _mm256_set1_epi32
64 #define VectorLoad _mm256_stream_load_si256
65 #define VectorLoadAsm(SRC, DST) \
66   asm volatile("vmovaps %[src], %[dst]" : [ dst ] "=v"(DST) : [ src ] "m"(SRC) :)
67 #define VectorStreamLoad _mm256_stream_load_si256
68 #define VectorStreamLoadAsm(SRC, DST) \
69   asm volatile("vmovntdqa %[src], %[dst]" : [ dst ] "=v"(DST) : [ src ] "m"(SRC) :)
70 #define VectorStreamWrite _mm256_stream_si256
71 
72 #else  // ARROW_HAVE_AVX2 not set
73 
74 using VectorType = __m128i;
75 #define VectorSet _mm_set1_epi32
76 #define VectorLoad _mm_stream_load_si128
77 #define VectorLoadAsm(SRC, DST) \
78   asm volatile("movaps %[src], %[dst]" : [ dst ] "=x"(DST) : [ src ] "m"(SRC) :)
79 #define VectorStreamLoad _mm_stream_load_si128
80 #define VectorStreamLoadAsm(SRC, DST) \
81   asm volatile("movntdqa %[src], %[dst]" : [ dst ] "=x"(DST) : [ src ] "m"(SRC) :)
82 #define VectorStreamWrite _mm_stream_si128
83 
84 #endif  // ARROW_HAVE_AVX2
85 #endif  // ARROW_HAVE_AVX512
86 
Read(void * src,void * dst,size_t size)87 static void Read(void* src, void* dst, size_t size) {
88   const auto simd = static_cast<VectorType*>(src);
89   VectorType a, b, c, d;
90   (void)dst;
91 
92   for (size_t i = 0; i < size / sizeof(VectorType); i += 4) {
93     VectorLoadAsm(simd[i], a);
94     VectorLoadAsm(simd[i + 1], b);
95     VectorLoadAsm(simd[i + 2], c);
96     VectorLoadAsm(simd[i + 3], d);
97   }
98 
99   memset(&a, 0, sizeof(a));
100   memset(&b, 0, sizeof(b));
101   memset(&c, 0, sizeof(c));
102   memset(&d, 0, sizeof(d));
103 
104   benchmark::DoNotOptimize(a + b + c + d);
105 }
106 
107 // See http://codearcana.com/posts/2013/05/18/achieving-maximum-memory-bandwidth.html
108 // for the usage of stream loads/writes. Or section 6.1, page 47 of
109 // https://akkadia.org/drepper/cpumemory.pdf .
StreamRead(void * src,void * dst,size_t size)110 static void StreamRead(void* src, void* dst, size_t size) {
111   auto simd = static_cast<VectorType*>(src);
112   VectorType a, b, c, d;
113   (void)dst;
114 
115   memset(&a, 0, sizeof(a));
116   memset(&b, 0, sizeof(b));
117   memset(&c, 0, sizeof(c));
118   memset(&d, 0, sizeof(d));
119 
120   for (size_t i = 0; i < size / sizeof(VectorType); i += 4) {
121     VectorStreamLoadAsm(simd[i], a);
122     VectorStreamLoadAsm(simd[i + 1], b);
123     VectorStreamLoadAsm(simd[i + 2], c);
124     VectorStreamLoadAsm(simd[i + 3], d);
125   }
126 
127   benchmark::DoNotOptimize(a + b + c + d);
128 }
129 
StreamWrite(void * src,void * dst,size_t size)130 static void StreamWrite(void* src, void* dst, size_t size) {
131   auto simd = static_cast<VectorType*>(dst);
132   const VectorType ones = VectorSet(1);
133   (void)src;
134 
135   for (size_t i = 0; i < size / sizeof(VectorType); i += 4) {
136     VectorStreamWrite(&simd[i], ones);
137     VectorStreamWrite(&simd[i + 1], ones);
138     VectorStreamWrite(&simd[i + 2], ones);
139     VectorStreamWrite(&simd[i + 3], ones);
140   }
141 }
142 
StreamReadWrite(void * src,void * dst,size_t size)143 static void StreamReadWrite(void* src, void* dst, size_t size) {
144   auto src_simd = static_cast<VectorType*>(src);
145   auto dst_simd = static_cast<VectorType*>(dst);
146 
147   for (size_t i = 0; i < size / sizeof(VectorType); i += 4) {
148     VectorStreamWrite(&dst_simd[i], VectorStreamLoad(&src_simd[i]));
149     VectorStreamWrite(&dst_simd[i + 1], VectorStreamLoad(&src_simd[i + 1]));
150     VectorStreamWrite(&dst_simd[i + 2], VectorStreamLoad(&src_simd[i + 2]));
151     VectorStreamWrite(&dst_simd[i + 3], VectorStreamLoad(&src_simd[i + 3]));
152   }
153 }
154 
155 #endif  // ARROW_HAVE_SSE4_2
156 
157 #ifdef ARROW_HAVE_ARMV8_CRYPTO
158 
159 using VectorType = uint8x16_t;
160 using VectorTypeDual = uint8x16x2_t;
161 
162 #define VectorSet vdupq_n_u8
163 #define VectorLoadAsm vld1q_u8
164 
armv8_stream_load_pair(VectorType * src,VectorType * dst)165 static void armv8_stream_load_pair(VectorType* src, VectorType* dst) {
166   asm volatile("LDNP %[reg1], %[reg2], [%[from]]\n\t"
167                : [ reg1 ] "+r"(*dst), [ reg2 ] "+r"(*(dst + 1))
168                : [ from ] "r"(src));
169 }
170 
armv8_stream_store_pair(VectorType * src,VectorType * dst)171 static void armv8_stream_store_pair(VectorType* src, VectorType* dst) {
172   asm volatile("STNP %[reg1], %[reg2], [%[to]]\n\t"
173                : [ to ] "+r"(dst)
174                : [ reg1 ] "r"(*src), [ reg2 ] "r"(*(src + 1))
175                : "memory");
176 }
177 
armv8_stream_ldst_pair(VectorType * src,VectorType * dst)178 static void armv8_stream_ldst_pair(VectorType* src, VectorType* dst) {
179   asm volatile(
180       "LDNP q1, q2, [%[from]]\n\t"
181       "STNP q1, q2, [%[to]]\n\t"
182       : [ from ] "+r"(src), [ to ] "+r"(dst)
183       :
184       : "memory", "v0", "v1", "v2", "v3");
185 }
186 
Read(void * src,void * dst,size_t size)187 static void Read(void* src, void* dst, size_t size) {
188   const auto simd = static_cast<uint8_t*>(src);
189   VectorType a;
190   (void)dst;
191 
192   memset(&a, 0, sizeof(a));
193 
194   for (size_t i = 0; i < size; i += sizeof(VectorType)) {
195     a = VectorLoadAsm(simd + i);
196   }
197 
198   benchmark::DoNotOptimize(a);
199 }
200 
201 // See http://codearcana.com/posts/2013/05/18/achieving-maximum-memory-bandwidth.html
202 // for the usage of stream loads/writes. Or section 6.1, page 47 of
203 // https://akkadia.org/drepper/cpumemory.pdf .
StreamRead(void * src,void * dst,size_t size)204 static void StreamRead(void* src, void* dst, size_t size) {
205   auto simd = static_cast<VectorType*>(src);
206   VectorType a[2];
207   (void)dst;
208 
209   memset(&a, 0, sizeof(VectorTypeDual));
210 
211   for (size_t i = 0; i < size / sizeof(VectorType); i += 2) {
212     armv8_stream_load_pair(simd + i, a);
213   }
214 
215   benchmark::DoNotOptimize(a);
216 }
217 
StreamWrite(void * src,void * dst,size_t size)218 static void StreamWrite(void* src, void* dst, size_t size) {
219   auto simd = static_cast<VectorType*>(dst);
220   VectorType ones[2];
221   (void)src;
222 
223   ones[0] = VectorSet(1);
224   ones[1] = VectorSet(1);
225 
226   for (size_t i = 0; i < size / sizeof(VectorType); i += 2) {
227     armv8_stream_store_pair(static_cast<VectorType*>(ones), simd + i);
228   }
229 }
230 
StreamReadWrite(void * src,void * dst,size_t size)231 static void StreamReadWrite(void* src, void* dst, size_t size) {
232   auto src_simd = static_cast<VectorType*>(src);
233   auto dst_simd = static_cast<VectorType*>(dst);
234 
235   for (size_t i = 0; i < size / sizeof(VectorType); i += 2) {
236     armv8_stream_ldst_pair(src_simd + i, dst_simd + i);
237   }
238 }
239 
240 #endif  // ARROW_HAVE_ARMV8_CRYPTO
241 
PlatformMemcpy(void * src,void * dst,size_t size)242 static void PlatformMemcpy(void* src, void* dst, size_t size) { memcpy(src, dst, size); }
243 
244 using ApplyFn = decltype(Read);
245 
246 template <ApplyFn Apply>
MemoryBandwidth(benchmark::State & state)247 static void MemoryBandwidth(benchmark::State& state) {  // NOLINT non-const reference
248   const size_t buffer_size = state.range(0);
249   BufferPtr src, dst;
250 
251   dst = *AllocateBuffer(buffer_size);
252   src = *AllocateBuffer(buffer_size);
253   random_bytes(buffer_size, 0, src->mutable_data());
254 
255   while (state.KeepRunning()) {
256     Apply(src->mutable_data(), dst->mutable_data(), buffer_size);
257   }
258 
259   state.SetBytesProcessed(state.iterations() * buffer_size);
260 }
261 
262 #ifdef ARROW_HAVE_SSE4_2
SetCacheBandwidthArgs(benchmark::internal::Benchmark * bench)263 static void SetCacheBandwidthArgs(benchmark::internal::Benchmark* bench) {
264   auto cache_sizes = {kL1Size, kL2Size, kL3Size};
265   for (auto size : cache_sizes) {
266     bench->Arg(size / 2);
267     bench->Arg(size);
268     bench->Arg(size * 2);
269   }
270 
271   bench->ArgName("size");
272 }
273 
274 BENCHMARK_TEMPLATE(MemoryBandwidth, Read)->Apply(SetCacheBandwidthArgs);
275 #endif  // ARROW_HAVE_SSE4_2
276 
SetMemoryBandwidthArgs(benchmark::internal::Benchmark * bench)277 static void SetMemoryBandwidthArgs(benchmark::internal::Benchmark* bench) {
278   // `UseRealTime` is required due to threads, otherwise the cumulative CPU time
279   // is used which will skew the results by the number of threads.
280   bench->Arg(kMemoryPerCore)->ThreadRange(1, kNumCores)->UseRealTime();
281 }
282 
283 BENCHMARK_TEMPLATE(MemoryBandwidth, StreamRead)->Apply(SetMemoryBandwidthArgs);
284 BENCHMARK_TEMPLATE(MemoryBandwidth, StreamWrite)->Apply(SetMemoryBandwidthArgs);
285 BENCHMARK_TEMPLATE(MemoryBandwidth, StreamReadWrite)->Apply(SetMemoryBandwidthArgs);
286 BENCHMARK_TEMPLATE(MemoryBandwidth, PlatformMemcpy)->Apply(SetMemoryBandwidthArgs);
287 
288 #endif  // _MSC_VER
289 #endif  // ARROW_WITH_BENCHMARKS_REFERENCE
290 
ParallelMemoryCopy(benchmark::State & state)291 static void ParallelMemoryCopy(benchmark::State& state) {  // NOLINT non-const reference
292   const int64_t n_threads = state.range(0);
293   const int64_t buffer_size = kMemoryPerCore;
294 
295   auto src = *AllocateBuffer(buffer_size);
296   std::shared_ptr<Buffer> dst = *AllocateBuffer(buffer_size);
297 
298   random_bytes(buffer_size, 0, src->mutable_data());
299 
300   while (state.KeepRunning()) {
301     io::FixedSizeBufferWriter writer(dst);
302     writer.set_memcopy_threads(static_cast<int>(n_threads));
303     ABORT_NOT_OK(writer.Write(src->data(), src->size()));
304   }
305 
306   state.SetBytesProcessed(int64_t(state.iterations()) * buffer_size);
307 }
308 
309 BENCHMARK(ParallelMemoryCopy)
310     ->RangeMultiplier(2)
311     ->Range(1, kNumCores)
312     ->ArgName("threads")
313     ->UseRealTime();
314 
BenchmarkBufferOutputStream(const std::string & datum,benchmark::State & state)315 static void BenchmarkBufferOutputStream(
316     const std::string& datum,
317     benchmark::State& state) {  // NOLINT non-const reference
318   const void* raw_data = datum.data();
319   int64_t raw_nbytes = static_cast<int64_t>(datum.size());
320   // Write approx. 32 MB to each BufferOutputStream
321   int64_t num_raw_values = (1 << 25) / raw_nbytes;
322   for (auto _ : state) {
323     auto stream = *io::BufferOutputStream::Create(1024);
324     for (int64_t i = 0; i < num_raw_values; ++i) {
325       ABORT_NOT_OK(stream->Write(raw_data, raw_nbytes));
326     }
327     ABORT_NOT_OK(stream->Finish());
328   }
329   state.SetBytesProcessed(int64_t(state.iterations()) * num_raw_values * raw_nbytes);
330 }
331 
BufferOutputStreamTinyWrites(benchmark::State & state)332 static void BufferOutputStreamTinyWrites(
333     benchmark::State& state) {  // NOLINT non-const reference
334   // A 8-byte datum
335   return BenchmarkBufferOutputStream("abdefghi", state);
336 }
337 
BufferOutputStreamSmallWrites(benchmark::State & state)338 static void BufferOutputStreamSmallWrites(
339     benchmark::State& state) {  // NOLINT non-const reference
340   // A 700-byte datum
341   std::string datum;
342   for (int i = 0; i < 100; ++i) {
343     datum += "abcdefg";
344   }
345   return BenchmarkBufferOutputStream(datum, state);
346 }
347 
BufferOutputStreamLargeWrites(benchmark::State & state)348 static void BufferOutputStreamLargeWrites(
349     benchmark::State& state) {  // NOLINT non-const reference
350   // A 1.5MB datum
351   std::string datum(1500000, 'x');
352   return BenchmarkBufferOutputStream(datum, state);
353 }
354 
355 BENCHMARK(BufferOutputStreamTinyWrites)->UseRealTime();
356 BENCHMARK(BufferOutputStreamSmallWrites)->UseRealTime();
357 BENCHMARK(BufferOutputStreamLargeWrites)->UseRealTime();
358 
359 }  // namespace arrow
360