1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 /* -*- mode: C; c-basic-offset: 4 -*- */
4 #ident "$Id$"
5 /*======
6 This file is part of TokuDB
7 
8 
9 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
10 
11     TokuDB is free software: you can redistribute it and/or modify
12     it under the terms of the GNU General Public License, version 2,
13     as published by the Free Software Foundation.
14 
15     TokuDB is distributed in the hope that it will be useful,
16     but WITHOUT ANY WARRANTY; without even the implied warranty of
17     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18     GNU General Public License for more details.
19 
20     You should have received a copy of the GNU General Public License
21     along with TokuDB.  If not, see <http://www.gnu.org/licenses/>.
22 
23 ======= */
24 
25 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
26 
27 #include "tokudb_background.h"
28 #include "tokudb_sysvars.h"
29 
30 namespace tokudb {
31 namespace background {
32 
33 
34 std::atomic<uint64_t> job_manager_t::job_t::_next_id(1);
35 
36 job_manager_t::job_t::job_t(bool user_scheduled) :
37     _running(false),
38     _cancelled(false),
39     _id(_next_id++),
40     _user_scheduled(user_scheduled),
41     _scheduled_time(::time(0)),
42     _started_time(0) {
43 }
44 job_manager_t::job_t::~job_t() {
45 }
46 void* job_manager_t::operator new(size_t sz) {
47     return tokudb::memory::malloc(sz, MYF(MY_WME|MY_ZEROFILL|MY_FAE));
48 }
49 void job_manager_t::operator delete(void* p) {
50     tokudb::memory::free(p);
51 }
52 job_manager_t::job_manager_t() :
53     _sem(0, 65535),
54     _shutdown(false) {
55 }
56 job_manager_t::~job_manager_t() {
57 }
58 void job_manager_t::initialize() {
59     int r = _thread.start(thread_func, this);
60     assert_always(r == 0);
61 }
62 void job_manager_t::destroy() {
63     assert_always(!_shutdown);
64     assert_always(_foreground_jobs.size() == 0);
65     _shutdown = true;
66     _sem.set_interrupt();
67 
68     while (_background_jobs.size()) {
69         mutex_t_lock(_mutex);
70         job_t* job = _background_jobs.front();
71         if (!job->cancelled())
72             cancel(job);
73         _background_jobs.pop_front();
74         delete job;
75         mutex_t_unlock(_mutex);
76     }
77 
78     void* result;
79     int r = _thread.join(&result);
80     assert_always(r == 0);
81 }
82 bool job_manager_t::run_job(job_t* newjob, bool background) {
83     bool ret = false;
84     const char* jobkey = newjob->key();
85 
86     mutex_t_lock(_mutex);
87     assert_always(!_shutdown);
88 
89     for (jobs_t::iterator it = _background_jobs.begin();
90          it != _background_jobs.end();
91          it++) {
92         job_t* job = *it;
93         if (!job->cancelled() && strcmp(job->key(), jobkey) == 0) {
94             // if this is a foreground job being run and
95             // there is an existing background job of the same type
96             // and it is not running yet, we can cancel the background job
97             // and just run this one in the foreground, might have different
98             // params, but that is up to the user to figure out.
99             if (!background && !job->running()) {
100                 job->cancel();
101             } else {
102                 // can't schedule or run another job on the same key
103                 goto cleanup;
104             }
105         }
106     }
107     for (jobs_t::iterator it = _foreground_jobs.begin();
108          it != _foreground_jobs.end();
109          it++) {
110         job_t* job = *it;
111         if (strcmp(job->key(), jobkey) == 0) {
112             // can't schedule or run another job on the same key
113             // as an existing foreground job
114             goto cleanup;
115         }
116     }
117 
118     if (background) {
119         _background_jobs.push_back(newjob);
120         _sem.signal();
121         ret = true;
122     } else {
123         _foreground_jobs.push_back(newjob);
124 
125         run(newjob);
126 
127         for (jobs_t::iterator it = _foreground_jobs.begin();
128              it != _foreground_jobs.end();
129              it++) {
130             job_t* job = *it;
131             if (job == newjob) {
132                 _foreground_jobs.erase(it);
133                 delete job;
134                 break;
135             }
136         }
137         ret = true;
138     }
139 
140 cleanup:
141     mutex_t_unlock(_mutex);
142     return ret;
143 }
144 bool job_manager_t::cancel_job(const char* key) {
145     bool ret = false;
146     mutex_t_lock(_mutex);
147 
148     for (jobs_t::iterator it = _background_jobs.begin();
149          it != _background_jobs.end();
150          it++) {
151         job_t* job = *it;
152 
153         if (!job->cancelled() && strcmp(job->key(), key) == 0) {
154             cancel(job);
155             ret = true;
156         }
157     }
158 
159     mutex_t_unlock(_mutex);
160     return ret;
161 }
162 void job_manager_t::iterate_jobs(pfn_iterate_t callback, void* extra) const {
163     mutex_t_lock(_mutex);
164 
165     for (jobs_t::const_iterator it = _background_jobs.begin();
166          it != _background_jobs.end();
167          it++) {
168         job_t* job = *it;
169         if (!job->cancelled()) {
170             callback(job, extra);
171         }
172     }
173 
174     mutex_t_unlock(_mutex);
175 }
176 void* job_manager_t::thread_func(void* v) {
177     return ((tokudb::background::job_manager_t*)v)->real_thread_func();
178 }
179 void* job_manager_t::real_thread_func() {
180     while (_shutdown == false) {
181         tokudb::thread::semaphore_t::E_WAIT res = _sem.wait();
182         if (res == tokudb::thread::semaphore_t::E_INTERRUPTED || _shutdown) {
183                 break;
184         } else if (res == tokudb::thread::semaphore_t::E_SIGNALLED) {
185 #if defined(TOKUDB_DEBUG)
186             if (TOKUDB_UNLIKELY(
187                     tokudb::sysvars::debug_pause_background_job_manager)) {
188                 _sem.signal();
189                 tokudb::time::sleep_microsec(250000);
190                 continue;
191             }
192 #endif  // defined(TOKUDB_DEBUG)
193 
194             mutex_t_lock(_mutex);
195             assert_debug(_background_jobs.size() > 0);
196             job_t* job = _background_jobs.front();
197             run(job);
198             _background_jobs.pop_front();
199             mutex_t_unlock(_mutex);
200             delete job;
201         }
202     }
203     return NULL;
204 }
205 void job_manager_t::run(job_t* job) {
206     assert_debug(_mutex.is_owned_by_me());
207     if (!job->cancelled()) {
208         mutex_t_unlock(_mutex);
209         // do job
210         job->run();
211         // done job
212         mutex_t_lock(_mutex);
213     }
214     if (!job->cancelled()) {
215         job->destroy();
216     }
217 }
218 void job_manager_t::cancel(job_t* job) {
219     assert_debug(_mutex.is_owned_by_me());
220     assert_always(!job->cancelled());
221     job->cancel();
222 }
223 job_manager_t* _job_manager = NULL;
224 
225 bool initialize() {
226     assert_always(_job_manager == NULL);
227     _job_manager = new job_manager_t;
228     _job_manager->initialize();
229     return true;
230 }
231 bool destroy() {
232     _job_manager->destroy();
233     delete _job_manager;
234     _job_manager = NULL;
235     return true;
236 }
237 } // namespace background
238 } // namespace tokudb
239