1 /*
2 Bacula(R) - The Network Backup Solution
3
4 Copyright (C) 2000-2020 Kern Sibbald
5
6 The original author of Bacula is Kern Sibbald, with contributions
7 from many others, a complete list can be found in the file AUTHORS.
8
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
13
14 This notice must be preserved when any source code is
15 conveyed and/or propagated.
16
17 Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20 * Bacula worker class. It permits creating a worker thread,
21 * then sending data via a fifo queue to it.
22 *
23 * Kern Sibbald, August 2014
24 *
25 */
26
27 #define LOCKMGR_COMPLIANT
28 #include "bacula.h"
29 #include "worker.h"
30
init(int fifo_size)31 int worker::init(int fifo_size)
32 {
33 int stat;
34
35 if ((stat = pthread_mutex_init(&mutex, NULL)) != 0) {
36 return stat;
37 }
38 if ((stat = pthread_mutex_init(&fmutex, NULL)) != 0) {
39 pthread_mutex_destroy(&mutex);
40 return stat;
41 }
42 if ((stat = pthread_cond_init(&full_wait, NULL)) != 0) {
43 pthread_mutex_destroy(&mutex);
44 pthread_mutex_destroy(&fmutex);
45 return stat;
46 }
47 if ((stat = pthread_cond_init(&empty_wait, NULL)) != 0) {
48 pthread_cond_destroy(&full_wait);
49 pthread_mutex_destroy(&mutex);
50 pthread_mutex_destroy(&fmutex);
51 return stat;
52 }
53 if ((stat = pthread_cond_init(&m_wait, NULL)) != 0) {
54 pthread_cond_destroy(&empty_wait);
55 pthread_cond_destroy(&full_wait);
56 pthread_mutex_destroy(&mutex);
57 pthread_mutex_destroy(&fmutex);
58 return stat;
59 }
60 valid = WORKER_VALID;
61 fifo = New(flist(fifo_size));
62 fpool = New(alist(fifo_size + 2, false));
63 worker_running = false;
64 set_wait_state();
65 return 0;
66 }
67
68 /*
69 * Handle cleanup when the lock is released.
70 */
worker_cleanup(void * arg)71 static void worker_cleanup(void *arg)
72 {
73 worker *wrk = (worker *)arg;
74 wrk->release_lock();
75 }
76
77
release_lock()78 void worker::release_lock()
79 {
80 pthread_mutex_unlock(&mutex);
81 }
82
83
set_wait_state()84 void worker::set_wait_state()
85 {
86 m_state = WORKER_WAIT;
87 }
88
set_run_state()89 void worker::set_run_state()
90 {
91 if (is_quit_state()) return;
92 m_state = WORKER_RUN;
93 if (worker_waiting) {
94 pthread_cond_signal(&m_wait);
95 }
96 }
97
set_quit_state()98 void worker::set_quit_state()
99 {
100 P(mutex);
101 m_state = WORKER_QUIT;
102 pthread_cond_signal(&m_wait);
103 pthread_cond_signal(&empty_wait);
104 V(mutex);
105 }
106
107
108 /* Empty the fifo putting in free pool */
discard_queue()109 void worker::discard_queue()
110 {
111 void *item;
112
113 P(mutex);
114 P(fmutex);
115 while ((item = fifo->dequeue())) {
116 fpool->push(item);
117 }
118 V(fmutex);
119 V(mutex);
120 }
121
122 /*
123 * Destroy a read/write lock
124 *
125 * Returns: 0 on success
126 * errno on failure
127 */
destroy()128 int worker::destroy()
129 {
130 int stat, stat1, stat2, stat3, stat4;
131 POOLMEM *item;
132
133 m_state = WORKER_QUIT;
134 pthread_cond_signal(&m_wait);
135 pthread_cond_signal(&empty_wait);
136
137 P(fmutex);
138 /* Release free pool */
139 while ((item = (POOLMEM *)fpool->pop())) {
140 free_pool_memory(item);
141 }
142 V(fmutex);
143 fpool->destroy();
144 free(fpool);
145
146 /* Release work queue */
147 while ((item = (POOLMEM *)fifo->dequeue())) {
148 free_pool_memory(item);
149 }
150 valid = 0;
151 worker_running = false;
152
153 fifo->destroy();
154 free(fifo);
155
156 stat = pthread_mutex_destroy(&mutex);
157 stat1 = pthread_mutex_destroy(&fmutex);
158 stat2 = pthread_cond_destroy(&full_wait);
159 stat3 = pthread_cond_destroy(&empty_wait);
160 stat4 = pthread_cond_destroy(&m_wait);
161 if (stat != 0) return stat;
162 if (stat1 != 0) return stat1;
163 if (stat2 != 0) return stat2;
164 if (stat3 != 0) return stat3;
165 if (stat4 != 0) return stat4;
166 return 0;
167 }
168
169
170 /* Start the worker thread */
start(void * (* auser_sub)(void *),void * auser_ctx)171 int worker::start(void *(*auser_sub)(void *), void *auser_ctx)
172 {
173 int stat;
174 int i;
175 if (valid != WORKER_VALID) {
176 return EINVAL;
177 }
178 user_sub = auser_sub;
179 user_ctx = auser_ctx;
180 if ((stat = pthread_create(&worker_id, NULL, user_sub, this) != 0)) {
181 return stat;
182 }
183 /* Wait for thread to start, but not too long */
184 for (i=0; i<100 && !is_running(); i++) {
185 bmicrosleep(0, 5000);
186 }
187 set_run_state();
188 return 0;
189 }
190
191 /* Wait for the worker thread to empty the queue */
wait_queue_empty()192 void worker::wait_queue_empty()
193 {
194 if (is_quit_state()) {
195 return;
196 }
197 P(mutex);
198 while (!empty() && !is_quit_state()) {
199 pthread_cond_wait(&empty_wait, &mutex);
200 }
201 V(mutex);
202 return;
203 }
204
205 /* Wait for the main thread to release us */
wait()206 void worker::wait()
207 {
208 P(mutex);
209 pthread_cleanup_push(worker_cleanup, (void *)this);
210 while (is_wait_state() && !is_quit_state()) {
211 worker_waiting = true;
212 pthread_cond_signal(&m_wait);
213 pthread_cond_wait(&m_wait, &mutex);
214 }
215 pthread_cleanup_pop(0);
216 worker_waiting = false;
217 V(mutex);
218 }
219
220 /* Stop the worker thread */
stop()221 int worker::stop()
222 {
223 if (valid != WORKER_VALID) {
224 return EINVAL;
225 }
226 m_state = WORKER_QUIT;
227 pthread_cond_signal(&m_wait);
228 pthread_cond_signal(&empty_wait);
229
230 if (!pthread_equal(worker_id, pthread_self())) {
231 pthread_kill(worker_id, SIGUSR2);
232 pthread_join(worker_id, NULL);
233 }
234 return 0;
235 }
236
237
238 /*
239 * Queue an item for the worker thread. Called by main thread.
240 */
queue(void * item)241 bool worker::queue(void *item)
242 {
243 bool was_empty = false;;
244
245 if (valid != WORKER_VALID || is_quit_state()) {
246 return EINVAL;
247 }
248 P(mutex);
249 done = false;
250 //pthread_cleanup_push(worker_cleanup, (void *)this);
251 while (full() && !is_quit_state()) {
252 pthread_cond_wait(&full_wait, &mutex);
253 }
254 //pthread_cleanup_pop(0);
255 /* Maybe this should be worker_running */
256 was_empty = empty();
257 if (!fifo->queue(item)) {
258 /* Since we waited for !full this cannot happen */
259 V(mutex);
260 ASSERT2(1, "Fifo queue failed.\n");
261 }
262 if (was_empty) {
263 pthread_cond_signal(&empty_wait);
264 }
265 m_state = WORKER_RUN;
266 if (worker_waiting) {
267 pthread_cond_signal(&m_wait);
268 }
269 V(mutex);
270 return 1;
271 }
272
273 /*
274 * Wait for work to complete
275 */
finish_work()276 void worker::finish_work()
277 {
278 P(mutex);
279 while (!empty() && !is_quit_state()) {
280 pthread_cond_wait(&empty_wait, &mutex);
281 }
282 done = true; /* Tell worker that work is done */
283 m_state = WORKER_WAIT; /* force worker into wait state */
284 V(mutex); /* pause for state transition */
285 if (waiting_on_empty) pthread_cond_signal(&empty_wait);
286 P(mutex);
287 /* Wait until worker in wait state */
288 while (!worker_waiting && !is_quit_state()) {
289 if (waiting_on_empty) pthread_cond_signal(&empty_wait);
290 pthread_cond_wait(&m_wait, &mutex);
291 }
292 V(mutex);
293 discard_queue();
294 }
295
296 /*
297 * Dequeue a work item. Called by worker thread.
298 */
dequeue()299 void *worker::dequeue()
300 {
301 bool was_full = false;;
302 void *item = NULL;
303
304 if (valid != WORKER_VALID || done || is_quit_state()) {
305 return NULL;
306 }
307 P(mutex);
308 //pthread_cleanup_push(worker_cleanup, (void *)this);
309 while (empty() && !done && !is_quit_state()) {
310 waiting_on_empty = true;
311 pthread_cond_wait(&empty_wait, &mutex);
312 }
313 waiting_on_empty = false;
314 //pthread_cleanup_pop(0);
315 was_full = full();
316 item = fifo->dequeue();
317 if (was_full) {
318 pthread_cond_signal(&full_wait);
319 }
320 if (empty()) {
321 pthread_cond_signal(&empty_wait);
322 }
323 V(mutex);
324 return item;
325 }
326
327 /*
328 * Pop a free buffer from the list, if one exists.
329 * Called by main thread to get a free buffer.
330 * If none exists (NULL returned), it must allocate
331 * one.
332 */
pop_free_buffer()333 void *worker::pop_free_buffer()
334 {
335 void *free_buf;
336
337 P(fmutex);
338 free_buf = fpool->pop();
339 V(fmutex);
340 return free_buf;
341 }
342
343 /*
344 * Once a work item (buffer) has been processed by the
345 * worker thread, it will put it on the free buffer list
346 * (fpool).
347 */
push_free_buffer(void * buf)348 void worker::push_free_buffer(void *buf)
349 {
350 P(fmutex);
351 fpool->push(buf);
352 V(fmutex);
353 }
354
355
356 //=================================================
357
358 #ifdef TEST_PROGRAM
359
worker_prog(void * wctx)360 void *worker_prog(void *wctx)
361 {
362 POOLMEM *buf;
363 worker *wrk = (worker *)wctx;
364
365 wrk->set_running();
366
367 while (!wrk->is_quit_state()) {
368 if (wrk->is_wait_state()) {
369 wrk->wait();
370 continue;
371 }
372 buf = (POOLMEM *)wrk->dequeue();
373 if (!buf) {
374 printf("worker: got null stop\n");
375 return NULL;
376 }
377 printf("ctx=%lld worker: %s\n", (long long int)wrk->get_ctx(), buf);
378 wrk->push_free_buffer(buf);
379 }
380 printf("worker: asked to stop");
381 return NULL;
382 }
383
main(int argc,char * argv[])384 int main(int argc, char *argv[])
385 {
386 POOLMEM *buf;
387 int i;
388 worker *wrk;
389 void *ctx;
390
391 wrk = New(worker(10));
392 ctx = (void *)1;
393 wrk->start(worker_prog, ctx);
394
395 for (i=1; i<=40; i++) {
396 buf = (POOLMEM *)wrk->pop_free_buffer();
397 if (!buf) {
398 buf = get_pool_memory(PM_BSOCK);
399 printf("Alloc %p\n", buf);
400 }
401 sprintf(buf, "This is item %d", i);
402 wrk->queue(buf);
403 //printf("back from queue %d\n", i);
404 }
405 wrk->wait_queue_empty();
406 wrk->set_wait_state();
407 printf("======\n");
408 for (i=1; i<=5; i++) {
409 buf = (POOLMEM *)wrk->pop_free_buffer();
410 if (!buf) {
411 buf = get_pool_memory(PM_BSOCK);
412 printf("Alloc %p\n", buf);
413 }
414 sprintf(buf, "This is item %d", i);
415 wrk->queue(buf);
416 //printf("back from queue %d\n", i);
417 }
418 wrk->set_run_state();
419 for (i=6; i<=40; i++) {
420 buf = (POOLMEM *)wrk->pop_free_buffer();
421 if (!buf) {
422 buf = get_pool_memory(PM_BSOCK);
423 printf("Alloc %p\n", buf);
424 }
425 sprintf(buf, "This is item %d", i);
426 wrk->queue(buf);
427 //printf("back from queue %d\n", i);
428 }
429 wrk->wait_queue_empty();
430 wrk->stop();
431 wrk->destroy();
432 free(wrk);
433
434 close_memory_pool();
435 sm_dump(false); /* test program */
436 }
437 #endif
438