1 /* 2 * Copyright (c) Facebook, Inc. and its affiliates. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 /** 18 * Higher performance (up to 10x) atomic increment using thread caching. 19 * 20 * @author Spencer Ahrens (sahrens) 21 */ 22 23 #pragma once 24 25 #include <atomic> 26 27 #include <folly/Likely.h> 28 #include <folly/ThreadLocal.h> 29 30 namespace folly { 31 32 // Note that readFull requires holding a lock and iterating through all of the 33 // thread local objects with the same Tag, so if you have a lot of 34 // ThreadCachedInt's you should considering breaking up the Tag space even 35 // further. 36 template <class IntT, class Tag = IntT> 37 class ThreadCachedInt { 38 struct IntCache; 39 40 public: 41 explicit ThreadCachedInt(IntT initialVal = 0, uint32_t cacheSize = 1000) target_(initialVal)42 : target_(initialVal), cacheSize_(cacheSize) {} 43 44 ThreadCachedInt(const ThreadCachedInt&) = delete; 45 ThreadCachedInt& operator=(const ThreadCachedInt&) = delete; 46 increment(IntT inc)47 void increment(IntT inc) { 48 auto cache = cache_.get(); 49 if (UNLIKELY(cache == nullptr)) { 50 cache = new IntCache(*this); 51 cache_.reset(cache); 52 } 53 cache->increment(inc); 54 } 55 56 // Quickly grabs the current value which may not include some cached 57 // increments. readFast()58 IntT readFast() const { return target_.load(std::memory_order_relaxed); } 59 60 // Reads the current value plus all the cached increments. Requires grabbing 61 // a lock, so this is significantly slower than readFast(). readFull()62 IntT readFull() const { 63 // This could race with thread destruction and so the access lock should be 64 // acquired before reading the current value 65 const auto accessor = cache_.accessAllThreads(); 66 IntT ret = readFast(); 67 for (const auto& cache : accessor) { 68 if (!cache.reset_.load(std::memory_order_acquire)) { 69 ret += cache.val_.load(std::memory_order_relaxed); 70 } 71 } 72 return ret; 73 } 74 75 // Quickly reads and resets current value (doesn't reset cached increments). readFastAndReset()76 IntT readFastAndReset() { 77 return target_.exchange(0, std::memory_order_release); 78 } 79 80 // This function is designed for accumulating into another counter, where you 81 // only want to count each increment once. It can still get the count a 82 // little off, however, but it should be much better than calling readFull() 83 // and set(0) sequentially. readFullAndReset()84 IntT readFullAndReset() { 85 // This could race with thread destruction and so the access lock should be 86 // acquired before reading the current value 87 auto accessor = cache_.accessAllThreads(); 88 IntT ret = readFastAndReset(); 89 for (auto& cache : accessor) { 90 if (!cache.reset_.load(std::memory_order_acquire)) { 91 ret += cache.val_.load(std::memory_order_relaxed); 92 cache.reset_.store(true, std::memory_order_release); 93 } 94 } 95 return ret; 96 } 97 setCacheSize(uint32_t newSize)98 void setCacheSize(uint32_t newSize) { 99 cacheSize_.store(newSize, std::memory_order_release); 100 } 101 getCacheSize()102 uint32_t getCacheSize() const { return cacheSize_.load(); } 103 104 ThreadCachedInt& operator+=(IntT inc) { 105 increment(inc); 106 return *this; 107 } 108 ThreadCachedInt& operator-=(IntT inc) { 109 increment(-inc); 110 return *this; 111 } 112 // pre-increment (we don't support post-increment) 113 ThreadCachedInt& operator++() { 114 increment(1); 115 return *this; 116 } 117 ThreadCachedInt& operator--() { 118 increment(IntT(-1)); 119 return *this; 120 } 121 122 // Thread-safe set function. 123 // This is a best effort implementation. In some edge cases, there could be 124 // data loss (missing counts) set(IntT newVal)125 void set(IntT newVal) { 126 for (auto& cache : cache_.accessAllThreads()) { 127 cache.reset_.store(true, std::memory_order_release); 128 } 129 target_.store(newVal, std::memory_order_release); 130 } 131 132 private: 133 std::atomic<IntT> target_; 134 std::atomic<uint32_t> cacheSize_; 135 ThreadLocalPtr<IntCache, Tag, AccessModeStrict> 136 cache_; // Must be last for dtor ordering 137 138 // This should only ever be modified by one thread 139 struct IntCache { 140 ThreadCachedInt* parent_; 141 mutable std::atomic<IntT> val_; 142 mutable uint32_t numUpdates_; 143 std::atomic<bool> reset_; 144 IntCacheIntCache145 explicit IntCache(ThreadCachedInt& parent) 146 : parent_(&parent), val_(0), numUpdates_(0), reset_(false) {} 147 incrementIntCache148 void increment(IntT inc) { 149 if (LIKELY(!reset_.load(std::memory_order_acquire))) { 150 // This thread is the only writer to val_, so it's fine do do 151 // a relaxed load and do the addition non-atomically. 152 val_.store( 153 val_.load(std::memory_order_relaxed) + inc, 154 std::memory_order_release); 155 } else { 156 val_.store(inc, std::memory_order_relaxed); 157 reset_.store(false, std::memory_order_release); 158 } 159 ++numUpdates_; 160 if (UNLIKELY( 161 numUpdates_ > 162 parent_->cacheSize_.load(std::memory_order_acquire))) { 163 flush(); 164 } 165 } 166 flushIntCache167 void flush() const { 168 parent_->target_.fetch_add(val_, std::memory_order_release); 169 val_.store(0, std::memory_order_release); 170 numUpdates_ = 0; 171 } 172 ~IntCacheIntCache173 ~IntCache() { flush(); } 174 }; 175 }; 176 177 } // namespace folly 178