1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 /* -*- mode: C; c-basic-offset: 4 -*- */
4 #ident "$Id$"
5 /*======
6 This file is part of TokuDB
7
8
9 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
10
11 TokuDBis is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License, version 2,
13 as published by the Free Software Foundation.
14
15 TokuDB is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with TokuDB. If not, see <http://www.gnu.org/licenses/>.
22
23 ======= */
24
25 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
26
27 #ifndef _TOKUDB_SYNC_H
28 #define _TOKUDB_SYNC_H
29
30 #include "hatoku_defines.h"
31 #include "tokudb_debug.h"
32 #include "tokudb_time.h"
33
34 namespace tokudb {
35 namespace thread {
36
37 extern const pfs_key_t pfs_not_instrumented;
38
39 uint my_tid(void);
40
41 // Your basic mutex
42 class mutex_t {
43 public:
44 explicit mutex_t(pfs_key_t key);
mutex_t(void)45 mutex_t(void) : mutex_t(pfs_not_instrumented) {}
46 ~mutex_t(void);
47
48 void reinit(pfs_key_t key);
49 void lock(
50 #if defined(SAFE_MUTEX) || defined(HAVE_PSI_MUTEX_INTERFACE)
51 const char* src_file,
52 uint src_line
53 #endif // SAFE_MUTEX || HAVE_PSI_MUTEX_INTERFACE
54 );
55 void unlock(
56 #if defined(SAFE_MUTEX)
57 const char* src_file,
58 uint src_line
59 #endif // SAFE_MUTEX
60 );
61 #ifdef TOKUDB_DEBUG
62 bool is_owned_by_me(void) const;
63 #endif
64 private:
65 static pthread_t _null_owner;
66 mysql_mutex_t _mutex;
67 #ifdef TOKUDB_DEBUG
68 uint _owners;
69 pthread_t _owner;
70 #endif
71 };
72
73 // Simple read write lock
74 class rwlock_t {
75 public:
76 explicit rwlock_t(pfs_key_t key);
rwlock_t(void)77 rwlock_t(void) : rwlock_t(pfs_not_instrumented) {}
78 ~rwlock_t(void);
79
80 void lock_read(
81 #ifdef HAVE_PSI_RWLOCK_INTERFACE
82 const char* src_file,
83 uint src_line
84 #endif // HAVE_PSI_RWLOCK_INTERFACE
85 );
86 void lock_write(
87 #ifdef HAVE_PSI_RWLOCK_INTERFACE
88 const char* src_file,
89 uint src_line
90 #endif // HAVE_PSI_RWLOCK_INTERFACE
91 );
92 void unlock(void);
93
94 private:
95 rwlock_t(const rwlock_t&);
96 rwlock_t& operator=(const rwlock_t&);
97
98 mysql_rwlock_t _rwlock;
99 };
100
101 // Simple event signal/wait class
102 class event_t {
103 public:
104 // create_signalled - create the event in a signalled state
105 // manual_reset - create an event that must be manually reset
106 // after signaling
107 event_t(
108 bool create_signalled = false,
109 bool manual_reset = false);
110 ~event_t(void);
111
112 // wait for the event to become signalled
113 void wait(void);
114
115 // signal the event
116 void signal(void);
117
118 // pulse the event (signal and free exactly one waiter)
119 void pulse(void);
120
121 // is the event currently signalled
122 bool signalled(void);
123
124 // unsignal/clear the event
125 void reset(void);
126
127 private:
128 event_t(const event_t&);
129 event_t& operator=(const event_t&);
130
131 pthread_mutex_t _mutex;
132 pthread_cond_t _cond;
133 bool _signalled;
134 bool _pulsed;
135 bool _manual_reset;
136 };
137
138 // Semaphore signal/wait class
139 class semaphore_t {
140 public:
141 // initial_count - the initial signal count of the semaphore
142 // max_count - the maximum signal count for the semaphore.
143 semaphore_t(int initial_count, int max_count);
144 ~semaphore_t(void);
145
146 enum E_WAIT {
147 E_SIGNALLED = 0,
148 E_INTERRUPTED = 1,
149 E_TIMEDOUT = 2
150 };
151
152 // wait for the semaphore to become signalled
153 E_WAIT wait(void);
154
155 // signal the semaphore to increase the count
156 // return true if signalled, false if ignored due to count
157 bool signal(void);
158
159 // what is the semaphore signal count
160 int signalled(void);
161
162 // unsignal a signalled semaphore
163 void reset(void);
164
165 // set to interrupt any waiters, as long as is set,
166 // waiters will return immediately with E_INTERRUPTED.
167 // the semaphore signal count and tracking will continue
168 // accepting signals and leave the signalled state intact
169 void set_interrupt(void);
170 void clear_interrupt(void);
171
172 private:
173 semaphore_t(const semaphore_t&);
174 semaphore_t& operator=(const semaphore_t&);
175
176 pthread_mutex_t _mutex;
177 pthread_cond_t _cond;
178 bool _interrupted;
179 int _signalled;
180 int _initial_count;
181 int _max_count;
182 };
183
184 // Thread class
185 class thread_t {
186 public:
187 thread_t(void);
188 ~thread_t(void);
189
190 int start(void* (*pfn)(void*), void* arg);
191 int join(void** value_ptr);
192 int detach(void);
193
194 private:
195 pthread_t _thread;
196 };
197
my_tid(void)198 inline uint my_tid(void) { return (uint)toku_os_gettid(); }
199
mutex_t(pfs_key_t key)200 inline mutex_t::mutex_t(pfs_key_t key) {
201 #ifdef TOKUDB_DEBUG
202 _owners = 0;
203 _owner = _null_owner;
204 #endif
205 int r MY_ATTRIBUTE((unused)) = mysql_mutex_init(key, &_mutex, MY_MUTEX_INIT_FAST);
206 assert_debug(r == 0);
207 }
~mutex_t()208 inline mutex_t::~mutex_t() {
209 #ifdef TOKUDB_DEBUG
210 assert_debug(_owners == 0);
211 #endif
212 int r MY_ATTRIBUTE((unused)) = mysql_mutex_destroy(&_mutex);
213 assert_debug(r == 0);
214 }
reinit(pfs_key_t key)215 inline void mutex_t::reinit(pfs_key_t key) {
216 #ifdef TOKUDB_DEBUG
217 assert_debug(_owners == 0);
218 #endif
219 int r MY_ATTRIBUTE((unused));
220 r = mysql_mutex_destroy(&_mutex);
221 assert_debug(r == 0);
222 r = mysql_mutex_init(key, &_mutex, MY_MUTEX_INIT_FAST);
223 assert_debug(r == 0);
224 }
lock(const char * src_file,uint src_line)225 inline void mutex_t::lock(
226 #if defined(SAFE_MUTEX) || defined(HAVE_PSI_MUTEX_INTERFACE)
227 const char* src_file,
228 uint src_line
229 #endif // SAFE_MUTEX || HAVE_PSI_MUTEX_INTERFACE
230 ) {
231 assert_debug(is_owned_by_me() == false);
232 int r MY_ATTRIBUTE((unused)) = inline_mysql_mutex_lock(&_mutex
233 #if defined(SAFE_MUTEX) || defined(HAVE_PSI_MUTEX_INTERFACE)
234 ,
235 src_file,
236 src_line
237 #endif // SAFE_MUTEX || HAVE_PSI_MUTEX_INTERFACE
238 );
239 assert_debug(r == 0);
240 #ifdef TOKUDB_DEBUG
241 _owners++;
242 _owner = pthread_self();
243 #endif
244 }
unlock(const char * src_file,uint src_line)245 inline void mutex_t::unlock(
246 #if defined(SAFE_MUTEX)
247 const char* src_file,
248 uint src_line
249 #endif // SAFE_MUTEX
250 ) {
251 #ifdef TOKUDB_DEBUG
252 assert_debug(_owners > 0);
253 assert_debug(is_owned_by_me());
254 _owners--;
255 _owner = _null_owner;
256 #endif
257 int r MY_ATTRIBUTE((unused)) = inline_mysql_mutex_unlock(&_mutex
258 #if defined(SAFE_MUTEX)
259 ,
260 src_file,
261 src_line
262 #endif // SAFE_MUTEX
263 );
264 assert_debug(r == 0);
265 }
266 #ifdef TOKUDB_DEBUG
is_owned_by_me(void)267 inline bool mutex_t::is_owned_by_me(void) const {
268 return pthread_equal(pthread_self(), _owner) != 0 ? true : false;
269 }
270 #endif
271
rwlock_t(pfs_key_t key)272 inline rwlock_t::rwlock_t(pfs_key_t key) {
273 int r MY_ATTRIBUTE((unused)) = mysql_rwlock_init(key, &_rwlock);
274 assert_debug(r == 0);
275 }
~rwlock_t()276 inline rwlock_t::~rwlock_t() {
277 int r MY_ATTRIBUTE((unused)) = mysql_rwlock_destroy(&_rwlock);
278 assert_debug(r == 0);
279 }
lock_read(const char * src_file,uint src_line)280 inline void rwlock_t::lock_read(
281 #ifdef HAVE_PSI_RWLOCK_INTERFACE
282 const char* src_file,
283 uint src_line
284 #endif // HAVE_PSI_RWLOCK_INTERFACE
285 ) {
286 int r;
287 while ((r = inline_mysql_rwlock_rdlock(&_rwlock
288 #ifdef HAVE_PSI_RWLOCK_INTERFACE
289 ,
290 src_file,
291 src_line
292 #endif // HAVE_PSI_RWLOCK_INTERFACE
293 )) != 0) {
294 if (r == EBUSY || r == EAGAIN) {
295 time::sleep_microsec(1000);
296 continue;
297 }
298 break;
299 }
300 assert_debug(r == 0);
301 }
lock_write(const char * src_file,uint src_line)302 inline void rwlock_t::lock_write(
303 #ifdef HAVE_PSI_RWLOCK_INTERFACE
304 const char* src_file,
305 uint src_line
306 #endif // HAVE_PSI_RWLOCK_INTERFACE
307 ) {
308 int r;
309 while ((r = inline_mysql_rwlock_wrlock(&_rwlock
310 #ifdef HAVE_PSI_RWLOCK_INTERFACE
311 ,
312 src_file,
313 src_line
314 #endif // HAVE_PSI_RWLOCK_INTERFACE
315 )) != 0) {
316 if (r == EBUSY || r == EAGAIN) {
317 time::sleep_microsec(1000);
318 continue;
319 }
320 break;
321 }
322 assert_debug(r == 0);
323 }
unlock(void)324 inline void rwlock_t::unlock(void) {
325 int r MY_ATTRIBUTE((unused)) = mysql_rwlock_unlock(&_rwlock);
326 assert_debug(r == 0);
327 }
rwlock_t(const rwlock_t &)328 inline rwlock_t::rwlock_t(const rwlock_t&) {}
329 inline rwlock_t& rwlock_t::operator=(const rwlock_t&) {
330 return *this;
331 }
332
333
event_t(bool create_signalled,bool manual_reset)334 inline event_t::event_t(bool create_signalled, bool manual_reset) :
335 _manual_reset(manual_reset) {
336
337 int r MY_ATTRIBUTE((unused)) = pthread_mutex_init(&_mutex, NULL);
338 assert_debug(r == 0);
339 r = pthread_cond_init(&_cond, NULL);
340 assert_debug(r == 0);
341 if (create_signalled) {
342 _signalled = true;
343 } else {
344 _signalled = false;
345 }
346 _pulsed = false;
347 }
~event_t(void)348 inline event_t::~event_t(void) {
349 int r MY_ATTRIBUTE((unused)) = pthread_mutex_destroy(&_mutex);
350 assert_debug(r == 0);
351 r = pthread_cond_destroy(&_cond);
352 assert_debug(r == 0);
353 }
wait(void)354 inline void event_t::wait(void) {
355 int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex);
356 assert_debug(r == 0);
357 while (_signalled == false && _pulsed == false) {
358 r = pthread_cond_wait(&_cond, &_mutex);
359 assert_debug(r == 0);
360 }
361 if (_manual_reset == false)
362 _signalled = false;
363 if (_pulsed)
364 _pulsed = false;
365 r = pthread_mutex_unlock(&_mutex);
366 assert_debug(r == 0);
367 return;
368 }
signal(void)369 inline void event_t::signal(void) {
370 int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex);
371 assert_debug(r == 0);
372 _signalled = true;
373 if (_manual_reset) {
374 r = pthread_cond_broadcast(&_cond);
375 assert_debug(r == 0);
376 } else {
377 r = pthread_cond_signal(&_cond);
378 assert_debug(r == 0);
379 }
380 r = pthread_mutex_unlock(&_mutex);
381 assert_debug(r == 0);
382 }
pulse(void)383 inline void event_t::pulse(void) {
384 int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex);
385 assert_debug(r == 0);
386 _pulsed = true;
387 r = pthread_cond_signal(&_cond);
388 assert_debug(r == 0);
389 r = pthread_mutex_unlock(&_mutex);
390 assert_debug(r == 0);
391 }
signalled(void)392 inline bool event_t::signalled(void) {
393 bool ret = false;
394 int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex);
395 assert_debug(r == 0);
396 ret = _signalled;
397 r = pthread_mutex_unlock(&_mutex);
398 assert_debug(r == 0);
399 return ret;
400 }
reset(void)401 inline void event_t::reset(void) {
402 int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex);
403 assert_debug(r == 0);
404 _signalled = false;
405 _pulsed = false;
406 r = pthread_mutex_unlock(&_mutex);
407 assert_debug(r == 0);
408 return;
409 }
event_t(const event_t &)410 inline event_t::event_t(const event_t&) {
411 }
412 inline event_t& event_t::operator=(const event_t&) {
413 return *this;
414 }
415
416
semaphore_t(int initial_count,int max_count)417 inline semaphore_t::semaphore_t(
418 int initial_count,
419 int max_count) :
420 _interrupted(false),
421 _initial_count(initial_count),
422 _max_count(max_count) {
423
424 int r MY_ATTRIBUTE((unused)) = pthread_mutex_init(&_mutex, NULL);
425 assert_debug(r == 0);
426 r = pthread_cond_init(&_cond, NULL);
427 assert_debug(r == 0);
428 _signalled = _initial_count;
429 }
~semaphore_t(void)430 inline semaphore_t::~semaphore_t(void) {
431 int r MY_ATTRIBUTE((unused)) = pthread_mutex_destroy(&_mutex);
432 assert_debug(r == 0);
433 r = pthread_cond_destroy(&_cond);
434 assert_debug(r == 0);
435 }
wait(void)436 inline semaphore_t::E_WAIT semaphore_t::wait(void) {
437 E_WAIT ret;
438 int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex);
439 assert_debug(r == 0);
440 while (_signalled == 0 && _interrupted == false) {
441 r = pthread_cond_wait(&_cond, &_mutex);
442 assert_debug(r == 0);
443 }
444 if (_interrupted) {
445 ret = E_INTERRUPTED;
446 } else {
447 _signalled--;
448 ret = E_SIGNALLED;
449 }
450 r = pthread_mutex_unlock(&_mutex);
451 assert_debug(r == 0);
452 return ret;
453 }
signal(void)454 inline bool semaphore_t::signal(void) {
455 bool ret = false;
456 int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex);
457 assert_debug(r == 0);
458 if (_signalled < _max_count) {
459 _signalled++;
460 ret = true;
461 }
462 r = pthread_cond_signal(&_cond);
463 assert_debug(r == 0);
464 r = pthread_mutex_unlock(&_mutex);
465 assert_debug(r == 0);
466 return ret;
467 }
signalled(void)468 inline int semaphore_t::signalled(void) {
469 int ret = 0;
470 int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex);
471 assert_debug(r == 0);
472 ret = _signalled;
473 r = pthread_mutex_unlock(&_mutex);
474 assert_debug(r == 0);
475 return ret;
476 }
reset(void)477 inline void semaphore_t::reset(void) {
478 int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex);
479 assert_debug(r == 0);
480 _signalled = 0;
481 r = pthread_mutex_unlock(&_mutex);
482 assert_debug(r == 0);
483 return;
484 }
set_interrupt(void)485 inline void semaphore_t::set_interrupt(void) {
486 int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex);
487 assert_debug(r == 0);
488 _interrupted = true;
489 r = pthread_cond_broadcast(&_cond);
490 assert_debug(r == 0);
491 r = pthread_mutex_unlock(&_mutex);
492 assert_debug(r == 0);
493 }
clear_interrupt(void)494 inline void semaphore_t::clear_interrupt(void) {
495 int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex);
496 assert_debug(r == 0);
497 _interrupted = false;
498 r = pthread_mutex_unlock(&_mutex);
499 assert_debug(r == 0);
500 }
semaphore_t(const semaphore_t &)501 inline semaphore_t::semaphore_t(const semaphore_t&) {
502 }
503 inline semaphore_t& semaphore_t::operator=(const semaphore_t&) {
504 return *this;
505 }
506
507
thread_t(void)508 inline thread_t::thread_t(void) : _thread(0) {
509 }
~thread_t(void)510 inline thread_t::~thread_t(void) {
511 }
start(void * (* pfn)(void *),void * arg)512 inline int thread_t::start(void*(*pfn)(void*), void* arg) {
513 return pthread_create(&_thread, NULL, pfn, arg);
514 }
join(void ** value_ptr)515 inline int thread_t::join(void** value_ptr) {
516 return pthread_join(_thread, value_ptr);
517 }
detach(void)518 inline int thread_t::detach(void) {
519 return pthread_detach(_thread);
520 }
521
522 } // namespace thread
523 } // namespace tokudb
524
525
526 #endif // _TOKUDB_SYNC_H
527