1 /*------------------------------------------------------------------------ 2 Junction: Concurrent data structures in C++ 3 Copyright (c) 2016 Jeff Preshing 4 Distributed under the Simplified BSD License. 5 Original location: https://github.com/preshing/junction 6 This software is distributed WITHOUT ANY WARRANTY; without even the 7 implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. 8 See the LICENSE file for more information. 9 ------------------------------------------------------------------------*/ 10 11 #ifndef CONCURRENTMAP_H 12 #define CONCURRENTMAP_H 13 14 #include "leapfrog.h" 15 #include "qsbr.h" 16 17 template <typename K, typename V, class KT = DefaultKeyTraits<K>, class VT = DefaultValueTraits<V> > 18 class ConcurrentMap 19 { 20 public: 21 typedef K Key; 22 typedef V Value; 23 typedef KT KeyTraits; 24 typedef VT ValueTraits; 25 typedef quint32 Hash; 26 typedef Leapfrog<ConcurrentMap> Details; 27 28 private: 29 Atomic<typename Details::Table*> m_root; 30 QSBR m_gc; 31 32 public: m_root(Details::Table::create (capacity))33 ConcurrentMap(quint64 capacity = Details::InitialSize) : m_root(Details::Table::create(capacity)) 34 { 35 } 36 ~ConcurrentMap()37 ~ConcurrentMap() 38 { 39 typename Details::Table* table = m_root.loadNonatomic(); 40 table->destroy(); 41 m_gc.flush(); 42 } 43 getGC()44 QSBR &getGC() 45 { 46 return m_gc; 47 } 48 migrationInProcess()49 bool migrationInProcess() 50 { 51 return quint64(m_root.loadNonatomic()->jobCoordinator.loadConsume()) > 1; 52 } 53 54 // publishTableMigration() is called by exactly one thread from Details::TableMigration::run() 55 // after all the threads participating in the migration have completed their work. publishTableMigration(typename Details::TableMigration * migration)56 void publishTableMigration(typename Details::TableMigration* migration) 57 { 58 m_root.store(migration->m_destination, Release); 59 // Caller will GC the TableMigration and the source table. 60 } 61 62 // A Mutator represents a known cell in the hash table. 63 // It's meant for manipulations within a temporary function scope. 64 // Obviously you must not call QSBR::Update while holding a Mutator. 65 // Any operation that modifies the table (exchangeValue, eraseValue) 66 // may be forced to follow a redirected cell, which changes the Mutator itself. 67 // Note that even if the Mutator was constructed from an existing cell, 68 // exchangeValue() can still trigger a resize if the existing cell was previously marked deleted, 69 // or if another thread deletes the key between the two steps. 70 class Mutator 71 { 72 private: 73 friend class ConcurrentMap; 74 75 ConcurrentMap& m_map; 76 typename Details::Table* m_table; 77 typename Details::Cell* m_cell; 78 Value m_value; 79 80 // Constructor: Find existing cell Mutator(ConcurrentMap & map,Key key,bool)81 Mutator(ConcurrentMap& map, Key key, bool) : m_map(map), m_value(Value(ValueTraits::NullValue)) 82 { 83 Hash hash = KeyTraits::hash(key); 84 for (;;) { 85 m_table = m_map.m_root.load(Consume); 86 m_cell = Details::find(hash, m_table); 87 if (!m_cell) { 88 return; 89 } 90 91 Value value = m_cell->value.load(Consume); 92 if (value != Value(ValueTraits::Redirect)) { 93 // Found an existing value 94 m_value = value; 95 return; 96 } 97 // We've encountered a Redirect value. Help finish the migration. 98 m_table->jobCoordinator.participate(); 99 // Try again using the latest root. 100 } 101 } 102 103 // Constructor: Insert or find cell Mutator(ConcurrentMap & map,Key key)104 Mutator(ConcurrentMap& map, Key key) : m_map(map), m_value(Value(ValueTraits::NullValue)) 105 { 106 Hash hash = KeyTraits::hash(key); 107 for (;;) { 108 m_table = m_map.m_root.load(Consume); 109 quint64 overflowIdx; 110 switch (Details::insertOrFind(hash, m_table, m_cell, overflowIdx)) { // Modifies m_cell 111 case Details::InsertResult_InsertedNew: { 112 // We've inserted a new cell. Don't load m_cell->value. 113 return; 114 } 115 case Details::InsertResult_AlreadyFound: { 116 // The hash was already found in the table. 117 Value value = m_cell->value.load(Consume); 118 if (value == Value(ValueTraits::Redirect)) { 119 // We've encountered a Redirect value. 120 break; // Help finish the migration. 121 } 122 // Found an existing value 123 m_value = value; 124 return; 125 } 126 case Details::InsertResult_Overflow: { 127 // Unlike ConcurrentMap_Linear, we don't need to keep track of & pass a "mustDouble" flag. 128 // Passing overflowIdx is sufficient to prevent an infinite loop here. 129 // It defines the start of the range of cells to check while estimating total cells in use. 130 // After the first migration, deleted keys are purged, so if we hit this line during the 131 // second loop iteration, every cell in the range will be in use, thus the estimate will be 100%. 132 // (Concurrent deletes could result in further iterations, but it will eventually settle.) 133 Details::beginTableMigration(m_map, m_table, overflowIdx); 134 break; 135 } 136 } 137 // A migration has been started (either by us, or another thread). Participate until it's complete. 138 m_table->jobCoordinator.participate(); 139 // Try again using the latest root. 140 } 141 } 142 143 public: getValue()144 Value getValue() const 145 { 146 // Return previously loaded value. Don't load it again. 147 return Value(m_value); 148 } 149 exchangeValue(Value desired)150 Value exchangeValue(Value desired) 151 { 152 for (;;) { 153 Value oldValue = m_value; 154 if (m_cell->value.compareExchangeStrong(m_value, desired, ConsumeRelease)) { 155 // Exchange was successful. Return previous value. 156 Value result = m_value; 157 m_value = desired; // Leave the mutator in a valid state 158 return result; 159 } 160 // The CAS failed and m_value has been updated with the latest value. 161 if (m_value != Value(ValueTraits::Redirect)) { 162 if (oldValue == Value(ValueTraits::NullValue) && m_value != Value(ValueTraits::NullValue)) { 163 // racing write inserted new value 164 } 165 // There was a racing write (or erase) to this cell. 166 // Pretend we exchanged with ourselves, and just let the racing write win. 167 return desired; 168 } 169 170 // We've encountered a Redirect value. Help finish the migration. 171 Hash hash = m_cell->hash.load(Relaxed); 172 for (;;) { 173 // Help complete the migration. 174 m_table->jobCoordinator.participate(); 175 // Try again in the new table. 176 m_table = m_map.m_root.load(Consume); 177 m_value = Value(ValueTraits::NullValue); 178 quint64 overflowIdx; 179 180 switch (Details::insertOrFind(hash, m_table, m_cell, overflowIdx)) { // Modifies m_cell 181 case Details::InsertResult_AlreadyFound: 182 m_value = m_cell->value.load(Consume); 183 if (m_value == Value(ValueTraits::Redirect)) { 184 break; 185 } 186 goto breakOuter; 187 case Details::InsertResult_InsertedNew: 188 goto breakOuter; 189 case Details::InsertResult_Overflow: 190 Details::beginTableMigration(m_map, m_table, overflowIdx); 191 break; 192 } 193 // We were redirected... again 194 } 195 breakOuter:; 196 // Try again in the new table. 197 } 198 } 199 assignValue(Value desired)200 void assignValue(Value desired) 201 { 202 exchangeValue(desired); 203 } 204 eraseValue()205 Value eraseValue() 206 { 207 for (;;) { 208 if (m_value == Value(ValueTraits::NullValue)) { 209 return Value(m_value); 210 } 211 212 if (m_cell->value.compareExchangeStrong(m_value, Value(ValueTraits::NullValue), Consume)) { 213 // Exchange was successful and a non-NULL value was erased and returned by reference in m_value. 214 Value result = m_value; 215 m_value = Value(ValueTraits::NullValue); // Leave the mutator in a valid state 216 return result; 217 } 218 219 // The CAS failed and m_value has been updated with the latest value. 220 if (m_value != Value(ValueTraits::Redirect)) { 221 // There was a racing write (or erase) to this cell. 222 // Pretend we erased nothing, and just let the racing write win. 223 return Value(ValueTraits::NullValue); 224 } 225 226 // We've been redirected to a new table. 227 Hash hash = m_cell->hash.load(Relaxed); // Re-fetch hash 228 for (;;) { 229 // Help complete the migration. 230 m_table->jobCoordinator.participate(); 231 // Try again in the new table. 232 m_table = m_map.m_root.load(Consume); 233 m_cell = Details::find(hash, m_table); 234 if (!m_cell) { 235 m_value = Value(ValueTraits::NullValue); 236 return m_value; 237 } 238 239 m_value = m_cell->value.load(Relaxed); 240 if (m_value != Value(ValueTraits::Redirect)) { 241 break; 242 } 243 } 244 } 245 } 246 }; 247 insertOrFind(Key key)248 Mutator insertOrFind(Key key) 249 { 250 return Mutator(*this, key); 251 } 252 find(Key key)253 Mutator find(Key key) 254 { 255 return Mutator(*this, key, false); 256 } 257 258 // Lookup without creating a temporary Mutator. get(Key key)259 Value get(Key key) 260 { 261 Hash hash = KeyTraits::hash(key); 262 for (;;) { 263 typename Details::Table* table = m_root.load(Consume); 264 typename Details::Cell* cell = Details::find(hash, table); 265 if (!cell) { 266 return Value(ValueTraits::NullValue); 267 } 268 269 Value value = cell->value.load(Consume); 270 if (value != Value(ValueTraits::Redirect)) { 271 return value; // Found an existing value 272 } 273 // We've been redirected to a new table. Help with the migration. 274 table->jobCoordinator.participate(); 275 // Try again in the new table. 276 } 277 } 278 assign(Key key,Value desired)279 Value assign(Key key, Value desired) 280 { 281 Mutator iter(*this, key); 282 return iter.exchangeValue(desired); 283 } 284 exchange(Key key,Value desired)285 Value exchange(Key key, Value desired) 286 { 287 Mutator iter(*this, key); 288 return iter.exchangeValue(desired); 289 } 290 erase(Key key)291 Value erase(Key key) 292 { 293 Mutator iter(*this, key, false); 294 return iter.eraseValue(); 295 } 296 297 // The easiest way to implement an Iterator is to prevent all Redirects. 298 // The currrent Iterator does that by forbidding concurrent inserts. 299 // To make it work with concurrent inserts, we'd need a way to block TableMigrations. 300 class Iterator 301 { 302 private: 303 typename Details::Table* m_table; 304 quint64 m_idx; 305 Key m_hash; 306 Value m_value; 307 308 public: 309 Iterator() = default; Iterator(ConcurrentMap & map)310 Iterator(ConcurrentMap& map) 311 { 312 // Since we've forbidden concurrent inserts (for now), nonatomic would suffice here, but let's plan ahead: 313 m_table = map.m_root.load(Consume); 314 m_idx = -1; 315 next(); 316 } 317 setMap(ConcurrentMap & map)318 void setMap(ConcurrentMap& map) 319 { 320 m_table = map.m_root.load(Consume); 321 m_idx = -1; 322 next(); 323 } 324 next()325 void next() 326 { 327 while (++m_idx <= m_table->sizeMask) { 328 // Index still inside range of table. 329 typename Details::CellGroup* group = m_table->getCellGroups() + (m_idx >> 2); 330 typename Details::Cell* cell = group->cells + (m_idx & 3); 331 m_hash = cell->hash.load(Relaxed); 332 333 if (m_hash != KeyTraits::NullHash) { 334 // Cell has been reserved. 335 m_value = cell->value.load(Relaxed); 336 if (m_value != Value(ValueTraits::NullValue)) 337 return; // Yield this cell. 338 } 339 } 340 // That's the end of the map. 341 m_hash = KeyTraits::NullHash; 342 m_value = Value(ValueTraits::NullValue); 343 } 344 isValid()345 bool isValid() const 346 { 347 #ifdef SANITY_CHECK 348 KIS_SAFE_ASSERT_RECOVER_RETURN_VALUE(m_value != Value(ValueTraits::Redirect), false); 349 #endif 350 return m_value != Value(ValueTraits::NullValue); 351 } 352 getKey()353 Key getKey() const 354 { 355 // Since we've forbidden concurrent inserts (for now), nonatomic would suffice here, but let's plan ahead: 356 return KeyTraits::dehash(m_hash); 357 } 358 getValue()359 Value getValue() const 360 { 361 return m_value; 362 } 363 }; 364 }; 365 366 #endif // CONCURRENTMAP_LEAPFROG_H 367