1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /**
18  * Higher performance (up to 10x) atomic increment using thread caching.
19  *
20  * @author Spencer Ahrens (sahrens)
21  */
22 
23 #pragma once
24 
25 #include <atomic>
26 
27 #include <folly/Likely.h>
28 #include <folly/ThreadLocal.h>
29 
30 namespace folly {
31 
32 // Note that readFull requires holding a lock and iterating through all of the
33 // thread local objects with the same Tag, so if you have a lot of
34 // ThreadCachedInt's you should considering breaking up the Tag space even
35 // further.
36 template <class IntT, class Tag = IntT>
37 class ThreadCachedInt {
38   struct IntCache;
39 
40  public:
41   explicit ThreadCachedInt(IntT initialVal = 0, uint32_t cacheSize = 1000)
target_(initialVal)42       : target_(initialVal), cacheSize_(cacheSize) {}
43 
44   ThreadCachedInt(const ThreadCachedInt&) = delete;
45   ThreadCachedInt& operator=(const ThreadCachedInt&) = delete;
46 
increment(IntT inc)47   void increment(IntT inc) {
48     auto cache = cache_.get();
49     if (UNLIKELY(cache == nullptr)) {
50       cache = new IntCache(*this);
51       cache_.reset(cache);
52     }
53     cache->increment(inc);
54   }
55 
56   // Quickly grabs the current value which may not include some cached
57   // increments.
readFast()58   IntT readFast() const { return target_.load(std::memory_order_relaxed); }
59 
60   // Reads the current value plus all the cached increments.  Requires grabbing
61   // a lock, so this is significantly slower than readFast().
readFull()62   IntT readFull() const {
63     // This could race with thread destruction and so the access lock should be
64     // acquired before reading the current value
65     const auto accessor = cache_.accessAllThreads();
66     IntT ret = readFast();
67     for (const auto& cache : accessor) {
68       if (!cache.reset_.load(std::memory_order_acquire)) {
69         ret += cache.val_.load(std::memory_order_relaxed);
70       }
71     }
72     return ret;
73   }
74 
75   // Quickly reads and resets current value (doesn't reset cached increments).
readFastAndReset()76   IntT readFastAndReset() {
77     return target_.exchange(0, std::memory_order_release);
78   }
79 
80   // This function is designed for accumulating into another counter, where you
81   // only want to count each increment once.  It can still get the count a
82   // little off, however, but it should be much better than calling readFull()
83   // and set(0) sequentially.
readFullAndReset()84   IntT readFullAndReset() {
85     // This could race with thread destruction and so the access lock should be
86     // acquired before reading the current value
87     auto accessor = cache_.accessAllThreads();
88     IntT ret = readFastAndReset();
89     for (auto& cache : accessor) {
90       if (!cache.reset_.load(std::memory_order_acquire)) {
91         ret += cache.val_.load(std::memory_order_relaxed);
92         cache.reset_.store(true, std::memory_order_release);
93       }
94     }
95     return ret;
96   }
97 
setCacheSize(uint32_t newSize)98   void setCacheSize(uint32_t newSize) {
99     cacheSize_.store(newSize, std::memory_order_release);
100   }
101 
getCacheSize()102   uint32_t getCacheSize() const { return cacheSize_.load(); }
103 
104   ThreadCachedInt& operator+=(IntT inc) {
105     increment(inc);
106     return *this;
107   }
108   ThreadCachedInt& operator-=(IntT inc) {
109     increment(-inc);
110     return *this;
111   }
112   // pre-increment (we don't support post-increment)
113   ThreadCachedInt& operator++() {
114     increment(1);
115     return *this;
116   }
117   ThreadCachedInt& operator--() {
118     increment(IntT(-1));
119     return *this;
120   }
121 
122   // Thread-safe set function.
123   // This is a best effort implementation. In some edge cases, there could be
124   // data loss (missing counts)
set(IntT newVal)125   void set(IntT newVal) {
126     for (auto& cache : cache_.accessAllThreads()) {
127       cache.reset_.store(true, std::memory_order_release);
128     }
129     target_.store(newVal, std::memory_order_release);
130   }
131 
132  private:
133   std::atomic<IntT> target_;
134   std::atomic<uint32_t> cacheSize_;
135   ThreadLocalPtr<IntCache, Tag, AccessModeStrict>
136       cache_; // Must be last for dtor ordering
137 
138   // This should only ever be modified by one thread
139   struct IntCache {
140     ThreadCachedInt* parent_;
141     mutable std::atomic<IntT> val_;
142     mutable uint32_t numUpdates_;
143     std::atomic<bool> reset_;
144 
IntCacheIntCache145     explicit IntCache(ThreadCachedInt& parent)
146         : parent_(&parent), val_(0), numUpdates_(0), reset_(false) {}
147 
incrementIntCache148     void increment(IntT inc) {
149       if (LIKELY(!reset_.load(std::memory_order_acquire))) {
150         // This thread is the only writer to val_, so it's fine do do
151         // a relaxed load and do the addition non-atomically.
152         val_.store(
153             val_.load(std::memory_order_relaxed) + inc,
154             std::memory_order_release);
155       } else {
156         val_.store(inc, std::memory_order_relaxed);
157         reset_.store(false, std::memory_order_release);
158       }
159       ++numUpdates_;
160       if (UNLIKELY(
161               numUpdates_ >
162               parent_->cacheSize_.load(std::memory_order_acquire))) {
163         flush();
164       }
165     }
166 
flushIntCache167     void flush() const {
168       parent_->target_.fetch_add(val_, std::memory_order_release);
169       val_.store(0, std::memory_order_release);
170       numUpdates_ = 0;
171     }
172 
~IntCacheIntCache173     ~IntCache() { flush(); }
174   };
175 };
176 
177 } // namespace folly
178