1 /*
2    BAREOS® - Backup Archiving REcovery Open Sourced
3 
4    Copyright (C) 2014-2019 Bareos GmbH & Co. KG
5 
6    This program is Free Software; you can redistribute it and/or
7    modify it under the terms of version three of the GNU Affero General Public
8    License as published by the Free Software Foundation and included
9    in the file LICENSE.vadp.
10 
11    This program is distributed in the hope that it will be useful, but
12    WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14    Affero General Public License for more details.
15 
16    You should have received a copy of the GNU Affero General Public License
17    along with this program; if not, write to the Free Software
18    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19    02110-1301, USA.
20 */
21 /*
22  * Marco van Wieringen, August 2014.
23  */
24 
25 /*
26  * Copy thread used for producer/consumer problem with pthreads.
27  */
28 #include <stdio.h>
29 #include <stdint.h>
30 #include <string.h>
31 #include <fcntl.h>
32 #include <unistd.h>
33 #include <malloc.h>
34 
35 #include <pthread.h>
36 #include "copy_thread.h"
37 
38 static CP_THREAD_CTX* cp_thread;
39 
40 /*
41  * Copy thread cancel handler.
42  */
copy_cleanup_thread(void * data)43 static void copy_cleanup_thread(void* data)
44 {
45   CP_THREAD_CTX* context = (CP_THREAD_CTX*)data;
46 
47   pthread_mutex_unlock(&context->lock);
48 }
49 
50 /*
51  * Actual copy thread that copies data.
52  */
copy_thread(void * data)53 static void* copy_thread(void* data)
54 {
55   CP_THREAD_SAVE_DATA* save_data;
56   CP_THREAD_CTX* context = (CP_THREAD_CTX*)data;
57 
58   if (pthread_mutex_lock(&context->lock) != 0) { goto bail_out; }
59 
60   /*
61    * When we get canceled make sure we run the cleanup function.
62    */
63   pthread_cleanup_push(copy_cleanup_thread, data);
64 
65   while (1) {
66     size_t cnt;
67 
68     /*
69      * Wait for the moment we are supposed to start.
70      * We are signalled by the restore thread.
71      */
72     pthread_cond_wait(&context->start, &context->lock);
73     context->started = true;
74 
75     pthread_mutex_unlock(&context->lock);
76 
77     /*
78      * Dequeue an item from the circular buffer.
79      */
80     save_data = (CP_THREAD_SAVE_DATA*)context->cb->dequeue();
81 
82     while (save_data) {
83       cnt = cp_thread->output_function(save_data->sector_offset,
84                                        save_data->data_len, save_data->data);
85       if (cnt < save_data->data_len) { return NULL; }
86       save_data = (CP_THREAD_SAVE_DATA*)context->cb->dequeue();
87     }
88 
89     /*
90      * Need to synchronize the main thread and this one so the main thread
91      * cannot miss the conditional signal.
92      */
93     if (pthread_mutex_lock(&context->lock) != 0) { goto bail_out; }
94 
95     /*
96      * Signal the main thread we flushed the data.
97      */
98     pthread_cond_signal(&context->flush);
99 
100     context->started = false;
101     context->flushed = true;
102   }
103 
104   pthread_cleanup_pop(1);
105 
106 bail_out:
107   return NULL;
108 }
109 
110 /*
111  * Create a copy thread.
112  */
setup_copy_thread(IO_FUNCTION * input_function,IO_FUNCTION * output_function)113 bool setup_copy_thread(IO_FUNCTION* input_function,
114                        IO_FUNCTION* output_function)
115 {
116   int nr_save_elements;
117   CP_THREAD_CTX* new_context;
118 
119   new_context = (CP_THREAD_CTX*)malloc(sizeof(CP_THREAD_CTX));
120   new_context->started = false;
121   new_context->flushed = false;
122   new_context->cb = new circbuf;
123 
124   nr_save_elements = new_context->cb->capacity();
125   new_context->save_data = (CP_THREAD_SAVE_DATA*)malloc(
126       nr_save_elements * sizeof(CP_THREAD_SAVE_DATA));
127   memset(new_context->save_data, 0,
128          nr_save_elements * sizeof(CP_THREAD_SAVE_DATA));
129   new_context->nr_save_elements = nr_save_elements;
130 
131   new_context->input_function = input_function;
132   new_context->output_function = output_function;
133 
134   if (pthread_mutex_init(&new_context->lock, NULL) != 0) { goto bail_out; }
135 
136   if (pthread_cond_init(&new_context->start, NULL) != 0) {
137     pthread_mutex_destroy(&new_context->lock);
138     goto bail_out;
139   }
140 
141   if (pthread_create(&new_context->thread_id, NULL, copy_thread,
142                      (void*)new_context) != 0) {
143     pthread_cond_destroy(&new_context->start);
144     pthread_mutex_destroy(&new_context->lock);
145     goto bail_out;
146   }
147 
148   cp_thread = new_context;
149   return true;
150 
151 bail_out:
152   free(new_context->save_data);
153   delete new_context->cb;
154   free(new_context);
155 
156   return false;
157 }
158 
159 /*
160  * Read a new piece of data via the input_function and put it onto the circular
161  * buffer.
162  */
send_to_copy_thread(size_t sector_offset,size_t nbyte)163 bool send_to_copy_thread(size_t sector_offset, size_t nbyte)
164 {
165   int slotnr;
166   circbuf* cb = cp_thread->cb;
167   CP_THREAD_SAVE_DATA* save_data;
168 
169   /*
170    * Find out which next slot will be used on the Circular Buffer.
171    * The method will block when the circular buffer is full until a slot is
172    * available.
173    */
174   slotnr = cb->next_slot();
175   save_data = &cp_thread->save_data[slotnr];
176 
177   /*
178    * If this is the first time we use this slot we need to allocate some memory.
179    */
180   if (!save_data->data) { save_data->data = malloc(nbyte + 1); }
181 
182   save_data->data_len =
183       cp_thread->input_function(sector_offset, nbyte, save_data->data);
184   if (save_data->data_len < 0) { return false; }
185   save_data->sector_offset = sector_offset;
186 
187   cb->enqueue(save_data);
188 
189   /*
190    * Signal the copy thread its time to start if it didn't start yet.
191    */
192   if (!cp_thread->started) { pthread_cond_signal(&cp_thread->start); }
193 
194   return true;
195 }
196 
197 /*
198  * Flush the copy thread.
199  */
flush_copy_thread()200 void flush_copy_thread()
201 {
202   CP_THREAD_CTX* context = cp_thread;
203 
204   if (pthread_mutex_lock(&context->lock) != 0) { return; }
205 
206   /*
207    * In essence the flush should work in one shot but be a bit more
208    * conservative.
209    */
210   while (!context->flushed) {
211     /*
212      * Tell the copy thread to flush out all data.
213      */
214     context->cb->flush();
215 
216     /*
217      * Wait for the copy thread to say it flushed the data out.
218      */
219     pthread_cond_wait(&context->flush, &context->lock);
220   }
221 
222   context->flushed = false;
223 
224   pthread_mutex_unlock(&context->lock);
225 }
226 
227 /*
228  * Cleanup all data allocated for the copy thread.
229  */
cleanup_copy_thread()230 void cleanup_copy_thread()
231 {
232   int slotnr;
233 
234   /*
235    * Stop the copy thread.
236    */
237   if (!pthread_equal(cp_thread->thread_id, pthread_self())) {
238     pthread_cancel(cp_thread->thread_id);
239     pthread_join(cp_thread->thread_id, NULL);
240   }
241 
242   /*
243    * Free all data allocated along the way.
244    */
245   for (slotnr = 0; slotnr < cp_thread->nr_save_elements; slotnr++) {
246     if (cp_thread->save_data[slotnr].data) {
247       free(cp_thread->save_data[slotnr].data);
248     }
249   }
250   free(cp_thread->save_data);
251   delete cp_thread->cb;
252   free(cp_thread);
253   cp_thread = NULL;
254 }
255