1#nim c -t:-march=i686 --cpu:amd64 --threads:on -d:release lockfreehash.nim 2 3import math, hashes 4 5#------------------------------------------------------------------------------ 6## Memory Utility Functions 7 8proc newHeap*[T](): ptr T = 9 result = cast[ptr T](alloc0(sizeof(T))) 10 11proc copyNew*[T](x: var T): ptr T = 12 var 13 size = sizeof(T) 14 mem = alloc(size) 15 copyMem(mem, x.addr, size) 16 return cast[ptr T](mem) 17 18proc copyTo*[T](val: var T, dest: int) = 19 copyMem(pointer(dest), val.addr, sizeof(T)) 20 21proc allocType*[T](): pointer = alloc(sizeof(T)) 22 23proc newShared*[T](): ptr T = 24 result = cast[ptr T](allocShared0(sizeof(T))) 25 26proc copyShared*[T](x: var T): ptr T = 27 var 28 size = sizeof(T) 29 mem = allocShared(size) 30 copyMem(mem, x.addr, size) 31 return cast[ptr T](mem) 32 33#------------------------------------------------------------------------------ 34## Pointer arithmetic 35 36proc `+`*(p: pointer, i: int): pointer {.inline.} = 37 cast[pointer](cast[int](p) + i) 38 39const 40 minTableSize = 8 41 reProbeLimit = 12 42 minCopyWork = 4096 43 intSize = sizeof(int) 44 45 46 47when sizeof(int) == 4: # 32bit 48 type 49 Raw = range[0..1073741823] 50 ## The range of uint values that can be stored directly in a value slot 51 ## when on a 32 bit platform 52elif sizeof(int) == 8: # 64bit 53 type 54 Raw = range[0'i64..4611686018427387903'i64] 55 ## The range of uint values that can be stored directly in a value slot 56 ## when on a 64 bit platform 57else: 58 {.error: "unsupported platform".} 59 60type 61 Entry = tuple 62 key: int 63 value: int 64 65 EntryArr = ptr array[0..10_000_000, Entry] 66 67 PConcTable[K, V] = ptr object {.pure.} 68 len: int 69 used: int 70 active: int 71 copyIdx: int 72 copyDone: int 73 next: PConcTable[K, V] 74 data: EntryArr 75 76proc setVal[K, V](table: var PConcTable[K, V], key: int, val: int, 77 expVal: int, match: bool): int 78 79#------------------------------------------------------------------------------ 80 81# Create a new table 82proc newLFTable*[K, V](size: int = minTableSize): PConcTable[K, V] = 83 let 84 dataLen = max(nextPowerOfTwo(size), minTableSize) 85 dataSize = dataLen*sizeof(Entry) 86 dataMem = allocShared0(dataSize) 87 tableSize = 7 * intSize 88 tableMem = allocShared0(tableSize) 89 table = cast[PConcTable[K, V]](tableMem) 90 table.len = dataLen 91 table.used = 0 92 table.active = 0 93 table.copyIdx = 0 94 table.copyDone = 0 95 table.next = nil 96 table.data = cast[EntryArr](dataMem) 97 result = table 98 99#------------------------------------------------------------------------------ 100 101# Delete a table 102proc deleteConcTable[K, V](tbl: PConcTable[K, V]) = 103 deallocShared(tbl.data) 104 deallocShared(tbl) 105 106#------------------------------------------------------------------------------ 107 108proc `[]`[K, V](table: var PConcTable[K, V], i: int): var Entry {.inline.} = 109 table.data[i] 110 111#------------------------------------------------------------------------------ 112# State flags stored in ptr 113 114 115proc pack[T](x: T): int {.inline.} = 116 result = (cast[int](x) shl 2) 117 #echo("packKey ",cast[int](x) , " -> ", result) 118 119# Pop the flags off returning a 4 byte aligned ptr to our Key or Val 120proc pop(x: int): int {.inline.} = 121 result = x and 0xFFFFFFFC'i32 122 123# Pop the raw value off of our Key or Val 124proc popRaw(x: int): int {.inline.} = 125 result = x shr 2 126 127# Pop the flags off returning a 4 byte aligned ptr to our Key or Val 128proc popPtr[V](x: int): ptr V {.inline.} = 129 result = cast[ptr V](pop(x)) 130 #echo("popPtr " & $x & " -> " & $cast[int](result)) 131 132# Ghost (sentinel) 133# K or V is no longer valid use new table 134const Ghost = 0xFFFFFFFC 135proc isGhost(x: int): bool {.inline.} = 136 result = x == 0xFFFFFFFC 137 138# Tombstone 139# applied to V = K is dead 140proc isTomb(x: int): bool {.inline.} = 141 result = (x and 0x00000002) != 0 142 143proc setTomb(x: int): int {.inline.} = 144 result = x or 0x00000002 145 146# Prime 147# K or V is in new table copied from old 148proc isPrime(x: int): bool {.inline.} = 149 result = (x and 0x00000001) != 0 150 151proc setPrime(x: int): int {.inline.} = 152 result = x or 0x00000001 153 154#------------------------------------------------------------------------------ 155 156##This is for i32 only need to override for i64 157proc hashInt(x: int): int {.inline.} = 158 var h = uint32(x) #shr 2'u32 159 h = h xor (h shr 16'u32) 160 h *= 0x85ebca6b'u32 161 h = h xor (h shr 13'u32) 162 h *= 0xc2b2ae35'u32 163 h = h xor (h shr 16'u32) 164 result = int(h) 165 166#------------------------------------------------------------------------------ 167 168proc resize[K, V](self: PConcTable[K, V]): PConcTable[K, V] = 169 var next = atomic_load_n(self.next.addr, ATOMIC_RELAXED) 170 #echo("next = " & $cast[int](next)) 171 if next != nil: 172 #echo("A new table already exists, copy in progress") 173 return next 174 var 175 oldLen = atomic_load_n(self.len.addr, ATOMIC_RELAXED) 176 newTable = newLFTable[K, V](oldLen*2) 177 success = atomic_compare_exchange_n(self.next.addr, next.addr, newTable, 178 false, ATOMIC_RELAXED, ATOMIC_RELAXED) 179 if not success: 180 echo("someone beat us to it! delete table we just created and return his " & 181 $cast[int](next)) 182 deleteConcTable(newTable) 183 return next 184 else: 185 echo("Created New Table! " & $cast[int](newTable) & " Size = " & $newTable.len) 186 return newTable 187 188 189#------------------------------------------------------------------------------ 190#proc keyEQ[K](key1: ptr K, key2: ptr K): bool {.inline.} = 191proc keyEQ[K](key1: int, key2: int): bool {.inline.} = 192 result = false 193 when K is Raw: 194 if key1 == key2: 195 result = true 196 else: 197 var 198 p1 = popPtr[K](key1) 199 p2 = popPtr[K](key2) 200 if p1 != nil and p2 != nil: 201 if cast[int](p1) == cast[int](p2): 202 return true 203 if p1[] == p2[]: 204 return true 205 206#------------------------------------------------------------------------------ 207 208#proc tableFull(self: var PConcTable[K,V]) : bool {.inline.} = 209 210 211#------------------------------------------------------------------------------ 212 213proc copySlot[K, V](idx: int, oldTbl: var PConcTable[K, V], 214 newTbl: var PConcTable[K, V]): bool = 215 #echo("Copy idx " & $idx) 216 var 217 oldVal = 0 218 oldkey = 0 219 ok = false 220 result = false 221 #Block the key so no other threads waste time here 222 while not ok: 223 ok = atomic_compare_exchange_n(oldTbl[idx].key.addr, oldKey.addr, 224 setTomb(oldKey), false, ATOMIC_RELAXED, ATOMIC_RELAXED) 225 #echo("oldKey was = " & $oldKey & " set it to tomb " & $setTomb(oldKey)) 226 #Prevent new values from appearing in the old table by priming 227 oldVal = atomic_load_n(oldTbl[idx].value.addr, ATOMIC_RELAXED) 228 while not isPrime(oldVal): 229 var box = if oldVal == 0 or isTomb(oldVal): oldVal.setTomb.setPrime 230 else: oldVal.setPrime 231 if atomic_compare_exchange_n(oldTbl[idx].value.addr, oldVal.addr, 232 box, false, ATOMIC_RELAXED, ATOMIC_RELAXED): 233 if isPrime(box) and isTomb(box): 234 return true 235 oldVal = box 236 break 237 #echo("oldVal was = ", oldVal, " set it to prime ", box) 238 if isPrime(oldVal) and isTomb(oldVal): 239 #when not (K is Raw): 240 # deallocShared(popPtr[K](oldKey)) 241 return false 242 if isTomb(oldVal): 243 echo("oldVal is Tomb!!!, should not happen") 244 if pop(oldVal) != 0: 245 result = setVal(newTbl, pop(oldKey), pop(oldVal), 0, true) == 0 246 #if result: 247 #echo("Copied a Slot! idx= " & $idx & " key= " & $oldKey & " val= " & $oldVal) 248 #else: 249 #echo("copy slot failed") 250 # Our copy is done so we disable the old slot 251 while not ok: 252 ok = atomic_compare_exchange_n(oldTbl[idx].value.addr, oldVal.addr, 253 oldVal.setTomb.setPrime, false, ATOMIC_RELAXED, ATOMIC_RELAXED) 254 #echo("disabled old slot") 255 #echo"---------------------" 256 257#------------------------------------------------------------------------------ 258 259proc promote[K, V](table: var PConcTable[K, V]) = 260 var 261 newData = atomic_load_n(table.next.data.addr, ATOMIC_RELAXED) 262 newLen = atomic_load_n(table.next.len.addr, ATOMIC_RELAXED) 263 newUsed = atomic_load_n(table.next.used.addr, ATOMIC_RELAXED) 264 265 deallocShared(table.data) 266 atomic_store_n(table.data.addr, newData, ATOMIC_RELAXED) 267 atomic_store_n(table.len.addr, newLen, ATOMIC_RELAXED) 268 atomic_store_n(table.used.addr, newUsed, ATOMIC_RELAXED) 269 atomic_store_n(table.copyIdx.addr, 0, ATOMIC_RELAXED) 270 atomic_store_n(table.copyDone.addr, 0, ATOMIC_RELAXED) 271 deallocShared(table.next) 272 atomic_store_n(table.next.addr, nil, ATOMIC_RELAXED) 273 echo("new table swapped!") 274 275#------------------------------------------------------------------------------ 276 277proc checkAndPromote[K, V](table: var PConcTable[K, V], workDone: int): bool = 278 var 279 oldLen = atomic_load_n(table.len.addr, ATOMIC_RELAXED) 280 copyDone = atomic_load_n(table.copyDone.addr, ATOMIC_RELAXED) 281 ok: bool 282 result = false 283 if workDone > 0: 284 #echo("len to copy =" & $oldLen) 285 #echo("copyDone + workDone = " & $copyDone & " + " & $workDone) 286 while not ok: 287 ok = atomic_compare_exchange_n(table.copyDone.addr, copyDone.addr, 288 copyDone + workDone, false, ATOMIC_RELAXED, ATOMIC_RELAXED) 289 #if ok: echo("set copyDone") 290 # If the copy is done we can promote this table 291 if copyDone + workDone >= oldLen: 292 # Swap new data 293 #echo("work is done!") 294 table.promote 295 result = true 296 297#------------------------------------------------------------------------------ 298 299proc copySlotAndCheck[K, V](table: var PConcTable[K, V], idx: int): 300 PConcTable[K, V] = 301 var 302 newTable = cast[PConcTable[K, V]](atomic_load_n(table.next.addr, 303 ATOMIC_RELAXED)) 304 result = newTable 305 if newTable != nil and copySlot(idx, table, newTable): 306 #echo("copied a single slot, idx = " & $idx) 307 if checkAndPromote(table, 1): return table 308 309 310#------------------------------------------------------------------------------ 311 312proc helpCopy[K, V](table: var PConcTable[K, V]): PConcTable[K, V] = 313 var 314 newTable = cast[PConcTable[K, V]](atomic_load_n(table.next.addr, 315 ATOMIC_RELAXED)) 316 result = newTable 317 if newTable != nil: 318 var 319 oldLen = atomic_load_n(table.len.addr, ATOMIC_RELAXED) 320 copyDone = atomic_load_n(table.copyDone.addr, ATOMIC_RELAXED) 321 copyIdx = 0 322 work = min(oldLen, minCopyWork) 323 #panicStart = -1 324 workDone = 0 325 if copyDone < oldLen: 326 var ok: bool 327 while not ok: 328 ok = atomic_compare_exchange_n(table.copyIdx.addr, copyIdx.addr, 329 copyIdx + work, false, ATOMIC_RELAXED, ATOMIC_RELAXED) 330 #echo("copy idx = ", copyIdx) 331 for i in 0..work-1: 332 var idx = (copyIdx + i) and (oldLen - 1) 333 if copySlot(idx, table, newTable): 334 workDone += 1 335 if workDone > 0: 336 #echo("did work ", workDone, " on thread ", cast[int](myThreadID[pointer]())) 337 if checkAndPromote(table, workDone): return table 338 # In case a thread finished all the work then got stalled before promotion 339 if checkAndPromote(table, 0): return table 340 341 342 343#------------------------------------------------------------------------------ 344 345proc setVal[K, V](table: var PConcTable[K, V], key: int, val: int, 346 expVal: int, match: bool): int = 347 #echo("-try set- in table ", " key = ", (popPtr[K](key)[]), " val = ", val) 348 when K is Raw: 349 var idx = hashInt(key) 350 else: 351 var idx = popPtr[K](key)[].hash 352 var 353 nextTable: PConcTable[K, V] 354 probes = 1 355 # spin until we find a key slot or build and jump to next table 356 while true: 357 idx = idx and (table.len - 1) 358 #echo("try set idx = " & $idx & "for" & $key) 359 var 360 probedKey = 0 361 openKey = atomic_compare_exchange_n(table[idx].key.addr, probedKey.addr, 362 key, false, ATOMIC_RELAXED, ATOMIC_RELAXED) 363 if openKey: 364 if val.isTomb: 365 #echo("val was tomb, bail, no reason to set an open slot to tomb") 366 return val 367 #increment used slots 368 #echo("found an open slot, total used = " & 369 #$atomic_add_fetch(table.used.addr, 1, ATOMIC_RELAXED)) 370 discard atomic_add_fetch(table.used.addr, 1, ATOMIC_RELAXED) 371 break # We found an open slot 372 #echo("set idx ", idx, " key = ", key, " probed = ", probedKey) 373 if keyEQ[K](probedKey, key): 374 #echo("we found the matching slot") 375 break # We found a matching slot 376 if (not(expVal != 0 and match)) and (probes >= reProbeLimit or key.isTomb): 377 if key.isTomb: echo("Key is Tombstone") 378 #if probes >= reProbeLimit: echo("Too much probing " & $probes) 379 #echo("try to resize") 380 #create next bigger table 381 nextTable = resize(table) 382 #help do some copying 383 #echo("help copy old table to new") 384 nextTable = helpCopy(table) 385 #now setVal in the new table instead 386 #echo("jumping to next table to set val") 387 return setVal(nextTable, key, val, expVal, match) 388 else: 389 idx += 1 390 probes += 1 391 # Done spinning for a new slot 392 var oldVal = atomic_load_n(table[idx].value.addr, ATOMIC_RELAXED) 393 if val == oldVal: 394 #echo("this val is already in the slot") 395 return oldVal 396 nextTable = atomic_load_n(table.next.addr, ATOMIC_SEQ_CST) 397 if nextTable == nil and 398 ((oldVal == 0 and 399 (probes >= reProbeLimit or table.used / table.len > 0.8)) or 400 (isPrime(oldVal))): 401 if table.used / table.len > 0.8: echo("resize because usage ratio = " & 402 $(table.used / table.len)) 403 if isPrime(oldVal): echo("old val isPrime, should be a rare mem ordering event") 404 nextTable = resize(table) 405 if nextTable != nil: 406 #echo("tomb old slot then set in new table") 407 nextTable = copySlotAndCheck(table, idx) 408 return setVal(nextTable, key, val, expVal, match) 409 # Finally ready to add new val to table 410 while true: 411 if match and oldVal != expVal: 412 #echo("set failed, no match oldVal= " & $oldVal & " expVal= " & $expVal) 413 return oldVal 414 if atomic_compare_exchange_n(table[idx].value.addr, oldVal.addr, 415 val, false, ATOMIC_RELEASE, ATOMIC_RELAXED): 416 #echo("val set at table " & $cast[int](table)) 417 if expVal != 0: 418 if (oldVal == 0 or isTomb(oldVal)) and not isTomb(val): 419 discard atomic_add_fetch(table.active.addr, 1, ATOMIC_RELAXED) 420 elif not (oldVal == 0 or isTomb(oldVal)) and isTomb(val): 421 discard atomic_add_fetch(table.active.addr, -1, ATOMIC_RELAXED) 422 if oldVal == 0 and expVal != 0: 423 return setTomb(oldVal) 424 else: return oldVal 425 if isPrime(oldVal): 426 nextTable = copySlotAndCheck(table, idx) 427 return setVal(nextTable, key, val, expVal, match) 428 429#------------------------------------------------------------------------------ 430 431proc getVal[K, V](table: var PConcTable[K, V], key: int): int = 432 #echo("-try get- key = " & $key) 433 when K is Raw: 434 var idx = hashInt(key) 435 else: 436 var idx = popPtr[K](key)[].hash 437 #echo("get idx ", idx) 438 var 439 probes = 0 440 val: int 441 while true: 442 idx = idx and (table.len - 1) 443 var 444 newTable: PConcTable[K, V] # = atomic_load_n(table.next.addr, ATOMIC_ACQUIRE) 445 probedKey = atomic_load_n(table[idx].key.addr, ATOMIC_SEQ_CST) 446 if keyEQ[K](probedKey, key): 447 #echo("found key after ", probes+1) 448 val = atomic_load_n(table[idx].value.addr, ATOMIC_ACQUIRE) 449 if not isPrime(val): 450 if isTomb(val): 451 #echo("val was tomb but not prime") 452 return 0 453 else: 454 #echo("-GotIt- idx = ", idx, " key = ", key, " val ", val ) 455 return val 456 else: 457 newTable = copySlotAndCheck(table, idx) 458 return getVal(newTable, key) 459 else: 460 #echo("probe ", probes, " idx = ", idx, " key = ", key, " found ", probedKey ) 461 if probes >= reProbeLimit*4 or key.isTomb: 462 if newTable == nil: 463 #echo("too many probes and no new table ", key, " ", idx ) 464 return 0 465 else: 466 newTable = helpCopy(table) 467 return getVal(newTable, key) 468 idx += 1 469 probes += 1 470 471#------------------------------------------------------------------------------ 472 473#proc set*(table: var PConcTable[Raw,Raw], key: Raw, val: Raw) = 474# discard setVal(table, pack(key), pack(key), 0, false) 475 476#proc set*[V](table: var PConcTable[Raw,V], key: Raw, val: ptr V) = 477# discard setVal(table, pack(key), cast[int](val), 0, false) 478 479proc set*[K, V](table: var PConcTable[K, V], key: var K, val: var V) = 480 when not (K is Raw): 481 var newKey = cast[int](copyShared(key)) 482 else: 483 var newKey = pack(key) 484 when not (V is Raw): 485 var newVal = cast[int](copyShared(val)) 486 else: 487 var newVal = pack(val) 488 var oldPtr = pop(setVal(table, newKey, newVal, 0, false)) 489 #echo("oldPtr = ", cast[int](oldPtr), " newPtr = ", cast[int](newPtr)) 490 when not (V is Raw): 491 if newVal != oldPtr and oldPtr != 0: 492 deallocShared(cast[ptr V](oldPtr)) 493 494 495 496proc get*[K, V](table: var PConcTable[K, V], key: var K): V = 497 when not (V is Raw): 498 when not (K is Raw): 499 return popPtr[V](getVal(table, cast[int](key.addr)))[] 500 else: 501 return popPtr[V](getVal(table, pack(key)))[] 502 else: 503 when not (K is Raw): 504 return popRaw(getVal(table, cast[int](key.addr))) 505 else: 506 return popRaw(getVal(table, pack(key))) 507 508 509 510 511 512 513 514 515 516 517 518#proc `[]`[K,V](table: var PConcTable[K,V], key: K): PEntry[K,V] {.inline.} = 519# getVal(table, key) 520 521#proc `[]=`[K,V](table: var PConcTable[K,V], key: K, val: V): PEntry[K,V] {.inline.} = 522# setVal(table, key, val) 523 524 525 526 527 528 529#Tests ---------------------------- 530when not defined(testing) and isMainModule: 531 import locks, times, mersenne 532 533 const 534 numTests = 100000 535 numThreads = 10 536 537 538 539 type 540 TestObj = tuple 541 thr: int 542 f0: int 543 f1: int 544 545 Data = tuple[k: string, v: TestObj] 546 PDataArr = array[0..numTests-1, Data] 547 Dict = PConcTable[string, TestObj] 548 549 var 550 thr: array[0..numThreads-1, Thread[Dict]] 551 552 table = newLFTable[string, TestObj](8) 553 rand = newMersenneTwister(2525) 554 555 proc createSampleData(len: int): PDataArr = 556 #result = cast[PDataArr](allocShared0(sizeof(Data)*numTests)) 557 for i in 0..len-1: 558 result[i].k = "mark" & $(i+1) 559 #echo("mark" & $(i+1), " ", hash("mark" & $(i+1))) 560 result[i].v.thr = 0 561 result[i].v.f0 = i+1 562 result[i].v.f1 = 0 563 #echo("key = " & $(i+1) & " Val ptr = " & $cast[int](result[i].v.addr)) 564 565 566 567 proc threadProc(tp: Dict) {.thread.} = 568 var t = cpuTime(); 569 for i in 1..numTests: 570 var key = "mark" & $(i) 571 var got = table.get(key) 572 got.thr = cast[int](myThreadID[pointer]()) 573 got.f1 = got.f1 + 1 574 table.set(key, got) 575 t = cpuTime() - t 576 echo t 577 578 579 var testData = createSampleData(numTests) 580 581 for i in 0..numTests-1: 582 table.set(testData[i].k, testData[i].v) 583 584 var i = 0 585 while i < numThreads: 586 createThread(thr[i], threadProc, table) 587 i += 1 588 589 joinThreads(thr) 590 591 592 593 594 595 var fails = 0 596 597 for i in 0..numTests-1: 598 var got = table.get(testData[i].k) 599 if got.f0 != i+1 or got.f1 != numThreads: 600 fails += 1 601 echo(got) 602 603 echo("Failed read or write = ", fails) 604 605 606 #for i in 1..numTests: 607 # echo(i, " = ", hashInt(i) and 8191) 608 609 deleteConcTable(table) 610