1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <folly/AtomicHashMap.h>
18 
19 #include <atomic>
20 #include <memory>
21 #include <thread>
22 
23 #include <glog/logging.h>
24 
25 #include <folly/Benchmark.h>
26 #include <folly/Conv.h>
27 #include <folly/portability/Atomic.h>
28 #include <folly/portability/GTest.h>
29 #include <folly/portability/SysTime.h>
30 
31 using folly::AtomicHashArray;
32 using folly::AtomicHashMap;
33 using folly::StringPiece;
34 using std::string;
35 using std::vector;
36 
37 // Tunables:
38 DEFINE_double(targetLoadFactor, 0.75, "Target memory utilization fraction.");
39 DEFINE_double(maxLoadFactor, 0.80, "Max before growth.");
40 DEFINE_int32(numThreads, 8, "Threads to use for concurrency tests.");
41 DEFINE_int64(numBMElements, 12 * 1000 * 1000, "Size of maps for benchmarks.");
42 
43 const double LF = FLAGS_maxLoadFactor / FLAGS_targetLoadFactor;
44 const int maxBMElements = int(FLAGS_numBMElements * LF); // hit our target LF.
45 
nowInUsec()46 static int64_t nowInUsec() {
47   timeval tv;
48   gettimeofday(&tv, nullptr);
49   return int64_t(tv.tv_sec) * 1000 * 1000 + tv.tv_usec;
50 }
51 
TEST(Ahm,BasicStrings)52 TEST(Ahm, BasicStrings) {
53   typedef AtomicHashMap<int64_t, string> AHM;
54   AHM myMap(1024);
55   EXPECT_TRUE(myMap.begin() == myMap.end());
56 
57   for (int i = 0; i < 100; ++i) {
58     myMap.insert(make_pair(i, folly::to<string>(i)));
59   }
60   for (int i = 0; i < 100; ++i) {
61     EXPECT_EQ(myMap.find(i)->second, folly::to<string>(i));
62   }
63 
64   myMap.insert(std::make_pair(999, "A"));
65   myMap.insert(std::make_pair(999, "B"));
66   EXPECT_EQ(myMap.find(999)->second, "A"); // shouldn't have overwritten
67   myMap.find(999)->second = "B";
68   myMap.find(999)->second = "C";
69   EXPECT_EQ(myMap.find(999)->second, "C");
70   EXPECT_EQ(myMap.find(999)->first, 999);
71 }
72 
TEST(Ahm,BasicNoncopyable)73 TEST(Ahm, BasicNoncopyable) {
74   typedef AtomicHashMap<int64_t, std::unique_ptr<int>> AHM;
75   AHM myMap(1024);
76   EXPECT_TRUE(myMap.begin() == myMap.end());
77 
78   for (int i = 0; i < 50; ++i) {
79     myMap.insert(make_pair(i, std::make_unique<int>(i)));
80   }
81   for (int i = 50; i < 100; ++i) {
82     myMap.insert(i, std::make_unique<int>(i));
83   }
84   for (int i = 100; i < 150; ++i) {
85     myMap.emplace(i, new int(i));
86   }
87   for (int i = 150; i < 200; ++i) {
88     myMap.emplace(i, new int(i), std::default_delete<int>());
89   }
90   for (int i = 0; i < 200; ++i) {
91     EXPECT_EQ(*(myMap.find(i)->second), i);
92   }
93   for (int i = 0; i < 200; i += 4) {
94     myMap.erase(i);
95   }
96   for (int i = 0; i < 200; i += 4) {
97     EXPECT_EQ(myMap.find(i), myMap.end());
98   }
99 }
100 
101 typedef int32_t KeyT;
102 typedef int32_t ValueT;
103 
104 typedef AtomicHashMap<KeyT, ValueT> AHMapT;
105 typedef AHMapT::value_type RecordT;
106 typedef AtomicHashArray<KeyT, ValueT> AHArrayT;
107 AHArrayT::Config config;
108 typedef folly::QuadraticProbingAtomicHashMap<KeyT, ValueT> QPAHMapT;
109 QPAHMapT::Config qpConfig;
110 static AHArrayT::SmartPtr globalAHA(nullptr);
111 static std::unique_ptr<AHMapT> globalAHM;
112 static std::unique_ptr<QPAHMapT> globalQPAHM;
113 
114 // Generate a deterministic value based on an input key
genVal(int key)115 static int genVal(int key) {
116   return key / 3;
117 }
118 
119 static bool legalKey(const char* a);
120 
121 struct EqTraits {
operator ()EqTraits122   bool operator()(const char* a, const char* b) {
123     return legalKey(a) && (strcmp(a, b) == 0);
124   }
operator ()EqTraits125   bool operator()(const char* a, const char& b) {
126     return legalKey(a) && (a[0] != '\0') && (a[0] == b);
127   }
operator ()EqTraits128   bool operator()(const char* a, const StringPiece b) {
129     return legalKey(a) && (strlen(a) == b.size()) &&
130         (strcmp(a, b.begin()) == 0);
131   }
132 };
133 
134 struct HashTraits {
operator ()HashTraits135   size_t operator()(const char* a) {
136     size_t result = 0;
137     while (a[0] != 0) {
138       result += static_cast<size_t>(*(a++));
139     }
140     return result;
141   }
operator ()HashTraits142   size_t operator()(const char& a) { return static_cast<size_t>(a); }
operator ()HashTraits143   size_t operator()(const StringPiece a) {
144     size_t result = 0;
145     for (const auto& ch : a) {
146       result += static_cast<size_t>(ch);
147     }
148     return result;
149   }
150 };
151 
152 typedef AtomicHashMap<const char*, int64_t, HashTraits, EqTraits> AHMCstrInt;
153 AHMCstrInt::Config cstrIntCfg;
154 
legalKey(const char * a)155 static bool legalKey(const char* a) {
156   return a != cstrIntCfg.emptyKey && a != cstrIntCfg.lockedKey &&
157       a != cstrIntCfg.erasedKey;
158 }
159 
TEST(Ahm,BasicLookup)160 TEST(Ahm, BasicLookup) {
161   AHMCstrInt myMap(1024, cstrIntCfg);
162   EXPECT_TRUE(myMap.begin() == myMap.end());
163   myMap.insert(std::make_pair("f", 42));
164   EXPECT_EQ(42, myMap.find("f")->second);
165   {
166     // Look up a single char, successfully.
167     auto it = myMap.find<char>('f');
168     EXPECT_EQ(42, it->second);
169   }
170   {
171     // Look up a single char, unsuccessfully.
172     auto it = myMap.find<char>('g');
173     EXPECT_TRUE(it == myMap.end());
174   }
175   {
176     // Look up a string piece, successfully.
177     const StringPiece piece("f");
178     auto it = myMap.find(piece);
179     EXPECT_EQ(42, it->second);
180   }
181 }
182 
TEST(Ahm,grow)183 TEST(Ahm, grow) {
184   VLOG(1) << "Overhead: " << sizeof(AHArrayT) << " (array) "
185           << sizeof(AHMapT) + sizeof(AHArrayT) << " (map/set) Bytes.";
186   uint64_t numEntries = 10000;
187   float sizeFactor = 0.46f;
188 
189   std::unique_ptr<AHMapT> m(new AHMapT(int(numEntries * sizeFactor), config));
190 
191   // load map - make sure we succeed and the index is accurate
192   bool success = true;
193   for (uint64_t i = 0; i < numEntries; i++) {
194     auto ret = m->insert(RecordT(i, genVal(i)));
195     success &= ret.second;
196     success &= (m->findAt(ret.first.getIndex())->second == genVal(i));
197   }
198   // Overwrite vals to make sure there are no dups
199   // Every insert should fail because the keys are already in the map.
200   success = true;
201   for (uint64_t i = 0; i < numEntries; i++) {
202     auto ret = m->insert(RecordT(i, genVal(i * 2)));
203     success &= (ret.second == false); // fail on collision
204     success &= (ret.first->second == genVal(i)); // return the previous value
205     success &= (m->findAt(ret.first.getIndex())->second == genVal(i));
206   }
207   EXPECT_TRUE(success);
208 
209   // check correctness
210   EXPECT_GT(m->numSubMaps(), 1); // make sure we grew
211   success = true;
212   EXPECT_EQ(m->size(), numEntries);
213   for (size_t i = 0; i < numEntries; i++) {
214     success &= (m->find(i)->second == genVal(i));
215   }
216   EXPECT_TRUE(success);
217 
218   // Check findAt
219   success = true;
220   AHMapT::const_iterator retIt;
221   for (int32_t i = 0; i < int32_t(numEntries); i++) {
222     retIt = m->find(i);
223     retIt = m->findAt(retIt.getIndex());
224     success &= (retIt->second == genVal(i));
225     // We use a uint32_t index so that this comparison is between two
226     // variables of the same type.
227     success &= (retIt->first == i);
228   }
229   EXPECT_TRUE(success);
230 
231   // Try modifying value
232   m->find(8)->second = 5309;
233   EXPECT_EQ(m->find(8)->second, 5309);
234 
235   // check clear()
236   m->clear();
237   success = true;
238   for (uint64_t i = 0; i < numEntries / 2; i++) {
239     success &= m->insert(RecordT(i, genVal(i))).second;
240   }
241   EXPECT_TRUE(success);
242   EXPECT_EQ(m->size(), numEntries / 2);
243 }
244 
TEST(Ahm,iterator)245 TEST(Ahm, iterator) {
246   int numEntries = 10000;
247   float sizeFactor = .46f;
248   std::unique_ptr<AHMapT> m(new AHMapT(int(numEntries * sizeFactor), config));
249 
250   // load map - make sure we succeed and the index is accurate
251   for (int i = 0; i < numEntries; i++) {
252     m->insert(RecordT(i, genVal(i)));
253   }
254 
255   bool success = true;
256   int count = 0;
257   FOR_EACH (it, *m) {
258     success &= (it->second == genVal(it->first));
259     ++count;
260   }
261   EXPECT_TRUE(success);
262   EXPECT_EQ(count, numEntries);
263 }
264 
265 class Counters {
266  private:
267   // Note: Unfortunately can't currently put a std::atomic<int64_t> in
268   // the value in ahm since it doesn't support types that are both non-copy
269   // and non-move constructible yet.
270   AtomicHashMap<int64_t, int64_t> ahm;
271 
272  public:
Counters(size_t numCounters)273   explicit Counters(size_t numCounters) : ahm(numCounters) {}
274 
increment(int64_t obj_id)275   void increment(int64_t obj_id) {
276     auto ret = ahm.insert(std::make_pair(obj_id, 1));
277     if (!ret.second) {
278       // obj_id already exists, increment count
279       __sync_fetch_and_add(&ret.first->second, 1);
280     }
281   }
282 
getValue(int64_t obj_id)283   int64_t getValue(int64_t obj_id) {
284     auto ret = ahm.find(obj_id);
285     return ret != ahm.end() ? ret->second : 0;
286   }
287 
288   // export the counters without blocking increments
toString()289   string toString() {
290     string ret = "{\n";
291     ret.reserve(ahm.size() * 32);
292     for (const auto& e : ahm) {
293       ret += folly::to<string>("  [", e.first, ":", e.second, "]\n");
294     }
295     ret += "}\n";
296     return ret;
297   }
298 };
299 
300 // If you get an error "terminate called without an active exception", there
301 // might be too many threads getting created - decrease numKeys and/or mult.
TEST(Ahm,counter)302 TEST(Ahm, counter) {
303   const int numKeys = 10;
304   const int mult = 10;
305   Counters c(numKeys);
306   vector<int64_t> keys;
307   FOR_EACH_RANGE (i, 1, numKeys) { keys.push_back(i); }
308   vector<std::thread> threads;
309   for (auto key : keys) {
310     FOR_EACH_RANGE (i, 0, key * mult) {
311       threads.push_back(std::thread([&, key] { c.increment(key); }));
312     }
313   }
314   for (auto& t : threads) {
315     t.join();
316   }
317   string str = c.toString();
318   for (auto key : keys) {
319     int val = key * mult;
320     EXPECT_EQ(val, c.getValue(key));
321     EXPECT_NE(
322         string::npos, str.find(folly::to<string>("[", key, ":", val, "]")));
323   }
324 }
325 
326 class Integer {
327  public:
Integer(KeyT v=0)328   explicit Integer(KeyT v = 0) : v_(v) {}
329 
330   Integer(const Integer&) = default;
331 
operator =(const Integer & a)332   Integer& operator=(const Integer& a) {
333     static bool throwException_ = false;
334     throwException_ = !throwException_;
335     if (throwException_) {
336       throw 1;
337     }
338     v_ = a.v_;
339     return *this;
340   }
341 
operator ==(const Integer & a) const342   bool operator==(const Integer& a) const { return v_ == a.v_; }
343 
344  private:
345   KeyT v_;
346 };
347 
TEST(Ahm,map_exception_safety)348 TEST(Ahm, map_exception_safety) {
349   typedef AtomicHashMap<KeyT, Integer> MyMapT;
350 
351   int numEntries = 10000;
352   float sizeFactor = 0.46f;
353   std::unique_ptr<MyMapT> m(new MyMapT(int(numEntries * sizeFactor)));
354 
355   bool success = true;
356   int count = 0;
357   for (int i = 0; i < numEntries; i++) {
358     try {
359       m->insert(i, Integer(genVal(i)));
360       success &= (m->find(i)->second == Integer(genVal(i)));
361       ++count;
362     } catch (...) {
363       success &= !m->count(i);
364     }
365   }
366   EXPECT_EQ(count, m->size());
367   EXPECT_TRUE(success);
368 }
369 
TEST(Ahm,basicErase)370 TEST(Ahm, basicErase) {
371   size_t numEntries = 3000;
372 
373   std::unique_ptr<AHMapT> s(new AHMapT(numEntries, config));
374   // Iterate filling up the map and deleting all keys a few times
375   // to test more than one subMap.
376   for (int iterations = 0; iterations < 4; ++iterations) {
377     // Testing insertion of keys
378     bool success = true;
379     for (size_t i = 0; i < numEntries; ++i) {
380       success &= !(s->count(i));
381       auto ret = s->insert(RecordT(i, i));
382       success &= s->count(i);
383       success &= ret.second;
384     }
385     EXPECT_TRUE(success);
386     EXPECT_EQ(s->size(), numEntries);
387 
388     // Delete every key in the map and verify that the key is gone and the
389     // size is correct.
390     success = true;
391     for (size_t i = 0; i < numEntries; ++i) {
392       success &= s->erase(i);
393       success &= (s->size() == numEntries - 1 - i);
394       success &= !(s->count(i));
395       success &= !(s->erase(i));
396     }
397     EXPECT_TRUE(success);
398   }
399   VLOG(1) << "Final number of subMaps = " << s->numSubMaps();
400 }
401 
402 namespace {
403 
randomizeKey(int key)404 inline KeyT randomizeKey(int key) {
405   // We deterministically randomize the key to more accurately simulate
406   // real-world usage, and to avoid pathalogical performance patterns (e.g.
407   // those related to std::hash<int64_t>()(1) == 1).
408   //
409   // Use a hash function we don't normally use for ints to avoid interactions.
410   return folly::hash::jenkins_rev_mix32(key);
411 }
412 
413 int numOpsPerThread = 0;
414 
insertThread(void * jj)415 void* insertThread(void* jj) {
416   int64_t j = (int64_t)jj;
417   for (int i = 0; i < numOpsPerThread; ++i) {
418     KeyT key = randomizeKey(i + j * numOpsPerThread);
419     globalAHM->insert(key, genVal(key));
420   }
421   return nullptr;
422 }
423 
qpInsertThread(void * jj)424 void* qpInsertThread(void* jj) {
425   int64_t j = (int64_t)jj;
426   for (int i = 0; i < numOpsPerThread; ++i) {
427     KeyT key = randomizeKey(i + j * numOpsPerThread);
428     globalQPAHM->insert(key, genVal(key));
429   }
430   return nullptr;
431 }
432 
insertThreadArr(void * jj)433 void* insertThreadArr(void* jj) {
434   int64_t j = (int64_t)jj;
435   for (int i = 0; i < numOpsPerThread; ++i) {
436     KeyT key = randomizeKey(i + j * numOpsPerThread);
437     globalAHA->insert(std::make_pair(key, genVal(key)));
438   }
439   return nullptr;
440 }
441 
442 std::atomic<bool> runThreadsCreatedAllThreads;
runThreads(void * (* mainFunc)(void *),int numThreads,void ** statuses)443 void runThreads(void* (*mainFunc)(void*), int numThreads, void** statuses) {
444   folly::BenchmarkSuspender susp;
445   runThreadsCreatedAllThreads.store(false);
446   vector<std::thread> threads;
447   for (int64_t j = 0; j < numThreads; j++) {
448     threads.emplace_back([statuses, mainFunc, j]() {
449       auto ret = mainFunc((void*)j);
450       if (statuses != nullptr) {
451         statuses[j] = ret;
452       }
453     });
454   }
455   susp.dismiss();
456 
457   runThreadsCreatedAllThreads.store(true);
458   for (size_t i = 0; i < threads.size(); ++i) {
459     threads[i].join();
460   }
461 }
462 
runThreads(void * (* mainFunc)(void *))463 void runThreads(void* (*mainFunc)(void*)) {
464   runThreads(mainFunc, FLAGS_numThreads, nullptr);
465 }
466 
467 } // namespace
468 
TEST(Ahm,collision_test)469 TEST(Ahm, collision_test) {
470   const int numInserts = 1000000 / 4;
471 
472   // Doing the same number on each thread so we collide.
473   numOpsPerThread = numInserts;
474 
475   float sizeFactor = 0.46f;
476   int entrySize = sizeof(KeyT) + sizeof(ValueT);
477   VLOG(1) << "Testing " << numInserts << " unique " << entrySize
478           << " Byte entries replicated in " << FLAGS_numThreads
479           << " threads with " << FLAGS_maxLoadFactor * 100.0
480           << "% max load factor.";
481 
482   globalAHM = std::make_unique<AHMapT>(int(numInserts * sizeFactor), config);
483 
484   size_t sizeInit = globalAHM->capacity();
485   VLOG(1) << "  Initial capacity: " << sizeInit;
486 
487   double start = nowInUsec();
488   runThreads([](void*) -> void* { // collisionInsertThread
489     for (int i = 0; i < numOpsPerThread; ++i) {
490       KeyT key = randomizeKey(i);
491       globalAHM->insert(key, genVal(key));
492     }
493     return nullptr;
494   });
495   double elapsed = nowInUsec() - start;
496 
497   size_t finalCap = globalAHM->capacity();
498   size_t sizeAHM = globalAHM->size();
499   VLOG(1) << elapsed / sizeAHM << " usec per " << FLAGS_numThreads
500           << " duplicate inserts (atomic).";
501   VLOG(1) << "  Final capacity: " << finalCap << " in "
502           << globalAHM->numSubMaps() << " sub maps ("
503           << sizeAHM * 100 / finalCap << "% load factor, "
504           << (finalCap - sizeInit) * 100 / sizeInit << "% growth).";
505 
506   // check correctness
507   EXPECT_EQ(sizeAHM, numInserts);
508   bool success = true;
509   for (int i = 0; i < numInserts; ++i) {
510     KeyT key = randomizeKey(i);
511     success &= (globalAHM->find(key)->second == genVal(key));
512   }
513   EXPECT_TRUE(success);
514 
515   // check colliding finds
516   start = nowInUsec();
517   runThreads([](void*) -> void* { // collisionFindThread
518     KeyT key(0);
519     for (int i = 0; i < numOpsPerThread; ++i) {
520       globalAHM->find(key);
521     }
522     return nullptr;
523   });
524 
525   elapsed = nowInUsec() - start;
526 
527   VLOG(1) << elapsed / sizeAHM << " usec per " << FLAGS_numThreads
528           << " duplicate finds (atomic).";
529 }
530 
531 namespace {
532 
533 const int kInsertPerThread = 100000;
534 int raceFinalSizeEstimate;
535 
raceIterateThread()536 void raceIterateThread() {
537   int count = 0;
538 
539   AHMapT::iterator it = globalAHM->begin();
540   AHMapT::iterator end = globalAHM->end();
541   for (; it != end; ++it) {
542     ++count;
543     if (count > raceFinalSizeEstimate) {
544       EXPECT_FALSE("Infinite loop in iterator.");
545       return;
546     }
547   }
548 }
549 
raceInsertRandomThread()550 void raceInsertRandomThread() {
551   for (int i = 0; i < kInsertPerThread; ++i) {
552     KeyT key = rand();
553     globalAHM->insert(key, genVal(key));
554   }
555 }
556 
557 } // namespace
558 
559 // Test for race conditions when inserting and iterating at the same time and
560 // creating multiple submaps.
TEST(Ahm,race_insert_iterate_thread_test)561 TEST(Ahm, race_insert_iterate_thread_test) {
562   const int kInsertThreads = 20;
563   const int kIterateThreads = 20;
564   raceFinalSizeEstimate = kInsertThreads * kInsertPerThread;
565 
566   VLOG(1) << "Testing iteration and insertion with " << kInsertThreads
567           << " threads inserting and " << kIterateThreads
568           << " threads iterating.";
569 
570   globalAHM = std::make_unique<AHMapT>(raceFinalSizeEstimate / 9, config);
571 
572   vector<std::thread> threads;
573   for (auto j = 0u; j < kInsertThreads; ++j) {
574     threads.emplace_back(raceInsertRandomThread);
575   }
576   for (auto j = 0u; j < kIterateThreads; ++j) {
577     threads.emplace_back(raceIterateThread);
578   }
579   for (auto& thread : threads) {
580     thread.join();
581   }
582   VLOG(1) << "Ended up with " << globalAHM->numSubMaps() << " submaps";
583   VLOG(1) << "Final size of map " << globalAHM->size();
584 }
585 
586 namespace {
587 
588 const int kTestEraseInsertions = 200000;
589 std::atomic<int32_t> insertedLevel;
590 
testEraseInsertThread(void *)591 void* testEraseInsertThread(void*) {
592   for (int i = 0; i < kTestEraseInsertions; ++i) {
593     KeyT key = randomizeKey(i);
594     globalAHM->insert(key, genVal(key));
595     insertedLevel.store(i, std::memory_order_release);
596   }
597   insertedLevel.store(kTestEraseInsertions, std::memory_order_release);
598   return nullptr;
599 }
600 
testEraseEraseThread(void *)601 void* testEraseEraseThread(void*) {
602   for (int i = 0; i < kTestEraseInsertions; ++i) {
603     /*
604      * Make sure that we don't get ahead of the insert thread, because
605      * part of the condition for this unit test succeeding is that the
606      * map ends up empty.
607      *
608      * Note, there is a subtle case here when a new submap is
609      * allocated: the erasing thread might get 0 from count(key)
610      * because it hasn't seen numSubMaps_ update yet.  To avoid this
611      * race causing problems for the test (it's ok for real usage), we
612      * lag behind the inserter by more than just element.
613      */
614     const int lag = 10;
615     int currentLevel;
616     do {
617       currentLevel = insertedLevel.load(std::memory_order_acquire);
618       if (currentLevel == kTestEraseInsertions) {
619         currentLevel += lag + 1;
620       }
621     } while (currentLevel - lag < i);
622 
623     KeyT key = randomizeKey(i);
624     while (globalAHM->count(key)) {
625       if (globalAHM->erase(key)) {
626         break;
627       }
628     }
629   }
630   return nullptr;
631 }
632 
633 } // namespace
634 
635 // Here we have a single thread inserting some values, and several threads
636 // racing to delete the values in the order they were inserted.
TEST(Ahm,thread_erase_insert_race)637 TEST(Ahm, thread_erase_insert_race) {
638   const int kInsertThreads = 1;
639   const int kEraseThreads = 10;
640 
641   VLOG(1) << "Testing insertion and erase with " << kInsertThreads
642           << " thread inserting and " << kEraseThreads << " threads erasing.";
643 
644   globalAHM = std::make_unique<AHMapT>(kTestEraseInsertions / 4, config);
645 
646   vector<pthread_t> threadIds;
647   for (int64_t j = 0; j < kInsertThreads + kEraseThreads; j++) {
648     pthread_t tid;
649     void* (*thread)(void*) =
650         (j < kInsertThreads ? testEraseInsertThread : testEraseEraseThread);
651     if (pthread_create(&tid, nullptr, thread, (void*)j) != 0) {
652       LOG(ERROR) << "Could not start thread";
653     } else {
654       threadIds.push_back(tid);
655     }
656   }
657   for (size_t i = 0; i < threadIds.size(); i++) {
658     pthread_join(threadIds[i], nullptr);
659   }
660 
661   EXPECT_TRUE(globalAHM->empty());
662   EXPECT_EQ(globalAHM->size(), 0);
663 
664   VLOG(1) << "Ended up with " << globalAHM->numSubMaps() << " submaps";
665 }
666 
667 // Repro for T#483734: Duplicate AHM inserts due to incorrect AHA return value.
668 typedef AtomicHashArray<int32_t, int32_t> AHA;
669 AHA::Config configRace;
670 auto atomicHashArrayInsertRaceArray = AHA::create(2, configRace);
atomicHashArrayInsertRaceThread(void *)671 void* atomicHashArrayInsertRaceThread(void* /* j */) {
672   AHA* arr = atomicHashArrayInsertRaceArray.get();
673   uintptr_t numInserted = 0;
674   while (!runThreadsCreatedAllThreads.load()) {
675     ;
676   }
677   for (int i = 0; i < 2; i++) {
678     if (arr->insert(RecordT(randomizeKey(i), 0)).first != arr->end()) {
679       numInserted++;
680     }
681   }
682   return (void*)numInserted;
683 }
TEST(Ahm,atomic_hash_array_insert_race)684 TEST(Ahm, atomic_hash_array_insert_race) {
685   AHA* arr = atomicHashArrayInsertRaceArray.get();
686   int numIterations = 5000;
687   constexpr int numThreads = 4;
688   void* statuses[numThreads];
689   for (int i = 0; i < numIterations; i++) {
690     arr->clear();
691     runThreads(atomicHashArrayInsertRaceThread, numThreads, statuses);
692     EXPECT_GE(arr->size(), 1);
693     for (int j = 0; j < numThreads; j++) {
694       EXPECT_EQ(arr->size(), uintptr_t(statuses[j]));
695     }
696   }
697 }
698 
699 // Repro for T#5841499. Race between erase() and find() on the same key.
TEST(Ahm,erase_find_race)700 TEST(Ahm, erase_find_race) {
701   const uint64_t limit = 10000;
702   AtomicHashMap<uint64_t, uint64_t> map(limit + 10);
703   std::atomic<uint64_t> key{1};
704 
705   // Invariant: all values are equal to their keys.
706   // At any moment there is one or two consecutive keys in the map.
707 
708   std::thread write_thread([&]() {
709     while (true) {
710       uint64_t k = ++key;
711       if (k > limit) {
712         break;
713       }
714       map.insert(k + 1, k + 1);
715       map.erase(k);
716     }
717   });
718 
719   std::thread read_thread([&]() {
720     while (true) {
721       uint64_t k = key.load();
722       if (k > limit) {
723         break;
724       }
725 
726       auto it = map.find(k);
727       if (it != map.end()) {
728         ASSERT_EQ(k, it->second);
729       }
730     }
731   });
732 
733   read_thread.join();
734   write_thread.join();
735 }
736 
737 // Erase right after insert race bug repro (t9130653)
TEST(Ahm,erase_after_insert_race)738 TEST(Ahm, erase_after_insert_race) {
739   const uint64_t limit = 10000;
740   const size_t num_threads = 100;
741   const size_t num_iters = 500;
742   AtomicHashMap<uint64_t, uint64_t> map(limit + 10);
743 
744   std::atomic<bool> go{false};
745   std::vector<std::thread> ts;
746   for (size_t i = 0; i < num_threads; ++i) {
747     ts.emplace_back([&]() {
748       while (!go) {
749         continue;
750       }
751       for (size_t n = 0; n < num_iters; ++n) {
752         map.erase(1);
753         map.insert(1, 1);
754       }
755     });
756   }
757 
758   go = true;
759 
760   for (auto& t : ts) {
761     t.join();
762   }
763 }
764 
765 // Repro for a bug when iterator didn't skip empty submaps.
TEST(Ahm,iterator_skips_empty_submaps)766 TEST(Ahm, iterator_skips_empty_submaps) {
767   AtomicHashMap<uint64_t, uint64_t>::Config conf;
768   conf.growthFactor = 1;
769 
770   AtomicHashMap<uint64_t, uint64_t> map(1, conf);
771 
772   map.insert(1, 1);
773   map.insert(2, 2);
774   map.insert(3, 3);
775 
776   map.erase(2);
777 
778   auto it = map.find(1);
779 
780   ASSERT_NE(map.end(), it);
781   ASSERT_EQ(1, it->first);
782   ASSERT_EQ(1, it->second);
783 
784   ++it;
785 
786   ASSERT_NE(map.end(), it);
787   ASSERT_EQ(3, it->first);
788   ASSERT_EQ(3, it->second);
789 
790   ++it;
791   ASSERT_EQ(map.end(), it);
792 }
793 
794 namespace {
795 
loadGlobalAha()796 void loadGlobalAha() {
797   std::cout << "loading global AHA with " << FLAGS_numThreads
798             << " threads...\n";
799   uint64_t start = nowInUsec();
800   globalAHA = AHArrayT::create(maxBMElements, config);
801   numOpsPerThread = FLAGS_numBMElements / FLAGS_numThreads;
802   CHECK_EQ(0, FLAGS_numBMElements % FLAGS_numThreads)
803       << "kNumThreads must evenly divide kNumInserts.";
804   runThreads(insertThreadArr);
805   uint64_t elapsed = nowInUsec() - start;
806   std::cout << "  took " << elapsed / 1000 << " ms ("
807             << (elapsed * 1000 / FLAGS_numBMElements) << " ns/insert).\n";
808   EXPECT_EQ(globalAHA->size(), FLAGS_numBMElements);
809 }
810 
loadGlobalAhm()811 void loadGlobalAhm() {
812   std::cout << "loading global AHM with " << FLAGS_numThreads
813             << " threads...\n";
814   uint64_t start = nowInUsec();
815   globalAHM = std::make_unique<AHMapT>(maxBMElements, config);
816   numOpsPerThread = FLAGS_numBMElements / FLAGS_numThreads;
817   runThreads(insertThread);
818   uint64_t elapsed = nowInUsec() - start;
819   std::cout << "  took " << elapsed / 1000 << " ms ("
820             << (elapsed * 1000 / FLAGS_numBMElements) << " ns/insert).\n";
821   EXPECT_EQ(globalAHM->size(), FLAGS_numBMElements);
822 }
823 
loadGlobalQPAhm()824 void loadGlobalQPAhm() {
825   std::cout << "loading global QPAHM with " << FLAGS_numThreads
826             << " threads...\n";
827   uint64_t start = nowInUsec();
828   globalQPAHM = std::make_unique<QPAHMapT>(maxBMElements, qpConfig);
829   numOpsPerThread = FLAGS_numBMElements / FLAGS_numThreads;
830   runThreads(qpInsertThread);
831   uint64_t elapsed = nowInUsec() - start;
832   std::cout << "  took " << elapsed / 1000 << " ms ("
833             << (elapsed * 1000 / FLAGS_numBMElements) << " ns/insert).\n";
834   EXPECT_EQ(globalQPAHM->size(), FLAGS_numBMElements);
835 }
836 
837 } // namespace
838 
BENCHMARK(st_aha_find,iters)839 BENCHMARK(st_aha_find, iters) {
840   CHECK_LE(iters, FLAGS_numBMElements);
841   for (size_t i = 0; i < iters; i++) {
842     KeyT key = randomizeKey(i);
843     folly::doNotOptimizeAway(globalAHA->find(key)->second);
844   }
845 }
846 
BENCHMARK(st_ahm_find,iters)847 BENCHMARK(st_ahm_find, iters) {
848   CHECK_LE(iters, FLAGS_numBMElements);
849   for (size_t i = 0; i < iters; i++) {
850     KeyT key = randomizeKey(i);
851     folly::doNotOptimizeAway(globalAHM->find(key)->second);
852   }
853 }
854 
BENCHMARK(st_qpahm_find,iters)855 BENCHMARK(st_qpahm_find, iters) {
856   CHECK_LE(iters, FLAGS_numBMElements);
857   for (size_t i = 0; i < iters; i++) {
858     KeyT key = randomizeKey(i);
859     folly::doNotOptimizeAway(globalQPAHM->find(key)->second);
860   }
861 }
862 
863 BENCHMARK_DRAW_LINE();
864 
BENCHMARK(mt_ahm_miss,iters)865 BENCHMARK(mt_ahm_miss, iters) {
866   CHECK_LE(iters, FLAGS_numBMElements);
867   numOpsPerThread = iters / FLAGS_numThreads;
868   runThreads([](void* jj) -> void* {
869     int64_t j = (int64_t)jj;
870     while (!runThreadsCreatedAllThreads.load()) {
871       ;
872     }
873     for (int i = 0; i < numOpsPerThread; ++i) {
874       KeyT key = i + j * numOpsPerThread * 100;
875       folly::doNotOptimizeAway(globalAHM->find(key) == globalAHM->end());
876     }
877     return nullptr;
878   });
879 }
880 
BENCHMARK(mt_qpahm_miss,iters)881 BENCHMARK(mt_qpahm_miss, iters) {
882   CHECK_LE(iters, FLAGS_numBMElements);
883   numOpsPerThread = iters / FLAGS_numThreads;
884   runThreads([](void* jj) -> void* {
885     int64_t j = (int64_t)jj;
886     while (!runThreadsCreatedAllThreads.load()) {
887       ;
888     }
889     for (int i = 0; i < numOpsPerThread; ++i) {
890       KeyT key = i + j * numOpsPerThread * 100;
891       folly::doNotOptimizeAway(globalQPAHM->find(key) == globalQPAHM->end());
892     }
893     return nullptr;
894   });
895 }
896 
BENCHMARK(st_ahm_miss,iters)897 BENCHMARK(st_ahm_miss, iters) {
898   CHECK_LE(iters, FLAGS_numBMElements);
899   for (size_t i = 0; i < iters; i++) {
900     KeyT key = randomizeKey(i + iters * 100);
901     folly::doNotOptimizeAway(globalAHM->find(key) == globalAHM->end());
902   }
903 }
904 
BENCHMARK(st_qpahm_miss,iters)905 BENCHMARK(st_qpahm_miss, iters) {
906   CHECK_LE(iters, FLAGS_numBMElements);
907   for (size_t i = 0; i < iters; i++) {
908     KeyT key = randomizeKey(i + iters * 100);
909     folly::doNotOptimizeAway(globalQPAHM->find(key) == globalQPAHM->end());
910   }
911 }
912 
BENCHMARK(mt_ahm_find_insert_mix,iters)913 BENCHMARK(mt_ahm_find_insert_mix, iters) {
914   CHECK_LE(iters, FLAGS_numBMElements);
915   numOpsPerThread = iters / FLAGS_numThreads;
916   runThreads([](void* jj) -> void* {
917     int64_t j = (int64_t)jj;
918     while (!runThreadsCreatedAllThreads.load()) {
919       ;
920     }
921     for (int i = 0; i < numOpsPerThread; ++i) {
922       if (i % 128) { // ~1% insert mix
923         KeyT key = randomizeKey(i + j * numOpsPerThread);
924         folly::doNotOptimizeAway(globalAHM->find(key)->second);
925       } else {
926         KeyT key = randomizeKey(i + j * numOpsPerThread * 100);
927         globalAHM->insert(key, genVal(key));
928       }
929     }
930     return nullptr;
931   });
932 }
933 
BENCHMARK(mt_qpahm_find_insert_mix,iters)934 BENCHMARK(mt_qpahm_find_insert_mix, iters) {
935   CHECK_LE(iters, FLAGS_numBMElements);
936   numOpsPerThread = iters / FLAGS_numThreads;
937   runThreads([](void* jj) -> void* {
938     int64_t j = (int64_t)jj;
939     while (!runThreadsCreatedAllThreads.load()) {
940       ;
941     }
942     for (int i = 0; i < numOpsPerThread; ++i) {
943       if (i % 128) { // ~1% insert mix
944         KeyT key = randomizeKey(i + j * numOpsPerThread);
945         folly::doNotOptimizeAway(globalQPAHM->find(key)->second);
946       } else {
947         KeyT key = randomizeKey(i + j * numOpsPerThread * 100);
948         globalQPAHM->insert(key, genVal(key));
949       }
950     }
951     return nullptr;
952   });
953 }
954 
BENCHMARK(mt_aha_find,iters)955 BENCHMARK(mt_aha_find, iters) {
956   CHECK_LE(iters, FLAGS_numBMElements);
957   numOpsPerThread = iters / FLAGS_numThreads;
958   runThreads([](void* jj) -> void* {
959     int64_t j = (int64_t)jj;
960     while (!runThreadsCreatedAllThreads.load()) {
961       ;
962     }
963     for (int i = 0; i < numOpsPerThread; ++i) {
964       KeyT key = randomizeKey(i + j * numOpsPerThread);
965       folly::doNotOptimizeAway(globalAHA->find(key)->second);
966     }
967     return nullptr;
968   });
969 }
970 
BENCHMARK(mt_ahm_find,iters)971 BENCHMARK(mt_ahm_find, iters) {
972   CHECK_LE(iters, FLAGS_numBMElements);
973   numOpsPerThread = iters / FLAGS_numThreads;
974   runThreads([](void* jj) -> void* {
975     int64_t j = (int64_t)jj;
976     while (!runThreadsCreatedAllThreads.load()) {
977       ;
978     }
979     for (int i = 0; i < numOpsPerThread; ++i) {
980       KeyT key = randomizeKey(i + j * numOpsPerThread);
981       folly::doNotOptimizeAway(globalAHM->find(key)->second);
982     }
983     return nullptr;
984   });
985 }
986 
BENCHMARK(mt_qpahm_find,iters)987 BENCHMARK(mt_qpahm_find, iters) {
988   CHECK_LE(iters, FLAGS_numBMElements);
989   numOpsPerThread = iters / FLAGS_numThreads;
990   runThreads([](void* jj) -> void* {
991     int64_t j = (int64_t)jj;
992     while (!runThreadsCreatedAllThreads.load()) {
993       ;
994     }
995     for (int i = 0; i < numOpsPerThread; ++i) {
996       KeyT key = randomizeKey(i + j * numOpsPerThread);
997       folly::doNotOptimizeAway(globalQPAHM->find(key)->second);
998     }
999     return nullptr;
1000   });
1001 }
1002 
1003 KeyT k;
BENCHMARK(st_baseline_modulus_and_random,iters)1004 BENCHMARK(st_baseline_modulus_and_random, iters) {
1005   for (size_t i = 0; i < iters; ++i) {
1006     k = randomizeKey(i) % iters;
1007   }
1008 }
1009 
1010 // insertions go last because they reset the map
1011 
BENCHMARK(mt_ahm_insert,iters)1012 BENCHMARK(mt_ahm_insert, iters) {
1013   BENCHMARK_SUSPEND {
1014     globalAHM = std::make_unique<AHMapT>(int(iters * LF), config);
1015     numOpsPerThread = iters / FLAGS_numThreads;
1016   }
1017   runThreads(insertThread);
1018 }
1019 
BENCHMARK(mt_qpahm_insert,iters)1020 BENCHMARK(mt_qpahm_insert, iters) {
1021   BENCHMARK_SUSPEND {
1022     globalQPAHM = std::make_unique<QPAHMapT>(int(iters * LF), qpConfig);
1023     numOpsPerThread = iters / FLAGS_numThreads;
1024   }
1025   runThreads(qpInsertThread);
1026 }
1027 
BENCHMARK(st_ahm_insert,iters)1028 BENCHMARK(st_ahm_insert, iters) {
1029   folly::BenchmarkSuspender susp;
1030   std::unique_ptr<AHMapT> ahm(new AHMapT(int(iters * LF), config));
1031   susp.dismiss();
1032 
1033   for (size_t i = 0; i < iters; i++) {
1034     KeyT key = randomizeKey(i);
1035     ahm->insert(key, genVal(key));
1036   }
1037 }
1038 
BENCHMARK(st_qpahm_insert,iters)1039 BENCHMARK(st_qpahm_insert, iters) {
1040   folly::BenchmarkSuspender susp;
1041   std::unique_ptr<QPAHMapT> ahm(new QPAHMapT(int(iters * LF), qpConfig));
1042   susp.dismiss();
1043 
1044   for (size_t i = 0; i < iters; i++) {
1045     KeyT key = randomizeKey(i);
1046     ahm->insert(key, genVal(key));
1047   }
1048 }
1049 
benchmarkSetup()1050 void benchmarkSetup() {
1051   config.maxLoadFactor = FLAGS_maxLoadFactor;
1052   qpConfig.maxLoadFactor = FLAGS_maxLoadFactor;
1053   configRace.maxLoadFactor = 0.5;
1054   int numCores = sysconf(_SC_NPROCESSORS_ONLN);
1055   loadGlobalAha();
1056   loadGlobalAhm();
1057   loadGlobalQPAhm();
1058   string numIters =
1059       folly::to<string>(std::min(1000000, int(FLAGS_numBMElements)));
1060 
1061   gflags::SetCommandLineOptionWithMode(
1062       "bm_max_iters", numIters.c_str(), gflags::SET_FLAG_IF_DEFAULT);
1063   gflags::SetCommandLineOptionWithMode(
1064       "bm_min_iters", numIters.c_str(), gflags::SET_FLAG_IF_DEFAULT);
1065   string numCoresStr = folly::to<string>(numCores);
1066   gflags::SetCommandLineOptionWithMode(
1067       "numThreads", numCoresStr.c_str(), gflags::SET_FLAG_IF_DEFAULT);
1068 
1069   std::cout << "\nRunning AHM benchmarks on machine with " << numCores
1070             << " logical cores.\n"
1071                "  num elements per map: "
1072             << FLAGS_numBMElements << "\n"
1073             << "  num threads for mt tests: " << FLAGS_numThreads << "\n"
1074             << "  AHM load factor: " << FLAGS_targetLoadFactor << "\n\n";
1075 }
1076 
main(int argc,char ** argv)1077 int main(int argc, char** argv) {
1078   testing::InitGoogleTest(&argc, argv);
1079   gflags::ParseCommandLineFlags(&argc, &argv, true);
1080   auto ret = RUN_ALL_TESTS();
1081   if (!ret && FLAGS_benchmark) {
1082     benchmarkSetup();
1083     folly::runBenchmarks();
1084   }
1085   return ret;
1086 }
1087 
1088 /*
1089 loading global AHA with 8 threads...
1090   took 487 ms (40 ns/insert).
1091 loading global AHM with 8 threads...
1092   took 478 ms (39 ns/insert).
1093 loading global QPAHM with 8 threads...
1094   took 478 ms (39 ns/insert).
1095 
1096 Running AHM benchmarks on machine with 24 logical cores.
1097   num elements per map: 12000000
1098   num threads for mt tests: 24
1099   AHM load factor: 0.75
1100 
1101 ============================================================================
1102 folly/test/AtomicHashMapTest.cpp                relative  time/iter  iters/s
1103 ============================================================================
1104 st_aha_find                                                 92.63ns   10.80M
1105 st_ahm_find                                                107.78ns    9.28M
1106 st_qpahm_find                                               90.69ns   11.03M
1107 ----------------------------------------------------------------------------
1108 mt_ahm_miss                                                  2.09ns  477.36M
1109 mt_qpahm_miss                                                1.37ns  728.82M
1110 st_ahm_miss                                                241.07ns    4.15M
1111 st_qpahm_miss                                              223.17ns    4.48M
1112 mt_ahm_find_insert_mix                                       8.05ns  124.24M
1113 mt_qpahm_find_insert_mix                                     9.10ns  109.85M
1114 mt_aha_find                                                  6.82ns  146.68M
1115 mt_ahm_find                                                  7.95ns  125.77M
1116 mt_qpahm_find                                                6.81ns  146.83M
1117 st_baseline_modulus_and_random                               6.02ns  166.03M
1118 mt_ahm_insert                                               14.29ns   69.97M
1119 mt_qpahm_insert                                             11.68ns   85.61M
1120 st_ahm_insert                                              125.39ns    7.98M
1121 st_qpahm_insert                                            128.76ns    7.77M
1122 ============================================================================
1123 */
1124