1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <folly/AtomicHashMap.h>
18
19 #include <atomic>
20 #include <memory>
21 #include <thread>
22
23 #include <glog/logging.h>
24
25 #include <folly/Benchmark.h>
26 #include <folly/Conv.h>
27 #include <folly/portability/Atomic.h>
28 #include <folly/portability/GTest.h>
29 #include <folly/portability/SysTime.h>
30
31 using folly::AtomicHashArray;
32 using folly::AtomicHashMap;
33 using folly::StringPiece;
34 using std::string;
35 using std::vector;
36
37 // Tunables:
38 DEFINE_double(targetLoadFactor, 0.75, "Target memory utilization fraction.");
39 DEFINE_double(maxLoadFactor, 0.80, "Max before growth.");
40 DEFINE_int32(numThreads, 8, "Threads to use for concurrency tests.");
41 DEFINE_int64(numBMElements, 12 * 1000 * 1000, "Size of maps for benchmarks.");
42
43 const double LF = FLAGS_maxLoadFactor / FLAGS_targetLoadFactor;
44 const int maxBMElements = int(FLAGS_numBMElements * LF); // hit our target LF.
45
nowInUsec()46 static int64_t nowInUsec() {
47 timeval tv;
48 gettimeofday(&tv, nullptr);
49 return int64_t(tv.tv_sec) * 1000 * 1000 + tv.tv_usec;
50 }
51
TEST(Ahm,BasicStrings)52 TEST(Ahm, BasicStrings) {
53 typedef AtomicHashMap<int64_t, string> AHM;
54 AHM myMap(1024);
55 EXPECT_TRUE(myMap.begin() == myMap.end());
56
57 for (int i = 0; i < 100; ++i) {
58 myMap.insert(make_pair(i, folly::to<string>(i)));
59 }
60 for (int i = 0; i < 100; ++i) {
61 EXPECT_EQ(myMap.find(i)->second, folly::to<string>(i));
62 }
63
64 myMap.insert(std::make_pair(999, "A"));
65 myMap.insert(std::make_pair(999, "B"));
66 EXPECT_EQ(myMap.find(999)->second, "A"); // shouldn't have overwritten
67 myMap.find(999)->second = "B";
68 myMap.find(999)->second = "C";
69 EXPECT_EQ(myMap.find(999)->second, "C");
70 EXPECT_EQ(myMap.find(999)->first, 999);
71 }
72
TEST(Ahm,BasicNoncopyable)73 TEST(Ahm, BasicNoncopyable) {
74 typedef AtomicHashMap<int64_t, std::unique_ptr<int>> AHM;
75 AHM myMap(1024);
76 EXPECT_TRUE(myMap.begin() == myMap.end());
77
78 for (int i = 0; i < 50; ++i) {
79 myMap.insert(make_pair(i, std::make_unique<int>(i)));
80 }
81 for (int i = 50; i < 100; ++i) {
82 myMap.insert(i, std::make_unique<int>(i));
83 }
84 for (int i = 100; i < 150; ++i) {
85 myMap.emplace(i, new int(i));
86 }
87 for (int i = 150; i < 200; ++i) {
88 myMap.emplace(i, new int(i), std::default_delete<int>());
89 }
90 for (int i = 0; i < 200; ++i) {
91 EXPECT_EQ(*(myMap.find(i)->second), i);
92 }
93 for (int i = 0; i < 200; i += 4) {
94 myMap.erase(i);
95 }
96 for (int i = 0; i < 200; i += 4) {
97 EXPECT_EQ(myMap.find(i), myMap.end());
98 }
99 }
100
101 typedef int32_t KeyT;
102 typedef int32_t ValueT;
103
104 typedef AtomicHashMap<KeyT, ValueT> AHMapT;
105 typedef AHMapT::value_type RecordT;
106 typedef AtomicHashArray<KeyT, ValueT> AHArrayT;
107 AHArrayT::Config config;
108 typedef folly::QuadraticProbingAtomicHashMap<KeyT, ValueT> QPAHMapT;
109 QPAHMapT::Config qpConfig;
110 static AHArrayT::SmartPtr globalAHA(nullptr);
111 static std::unique_ptr<AHMapT> globalAHM;
112 static std::unique_ptr<QPAHMapT> globalQPAHM;
113
114 // Generate a deterministic value based on an input key
genVal(int key)115 static int genVal(int key) {
116 return key / 3;
117 }
118
119 static bool legalKey(const char* a);
120
121 struct EqTraits {
operator ()EqTraits122 bool operator()(const char* a, const char* b) {
123 return legalKey(a) && (strcmp(a, b) == 0);
124 }
operator ()EqTraits125 bool operator()(const char* a, const char& b) {
126 return legalKey(a) && (a[0] != '\0') && (a[0] == b);
127 }
operator ()EqTraits128 bool operator()(const char* a, const StringPiece b) {
129 return legalKey(a) && (strlen(a) == b.size()) &&
130 (strcmp(a, b.begin()) == 0);
131 }
132 };
133
134 struct HashTraits {
operator ()HashTraits135 size_t operator()(const char* a) {
136 size_t result = 0;
137 while (a[0] != 0) {
138 result += static_cast<size_t>(*(a++));
139 }
140 return result;
141 }
operator ()HashTraits142 size_t operator()(const char& a) { return static_cast<size_t>(a); }
operator ()HashTraits143 size_t operator()(const StringPiece a) {
144 size_t result = 0;
145 for (const auto& ch : a) {
146 result += static_cast<size_t>(ch);
147 }
148 return result;
149 }
150 };
151
152 typedef AtomicHashMap<const char*, int64_t, HashTraits, EqTraits> AHMCstrInt;
153 AHMCstrInt::Config cstrIntCfg;
154
legalKey(const char * a)155 static bool legalKey(const char* a) {
156 return a != cstrIntCfg.emptyKey && a != cstrIntCfg.lockedKey &&
157 a != cstrIntCfg.erasedKey;
158 }
159
TEST(Ahm,BasicLookup)160 TEST(Ahm, BasicLookup) {
161 AHMCstrInt myMap(1024, cstrIntCfg);
162 EXPECT_TRUE(myMap.begin() == myMap.end());
163 myMap.insert(std::make_pair("f", 42));
164 EXPECT_EQ(42, myMap.find("f")->second);
165 {
166 // Look up a single char, successfully.
167 auto it = myMap.find<char>('f');
168 EXPECT_EQ(42, it->second);
169 }
170 {
171 // Look up a single char, unsuccessfully.
172 auto it = myMap.find<char>('g');
173 EXPECT_TRUE(it == myMap.end());
174 }
175 {
176 // Look up a string piece, successfully.
177 const StringPiece piece("f");
178 auto it = myMap.find(piece);
179 EXPECT_EQ(42, it->second);
180 }
181 }
182
TEST(Ahm,grow)183 TEST(Ahm, grow) {
184 VLOG(1) << "Overhead: " << sizeof(AHArrayT) << " (array) "
185 << sizeof(AHMapT) + sizeof(AHArrayT) << " (map/set) Bytes.";
186 uint64_t numEntries = 10000;
187 float sizeFactor = 0.46f;
188
189 std::unique_ptr<AHMapT> m(new AHMapT(int(numEntries * sizeFactor), config));
190
191 // load map - make sure we succeed and the index is accurate
192 bool success = true;
193 for (uint64_t i = 0; i < numEntries; i++) {
194 auto ret = m->insert(RecordT(i, genVal(i)));
195 success &= ret.second;
196 success &= (m->findAt(ret.first.getIndex())->second == genVal(i));
197 }
198 // Overwrite vals to make sure there are no dups
199 // Every insert should fail because the keys are already in the map.
200 success = true;
201 for (uint64_t i = 0; i < numEntries; i++) {
202 auto ret = m->insert(RecordT(i, genVal(i * 2)));
203 success &= (ret.second == false); // fail on collision
204 success &= (ret.first->second == genVal(i)); // return the previous value
205 success &= (m->findAt(ret.first.getIndex())->second == genVal(i));
206 }
207 EXPECT_TRUE(success);
208
209 // check correctness
210 EXPECT_GT(m->numSubMaps(), 1); // make sure we grew
211 success = true;
212 EXPECT_EQ(m->size(), numEntries);
213 for (size_t i = 0; i < numEntries; i++) {
214 success &= (m->find(i)->second == genVal(i));
215 }
216 EXPECT_TRUE(success);
217
218 // Check findAt
219 success = true;
220 AHMapT::const_iterator retIt;
221 for (int32_t i = 0; i < int32_t(numEntries); i++) {
222 retIt = m->find(i);
223 retIt = m->findAt(retIt.getIndex());
224 success &= (retIt->second == genVal(i));
225 // We use a uint32_t index so that this comparison is between two
226 // variables of the same type.
227 success &= (retIt->first == i);
228 }
229 EXPECT_TRUE(success);
230
231 // Try modifying value
232 m->find(8)->second = 5309;
233 EXPECT_EQ(m->find(8)->second, 5309);
234
235 // check clear()
236 m->clear();
237 success = true;
238 for (uint64_t i = 0; i < numEntries / 2; i++) {
239 success &= m->insert(RecordT(i, genVal(i))).second;
240 }
241 EXPECT_TRUE(success);
242 EXPECT_EQ(m->size(), numEntries / 2);
243 }
244
TEST(Ahm,iterator)245 TEST(Ahm, iterator) {
246 int numEntries = 10000;
247 float sizeFactor = .46f;
248 std::unique_ptr<AHMapT> m(new AHMapT(int(numEntries * sizeFactor), config));
249
250 // load map - make sure we succeed and the index is accurate
251 for (int i = 0; i < numEntries; i++) {
252 m->insert(RecordT(i, genVal(i)));
253 }
254
255 bool success = true;
256 int count = 0;
257 FOR_EACH (it, *m) {
258 success &= (it->second == genVal(it->first));
259 ++count;
260 }
261 EXPECT_TRUE(success);
262 EXPECT_EQ(count, numEntries);
263 }
264
265 class Counters {
266 private:
267 // Note: Unfortunately can't currently put a std::atomic<int64_t> in
268 // the value in ahm since it doesn't support types that are both non-copy
269 // and non-move constructible yet.
270 AtomicHashMap<int64_t, int64_t> ahm;
271
272 public:
Counters(size_t numCounters)273 explicit Counters(size_t numCounters) : ahm(numCounters) {}
274
increment(int64_t obj_id)275 void increment(int64_t obj_id) {
276 auto ret = ahm.insert(std::make_pair(obj_id, 1));
277 if (!ret.second) {
278 // obj_id already exists, increment count
279 __sync_fetch_and_add(&ret.first->second, 1);
280 }
281 }
282
getValue(int64_t obj_id)283 int64_t getValue(int64_t obj_id) {
284 auto ret = ahm.find(obj_id);
285 return ret != ahm.end() ? ret->second : 0;
286 }
287
288 // export the counters without blocking increments
toString()289 string toString() {
290 string ret = "{\n";
291 ret.reserve(ahm.size() * 32);
292 for (const auto& e : ahm) {
293 ret += folly::to<string>(" [", e.first, ":", e.second, "]\n");
294 }
295 ret += "}\n";
296 return ret;
297 }
298 };
299
300 // If you get an error "terminate called without an active exception", there
301 // might be too many threads getting created - decrease numKeys and/or mult.
TEST(Ahm,counter)302 TEST(Ahm, counter) {
303 const int numKeys = 10;
304 const int mult = 10;
305 Counters c(numKeys);
306 vector<int64_t> keys;
307 FOR_EACH_RANGE (i, 1, numKeys) { keys.push_back(i); }
308 vector<std::thread> threads;
309 for (auto key : keys) {
310 FOR_EACH_RANGE (i, 0, key * mult) {
311 threads.push_back(std::thread([&, key] { c.increment(key); }));
312 }
313 }
314 for (auto& t : threads) {
315 t.join();
316 }
317 string str = c.toString();
318 for (auto key : keys) {
319 int val = key * mult;
320 EXPECT_EQ(val, c.getValue(key));
321 EXPECT_NE(
322 string::npos, str.find(folly::to<string>("[", key, ":", val, "]")));
323 }
324 }
325
326 class Integer {
327 public:
Integer(KeyT v=0)328 explicit Integer(KeyT v = 0) : v_(v) {}
329
330 Integer(const Integer&) = default;
331
operator =(const Integer & a)332 Integer& operator=(const Integer& a) {
333 static bool throwException_ = false;
334 throwException_ = !throwException_;
335 if (throwException_) {
336 throw 1;
337 }
338 v_ = a.v_;
339 return *this;
340 }
341
operator ==(const Integer & a) const342 bool operator==(const Integer& a) const { return v_ == a.v_; }
343
344 private:
345 KeyT v_;
346 };
347
TEST(Ahm,map_exception_safety)348 TEST(Ahm, map_exception_safety) {
349 typedef AtomicHashMap<KeyT, Integer> MyMapT;
350
351 int numEntries = 10000;
352 float sizeFactor = 0.46f;
353 std::unique_ptr<MyMapT> m(new MyMapT(int(numEntries * sizeFactor)));
354
355 bool success = true;
356 int count = 0;
357 for (int i = 0; i < numEntries; i++) {
358 try {
359 m->insert(i, Integer(genVal(i)));
360 success &= (m->find(i)->second == Integer(genVal(i)));
361 ++count;
362 } catch (...) {
363 success &= !m->count(i);
364 }
365 }
366 EXPECT_EQ(count, m->size());
367 EXPECT_TRUE(success);
368 }
369
TEST(Ahm,basicErase)370 TEST(Ahm, basicErase) {
371 size_t numEntries = 3000;
372
373 std::unique_ptr<AHMapT> s(new AHMapT(numEntries, config));
374 // Iterate filling up the map and deleting all keys a few times
375 // to test more than one subMap.
376 for (int iterations = 0; iterations < 4; ++iterations) {
377 // Testing insertion of keys
378 bool success = true;
379 for (size_t i = 0; i < numEntries; ++i) {
380 success &= !(s->count(i));
381 auto ret = s->insert(RecordT(i, i));
382 success &= s->count(i);
383 success &= ret.second;
384 }
385 EXPECT_TRUE(success);
386 EXPECT_EQ(s->size(), numEntries);
387
388 // Delete every key in the map and verify that the key is gone and the
389 // size is correct.
390 success = true;
391 for (size_t i = 0; i < numEntries; ++i) {
392 success &= s->erase(i);
393 success &= (s->size() == numEntries - 1 - i);
394 success &= !(s->count(i));
395 success &= !(s->erase(i));
396 }
397 EXPECT_TRUE(success);
398 }
399 VLOG(1) << "Final number of subMaps = " << s->numSubMaps();
400 }
401
402 namespace {
403
randomizeKey(int key)404 inline KeyT randomizeKey(int key) {
405 // We deterministically randomize the key to more accurately simulate
406 // real-world usage, and to avoid pathalogical performance patterns (e.g.
407 // those related to std::hash<int64_t>()(1) == 1).
408 //
409 // Use a hash function we don't normally use for ints to avoid interactions.
410 return folly::hash::jenkins_rev_mix32(key);
411 }
412
413 int numOpsPerThread = 0;
414
insertThread(void * jj)415 void* insertThread(void* jj) {
416 int64_t j = (int64_t)jj;
417 for (int i = 0; i < numOpsPerThread; ++i) {
418 KeyT key = randomizeKey(i + j * numOpsPerThread);
419 globalAHM->insert(key, genVal(key));
420 }
421 return nullptr;
422 }
423
qpInsertThread(void * jj)424 void* qpInsertThread(void* jj) {
425 int64_t j = (int64_t)jj;
426 for (int i = 0; i < numOpsPerThread; ++i) {
427 KeyT key = randomizeKey(i + j * numOpsPerThread);
428 globalQPAHM->insert(key, genVal(key));
429 }
430 return nullptr;
431 }
432
insertThreadArr(void * jj)433 void* insertThreadArr(void* jj) {
434 int64_t j = (int64_t)jj;
435 for (int i = 0; i < numOpsPerThread; ++i) {
436 KeyT key = randomizeKey(i + j * numOpsPerThread);
437 globalAHA->insert(std::make_pair(key, genVal(key)));
438 }
439 return nullptr;
440 }
441
442 std::atomic<bool> runThreadsCreatedAllThreads;
runThreads(void * (* mainFunc)(void *),int numThreads,void ** statuses)443 void runThreads(void* (*mainFunc)(void*), int numThreads, void** statuses) {
444 folly::BenchmarkSuspender susp;
445 runThreadsCreatedAllThreads.store(false);
446 vector<std::thread> threads;
447 for (int64_t j = 0; j < numThreads; j++) {
448 threads.emplace_back([statuses, mainFunc, j]() {
449 auto ret = mainFunc((void*)j);
450 if (statuses != nullptr) {
451 statuses[j] = ret;
452 }
453 });
454 }
455 susp.dismiss();
456
457 runThreadsCreatedAllThreads.store(true);
458 for (size_t i = 0; i < threads.size(); ++i) {
459 threads[i].join();
460 }
461 }
462
runThreads(void * (* mainFunc)(void *))463 void runThreads(void* (*mainFunc)(void*)) {
464 runThreads(mainFunc, FLAGS_numThreads, nullptr);
465 }
466
467 } // namespace
468
TEST(Ahm,collision_test)469 TEST(Ahm, collision_test) {
470 const int numInserts = 1000000 / 4;
471
472 // Doing the same number on each thread so we collide.
473 numOpsPerThread = numInserts;
474
475 float sizeFactor = 0.46f;
476 int entrySize = sizeof(KeyT) + sizeof(ValueT);
477 VLOG(1) << "Testing " << numInserts << " unique " << entrySize
478 << " Byte entries replicated in " << FLAGS_numThreads
479 << " threads with " << FLAGS_maxLoadFactor * 100.0
480 << "% max load factor.";
481
482 globalAHM = std::make_unique<AHMapT>(int(numInserts * sizeFactor), config);
483
484 size_t sizeInit = globalAHM->capacity();
485 VLOG(1) << " Initial capacity: " << sizeInit;
486
487 double start = nowInUsec();
488 runThreads([](void*) -> void* { // collisionInsertThread
489 for (int i = 0; i < numOpsPerThread; ++i) {
490 KeyT key = randomizeKey(i);
491 globalAHM->insert(key, genVal(key));
492 }
493 return nullptr;
494 });
495 double elapsed = nowInUsec() - start;
496
497 size_t finalCap = globalAHM->capacity();
498 size_t sizeAHM = globalAHM->size();
499 VLOG(1) << elapsed / sizeAHM << " usec per " << FLAGS_numThreads
500 << " duplicate inserts (atomic).";
501 VLOG(1) << " Final capacity: " << finalCap << " in "
502 << globalAHM->numSubMaps() << " sub maps ("
503 << sizeAHM * 100 / finalCap << "% load factor, "
504 << (finalCap - sizeInit) * 100 / sizeInit << "% growth).";
505
506 // check correctness
507 EXPECT_EQ(sizeAHM, numInserts);
508 bool success = true;
509 for (int i = 0; i < numInserts; ++i) {
510 KeyT key = randomizeKey(i);
511 success &= (globalAHM->find(key)->second == genVal(key));
512 }
513 EXPECT_TRUE(success);
514
515 // check colliding finds
516 start = nowInUsec();
517 runThreads([](void*) -> void* { // collisionFindThread
518 KeyT key(0);
519 for (int i = 0; i < numOpsPerThread; ++i) {
520 globalAHM->find(key);
521 }
522 return nullptr;
523 });
524
525 elapsed = nowInUsec() - start;
526
527 VLOG(1) << elapsed / sizeAHM << " usec per " << FLAGS_numThreads
528 << " duplicate finds (atomic).";
529 }
530
531 namespace {
532
533 const int kInsertPerThread = 100000;
534 int raceFinalSizeEstimate;
535
raceIterateThread()536 void raceIterateThread() {
537 int count = 0;
538
539 AHMapT::iterator it = globalAHM->begin();
540 AHMapT::iterator end = globalAHM->end();
541 for (; it != end; ++it) {
542 ++count;
543 if (count > raceFinalSizeEstimate) {
544 EXPECT_FALSE("Infinite loop in iterator.");
545 return;
546 }
547 }
548 }
549
raceInsertRandomThread()550 void raceInsertRandomThread() {
551 for (int i = 0; i < kInsertPerThread; ++i) {
552 KeyT key = rand();
553 globalAHM->insert(key, genVal(key));
554 }
555 }
556
557 } // namespace
558
559 // Test for race conditions when inserting and iterating at the same time and
560 // creating multiple submaps.
TEST(Ahm,race_insert_iterate_thread_test)561 TEST(Ahm, race_insert_iterate_thread_test) {
562 const int kInsertThreads = 20;
563 const int kIterateThreads = 20;
564 raceFinalSizeEstimate = kInsertThreads * kInsertPerThread;
565
566 VLOG(1) << "Testing iteration and insertion with " << kInsertThreads
567 << " threads inserting and " << kIterateThreads
568 << " threads iterating.";
569
570 globalAHM = std::make_unique<AHMapT>(raceFinalSizeEstimate / 9, config);
571
572 vector<std::thread> threads;
573 for (auto j = 0u; j < kInsertThreads; ++j) {
574 threads.emplace_back(raceInsertRandomThread);
575 }
576 for (auto j = 0u; j < kIterateThreads; ++j) {
577 threads.emplace_back(raceIterateThread);
578 }
579 for (auto& thread : threads) {
580 thread.join();
581 }
582 VLOG(1) << "Ended up with " << globalAHM->numSubMaps() << " submaps";
583 VLOG(1) << "Final size of map " << globalAHM->size();
584 }
585
586 namespace {
587
588 const int kTestEraseInsertions = 200000;
589 std::atomic<int32_t> insertedLevel;
590
testEraseInsertThread(void *)591 void* testEraseInsertThread(void*) {
592 for (int i = 0; i < kTestEraseInsertions; ++i) {
593 KeyT key = randomizeKey(i);
594 globalAHM->insert(key, genVal(key));
595 insertedLevel.store(i, std::memory_order_release);
596 }
597 insertedLevel.store(kTestEraseInsertions, std::memory_order_release);
598 return nullptr;
599 }
600
testEraseEraseThread(void *)601 void* testEraseEraseThread(void*) {
602 for (int i = 0; i < kTestEraseInsertions; ++i) {
603 /*
604 * Make sure that we don't get ahead of the insert thread, because
605 * part of the condition for this unit test succeeding is that the
606 * map ends up empty.
607 *
608 * Note, there is a subtle case here when a new submap is
609 * allocated: the erasing thread might get 0 from count(key)
610 * because it hasn't seen numSubMaps_ update yet. To avoid this
611 * race causing problems for the test (it's ok for real usage), we
612 * lag behind the inserter by more than just element.
613 */
614 const int lag = 10;
615 int currentLevel;
616 do {
617 currentLevel = insertedLevel.load(std::memory_order_acquire);
618 if (currentLevel == kTestEraseInsertions) {
619 currentLevel += lag + 1;
620 }
621 } while (currentLevel - lag < i);
622
623 KeyT key = randomizeKey(i);
624 while (globalAHM->count(key)) {
625 if (globalAHM->erase(key)) {
626 break;
627 }
628 }
629 }
630 return nullptr;
631 }
632
633 } // namespace
634
635 // Here we have a single thread inserting some values, and several threads
636 // racing to delete the values in the order they were inserted.
TEST(Ahm,thread_erase_insert_race)637 TEST(Ahm, thread_erase_insert_race) {
638 const int kInsertThreads = 1;
639 const int kEraseThreads = 10;
640
641 VLOG(1) << "Testing insertion and erase with " << kInsertThreads
642 << " thread inserting and " << kEraseThreads << " threads erasing.";
643
644 globalAHM = std::make_unique<AHMapT>(kTestEraseInsertions / 4, config);
645
646 vector<pthread_t> threadIds;
647 for (int64_t j = 0; j < kInsertThreads + kEraseThreads; j++) {
648 pthread_t tid;
649 void* (*thread)(void*) =
650 (j < kInsertThreads ? testEraseInsertThread : testEraseEraseThread);
651 if (pthread_create(&tid, nullptr, thread, (void*)j) != 0) {
652 LOG(ERROR) << "Could not start thread";
653 } else {
654 threadIds.push_back(tid);
655 }
656 }
657 for (size_t i = 0; i < threadIds.size(); i++) {
658 pthread_join(threadIds[i], nullptr);
659 }
660
661 EXPECT_TRUE(globalAHM->empty());
662 EXPECT_EQ(globalAHM->size(), 0);
663
664 VLOG(1) << "Ended up with " << globalAHM->numSubMaps() << " submaps";
665 }
666
667 // Repro for T#483734: Duplicate AHM inserts due to incorrect AHA return value.
668 typedef AtomicHashArray<int32_t, int32_t> AHA;
669 AHA::Config configRace;
670 auto atomicHashArrayInsertRaceArray = AHA::create(2, configRace);
atomicHashArrayInsertRaceThread(void *)671 void* atomicHashArrayInsertRaceThread(void* /* j */) {
672 AHA* arr = atomicHashArrayInsertRaceArray.get();
673 uintptr_t numInserted = 0;
674 while (!runThreadsCreatedAllThreads.load()) {
675 ;
676 }
677 for (int i = 0; i < 2; i++) {
678 if (arr->insert(RecordT(randomizeKey(i), 0)).first != arr->end()) {
679 numInserted++;
680 }
681 }
682 return (void*)numInserted;
683 }
TEST(Ahm,atomic_hash_array_insert_race)684 TEST(Ahm, atomic_hash_array_insert_race) {
685 AHA* arr = atomicHashArrayInsertRaceArray.get();
686 int numIterations = 5000;
687 constexpr int numThreads = 4;
688 void* statuses[numThreads];
689 for (int i = 0; i < numIterations; i++) {
690 arr->clear();
691 runThreads(atomicHashArrayInsertRaceThread, numThreads, statuses);
692 EXPECT_GE(arr->size(), 1);
693 for (int j = 0; j < numThreads; j++) {
694 EXPECT_EQ(arr->size(), uintptr_t(statuses[j]));
695 }
696 }
697 }
698
699 // Repro for T#5841499. Race between erase() and find() on the same key.
TEST(Ahm,erase_find_race)700 TEST(Ahm, erase_find_race) {
701 const uint64_t limit = 10000;
702 AtomicHashMap<uint64_t, uint64_t> map(limit + 10);
703 std::atomic<uint64_t> key{1};
704
705 // Invariant: all values are equal to their keys.
706 // At any moment there is one or two consecutive keys in the map.
707
708 std::thread write_thread([&]() {
709 while (true) {
710 uint64_t k = ++key;
711 if (k > limit) {
712 break;
713 }
714 map.insert(k + 1, k + 1);
715 map.erase(k);
716 }
717 });
718
719 std::thread read_thread([&]() {
720 while (true) {
721 uint64_t k = key.load();
722 if (k > limit) {
723 break;
724 }
725
726 auto it = map.find(k);
727 if (it != map.end()) {
728 ASSERT_EQ(k, it->second);
729 }
730 }
731 });
732
733 read_thread.join();
734 write_thread.join();
735 }
736
737 // Erase right after insert race bug repro (t9130653)
TEST(Ahm,erase_after_insert_race)738 TEST(Ahm, erase_after_insert_race) {
739 const uint64_t limit = 10000;
740 const size_t num_threads = 100;
741 const size_t num_iters = 500;
742 AtomicHashMap<uint64_t, uint64_t> map(limit + 10);
743
744 std::atomic<bool> go{false};
745 std::vector<std::thread> ts;
746 for (size_t i = 0; i < num_threads; ++i) {
747 ts.emplace_back([&]() {
748 while (!go) {
749 continue;
750 }
751 for (size_t n = 0; n < num_iters; ++n) {
752 map.erase(1);
753 map.insert(1, 1);
754 }
755 });
756 }
757
758 go = true;
759
760 for (auto& t : ts) {
761 t.join();
762 }
763 }
764
765 // Repro for a bug when iterator didn't skip empty submaps.
TEST(Ahm,iterator_skips_empty_submaps)766 TEST(Ahm, iterator_skips_empty_submaps) {
767 AtomicHashMap<uint64_t, uint64_t>::Config conf;
768 conf.growthFactor = 1;
769
770 AtomicHashMap<uint64_t, uint64_t> map(1, conf);
771
772 map.insert(1, 1);
773 map.insert(2, 2);
774 map.insert(3, 3);
775
776 map.erase(2);
777
778 auto it = map.find(1);
779
780 ASSERT_NE(map.end(), it);
781 ASSERT_EQ(1, it->first);
782 ASSERT_EQ(1, it->second);
783
784 ++it;
785
786 ASSERT_NE(map.end(), it);
787 ASSERT_EQ(3, it->first);
788 ASSERT_EQ(3, it->second);
789
790 ++it;
791 ASSERT_EQ(map.end(), it);
792 }
793
794 namespace {
795
loadGlobalAha()796 void loadGlobalAha() {
797 std::cout << "loading global AHA with " << FLAGS_numThreads
798 << " threads...\n";
799 uint64_t start = nowInUsec();
800 globalAHA = AHArrayT::create(maxBMElements, config);
801 numOpsPerThread = FLAGS_numBMElements / FLAGS_numThreads;
802 CHECK_EQ(0, FLAGS_numBMElements % FLAGS_numThreads)
803 << "kNumThreads must evenly divide kNumInserts.";
804 runThreads(insertThreadArr);
805 uint64_t elapsed = nowInUsec() - start;
806 std::cout << " took " << elapsed / 1000 << " ms ("
807 << (elapsed * 1000 / FLAGS_numBMElements) << " ns/insert).\n";
808 EXPECT_EQ(globalAHA->size(), FLAGS_numBMElements);
809 }
810
loadGlobalAhm()811 void loadGlobalAhm() {
812 std::cout << "loading global AHM with " << FLAGS_numThreads
813 << " threads...\n";
814 uint64_t start = nowInUsec();
815 globalAHM = std::make_unique<AHMapT>(maxBMElements, config);
816 numOpsPerThread = FLAGS_numBMElements / FLAGS_numThreads;
817 runThreads(insertThread);
818 uint64_t elapsed = nowInUsec() - start;
819 std::cout << " took " << elapsed / 1000 << " ms ("
820 << (elapsed * 1000 / FLAGS_numBMElements) << " ns/insert).\n";
821 EXPECT_EQ(globalAHM->size(), FLAGS_numBMElements);
822 }
823
loadGlobalQPAhm()824 void loadGlobalQPAhm() {
825 std::cout << "loading global QPAHM with " << FLAGS_numThreads
826 << " threads...\n";
827 uint64_t start = nowInUsec();
828 globalQPAHM = std::make_unique<QPAHMapT>(maxBMElements, qpConfig);
829 numOpsPerThread = FLAGS_numBMElements / FLAGS_numThreads;
830 runThreads(qpInsertThread);
831 uint64_t elapsed = nowInUsec() - start;
832 std::cout << " took " << elapsed / 1000 << " ms ("
833 << (elapsed * 1000 / FLAGS_numBMElements) << " ns/insert).\n";
834 EXPECT_EQ(globalQPAHM->size(), FLAGS_numBMElements);
835 }
836
837 } // namespace
838
BENCHMARK(st_aha_find,iters)839 BENCHMARK(st_aha_find, iters) {
840 CHECK_LE(iters, FLAGS_numBMElements);
841 for (size_t i = 0; i < iters; i++) {
842 KeyT key = randomizeKey(i);
843 folly::doNotOptimizeAway(globalAHA->find(key)->second);
844 }
845 }
846
BENCHMARK(st_ahm_find,iters)847 BENCHMARK(st_ahm_find, iters) {
848 CHECK_LE(iters, FLAGS_numBMElements);
849 for (size_t i = 0; i < iters; i++) {
850 KeyT key = randomizeKey(i);
851 folly::doNotOptimizeAway(globalAHM->find(key)->second);
852 }
853 }
854
BENCHMARK(st_qpahm_find,iters)855 BENCHMARK(st_qpahm_find, iters) {
856 CHECK_LE(iters, FLAGS_numBMElements);
857 for (size_t i = 0; i < iters; i++) {
858 KeyT key = randomizeKey(i);
859 folly::doNotOptimizeAway(globalQPAHM->find(key)->second);
860 }
861 }
862
863 BENCHMARK_DRAW_LINE();
864
BENCHMARK(mt_ahm_miss,iters)865 BENCHMARK(mt_ahm_miss, iters) {
866 CHECK_LE(iters, FLAGS_numBMElements);
867 numOpsPerThread = iters / FLAGS_numThreads;
868 runThreads([](void* jj) -> void* {
869 int64_t j = (int64_t)jj;
870 while (!runThreadsCreatedAllThreads.load()) {
871 ;
872 }
873 for (int i = 0; i < numOpsPerThread; ++i) {
874 KeyT key = i + j * numOpsPerThread * 100;
875 folly::doNotOptimizeAway(globalAHM->find(key) == globalAHM->end());
876 }
877 return nullptr;
878 });
879 }
880
BENCHMARK(mt_qpahm_miss,iters)881 BENCHMARK(mt_qpahm_miss, iters) {
882 CHECK_LE(iters, FLAGS_numBMElements);
883 numOpsPerThread = iters / FLAGS_numThreads;
884 runThreads([](void* jj) -> void* {
885 int64_t j = (int64_t)jj;
886 while (!runThreadsCreatedAllThreads.load()) {
887 ;
888 }
889 for (int i = 0; i < numOpsPerThread; ++i) {
890 KeyT key = i + j * numOpsPerThread * 100;
891 folly::doNotOptimizeAway(globalQPAHM->find(key) == globalQPAHM->end());
892 }
893 return nullptr;
894 });
895 }
896
BENCHMARK(st_ahm_miss,iters)897 BENCHMARK(st_ahm_miss, iters) {
898 CHECK_LE(iters, FLAGS_numBMElements);
899 for (size_t i = 0; i < iters; i++) {
900 KeyT key = randomizeKey(i + iters * 100);
901 folly::doNotOptimizeAway(globalAHM->find(key) == globalAHM->end());
902 }
903 }
904
BENCHMARK(st_qpahm_miss,iters)905 BENCHMARK(st_qpahm_miss, iters) {
906 CHECK_LE(iters, FLAGS_numBMElements);
907 for (size_t i = 0; i < iters; i++) {
908 KeyT key = randomizeKey(i + iters * 100);
909 folly::doNotOptimizeAway(globalQPAHM->find(key) == globalQPAHM->end());
910 }
911 }
912
BENCHMARK(mt_ahm_find_insert_mix,iters)913 BENCHMARK(mt_ahm_find_insert_mix, iters) {
914 CHECK_LE(iters, FLAGS_numBMElements);
915 numOpsPerThread = iters / FLAGS_numThreads;
916 runThreads([](void* jj) -> void* {
917 int64_t j = (int64_t)jj;
918 while (!runThreadsCreatedAllThreads.load()) {
919 ;
920 }
921 for (int i = 0; i < numOpsPerThread; ++i) {
922 if (i % 128) { // ~1% insert mix
923 KeyT key = randomizeKey(i + j * numOpsPerThread);
924 folly::doNotOptimizeAway(globalAHM->find(key)->second);
925 } else {
926 KeyT key = randomizeKey(i + j * numOpsPerThread * 100);
927 globalAHM->insert(key, genVal(key));
928 }
929 }
930 return nullptr;
931 });
932 }
933
BENCHMARK(mt_qpahm_find_insert_mix,iters)934 BENCHMARK(mt_qpahm_find_insert_mix, iters) {
935 CHECK_LE(iters, FLAGS_numBMElements);
936 numOpsPerThread = iters / FLAGS_numThreads;
937 runThreads([](void* jj) -> void* {
938 int64_t j = (int64_t)jj;
939 while (!runThreadsCreatedAllThreads.load()) {
940 ;
941 }
942 for (int i = 0; i < numOpsPerThread; ++i) {
943 if (i % 128) { // ~1% insert mix
944 KeyT key = randomizeKey(i + j * numOpsPerThread);
945 folly::doNotOptimizeAway(globalQPAHM->find(key)->second);
946 } else {
947 KeyT key = randomizeKey(i + j * numOpsPerThread * 100);
948 globalQPAHM->insert(key, genVal(key));
949 }
950 }
951 return nullptr;
952 });
953 }
954
BENCHMARK(mt_aha_find,iters)955 BENCHMARK(mt_aha_find, iters) {
956 CHECK_LE(iters, FLAGS_numBMElements);
957 numOpsPerThread = iters / FLAGS_numThreads;
958 runThreads([](void* jj) -> void* {
959 int64_t j = (int64_t)jj;
960 while (!runThreadsCreatedAllThreads.load()) {
961 ;
962 }
963 for (int i = 0; i < numOpsPerThread; ++i) {
964 KeyT key = randomizeKey(i + j * numOpsPerThread);
965 folly::doNotOptimizeAway(globalAHA->find(key)->second);
966 }
967 return nullptr;
968 });
969 }
970
BENCHMARK(mt_ahm_find,iters)971 BENCHMARK(mt_ahm_find, iters) {
972 CHECK_LE(iters, FLAGS_numBMElements);
973 numOpsPerThread = iters / FLAGS_numThreads;
974 runThreads([](void* jj) -> void* {
975 int64_t j = (int64_t)jj;
976 while (!runThreadsCreatedAllThreads.load()) {
977 ;
978 }
979 for (int i = 0; i < numOpsPerThread; ++i) {
980 KeyT key = randomizeKey(i + j * numOpsPerThread);
981 folly::doNotOptimizeAway(globalAHM->find(key)->second);
982 }
983 return nullptr;
984 });
985 }
986
BENCHMARK(mt_qpahm_find,iters)987 BENCHMARK(mt_qpahm_find, iters) {
988 CHECK_LE(iters, FLAGS_numBMElements);
989 numOpsPerThread = iters / FLAGS_numThreads;
990 runThreads([](void* jj) -> void* {
991 int64_t j = (int64_t)jj;
992 while (!runThreadsCreatedAllThreads.load()) {
993 ;
994 }
995 for (int i = 0; i < numOpsPerThread; ++i) {
996 KeyT key = randomizeKey(i + j * numOpsPerThread);
997 folly::doNotOptimizeAway(globalQPAHM->find(key)->second);
998 }
999 return nullptr;
1000 });
1001 }
1002
1003 KeyT k;
BENCHMARK(st_baseline_modulus_and_random,iters)1004 BENCHMARK(st_baseline_modulus_and_random, iters) {
1005 for (size_t i = 0; i < iters; ++i) {
1006 k = randomizeKey(i) % iters;
1007 }
1008 }
1009
1010 // insertions go last because they reset the map
1011
BENCHMARK(mt_ahm_insert,iters)1012 BENCHMARK(mt_ahm_insert, iters) {
1013 BENCHMARK_SUSPEND {
1014 globalAHM = std::make_unique<AHMapT>(int(iters * LF), config);
1015 numOpsPerThread = iters / FLAGS_numThreads;
1016 }
1017 runThreads(insertThread);
1018 }
1019
BENCHMARK(mt_qpahm_insert,iters)1020 BENCHMARK(mt_qpahm_insert, iters) {
1021 BENCHMARK_SUSPEND {
1022 globalQPAHM = std::make_unique<QPAHMapT>(int(iters * LF), qpConfig);
1023 numOpsPerThread = iters / FLAGS_numThreads;
1024 }
1025 runThreads(qpInsertThread);
1026 }
1027
BENCHMARK(st_ahm_insert,iters)1028 BENCHMARK(st_ahm_insert, iters) {
1029 folly::BenchmarkSuspender susp;
1030 std::unique_ptr<AHMapT> ahm(new AHMapT(int(iters * LF), config));
1031 susp.dismiss();
1032
1033 for (size_t i = 0; i < iters; i++) {
1034 KeyT key = randomizeKey(i);
1035 ahm->insert(key, genVal(key));
1036 }
1037 }
1038
BENCHMARK(st_qpahm_insert,iters)1039 BENCHMARK(st_qpahm_insert, iters) {
1040 folly::BenchmarkSuspender susp;
1041 std::unique_ptr<QPAHMapT> ahm(new QPAHMapT(int(iters * LF), qpConfig));
1042 susp.dismiss();
1043
1044 for (size_t i = 0; i < iters; i++) {
1045 KeyT key = randomizeKey(i);
1046 ahm->insert(key, genVal(key));
1047 }
1048 }
1049
benchmarkSetup()1050 void benchmarkSetup() {
1051 config.maxLoadFactor = FLAGS_maxLoadFactor;
1052 qpConfig.maxLoadFactor = FLAGS_maxLoadFactor;
1053 configRace.maxLoadFactor = 0.5;
1054 int numCores = sysconf(_SC_NPROCESSORS_ONLN);
1055 loadGlobalAha();
1056 loadGlobalAhm();
1057 loadGlobalQPAhm();
1058 string numIters =
1059 folly::to<string>(std::min(1000000, int(FLAGS_numBMElements)));
1060
1061 gflags::SetCommandLineOptionWithMode(
1062 "bm_max_iters", numIters.c_str(), gflags::SET_FLAG_IF_DEFAULT);
1063 gflags::SetCommandLineOptionWithMode(
1064 "bm_min_iters", numIters.c_str(), gflags::SET_FLAG_IF_DEFAULT);
1065 string numCoresStr = folly::to<string>(numCores);
1066 gflags::SetCommandLineOptionWithMode(
1067 "numThreads", numCoresStr.c_str(), gflags::SET_FLAG_IF_DEFAULT);
1068
1069 std::cout << "\nRunning AHM benchmarks on machine with " << numCores
1070 << " logical cores.\n"
1071 " num elements per map: "
1072 << FLAGS_numBMElements << "\n"
1073 << " num threads for mt tests: " << FLAGS_numThreads << "\n"
1074 << " AHM load factor: " << FLAGS_targetLoadFactor << "\n\n";
1075 }
1076
main(int argc,char ** argv)1077 int main(int argc, char** argv) {
1078 testing::InitGoogleTest(&argc, argv);
1079 gflags::ParseCommandLineFlags(&argc, &argv, true);
1080 auto ret = RUN_ALL_TESTS();
1081 if (!ret && FLAGS_benchmark) {
1082 benchmarkSetup();
1083 folly::runBenchmarks();
1084 }
1085 return ret;
1086 }
1087
1088 /*
1089 loading global AHA with 8 threads...
1090 took 487 ms (40 ns/insert).
1091 loading global AHM with 8 threads...
1092 took 478 ms (39 ns/insert).
1093 loading global QPAHM with 8 threads...
1094 took 478 ms (39 ns/insert).
1095
1096 Running AHM benchmarks on machine with 24 logical cores.
1097 num elements per map: 12000000
1098 num threads for mt tests: 24
1099 AHM load factor: 0.75
1100
1101 ============================================================================
1102 folly/test/AtomicHashMapTest.cpp relative time/iter iters/s
1103 ============================================================================
1104 st_aha_find 92.63ns 10.80M
1105 st_ahm_find 107.78ns 9.28M
1106 st_qpahm_find 90.69ns 11.03M
1107 ----------------------------------------------------------------------------
1108 mt_ahm_miss 2.09ns 477.36M
1109 mt_qpahm_miss 1.37ns 728.82M
1110 st_ahm_miss 241.07ns 4.15M
1111 st_qpahm_miss 223.17ns 4.48M
1112 mt_ahm_find_insert_mix 8.05ns 124.24M
1113 mt_qpahm_find_insert_mix 9.10ns 109.85M
1114 mt_aha_find 6.82ns 146.68M
1115 mt_ahm_find 7.95ns 125.77M
1116 mt_qpahm_find 6.81ns 146.83M
1117 st_baseline_modulus_and_random 6.02ns 166.03M
1118 mt_ahm_insert 14.29ns 69.97M
1119 mt_qpahm_insert 11.68ns 85.61M
1120 st_ahm_insert 125.39ns 7.98M
1121 st_qpahm_insert 128.76ns 7.77M
1122 ============================================================================
1123 */
1124