1 /*****************************************************************************
2 Copyright (c) 2020 MariaDB Corporation.
3
4 This program is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free Software
6 Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful, but WITHOUT
9 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License along with
13 this program; if not, write to the Free Software Foundation, Inc.,
14 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
15
16 *****************************************************************************/
17
18 /*
19 The group commit synchronization used in log_write_up_to()
20 works as follows
21
22 For simplicity, lets consider only write operation,synchronozation of
23 flush operation works the same.
24
25 Rules of the game
26
27 A thread enters log_write_up_to() with lsn of the current transaction
28 1. If last written lsn is greater than wait lsn (another thread already
29 wrote the log buffer),then there is no need to do anything.
30 2. If no other thread is currently writing, write the log buffer,
31 and update last written lsn.
32 3. Otherwise, wait, and go to step 1.
33
34 Synchronization can be done in different ways, e.g
35
36 a) Simple mutex locking the entire check and write operation
37 Disadvantage that threads that could continue after updating
38 last written lsn, still wait.
39
40 b) Spinlock, with periodic checks for last written lsn.
41 Fixes a) but burns CPU unnecessary.
42
43 c) Mutex / condition variable combo.
44
45 Condtion variable notifies (broadcast) all waiters, whenever
46 last written lsn is changed.
47
48 Has a disadvantage of many suprious wakeups, stress on OS scheduler,
49 and mutex contention.
50
51 d) Something else.
52 Make use of the waiter's lsn parameter, and only wakeup "right" waiting
53 threads.
54
55 We chose d). Even if implementation is more complicated than alternatves
56 due to the need to maintain list of waiters, it provides the best performance.
57
58 See group_commit_lock implementation for details.
59
60 Note that if write operation is very fast, a) or b) can be fine as alternative.
61 */
62 #ifdef _WIN32
63 #include <windows.h>
64 #endif
65
66 #ifdef __linux__
67 #include <linux/futex.h>
68 #include <sys/syscall.h>
69 #endif
70
71 #include <atomic>
72 #include <thread>
73 #include <mutex>
74 #include <condition_variable>
75 #include <my_cpu.h>
76
77 #include <log0types.h>
78 #include "log0sync.h"
79 #include <mysql/service_thd_wait.h>
80 /**
81 Helper class , used in group commit lock.
82
83 Binary semaphore, or (same thing), an auto-reset event
84 Has state (signalled or not), and provides 2 operations.
85 wait() and wake()
86
87 The implementation uses efficient locking primitives on Linux and Windows.
88 Or, mutex/condition combo elsewhere.
89 */
90
91 class binary_semaphore
92 {
93 public:
94 /**Wait until semaphore becomes signalled, and atomically reset the state
95 to non-signalled*/
96 void wait();
97 /** signals the semaphore */
98 void wake();
99
100 private:
101 #if defined(__linux__) || defined (_WIN32)
102 std::atomic<int> m_signalled;
103 static constexpr std::memory_order mem_order= std::memory_order_acq_rel;
104 public:
binary_semaphore()105 binary_semaphore() :m_signalled(0) {}
106 #else
107 std::mutex m_mtx{};
108 std::condition_variable m_cv{};
109 bool m_signalled = false;
110 #endif
111 };
112
113 #if defined (__linux__) || defined (_WIN32)
wait()114 void binary_semaphore::wait()
115 {
116 for (;;)
117 {
118 if (m_signalled.exchange(0, mem_order) == 1)
119 {
120 break;
121 }
122 #ifdef _WIN32
123 int zero = 0;
124 WaitOnAddress(&m_signalled, &zero, sizeof(m_signalled), INFINITE);
125 #else
126 syscall(SYS_futex, &m_signalled, FUTEX_WAIT_PRIVATE, 0, NULL, NULL, 0);
127 #endif
128 }
129 }
130
wake()131 void binary_semaphore::wake()
132 {
133 if (m_signalled.exchange(1, mem_order) == 0)
134 {
135 #ifdef _WIN32
136 WakeByAddressSingle(&m_signalled);
137 #else
138 syscall(SYS_futex, &m_signalled, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0);
139 #endif
140 }
141 }
142 #else
wait()143 void binary_semaphore::wait()
144 {
145 std::unique_lock<std::mutex> lk(m_mtx);
146 while (!m_signalled)
147 m_cv.wait(lk);
148 m_signalled = false;
149 }
wake()150 void binary_semaphore::wake()
151 {
152 std::unique_lock<std::mutex> lk(m_mtx);
153 m_signalled = true;
154 m_cv.notify_one();
155 }
156 #endif
157
158 /* A thread helper structure, used in group commit lock below*/
159 struct group_commit_waiter_t
160 {
161 lsn_t m_value;
162 binary_semaphore m_sema;
163 group_commit_waiter_t* m_next;
group_commit_waiter_tgroup_commit_waiter_t164 group_commit_waiter_t() :m_value(), m_sema(), m_next() {}
165 };
166
group_commit_lock()167 group_commit_lock::group_commit_lock() :
168 m_mtx(), m_value(0), m_pending_value(0), m_lock(false), m_waiters_list()
169 {
170 }
171
value() const172 group_commit_lock::value_type group_commit_lock::value() const
173 {
174 return m_value.load(std::memory_order::memory_order_relaxed);
175 }
176
pending() const177 group_commit_lock::value_type group_commit_lock::pending() const
178 {
179 return m_pending_value.load(std::memory_order::memory_order_relaxed);
180 }
181
set_pending(group_commit_lock::value_type num)182 void group_commit_lock::set_pending(group_commit_lock::value_type num)
183 {
184 ut_a(num >= value());
185 m_pending_value.store(num, std::memory_order::memory_order_relaxed);
186 }
187
188 const unsigned int MAX_SPINS = 1; /** max spins in acquire */
189 thread_local group_commit_waiter_t thread_local_waiter;
190
acquire(value_type num)191 group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num)
192 {
193 unsigned int spins = MAX_SPINS;
194
195 for(;;)
196 {
197 if (num <= value())
198 {
199 /* No need to wait.*/
200 return lock_return_code::EXPIRED;
201 }
202
203 if(spins-- == 0)
204 break;
205 if (num > pending())
206 {
207 /* Longer wait expected (longer than currently running operation),
208 don't spin.*/
209 break;
210 }
211 ut_delay(1);
212 }
213
214 thread_local_waiter.m_value = num;
215 std::unique_lock<std::mutex> lk(m_mtx, std::defer_lock);
216 while (num > value())
217 {
218 lk.lock();
219
220 /* Re-read current value after acquiring the lock*/
221 if (num <= value())
222 {
223 return lock_return_code::EXPIRED;
224 }
225
226 if (!m_lock)
227 {
228 /* Take the lock, become group commit leader.*/
229 m_lock = true;
230 #ifndef DBUG_OFF
231 m_owner_id = std::this_thread::get_id();
232 #endif
233 return lock_return_code::ACQUIRED;
234 }
235
236 /* Add yourself to waiters list.*/
237 thread_local_waiter.m_next = m_waiters_list;
238 m_waiters_list = &thread_local_waiter;
239 lk.unlock();
240
241 /* Sleep until woken in release().*/
242 thd_wait_begin(0,THD_WAIT_GROUP_COMMIT);
243 thread_local_waiter.m_sema.wait();
244 thd_wait_end(0);
245
246 }
247 return lock_return_code::EXPIRED;
248 }
249
release(value_type num)250 void group_commit_lock::release(value_type num)
251 {
252 std::unique_lock<std::mutex> lk(m_mtx);
253 m_lock = false;
254
255 /* Update current value. */
256 ut_a(num >= value());
257 m_value.store(num, std::memory_order_relaxed);
258
259 /*
260 Wake waiters for value <= current value.
261 Wake one more waiter, who will become the group commit lead.
262 */
263 group_commit_waiter_t* cur, * prev, * next;
264 group_commit_waiter_t* wakeup_list = nullptr;
265 int extra_wake = 0;
266
267 for (prev= nullptr, cur= m_waiters_list; cur; cur= next)
268 {
269 next= cur->m_next;
270 if (cur->m_value <= num || extra_wake++ == 0)
271 {
272 /* Move current waiter to wakeup_list*/
273
274 if (!prev)
275 {
276 /* Remove from the start of the list.*/
277 m_waiters_list = next;
278 }
279 else
280 {
281 /* Remove from the middle of the list.*/
282 prev->m_next= cur->m_next;
283 }
284
285 /* Append entry to the wakeup list.*/
286 cur->m_next = wakeup_list;
287 wakeup_list = cur;
288 }
289 else
290 {
291 prev= cur;
292 }
293 }
294 lk.unlock();
295
296 for (cur= wakeup_list; cur; cur= next)
297 {
298 next= cur->m_next;
299 cur->m_sema.wake();
300 }
301 }
302
303 #ifndef DBUG_OFF
is_owner()304 bool group_commit_lock::is_owner()
305 {
306 return m_lock && std::this_thread::get_id() == m_owner_id;
307 }
308 #endif
309
310