1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 /* -*- mode: C; c-basic-offset: 4 -*- */
4 #ident "$Id$"
5 /*======
6 This file is part of TokuDB
7
8
9 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
10
11 TokuDB is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License, version 2,
13 as published by the Free Software Foundation.
14
15 TokuDB is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with TokuDB. If not, see <http://www.gnu.org/licenses/>.
22
23 ======= */
24
25 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
26
27 #include "tokudb_background.h"
28 #include "tokudb_sysvars.h"
29
30 namespace tokudb {
31 namespace background {
32
33
34 std::atomic<uint64_t> job_manager_t::job_t::_next_id(1);
35
job_t(bool user_scheduled)36 job_manager_t::job_t::job_t(bool user_scheduled) :
37 _running(false),
38 _cancelled(false),
39 _id(_next_id++),
40 _user_scheduled(user_scheduled),
41 _scheduled_time(::time(0)),
42 _started_time(0) {
43 }
~job_t()44 job_manager_t::job_t::~job_t() {
45 }
operator new(size_t sz)46 void* job_manager_t::operator new(size_t sz) {
47 return tokudb::memory::malloc(sz, MYF(MY_WME|MY_ZEROFILL|MY_FAE));
48 }
operator delete(void * p)49 void job_manager_t::operator delete(void* p) {
50 tokudb::memory::free(p);
51 }
job_manager_t()52 job_manager_t::job_manager_t() :
53 _sem(0, 65535),
54 _shutdown(false) {
55 }
~job_manager_t()56 job_manager_t::~job_manager_t() {
57 }
initialize()58 void job_manager_t::initialize() {
59 int r = _thread.start(thread_func, this);
60 assert_always(r == 0);
61 }
destroy()62 void job_manager_t::destroy() {
63 assert_always(!_shutdown);
64 assert_always(_foreground_jobs.size() == 0);
65 _shutdown = true;
66 _sem.set_interrupt();
67
68 while (_background_jobs.size()) {
69 mutex_t_lock(_mutex);
70 job_t* job = _background_jobs.front();
71 if (!job->cancelled())
72 cancel(job);
73 _background_jobs.pop_front();
74 delete job;
75 mutex_t_unlock(_mutex);
76 }
77
78 void* result;
79 int r = _thread.join(&result);
80 assert_always(r == 0);
81 }
run_job(job_t * newjob,bool background)82 bool job_manager_t::run_job(job_t* newjob, bool background) {
83 bool ret = false;
84 const char* jobkey = newjob->key();
85
86 mutex_t_lock(_mutex);
87 assert_always(!_shutdown);
88
89 for (jobs_t::iterator it = _background_jobs.begin();
90 it != _background_jobs.end();
91 it++) {
92 job_t* job = *it;
93 if (!job->cancelled() && strcmp(job->key(), jobkey) == 0) {
94 // if this is a foreground job being run and
95 // there is an existing background job of the same type
96 // and it is not running yet, we can cancel the background job
97 // and just run this one in the foreground, might have different
98 // params, but that is up to the user to figure out.
99 if (!background && !job->running()) {
100 job->cancel();
101 } else {
102 // can't schedule or run another job on the same key
103 goto cleanup;
104 }
105 }
106 }
107 for (jobs_t::iterator it = _foreground_jobs.begin();
108 it != _foreground_jobs.end();
109 it++) {
110 job_t* job = *it;
111 if (strcmp(job->key(), jobkey) == 0) {
112 // can't schedule or run another job on the same key
113 // as an existing foreground job
114 goto cleanup;
115 }
116 }
117
118 if (background) {
119 _background_jobs.push_back(newjob);
120 _sem.signal();
121 ret = true;
122 } else {
123 _foreground_jobs.push_back(newjob);
124
125 run(newjob);
126
127 for (jobs_t::iterator it = _foreground_jobs.begin();
128 it != _foreground_jobs.end();
129 it++) {
130 job_t* job = *it;
131 if (job == newjob) {
132 _foreground_jobs.erase(it);
133 delete job;
134 break;
135 }
136 }
137 ret = true;
138 }
139
140 cleanup:
141 mutex_t_unlock(_mutex);
142 return ret;
143 }
cancel_job(const char * key)144 bool job_manager_t::cancel_job(const char* key) {
145 bool ret = false;
146 mutex_t_lock(_mutex);
147
148 for (jobs_t::iterator it = _background_jobs.begin();
149 it != _background_jobs.end();
150 it++) {
151 job_t* job = *it;
152
153 if (!job->cancelled() && strcmp(job->key(), key) == 0) {
154 cancel(job);
155 ret = true;
156 }
157 }
158
159 mutex_t_unlock(_mutex);
160 return ret;
161 }
iterate_jobs(pfn_iterate_t callback,void * extra) const162 void job_manager_t::iterate_jobs(pfn_iterate_t callback, void* extra) const {
163 mutex_t_lock(_mutex);
164
165 for (jobs_t::const_iterator it = _background_jobs.begin();
166 it != _background_jobs.end();
167 it++) {
168 job_t* job = *it;
169 if (!job->cancelled()) {
170 callback(job, extra);
171 }
172 }
173
174 mutex_t_unlock(_mutex);
175 }
thread_func(void * v)176 void* job_manager_t::thread_func(void* v) {
177 return ((tokudb::background::job_manager_t*)v)->real_thread_func();
178 }
real_thread_func()179 void* job_manager_t::real_thread_func() {
180 while (_shutdown == false) {
181 tokudb::thread::semaphore_t::E_WAIT res = _sem.wait();
182 if (res == tokudb::thread::semaphore_t::E_INTERRUPTED || _shutdown) {
183 break;
184 } else if (res == tokudb::thread::semaphore_t::E_SIGNALLED) {
185 #if defined(TOKUDB_DEBUG)
186 if (TOKUDB_UNLIKELY(
187 tokudb::sysvars::debug_pause_background_job_manager)) {
188 _sem.signal();
189 tokudb::time::sleep_microsec(250000);
190 continue;
191 }
192 #endif // defined(TOKUDB_DEBUG)
193
194 mutex_t_lock(_mutex);
195 assert_debug(_background_jobs.size() > 0);
196 job_t* job = _background_jobs.front();
197 run(job);
198 _background_jobs.pop_front();
199 mutex_t_unlock(_mutex);
200 delete job;
201 }
202 }
203 return NULL;
204 }
run(job_t * job)205 void job_manager_t::run(job_t* job) {
206 assert_debug(_mutex.is_owned_by_me());
207 if (!job->cancelled()) {
208 mutex_t_unlock(_mutex);
209 // do job
210 job->run();
211 // done job
212 mutex_t_lock(_mutex);
213 }
214 if (!job->cancelled()) {
215 job->destroy();
216 }
217 }
cancel(job_t * job)218 void job_manager_t::cancel(job_t* job) {
219 assert_debug(_mutex.is_owned_by_me());
220 assert_always(!job->cancelled());
221 job->cancel();
222 }
223 job_manager_t* _job_manager = NULL;
224
initialize()225 bool initialize() {
226 assert_always(_job_manager == NULL);
227 _job_manager = new job_manager_t;
228 _job_manager->initialize();
229 return true;
230 }
destroy()231 bool destroy() {
232 _job_manager->destroy();
233 delete _job_manager;
234 _job_manager = NULL;
235 return true;
236 }
237 } // namespace background
238 } // namespace tokudb
239