1 /*
2 BAREOS® - Backup Archiving REcovery Open Sourced
3
4 Copyright (C) 2014-2019 Bareos GmbH & Co. KG
5
6 This program is Free Software; you can redistribute it and/or
7 modify it under the terms of version three of the GNU Affero General Public
8 License as published by the Free Software Foundation and included
9 in the file LICENSE.vadp.
10
11 This program is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Affero General Public License for more details.
15
16 You should have received a copy of the GNU Affero General Public License
17 along with this program; if not, write to the Free Software
18 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 02110-1301, USA.
20 */
21 /*
22 * Marco van Wieringen, August 2014.
23 */
24
25 /*
26 * Copy thread used for producer/consumer problem with pthreads.
27 */
28 #include <stdio.h>
29 #include <stdint.h>
30 #include <string.h>
31 #include <fcntl.h>
32 #include <unistd.h>
33 #include <malloc.h>
34
35 #include <pthread.h>
36 #include "copy_thread.h"
37
38 static CP_THREAD_CTX* cp_thread;
39
40 /*
41 * Copy thread cancel handler.
42 */
copy_cleanup_thread(void * data)43 static void copy_cleanup_thread(void* data)
44 {
45 CP_THREAD_CTX* context = (CP_THREAD_CTX*)data;
46
47 pthread_mutex_unlock(&context->lock);
48 }
49
50 /*
51 * Actual copy thread that copies data.
52 */
copy_thread(void * data)53 static void* copy_thread(void* data)
54 {
55 CP_THREAD_SAVE_DATA* save_data;
56 CP_THREAD_CTX* context = (CP_THREAD_CTX*)data;
57
58 if (pthread_mutex_lock(&context->lock) != 0) { goto bail_out; }
59
60 /*
61 * When we get canceled make sure we run the cleanup function.
62 */
63 pthread_cleanup_push(copy_cleanup_thread, data);
64
65 while (1) {
66 size_t cnt;
67
68 /*
69 * Wait for the moment we are supposed to start.
70 * We are signalled by the restore thread.
71 */
72 pthread_cond_wait(&context->start, &context->lock);
73 context->started = true;
74
75 pthread_mutex_unlock(&context->lock);
76
77 /*
78 * Dequeue an item from the circular buffer.
79 */
80 save_data = (CP_THREAD_SAVE_DATA*)context->cb->dequeue();
81
82 while (save_data) {
83 cnt = cp_thread->output_function(save_data->sector_offset,
84 save_data->data_len, save_data->data);
85 if (cnt < save_data->data_len) { return NULL; }
86 save_data = (CP_THREAD_SAVE_DATA*)context->cb->dequeue();
87 }
88
89 /*
90 * Need to synchronize the main thread and this one so the main thread
91 * cannot miss the conditional signal.
92 */
93 if (pthread_mutex_lock(&context->lock) != 0) { goto bail_out; }
94
95 /*
96 * Signal the main thread we flushed the data.
97 */
98 pthread_cond_signal(&context->flush);
99
100 context->started = false;
101 context->flushed = true;
102 }
103
104 pthread_cleanup_pop(1);
105
106 bail_out:
107 return NULL;
108 }
109
110 /*
111 * Create a copy thread.
112 */
setup_copy_thread(IO_FUNCTION * input_function,IO_FUNCTION * output_function)113 bool setup_copy_thread(IO_FUNCTION* input_function,
114 IO_FUNCTION* output_function)
115 {
116 int nr_save_elements;
117 CP_THREAD_CTX* new_context;
118
119 new_context = (CP_THREAD_CTX*)malloc(sizeof(CP_THREAD_CTX));
120 new_context->started = false;
121 new_context->flushed = false;
122 new_context->cb = new circbuf;
123
124 nr_save_elements = new_context->cb->capacity();
125 new_context->save_data = (CP_THREAD_SAVE_DATA*)malloc(
126 nr_save_elements * sizeof(CP_THREAD_SAVE_DATA));
127 memset(new_context->save_data, 0,
128 nr_save_elements * sizeof(CP_THREAD_SAVE_DATA));
129 new_context->nr_save_elements = nr_save_elements;
130
131 new_context->input_function = input_function;
132 new_context->output_function = output_function;
133
134 if (pthread_mutex_init(&new_context->lock, NULL) != 0) { goto bail_out; }
135
136 if (pthread_cond_init(&new_context->start, NULL) != 0) {
137 pthread_mutex_destroy(&new_context->lock);
138 goto bail_out;
139 }
140
141 if (pthread_create(&new_context->thread_id, NULL, copy_thread,
142 (void*)new_context) != 0) {
143 pthread_cond_destroy(&new_context->start);
144 pthread_mutex_destroy(&new_context->lock);
145 goto bail_out;
146 }
147
148 cp_thread = new_context;
149 return true;
150
151 bail_out:
152 free(new_context->save_data);
153 delete new_context->cb;
154 free(new_context);
155
156 return false;
157 }
158
159 /*
160 * Read a new piece of data via the input_function and put it onto the circular
161 * buffer.
162 */
send_to_copy_thread(size_t sector_offset,size_t nbyte)163 bool send_to_copy_thread(size_t sector_offset, size_t nbyte)
164 {
165 int slotnr;
166 circbuf* cb = cp_thread->cb;
167 CP_THREAD_SAVE_DATA* save_data;
168
169 /*
170 * Find out which next slot will be used on the Circular Buffer.
171 * The method will block when the circular buffer is full until a slot is
172 * available.
173 */
174 slotnr = cb->next_slot();
175 save_data = &cp_thread->save_data[slotnr];
176
177 /*
178 * If this is the first time we use this slot we need to allocate some memory.
179 */
180 if (!save_data->data) { save_data->data = malloc(nbyte + 1); }
181
182 save_data->data_len =
183 cp_thread->input_function(sector_offset, nbyte, save_data->data);
184 if (save_data->data_len < 0) { return false; }
185 save_data->sector_offset = sector_offset;
186
187 cb->enqueue(save_data);
188
189 /*
190 * Signal the copy thread its time to start if it didn't start yet.
191 */
192 if (!cp_thread->started) { pthread_cond_signal(&cp_thread->start); }
193
194 return true;
195 }
196
197 /*
198 * Flush the copy thread.
199 */
flush_copy_thread()200 void flush_copy_thread()
201 {
202 CP_THREAD_CTX* context = cp_thread;
203
204 if (pthread_mutex_lock(&context->lock) != 0) { return; }
205
206 /*
207 * In essence the flush should work in one shot but be a bit more
208 * conservative.
209 */
210 while (!context->flushed) {
211 /*
212 * Tell the copy thread to flush out all data.
213 */
214 context->cb->flush();
215
216 /*
217 * Wait for the copy thread to say it flushed the data out.
218 */
219 pthread_cond_wait(&context->flush, &context->lock);
220 }
221
222 context->flushed = false;
223
224 pthread_mutex_unlock(&context->lock);
225 }
226
227 /*
228 * Cleanup all data allocated for the copy thread.
229 */
cleanup_copy_thread()230 void cleanup_copy_thread()
231 {
232 int slotnr;
233
234 /*
235 * Stop the copy thread.
236 */
237 if (!pthread_equal(cp_thread->thread_id, pthread_self())) {
238 pthread_cancel(cp_thread->thread_id);
239 pthread_join(cp_thread->thread_id, NULL);
240 }
241
242 /*
243 * Free all data allocated along the way.
244 */
245 for (slotnr = 0; slotnr < cp_thread->nr_save_elements; slotnr++) {
246 if (cp_thread->save_data[slotnr].data) {
247 free(cp_thread->save_data[slotnr].data);
248 }
249 }
250 free(cp_thread->save_data);
251 delete cp_thread->cb;
252 free(cp_thread);
253 cp_thread = NULL;
254 }
255