1 /*------------------------------------------------------------------------
2   Junction: Concurrent data structures in C++
3   Copyright (c) 2016 Jeff Preshing
4   Distributed under the Simplified BSD License.
5   Original location: https://github.com/preshing/junction
6   This software is distributed WITHOUT ANY WARRANTY; without even the
7   implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
8   See the LICENSE file for more information.
9 ------------------------------------------------------------------------*/
10 
11 #ifndef CONCURRENTMAP_H
12 #define CONCURRENTMAP_H
13 
14 #include "leapfrog.h"
15 #include "qsbr.h"
16 
17 template <typename K, typename V, class KT = DefaultKeyTraits<K>, class VT = DefaultValueTraits<V> >
18 class ConcurrentMap
19 {
20 public:
21     typedef K Key;
22     typedef V Value;
23     typedef KT KeyTraits;
24     typedef VT ValueTraits;
25     typedef quint32 Hash;
26     typedef Leapfrog<ConcurrentMap> Details;
27 
28 private:
29     Atomic<typename Details::Table*> m_root;
30     QSBR m_gc;
31 
32 public:
m_root(Details::Table::create (capacity))33     ConcurrentMap(quint64 capacity = Details::InitialSize) : m_root(Details::Table::create(capacity))
34     {
35     }
36 
~ConcurrentMap()37     ~ConcurrentMap()
38     {
39         typename Details::Table* table = m_root.loadNonatomic();
40         table->destroy();
41         m_gc.flush();
42     }
43 
getGC()44     QSBR &getGC()
45     {
46         return m_gc;
47     }
48 
migrationInProcess()49     bool migrationInProcess()
50     {
51         return quint64(m_root.loadNonatomic()->jobCoordinator.loadConsume()) > 1;
52     }
53 
54     // publishTableMigration() is called by exactly one thread from Details::TableMigration::run()
55     // after all the threads participating in the migration have completed their work.
publishTableMigration(typename Details::TableMigration * migration)56     void publishTableMigration(typename Details::TableMigration* migration)
57     {
58         m_root.store(migration->m_destination, Release);
59         // Caller will GC the TableMigration and the source table.
60     }
61 
62     // A Mutator represents a known cell in the hash table.
63     // It's meant for manipulations within a temporary function scope.
64     // Obviously you must not call QSBR::Update while holding a Mutator.
65     // Any operation that modifies the table (exchangeValue, eraseValue)
66     // may be forced to follow a redirected cell, which changes the Mutator itself.
67     // Note that even if the Mutator was constructed from an existing cell,
68     // exchangeValue() can still trigger a resize if the existing cell was previously marked deleted,
69     // or if another thread deletes the key between the two steps.
70     class Mutator
71     {
72     private:
73         friend class ConcurrentMap;
74 
75         ConcurrentMap& m_map;
76         typename Details::Table* m_table;
77         typename Details::Cell* m_cell;
78         Value m_value;
79 
80         // Constructor: Find existing cell
Mutator(ConcurrentMap & map,Key key,bool)81         Mutator(ConcurrentMap& map, Key key, bool) : m_map(map), m_value(Value(ValueTraits::NullValue))
82         {
83             Hash hash = KeyTraits::hash(key);
84             for (;;) {
85                 m_table = m_map.m_root.load(Consume);
86                 m_cell = Details::find(hash, m_table);
87                 if (!m_cell) {
88                     return;
89                 }
90 
91                 Value value = m_cell->value.load(Consume);
92                 if (value != Value(ValueTraits::Redirect)) {
93                     // Found an existing value
94                     m_value = value;
95                     return;
96                 }
97                 // We've encountered a Redirect value. Help finish the migration.
98                 m_table->jobCoordinator.participate();
99                 // Try again using the latest root.
100             }
101         }
102 
103         // Constructor: Insert or find cell
Mutator(ConcurrentMap & map,Key key)104         Mutator(ConcurrentMap& map, Key key) : m_map(map), m_value(Value(ValueTraits::NullValue))
105         {
106             Hash hash = KeyTraits::hash(key);
107             for (;;) {
108                 m_table = m_map.m_root.load(Consume);
109                 quint64 overflowIdx;
110                 switch (Details::insertOrFind(hash, m_table, m_cell, overflowIdx)) { // Modifies m_cell
111                 case Details::InsertResult_InsertedNew: {
112                     // We've inserted a new cell. Don't load m_cell->value.
113                     return;
114                 }
115                 case Details::InsertResult_AlreadyFound: {
116                     // The hash was already found in the table.
117                     Value value = m_cell->value.load(Consume);
118                     if (value == Value(ValueTraits::Redirect)) {
119                         // We've encountered a Redirect value.
120                         break; // Help finish the migration.
121                     }
122                     // Found an existing value
123                     m_value = value;
124                     return;
125                 }
126                 case Details::InsertResult_Overflow: {
127                     // Unlike ConcurrentMap_Linear, we don't need to keep track of & pass a "mustDouble" flag.
128                     // Passing overflowIdx is sufficient to prevent an infinite loop here.
129                     // It defines the start of the range of cells to check while estimating total cells in use.
130                     // After the first migration, deleted keys are purged, so if we hit this line during the
131                     // second loop iteration, every cell in the range will be in use, thus the estimate will be 100%.
132                     // (Concurrent deletes could result in further iterations, but it will eventually settle.)
133                     Details::beginTableMigration(m_map, m_table, overflowIdx);
134                     break;
135                 }
136                 }
137                 // A migration has been started (either by us, or another thread). Participate until it's complete.
138                 m_table->jobCoordinator.participate();
139                 // Try again using the latest root.
140             }
141         }
142 
143     public:
getValue()144         Value getValue() const
145         {
146             // Return previously loaded value. Don't load it again.
147             return Value(m_value);
148         }
149 
exchangeValue(Value desired)150         Value exchangeValue(Value desired)
151         {
152             for (;;) {
153                 Value oldValue = m_value;
154                 if (m_cell->value.compareExchangeStrong(m_value, desired, ConsumeRelease)) {
155                     // Exchange was successful. Return previous value.
156                     Value result = m_value;
157                     m_value = desired; // Leave the mutator in a valid state
158                     return result;
159                 }
160                 // The CAS failed and m_value has been updated with the latest value.
161                 if (m_value != Value(ValueTraits::Redirect)) {
162                     if (oldValue == Value(ValueTraits::NullValue) && m_value != Value(ValueTraits::NullValue)) {
163                         // racing write inserted new value
164                     }
165                     // There was a racing write (or erase) to this cell.
166                     // Pretend we exchanged with ourselves, and just let the racing write win.
167                     return desired;
168                 }
169 
170                 // We've encountered a Redirect value. Help finish the migration.
171                 Hash hash = m_cell->hash.load(Relaxed);
172                 for (;;) {
173                     // Help complete the migration.
174                     m_table->jobCoordinator.participate();
175                     // Try again in the new table.
176                     m_table = m_map.m_root.load(Consume);
177                     m_value = Value(ValueTraits::NullValue);
178                     quint64 overflowIdx;
179 
180                     switch (Details::insertOrFind(hash, m_table, m_cell, overflowIdx)) { // Modifies m_cell
181                     case Details::InsertResult_AlreadyFound:
182                         m_value = m_cell->value.load(Consume);
183                         if (m_value == Value(ValueTraits::Redirect)) {
184                             break;
185                         }
186                         goto breakOuter;
187                     case Details::InsertResult_InsertedNew:
188                         goto breakOuter;
189                     case Details::InsertResult_Overflow:
190                         Details::beginTableMigration(m_map, m_table, overflowIdx);
191                         break;
192                     }
193                     // We were redirected... again
194                 }
195             breakOuter:;
196                 // Try again in the new table.
197             }
198         }
199 
assignValue(Value desired)200         void assignValue(Value desired)
201         {
202             exchangeValue(desired);
203         }
204 
eraseValue()205         Value eraseValue()
206         {
207             for (;;) {
208                 if (m_value == Value(ValueTraits::NullValue)) {
209                     return Value(m_value);
210                 }
211 
212                 if (m_cell->value.compareExchangeStrong(m_value, Value(ValueTraits::NullValue), Consume)) {
213                     // Exchange was successful and a non-NULL value was erased and returned by reference in m_value.
214                     Value result = m_value;
215                     m_value = Value(ValueTraits::NullValue); // Leave the mutator in a valid state
216                     return result;
217                 }
218 
219                 // The CAS failed and m_value has been updated with the latest value.
220                 if (m_value != Value(ValueTraits::Redirect)) {
221                     // There was a racing write (or erase) to this cell.
222                     // Pretend we erased nothing, and just let the racing write win.
223                     return Value(ValueTraits::NullValue);
224                 }
225 
226                 // We've been redirected to a new table.
227                 Hash hash = m_cell->hash.load(Relaxed); // Re-fetch hash
228                 for (;;) {
229                     // Help complete the migration.
230                     m_table->jobCoordinator.participate();
231                     // Try again in the new table.
232                     m_table = m_map.m_root.load(Consume);
233                     m_cell = Details::find(hash, m_table);
234                     if (!m_cell) {
235                         m_value = Value(ValueTraits::NullValue);
236                         return m_value;
237                     }
238 
239                     m_value = m_cell->value.load(Relaxed);
240                     if (m_value != Value(ValueTraits::Redirect)) {
241                         break;
242                     }
243                 }
244             }
245         }
246     };
247 
insertOrFind(Key key)248     Mutator insertOrFind(Key key)
249     {
250         return Mutator(*this, key);
251     }
252 
find(Key key)253     Mutator find(Key key)
254     {
255         return Mutator(*this, key, false);
256     }
257 
258     // Lookup without creating a temporary Mutator.
get(Key key)259     Value get(Key key)
260     {
261         Hash hash = KeyTraits::hash(key);
262         for (;;) {
263             typename Details::Table* table = m_root.load(Consume);
264             typename Details::Cell* cell = Details::find(hash, table);
265             if (!cell) {
266                 return Value(ValueTraits::NullValue);
267             }
268 
269             Value value = cell->value.load(Consume);
270             if (value != Value(ValueTraits::Redirect)) {
271                 return value; // Found an existing value
272             }
273             // We've been redirected to a new table. Help with the migration.
274             table->jobCoordinator.participate();
275             // Try again in the new table.
276         }
277     }
278 
assign(Key key,Value desired)279     Value assign(Key key, Value desired)
280     {
281         Mutator iter(*this, key);
282         return iter.exchangeValue(desired);
283     }
284 
exchange(Key key,Value desired)285     Value exchange(Key key, Value desired)
286     {
287         Mutator iter(*this, key);
288         return iter.exchangeValue(desired);
289     }
290 
erase(Key key)291     Value erase(Key key)
292     {
293         Mutator iter(*this, key, false);
294         return iter.eraseValue();
295     }
296 
297     // The easiest way to implement an Iterator is to prevent all Redirects.
298     // The currrent Iterator does that by forbidding concurrent inserts.
299     // To make it work with concurrent inserts, we'd need a way to block TableMigrations.
300     class Iterator
301     {
302     private:
303         typename Details::Table* m_table;
304         quint64 m_idx;
305         Key m_hash;
306         Value m_value;
307 
308     public:
309         Iterator() = default;
Iterator(ConcurrentMap & map)310         Iterator(ConcurrentMap& map)
311         {
312             // Since we've forbidden concurrent inserts (for now), nonatomic would suffice here, but let's plan ahead:
313             m_table = map.m_root.load(Consume);
314             m_idx = -1;
315             next();
316         }
317 
setMap(ConcurrentMap & map)318         void setMap(ConcurrentMap& map)
319         {
320             m_table = map.m_root.load(Consume);
321             m_idx = -1;
322             next();
323         }
324 
next()325         void next()
326         {
327             while (++m_idx <= m_table->sizeMask) {
328                 // Index still inside range of table.
329                 typename Details::CellGroup* group = m_table->getCellGroups() + (m_idx >> 2);
330                 typename Details::Cell* cell = group->cells + (m_idx & 3);
331                 m_hash = cell->hash.load(Relaxed);
332 
333                 if (m_hash != KeyTraits::NullHash) {
334                     // Cell has been reserved.
335                     m_value = cell->value.load(Relaxed);
336                     if (m_value != Value(ValueTraits::NullValue))
337                         return; // Yield this cell.
338                 }
339             }
340             // That's the end of the map.
341             m_hash = KeyTraits::NullHash;
342             m_value = Value(ValueTraits::NullValue);
343         }
344 
isValid()345         bool isValid() const
346         {
347 #ifdef SANITY_CHECK
348             KIS_SAFE_ASSERT_RECOVER_RETURN_VALUE(m_value != Value(ValueTraits::Redirect), false);
349 #endif
350             return m_value != Value(ValueTraits::NullValue);
351         }
352 
getKey()353         Key getKey() const
354         {
355             // Since we've forbidden concurrent inserts (for now), nonatomic would suffice here, but let's plan ahead:
356             return KeyTraits::dehash(m_hash);
357         }
358 
getValue()359         Value getValue() const
360         {
361             return m_value;
362         }
363     };
364 };
365 
366 #endif // CONCURRENTMAP_LEAPFROG_H
367