1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 /* -*- mode: C; c-basic-offset: 4 -*- */
4 #ident "$Id$"
5 /*======
6 This file is part of TokuDB
7 
8 
9 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
10 
11     TokuDBis is free software: you can redistribute it and/or modify
12     it under the terms of the GNU General Public License, version 2,
13     as published by the Free Software Foundation.
14 
15     TokuDB is distributed in the hope that it will be useful,
16     but WITHOUT ANY WARRANTY; without even the implied warranty of
17     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18     GNU General Public License for more details.
19 
20     You should have received a copy of the GNU General Public License
21     along with TokuDB.  If not, see <http://www.gnu.org/licenses/>.
22 
23 ======= */
24 
25 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
26 
27 #ifndef _TOKUDB_SYNC_H
28 #define _TOKUDB_SYNC_H
29 
30 #include "hatoku_defines.h"
31 #include "tokudb_debug.h"
32 #include "tokudb_time.h"
33 
34 namespace tokudb {
35 namespace thread {
36 
37 extern const pfs_key_t pfs_not_instrumented;
38 
39 uint my_tid(void);
40 
41 // Your basic mutex
42 class mutex_t {
43 public:
44     explicit mutex_t(pfs_key_t key);
mutex_t(void)45     mutex_t(void) : mutex_t(pfs_not_instrumented) {}
46     ~mutex_t(void);
47 
48     void reinit(pfs_key_t key);
49     void lock(
50 #if defined(SAFE_MUTEX) || defined(HAVE_PSI_MUTEX_INTERFACE)
51         const char* src_file,
52         uint src_line
53 #endif  // SAFE_MUTEX || HAVE_PSI_MUTEX_INTERFACE
54         );
55     void unlock(
56 #if defined(SAFE_MUTEX)
57         const char* src_file,
58         uint src_line
59 #endif  // SAFE_MUTEX
60         );
61 #ifdef TOKUDB_DEBUG
62     bool is_owned_by_me(void) const;
63 #endif
64 private:
65     static pthread_t _null_owner;
66     mysql_mutex_t _mutex;
67 #ifdef TOKUDB_DEBUG
68     uint _owners;
69     pthread_t _owner;
70 #endif
71 };
72 
73 // Simple read write lock
74 class rwlock_t {
75 public:
76     explicit rwlock_t(pfs_key_t key);
rwlock_t(void)77     rwlock_t(void) : rwlock_t(pfs_not_instrumented) {}
78     ~rwlock_t(void);
79 
80     void lock_read(
81 #ifdef HAVE_PSI_RWLOCK_INTERFACE
82         const char* src_file,
83         uint src_line
84 #endif  // HAVE_PSI_RWLOCK_INTERFACE
85         );
86     void lock_write(
87 #ifdef HAVE_PSI_RWLOCK_INTERFACE
88         const char* src_file,
89         uint src_line
90 #endif  // HAVE_PSI_RWLOCK_INTERFACE
91         );
92     void unlock(void);
93 
94 private:
95     rwlock_t(const rwlock_t&);
96     rwlock_t& operator=(const rwlock_t&);
97 
98     mysql_rwlock_t _rwlock;
99 };
100 
101 // Simple event signal/wait class
102 class event_t {
103 public:
104     // create_signalled - create the event in a signalled state
105     // manual_reset - create an event that must be manually reset
106     // after signaling
107     event_t(
108         bool create_signalled = false,
109         bool manual_reset = false);
110     ~event_t(void);
111 
112     // wait for the event to become signalled
113     void wait(void);
114 
115     // signal the event
116     void signal(void);
117 
118     // pulse the event (signal and free exactly one waiter)
119     void pulse(void);
120 
121     // is the event currently signalled
122     bool signalled(void);
123 
124     // unsignal/clear the event
125     void reset(void);
126 
127 private:
128     event_t(const event_t&);
129     event_t& operator=(const event_t&);
130 
131     pthread_mutex_t		_mutex;
132     pthread_cond_t		_cond;
133     bool			_signalled;
134     bool			_pulsed;
135     bool			_manual_reset;
136 };
137 
138 // Semaphore signal/wait class
139 class semaphore_t {
140 public:
141     // initial_count - the initial signal count of the semaphore
142     // max_count - the maximum signal count for the semaphore.
143     semaphore_t(int initial_count, int max_count);
144     ~semaphore_t(void);
145 
146     enum E_WAIT {
147         E_SIGNALLED = 0,
148         E_INTERRUPTED = 1,
149         E_TIMEDOUT = 2
150     };
151 
152     // wait for the semaphore to become signalled
153     E_WAIT wait(void);
154 
155     // signal the semaphore to increase the count
156     // return true if signalled, false if ignored due to count
157     bool signal(void);
158 
159     // what is the semaphore signal count
160     int signalled(void);
161 
162     // unsignal a signalled semaphore
163     void reset(void);
164 
165     // set to interrupt any waiters, as long as is set,
166     // waiters will return immediately with E_INTERRUPTED.
167     // the semaphore signal count and tracking will continue
168     // accepting signals and leave the signalled state intact
169     void set_interrupt(void);
170     void clear_interrupt(void);
171 
172 private:
173     semaphore_t(const semaphore_t&);
174     semaphore_t& operator=(const semaphore_t&);
175 
176     pthread_mutex_t		_mutex;
177     pthread_cond_t		_cond;
178     bool			_interrupted;
179     int			_signalled;
180     int			_initial_count;
181     int			_max_count;
182 };
183 
184 // Thread class
185 class thread_t {
186 public:
187     thread_t(void);
188     ~thread_t(void);
189 
190     int start(void* (*pfn)(void*), void* arg);
191     int join(void** value_ptr);
192     int detach(void);
193 
194 private:
195     pthread_t   _thread;
196 };
197 
my_tid(void)198 inline uint my_tid(void) { return (uint)toku_os_gettid(); }
199 
mutex_t(pfs_key_t key)200 inline mutex_t::mutex_t(pfs_key_t key) {
201 #ifdef TOKUDB_DEBUG
202     _owners = 0;
203     _owner = _null_owner;
204 #endif
205     int  r MY_ATTRIBUTE((unused)) = mysql_mutex_init(key, &_mutex, MY_MUTEX_INIT_FAST);
206     assert_debug(r == 0);
207 }
~mutex_t()208 inline mutex_t::~mutex_t() {
209 #ifdef TOKUDB_DEBUG
210     assert_debug(_owners == 0);
211 #endif
212     int  r MY_ATTRIBUTE((unused)) = mysql_mutex_destroy(&_mutex);
213     assert_debug(r == 0);
214 }
reinit(pfs_key_t key)215 inline void mutex_t::reinit(pfs_key_t key) {
216 #ifdef TOKUDB_DEBUG
217     assert_debug(_owners == 0);
218 #endif
219     int  r MY_ATTRIBUTE((unused));
220     r = mysql_mutex_destroy(&_mutex);
221     assert_debug(r == 0);
222     r = mysql_mutex_init(key, &_mutex, MY_MUTEX_INIT_FAST);
223     assert_debug(r == 0);
224 }
lock(const char * src_file,uint src_line)225 inline void mutex_t::lock(
226 #if defined(SAFE_MUTEX) || defined(HAVE_PSI_MUTEX_INTERFACE)
227     const char* src_file,
228     uint src_line
229 #endif  // SAFE_MUTEX || HAVE_PSI_MUTEX_INTERFACE
230     ) {
231     assert_debug(is_owned_by_me() == false);
232     int r MY_ATTRIBUTE((unused)) = inline_mysql_mutex_lock(&_mutex
233 #if defined(SAFE_MUTEX) || defined(HAVE_PSI_MUTEX_INTERFACE)
234                                     ,
235                                     src_file,
236                                     src_line
237 #endif  // SAFE_MUTEX || HAVE_PSI_MUTEX_INTERFACE
238                                     );
239     assert_debug(r == 0);
240 #ifdef TOKUDB_DEBUG
241     _owners++;
242     _owner = pthread_self();
243 #endif
244 }
unlock(const char * src_file,uint src_line)245 inline void mutex_t::unlock(
246 #if defined(SAFE_MUTEX)
247     const char* src_file,
248     uint src_line
249 #endif  // SAFE_MUTEX
250     ) {
251 #ifdef TOKUDB_DEBUG
252     assert_debug(_owners > 0);
253     assert_debug(is_owned_by_me());
254     _owners--;
255     _owner = _null_owner;
256 #endif
257     int r MY_ATTRIBUTE((unused)) = inline_mysql_mutex_unlock(&_mutex
258 #if defined(SAFE_MUTEX)
259                                       ,
260                                       src_file,
261                                       src_line
262 #endif  // SAFE_MUTEX
263                                       );
264     assert_debug(r == 0);
265 }
266 #ifdef TOKUDB_DEBUG
is_owned_by_me(void)267 inline bool mutex_t::is_owned_by_me(void) const {
268     return pthread_equal(pthread_self(), _owner) != 0 ? true : false;
269 }
270 #endif
271 
rwlock_t(pfs_key_t key)272 inline rwlock_t::rwlock_t(pfs_key_t key) {
273     int r MY_ATTRIBUTE((unused)) = mysql_rwlock_init(key, &_rwlock);
274     assert_debug(r == 0);
275 }
~rwlock_t()276 inline rwlock_t::~rwlock_t() {
277     int r MY_ATTRIBUTE((unused)) = mysql_rwlock_destroy(&_rwlock);
278     assert_debug(r == 0);
279 }
lock_read(const char * src_file,uint src_line)280 inline void rwlock_t::lock_read(
281 #ifdef HAVE_PSI_RWLOCK_INTERFACE
282     const char* src_file,
283     uint src_line
284 #endif  // HAVE_PSI_RWLOCK_INTERFACE
285     ) {
286     int r;
287     while ((r = inline_mysql_rwlock_rdlock(&_rwlock
288 #ifdef HAVE_PSI_RWLOCK_INTERFACE
289                                            ,
290                                            src_file,
291                                            src_line
292 #endif  // HAVE_PSI_RWLOCK_INTERFACE
293                                            )) != 0) {
294         if (r == EBUSY || r == EAGAIN) {
295             time::sleep_microsec(1000);
296             continue;
297         }
298         break;
299     }
300     assert_debug(r == 0);
301 }
lock_write(const char * src_file,uint src_line)302 inline void rwlock_t::lock_write(
303 #ifdef HAVE_PSI_RWLOCK_INTERFACE
304     const char* src_file,
305     uint src_line
306 #endif  // HAVE_PSI_RWLOCK_INTERFACE
307     ) {
308     int r;
309     while ((r = inline_mysql_rwlock_wrlock(&_rwlock
310 #ifdef HAVE_PSI_RWLOCK_INTERFACE
311                                            ,
312                                            src_file,
313                                            src_line
314 #endif  // HAVE_PSI_RWLOCK_INTERFACE
315                                            )) != 0) {
316         if (r == EBUSY || r == EAGAIN) {
317             time::sleep_microsec(1000);
318             continue;
319         }
320         break;
321     }
322     assert_debug(r == 0);
323 }
unlock(void)324 inline void rwlock_t::unlock(void) {
325     int r MY_ATTRIBUTE((unused)) = mysql_rwlock_unlock(&_rwlock);
326     assert_debug(r == 0);
327 }
rwlock_t(const rwlock_t &)328 inline rwlock_t::rwlock_t(const rwlock_t&) {}
329 inline rwlock_t& rwlock_t::operator=(const rwlock_t&) {
330     return *this;
331 }
332 
333 
event_t(bool create_signalled,bool manual_reset)334 inline event_t::event_t(bool create_signalled, bool manual_reset) :
335     _manual_reset(manual_reset) {
336 
337     int r MY_ATTRIBUTE((unused)) = pthread_mutex_init(&_mutex, NULL);
338     assert_debug(r == 0);
339     r = pthread_cond_init(&_cond, NULL);
340     assert_debug(r == 0);
341     if (create_signalled) {
342         _signalled = true;
343     } else {
344         _signalled = false;
345     }
346     _pulsed = false;
347 }
~event_t(void)348 inline event_t::~event_t(void) {
349     int r MY_ATTRIBUTE((unused)) = pthread_mutex_destroy(&_mutex);
350     assert_debug(r == 0);
351     r = pthread_cond_destroy(&_cond);
352     assert_debug(r == 0);
353 }
wait(void)354 inline void event_t::wait(void) {
355     int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex);
356     assert_debug(r == 0);
357     while (_signalled == false && _pulsed == false) {
358         r = pthread_cond_wait(&_cond, &_mutex);
359         assert_debug(r == 0);
360     }
361     if (_manual_reset == false)
362         _signalled = false;
363     if (_pulsed)
364         _pulsed = false;
365     r = pthread_mutex_unlock(&_mutex);
366     assert_debug(r == 0);
367     return;
368 }
signal(void)369 inline void event_t::signal(void) {
370     int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex);
371     assert_debug(r == 0);
372     _signalled = true;
373     if (_manual_reset) {
374         r = pthread_cond_broadcast(&_cond);
375         assert_debug(r == 0);
376     } else {
377         r = pthread_cond_signal(&_cond);
378         assert_debug(r == 0);
379     }
380     r = pthread_mutex_unlock(&_mutex);
381     assert_debug(r == 0);
382 }
pulse(void)383 inline void event_t::pulse(void) {
384     int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex);
385     assert_debug(r == 0);
386     _pulsed = true;
387     r = pthread_cond_signal(&_cond);
388     assert_debug(r == 0);
389     r = pthread_mutex_unlock(&_mutex);
390     assert_debug(r == 0);
391 }
signalled(void)392 inline bool event_t::signalled(void) {
393     bool ret = false;
394     int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex);
395     assert_debug(r == 0);
396     ret = _signalled;
397     r = pthread_mutex_unlock(&_mutex);
398     assert_debug(r == 0);
399     return ret;
400 }
reset(void)401 inline void event_t::reset(void) {
402     int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex);
403     assert_debug(r == 0);
404     _signalled = false;
405     _pulsed = false;
406     r = pthread_mutex_unlock(&_mutex);
407     assert_debug(r == 0);
408     return;
409 }
event_t(const event_t &)410 inline event_t::event_t(const event_t&) {
411 }
412 inline event_t& event_t::operator=(const event_t&) {
413     return *this;
414 }
415 
416 
semaphore_t(int initial_count,int max_count)417 inline semaphore_t::semaphore_t(
418     int initial_count,
419     int max_count) :
420     _interrupted(false),
421     _initial_count(initial_count),
422     _max_count(max_count) {
423 
424     int r MY_ATTRIBUTE((unused)) = pthread_mutex_init(&_mutex, NULL);
425     assert_debug(r == 0);
426     r = pthread_cond_init(&_cond, NULL);
427     assert_debug(r == 0);
428     _signalled = _initial_count;
429 }
~semaphore_t(void)430 inline semaphore_t::~semaphore_t(void) {
431     int r MY_ATTRIBUTE((unused)) = pthread_mutex_destroy(&_mutex);
432     assert_debug(r == 0);
433     r = pthread_cond_destroy(&_cond);
434     assert_debug(r == 0);
435 }
wait(void)436 inline semaphore_t::E_WAIT semaphore_t::wait(void) {
437     E_WAIT ret;
438     int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex);
439     assert_debug(r == 0);
440     while (_signalled == 0 && _interrupted == false) {
441         r = pthread_cond_wait(&_cond, &_mutex);
442         assert_debug(r == 0);
443     }
444     if (_interrupted) {
445         ret = E_INTERRUPTED;
446     } else {
447         _signalled--;
448         ret = E_SIGNALLED;
449     }
450     r = pthread_mutex_unlock(&_mutex);
451     assert_debug(r == 0);
452     return ret;
453 }
signal(void)454 inline bool semaphore_t::signal(void) {
455     bool ret = false;
456     int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex);
457     assert_debug(r == 0);
458     if (_signalled < _max_count) {
459         _signalled++;
460         ret = true;
461     }
462     r = pthread_cond_signal(&_cond);
463     assert_debug(r == 0);
464     r = pthread_mutex_unlock(&_mutex);
465     assert_debug(r == 0);
466     return ret;
467 }
signalled(void)468 inline int semaphore_t::signalled(void) {
469     int ret = 0;
470     int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex);
471     assert_debug(r == 0);
472     ret = _signalled;
473     r = pthread_mutex_unlock(&_mutex);
474     assert_debug(r == 0);
475     return ret;
476 }
reset(void)477 inline void semaphore_t::reset(void) {
478     int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex);
479     assert_debug(r == 0);
480     _signalled = 0;
481     r = pthread_mutex_unlock(&_mutex);
482     assert_debug(r == 0);
483     return;
484 }
set_interrupt(void)485 inline void semaphore_t::set_interrupt(void) {
486     int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex);
487     assert_debug(r == 0);
488     _interrupted = true;
489     r = pthread_cond_broadcast(&_cond);
490     assert_debug(r == 0);
491     r = pthread_mutex_unlock(&_mutex);
492     assert_debug(r == 0);
493 }
clear_interrupt(void)494 inline void semaphore_t::clear_interrupt(void) {
495     int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex);
496     assert_debug(r == 0);
497     _interrupted = false;
498     r = pthread_mutex_unlock(&_mutex);
499     assert_debug(r == 0);
500 }
semaphore_t(const semaphore_t &)501 inline semaphore_t::semaphore_t(const semaphore_t&) {
502 }
503 inline semaphore_t& semaphore_t::operator=(const semaphore_t&) {
504     return *this;
505 }
506 
507 
thread_t(void)508 inline thread_t::thread_t(void) : _thread(0) {
509 }
~thread_t(void)510 inline thread_t::~thread_t(void) {
511 }
start(void * (* pfn)(void *),void * arg)512 inline int thread_t::start(void*(*pfn)(void*), void* arg) {
513     return pthread_create(&_thread, NULL, pfn, arg);
514 }
join(void ** value_ptr)515 inline int thread_t::join(void** value_ptr) {
516     return pthread_join(_thread, value_ptr);
517 }
detach(void)518 inline int thread_t::detach(void) {
519     return pthread_detach(_thread);
520 }
521 
522 } // namespace thread
523 } // namespace tokudb
524 
525 
526 #endif // _TOKUDB_SYNC_H
527