1 /*****************************************************************************
2 Copyright (c) 2020 MariaDB Corporation.
3 
4 This program is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free Software
6 Foundation; version 2 of the License.
7 
8 This program is distributed in the hope that it will be useful, but WITHOUT
9 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
11 
12 You should have received a copy of the GNU General Public License along with
13 this program; if not, write to the Free Software Foundation, Inc.,
14 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
15 
16 *****************************************************************************/
17 
18 /*
19 The  group commit synchronization used in log_write_up_to()
20 works as follows
21 
22 For simplicity, lets consider only write operation,synchronozation of
23 flush operation works the same.
24 
25 Rules of the game
26 
27 A thread enters log_write_up_to() with lsn of the current transaction
28 1. If last written lsn is greater than wait lsn (another thread already
29    wrote the log buffer),then there is no need to do anything.
30 2. If no other thread is currently writing, write the log buffer,
31    and update last written lsn.
32 3. Otherwise, wait, and go to step 1.
33 
34 Synchronization can be done in different ways, e.g
35 
36 a) Simple mutex locking the entire check and write operation
37 Disadvantage that threads that could continue after updating
38 last written lsn, still wait.
39 
40 b) Spinlock, with periodic checks for last written lsn.
41 Fixes a) but burns CPU unnecessary.
42 
43 c) Mutex / condition variable  combo.
44 
45 Condtion variable notifies (broadcast) all waiters, whenever
46 last written lsn is changed.
47 
48 Has a disadvantage of many suprious wakeups, stress on OS scheduler,
49 and mutex contention.
50 
51 d) Something else.
52 Make use of the waiter's lsn parameter, and only wakeup "right" waiting
53 threads.
54 
55 We chose d). Even if implementation is more complicated than alternatves
56 due to the need to maintain list of waiters, it provides the best performance.
57 
58 See group_commit_lock implementation for details.
59 
60 Note that if write operation is very fast, a) or b) can be fine as alternative.
61 */
62 #ifdef _WIN32
63 #include <windows.h>
64 #endif
65 
66 #ifdef __linux__
67 #include <linux/futex.h>
68 #include <sys/syscall.h>
69 #endif
70 
71 #include <atomic>
72 #include <thread>
73 #include <mutex>
74 #include <condition_variable>
75 #include <my_cpu.h>
76 
77 #include <log0types.h>
78 #include "log0sync.h"
79 #include <mysql/service_thd_wait.h>
80 /**
81   Helper class , used in group commit lock.
82 
83   Binary semaphore, or (same thing), an auto-reset event
84   Has state (signalled or not), and provides 2 operations.
85   wait() and wake()
86 
87   The implementation uses efficient locking primitives on Linux and Windows.
88   Or, mutex/condition combo elsewhere.
89 */
90 
91 class binary_semaphore
92 {
93 public:
94   /**Wait until semaphore becomes signalled, and atomically reset the state
95   to non-signalled*/
96   void wait();
97   /** signals the semaphore */
98   void wake();
99 
100 private:
101 #if defined(__linux__) || defined (_WIN32)
102   std::atomic<int> m_signalled;
103   static constexpr std::memory_order mem_order= std::memory_order_acq_rel;
104 public:
binary_semaphore()105   binary_semaphore() :m_signalled(0) {}
106 #else
107   std::mutex m_mtx{};
108   std::condition_variable m_cv{};
109   bool m_signalled = false;
110 #endif
111 };
112 
113 #if defined (__linux__) || defined (_WIN32)
wait()114 void binary_semaphore::wait()
115 {
116   for (;;)
117   {
118     if (m_signalled.exchange(0, mem_order) == 1)
119     {
120       break;
121     }
122 #ifdef _WIN32
123     int zero = 0;
124     WaitOnAddress(&m_signalled, &zero, sizeof(m_signalled), INFINITE);
125 #else
126     syscall(SYS_futex, &m_signalled, FUTEX_WAIT_PRIVATE, 0, NULL, NULL, 0);
127 #endif
128   }
129 }
130 
wake()131 void binary_semaphore::wake()
132 {
133   if (m_signalled.exchange(1, mem_order) == 0)
134   {
135 #ifdef _WIN32
136     WakeByAddressSingle(&m_signalled);
137 #else
138     syscall(SYS_futex, &m_signalled, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0);
139 #endif
140   }
141 }
142 #else
wait()143 void binary_semaphore::wait()
144 {
145   std::unique_lock<std::mutex> lk(m_mtx);
146   while (!m_signalled)
147     m_cv.wait(lk);
148   m_signalled = false;
149 }
wake()150 void binary_semaphore::wake()
151 {
152   std::unique_lock<std::mutex> lk(m_mtx);
153   m_signalled = true;
154   m_cv.notify_one();
155 }
156 #endif
157 
158 /* A thread helper structure, used in group commit lock below*/
159 struct group_commit_waiter_t
160 {
161   lsn_t m_value;
162   binary_semaphore m_sema;
163   group_commit_waiter_t* m_next;
group_commit_waiter_tgroup_commit_waiter_t164   group_commit_waiter_t() :m_value(), m_sema(), m_next() {}
165 };
166 
group_commit_lock()167 group_commit_lock::group_commit_lock() :
168   m_mtx(), m_value(0), m_pending_value(0), m_lock(false), m_waiters_list()
169 {
170 }
171 
value() const172 group_commit_lock::value_type group_commit_lock::value() const
173 {
174   return m_value.load(std::memory_order::memory_order_relaxed);
175 }
176 
pending() const177 group_commit_lock::value_type group_commit_lock::pending() const
178 {
179   return m_pending_value.load(std::memory_order::memory_order_relaxed);
180 }
181 
set_pending(group_commit_lock::value_type num)182 void group_commit_lock::set_pending(group_commit_lock::value_type num)
183 {
184   ut_a(num >= value());
185   m_pending_value.store(num, std::memory_order::memory_order_relaxed);
186 }
187 
188 const unsigned int MAX_SPINS = 1; /** max spins in acquire */
189 thread_local group_commit_waiter_t thread_local_waiter;
190 
acquire(value_type num)191 group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num)
192 {
193   unsigned int spins = MAX_SPINS;
194 
195   for(;;)
196   {
197     if (num <= value())
198     {
199       /* No need to wait.*/
200       return lock_return_code::EXPIRED;
201     }
202 
203     if(spins-- == 0)
204       break;
205     if (num > pending())
206     {
207       /* Longer wait expected (longer than currently running operation),
208         don't spin.*/
209       break;
210     }
211     ut_delay(1);
212   }
213 
214   thread_local_waiter.m_value = num;
215   std::unique_lock<std::mutex> lk(m_mtx, std::defer_lock);
216   while (num > value())
217   {
218     lk.lock();
219 
220     /* Re-read current value after acquiring the lock*/
221     if (num <= value())
222     {
223       return lock_return_code::EXPIRED;
224     }
225 
226     if (!m_lock)
227     {
228       /* Take the lock, become group commit leader.*/
229       m_lock = true;
230 #ifndef DBUG_OFF
231       m_owner_id = std::this_thread::get_id();
232 #endif
233       return lock_return_code::ACQUIRED;
234     }
235 
236     /* Add yourself to waiters list.*/
237     thread_local_waiter.m_next = m_waiters_list;
238     m_waiters_list = &thread_local_waiter;
239     lk.unlock();
240 
241     /* Sleep until woken in release().*/
242     thd_wait_begin(0,THD_WAIT_GROUP_COMMIT);
243     thread_local_waiter.m_sema.wait();
244     thd_wait_end(0);
245 
246   }
247   return lock_return_code::EXPIRED;
248 }
249 
release(value_type num)250 void group_commit_lock::release(value_type num)
251 {
252   std::unique_lock<std::mutex> lk(m_mtx);
253   m_lock = false;
254 
255   /* Update current value. */
256   ut_a(num >= value());
257   m_value.store(num, std::memory_order_relaxed);
258 
259   /*
260     Wake waiters for value <= current value.
261     Wake one more waiter, who will become the group commit lead.
262   */
263   group_commit_waiter_t* cur, * prev, * next;
264   group_commit_waiter_t* wakeup_list = nullptr;
265   int extra_wake = 0;
266 
267   for (prev= nullptr, cur= m_waiters_list; cur; cur= next)
268   {
269     next= cur->m_next;
270     if (cur->m_value <= num || extra_wake++ == 0)
271     {
272       /* Move current waiter to wakeup_list*/
273 
274       if (!prev)
275       {
276         /* Remove from the start of the list.*/
277         m_waiters_list = next;
278       }
279       else
280       {
281         /* Remove from the middle of the list.*/
282         prev->m_next= cur->m_next;
283       }
284 
285       /* Append entry to the wakeup list.*/
286       cur->m_next = wakeup_list;
287       wakeup_list = cur;
288     }
289     else
290     {
291       prev= cur;
292     }
293   }
294   lk.unlock();
295 
296   for (cur= wakeup_list; cur; cur= next)
297   {
298     next= cur->m_next;
299     cur->m_sema.wake();
300   }
301 }
302 
303 #ifndef DBUG_OFF
is_owner()304 bool group_commit_lock::is_owner()
305 {
306   return m_lock && std::this_thread::get_id() == m_owner_id;
307 }
308 #endif
309 
310