1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <folly/Function.h>
20 #include <folly/ThreadLocal.h>
21 #include <folly/synchronization/AsymmetricMemoryBarrier.h>
22 
23 // This is unlike folly::ThreadCachedInt in that the full value
24 // is never rounded up globally and cached, it only supports readFull.
25 //
26 // folly/experimental/TLRefCount is similar, but does not support a
27 // waitForZero, and is not reset-able.
28 //
29 // Note that the RCU implementation is completely abstracted from the
30 // counter implementation, a rseq implementation can be dropped in
31 // if the kernel supports it.
32 
33 namespace folly {
34 
35 namespace detail {
36 
37 // Use memory_order_seq_cst for accesses to increments/decrements if we're
38 // running under TSAN, because TSAN ignores barriers completely.
39 constexpr std::memory_order kCounterMemoryOrder =
40     kIsSanitizeThread ? std::memory_order_seq_cst : std::memory_order_relaxed;
41 
42 template <typename Tag>
43 class ThreadCachedInts {
44   std::atomic<int64_t> orphan_inc_[2] = {};
45   std::atomic<int64_t> orphan_dec_[2] = {};
46   folly::detail::Futex<> waiting_{0};
47 
48   class Integer {
49    public:
50     ThreadCachedInts* ints_;
Integer(ThreadCachedInts * ints)51     constexpr Integer(ThreadCachedInts* ints) noexcept
52         : ints_(ints), inc_{}, dec_{}, cache_(ints->int_cache_) {}
53     std::atomic<int64_t> inc_[2];
54     std::atomic<int64_t> dec_[2];
55     Integer*& cache_; // reference to the cached ptr
~Integer()56     ~Integer() noexcept {
57       // Increment counts must be set before decrement counts
58       ints_->orphan_inc_[0].fetch_add(
59           inc_[0].load(kCounterMemoryOrder), kCounterMemoryOrder);
60       ints_->orphan_inc_[1].fetch_add(
61           inc_[1].load(kCounterMemoryOrder), kCounterMemoryOrder);
62       folly::asymmetricLightBarrier(); // B
63       ints_->orphan_dec_[0].fetch_add(
64           dec_[0].load(kCounterMemoryOrder), kCounterMemoryOrder);
65       ints_->orphan_dec_[1].fetch_add(
66           dec_[1].load(kCounterMemoryOrder), kCounterMemoryOrder);
67       ints_->waiting_.store(0, std::memory_order_release);
68       detail::futexWake(&ints_->waiting_);
69       // reset the cache_ on destructor so we can handle the delete/recreate
70       cache_ = nullptr;
71     }
72   };
73   folly::ThreadLocalPtr<Integer, Tag> cs_;
74 
75   // Cache the int pointer in a threadlocal.
76   static thread_local Integer* int_cache_;
77 
init()78   void init() {
79     auto ret = new Integer(this);
80     cs_.reset(ret);
81     int_cache_ = ret;
82   }
83 
84  public:
increment(uint8_t epoch)85   FOLLY_ALWAYS_INLINE void increment(uint8_t epoch) {
86     if (!int_cache_) {
87       init();
88     }
89 
90     auto& c = int_cache_->inc_[epoch];
91     auto val = c.load(std::memory_order_relaxed);
92     c.store(val + 1, kCounterMemoryOrder);
93 
94     folly::asymmetricLightBarrier(); // A
95   }
96 
decrement(uint8_t epoch)97   FOLLY_ALWAYS_INLINE void decrement(uint8_t epoch) {
98     folly::asymmetricLightBarrier(); // B
99     if (!int_cache_) {
100       init();
101     }
102 
103     auto& c = int_cache_->dec_[epoch];
104     auto val = c.load(std::memory_order_relaxed);
105     c.store(val + 1, kCounterMemoryOrder);
106 
107     folly::asymmetricLightBarrier(); // C
108     if (waiting_.load(std::memory_order_acquire)) {
109       waiting_.store(0, std::memory_order_release);
110       detail::futexWake(&waiting_);
111     }
112   }
113 
readFull(uint8_t epoch)114   int64_t readFull(uint8_t epoch) {
115     int64_t full = -orphan_dec_[epoch].load(kCounterMemoryOrder);
116 
117     // Matches A - ensure all threads have seen new value of version,
118     // *and* that we see current values of counters in readFull()
119     //
120     // Note that in lock_shared if a reader is currently between the
121     // version load and counter increment, they may update the wrong
122     // epoch.  However, this is ok - they started concurrently *after*
123     // any callbacks that will run, and therefore it is safe to run
124     // the callbacks.
125     folly::asymmetricHeavyBarrier();
126     for (auto& i : cs_.accessAllThreads()) {
127       full -= i.dec_[epoch].load(kCounterMemoryOrder);
128     }
129 
130     // Matches B - ensure that all increments are seen if decrements
131     // are seen. This is necessary because increment and decrement
132     // are allowed to happen on different threads.
133     folly::asymmetricHeavyBarrier();
134 
135     auto accessor = cs_.accessAllThreads();
136     for (auto& i : accessor) {
137       full += i.inc_[epoch].load(kCounterMemoryOrder);
138     }
139 
140     // orphan is read behind accessAllThreads lock
141     return full + orphan_inc_[epoch].load(kCounterMemoryOrder);
142   }
143 
waitForZero(uint8_t phase)144   void waitForZero(uint8_t phase) {
145     // Try reading before futex sleeping.
146     if (readFull(phase) == 0) {
147       return;
148     }
149 
150     while (true) {
151       waiting_.store(1, std::memory_order_release);
152       // Matches C.  Ensure either decrement sees waiting_,
153       // or we see their decrement and can safely sleep.
154       folly::asymmetricHeavyBarrier();
155       if (readFull(phase) == 0) {
156         break;
157       }
158       detail::futexWait(&waiting_, 1);
159     }
160     waiting_.store(0, std::memory_order_relaxed);
161   }
162 
163   // We are guaranteed to be called while StaticMeta lock is still
164   // held because of ordering in AtForkList.  We can therefore safely
165   // touch orphan_ and clear out all counts.
resetAfterFork()166   void resetAfterFork() {
167     if (int_cache_) {
168       int_cache_->dec_[0].store(0, kCounterMemoryOrder);
169       int_cache_->dec_[1].store(0, kCounterMemoryOrder);
170       int_cache_->inc_[0].store(0, kCounterMemoryOrder);
171       int_cache_->inc_[1].store(0, kCounterMemoryOrder);
172     }
173     orphan_inc_[0].store(0, kCounterMemoryOrder);
174     orphan_inc_[1].store(0, kCounterMemoryOrder);
175     orphan_dec_[0].store(0, kCounterMemoryOrder);
176     orphan_dec_[1].store(0, kCounterMemoryOrder);
177   }
178 };
179 
180 template <typename Tag>
181 thread_local typename detail::ThreadCachedInts<Tag>::Integer*
182     detail::ThreadCachedInts<Tag>::int_cache_ = nullptr;
183 
184 } // namespace detail
185 } // namespace folly
186