1 /******************************************************
2 Copyright (c) 2011-2013 Percona LLC and/or its affiliates.
3
4 Compressing datasink implementation for XtraBackup.
5
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; version 2 of the License.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
18
19 *******************************************************/
20
21 #include <my_global.h>
22 #include <mysql_version.h>
23 #include <my_base.h>
24 #include <quicklz.h>
25 #include <zlib.h>
26 #include "common.h"
27 #include "datasink.h"
28
29 #define COMPRESS_CHUNK_SIZE ((size_t) (xtrabackup_compress_chunk_size))
30 #define MY_QLZ_COMPRESS_OVERHEAD 400
31
32 typedef struct {
33 pthread_t id;
34 uint num;
35 pthread_mutex_t ctrl_mutex;
36 pthread_cond_t ctrl_cond;
37 pthread_mutex_t data_mutex;
38 pthread_cond_t data_cond;
39 my_bool started;
40 my_bool data_avail;
41 my_bool cancelled;
42 const char *from;
43 size_t from_len;
44 char *to;
45 size_t to_len;
46 qlz_state_compress state;
47 ulong adler;
48 } comp_thread_ctxt_t;
49
50 typedef struct {
51 comp_thread_ctxt_t *threads;
52 uint nthreads;
53 } ds_compress_ctxt_t;
54
55 typedef struct {
56 ds_file_t *dest_file;
57 ds_compress_ctxt_t *comp_ctxt;
58 size_t bytes_processed;
59 } ds_compress_file_t;
60
61 /* Compression options */
62 extern char *xtrabackup_compress_alg;
63 extern uint xtrabackup_compress_threads;
64 extern ulonglong xtrabackup_compress_chunk_size;
65
66 static ds_ctxt_t *compress_init(const char *root);
67 static ds_file_t *compress_open(ds_ctxt_t *ctxt, const char *path,
68 MY_STAT *mystat);
69 static int compress_write(ds_file_t *file, const uchar *buf, size_t len);
70 static int compress_close(ds_file_t *file);
71 static void compress_deinit(ds_ctxt_t *ctxt);
72
73 datasink_t datasink_compress = {
74 &compress_init,
75 &compress_open,
76 &compress_write,
77 &compress_close,
78 &dummy_remove,
79 &compress_deinit
80 };
81
82 static inline int write_uint32_le(ds_file_t *file, ulong n);
83 static inline int write_uint64_le(ds_file_t *file, ulonglong n);
84
85 static comp_thread_ctxt_t *create_worker_threads(uint n);
86 static void destroy_worker_threads(comp_thread_ctxt_t *threads, uint n);
87 static void *compress_worker_thread_func(void *arg);
88
89 static
90 ds_ctxt_t *
compress_init(const char * root)91 compress_init(const char *root)
92 {
93 ds_ctxt_t *ctxt;
94 ds_compress_ctxt_t *compress_ctxt;
95 comp_thread_ctxt_t *threads;
96
97 /* Create and initialize the worker threads */
98 threads = create_worker_threads(xtrabackup_compress_threads);
99 if (threads == NULL) {
100 msg("compress: failed to create worker threads.");
101 return NULL;
102 }
103
104 ctxt = (ds_ctxt_t *) my_malloc(sizeof(ds_ctxt_t) +
105 sizeof(ds_compress_ctxt_t),
106 MYF(MY_FAE));
107
108 compress_ctxt = (ds_compress_ctxt_t *) (ctxt + 1);
109 compress_ctxt->threads = threads;
110 compress_ctxt->nthreads = xtrabackup_compress_threads;
111
112 ctxt->ptr = compress_ctxt;
113 ctxt->root = my_strdup(root, MYF(MY_FAE));
114
115 return ctxt;
116 }
117
118 static
119 ds_file_t *
compress_open(ds_ctxt_t * ctxt,const char * path,MY_STAT * mystat)120 compress_open(ds_ctxt_t *ctxt, const char *path, MY_STAT *mystat)
121 {
122 ds_compress_ctxt_t *comp_ctxt;
123 ds_ctxt_t *dest_ctxt;
124 ds_file_t *dest_file;
125 char new_name[FN_REFLEN];
126 size_t name_len;
127 ds_file_t *file;
128 ds_compress_file_t *comp_file;
129
130 xb_ad(ctxt->pipe_ctxt != NULL);
131 dest_ctxt = ctxt->pipe_ctxt;
132
133 comp_ctxt = (ds_compress_ctxt_t *) ctxt->ptr;
134
135 /* Append the .qp extension to the filename */
136 fn_format(new_name, path, "", ".qp", MYF(MY_APPEND_EXT));
137
138 dest_file = ds_open(dest_ctxt, new_name, mystat);
139 if (dest_file == NULL) {
140 return NULL;
141 }
142
143 /* Write the qpress archive header */
144 if (ds_write(dest_file, "qpress10", 8) ||
145 write_uint64_le(dest_file, COMPRESS_CHUNK_SIZE)) {
146 goto err;
147 }
148
149 /* We are going to create a one-file "flat" (i.e. with no
150 subdirectories) archive. So strip the directory part from the path and
151 remove the '.qp' suffix. */
152 fn_format(new_name, path, "", "", MYF(MY_REPLACE_DIR));
153
154 /* Write the qpress file header */
155 name_len = strlen(new_name);
156 if (ds_write(dest_file, "F", 1) ||
157 write_uint32_le(dest_file, (uint)name_len) ||
158 /* we want to write the terminating \0 as well */
159 ds_write(dest_file, new_name, name_len + 1)) {
160 goto err;
161 }
162
163 file = (ds_file_t *) my_malloc(sizeof(ds_file_t) +
164 sizeof(ds_compress_file_t),
165 MYF(MY_FAE));
166 comp_file = (ds_compress_file_t *) (file + 1);
167 comp_file->dest_file = dest_file;
168 comp_file->comp_ctxt = comp_ctxt;
169 comp_file->bytes_processed = 0;
170
171 file->ptr = comp_file;
172 file->path = dest_file->path;
173
174 return file;
175
176 err:
177 ds_close(dest_file);
178 return NULL;
179 }
180
181 static
182 int
compress_write(ds_file_t * file,const uchar * buf,size_t len)183 compress_write(ds_file_t *file, const uchar *buf, size_t len)
184 {
185 ds_compress_file_t *comp_file;
186 ds_compress_ctxt_t *comp_ctxt;
187 comp_thread_ctxt_t *threads;
188 comp_thread_ctxt_t *thd;
189 uint nthreads;
190 uint i;
191 const char *ptr;
192 ds_file_t *dest_file;
193
194 comp_file = (ds_compress_file_t *) file->ptr;
195 comp_ctxt = comp_file->comp_ctxt;
196 dest_file = comp_file->dest_file;
197
198 threads = comp_ctxt->threads;
199 nthreads = comp_ctxt->nthreads;
200
201 ptr = (const char *) buf;
202 while (len > 0) {
203 uint max_thread;
204
205 /* Send data to worker threads for compression */
206 for (i = 0; i < nthreads; i++) {
207 size_t chunk_len;
208
209 thd = threads + i;
210
211 pthread_mutex_lock(&thd->ctrl_mutex);
212
213 chunk_len = (len > COMPRESS_CHUNK_SIZE) ?
214 COMPRESS_CHUNK_SIZE : len;
215 thd->from = ptr;
216 thd->from_len = chunk_len;
217
218 pthread_mutex_lock(&thd->data_mutex);
219 thd->data_avail = TRUE;
220 pthread_cond_signal(&thd->data_cond);
221 pthread_mutex_unlock(&thd->data_mutex);
222
223 len -= chunk_len;
224 if (len == 0) {
225 break;
226 }
227 ptr += chunk_len;
228 }
229
230 max_thread = (i < nthreads) ? i : nthreads - 1;
231
232 /* Reap and stream the compressed data */
233 for (i = 0; i <= max_thread; i++) {
234 thd = threads + i;
235
236 pthread_mutex_lock(&thd->data_mutex);
237 while (thd->data_avail == TRUE) {
238 pthread_cond_wait(&thd->data_cond,
239 &thd->data_mutex);
240 }
241
242 xb_a(threads[i].to_len > 0);
243
244 if (ds_write(dest_file, "NEWBNEWB", 8) ||
245 write_uint64_le(dest_file,
246 comp_file->bytes_processed)) {
247 msg("compress: write to the destination stream "
248 "failed.");
249 return 1;
250 }
251
252 comp_file->bytes_processed += threads[i].from_len;
253
254 if (write_uint32_le(dest_file, threads[i].adler) ||
255 ds_write(dest_file, threads[i].to,
256 threads[i].to_len)) {
257 msg("compress: write to the destination stream "
258 "failed.");
259 return 1;
260 }
261
262 pthread_mutex_unlock(&threads[i].data_mutex);
263 pthread_mutex_unlock(&threads[i].ctrl_mutex);
264 }
265 }
266
267 return 0;
268 }
269
270 static
271 int
compress_close(ds_file_t * file)272 compress_close(ds_file_t *file)
273 {
274 ds_compress_file_t *comp_file;
275 ds_file_t *dest_file;
276 int rc;
277
278 comp_file = (ds_compress_file_t *) file->ptr;
279 dest_file = comp_file->dest_file;
280
281 /* Write the qpress file trailer */
282 ds_write(dest_file, "ENDSENDS", 8);
283
284 /* Supposedly the number of written bytes should be written as a
285 "recovery information" in the file trailer, but in reality qpress
286 always writes 8 zeros here. Let's do the same */
287
288 write_uint64_le(dest_file, 0);
289
290 rc = ds_close(dest_file);
291
292 my_free(file);
293
294 return rc;
295 }
296
297 static
298 void
compress_deinit(ds_ctxt_t * ctxt)299 compress_deinit(ds_ctxt_t *ctxt)
300 {
301 ds_compress_ctxt_t *comp_ctxt;
302
303 xb_ad(ctxt->pipe_ctxt != NULL);
304
305 comp_ctxt = (ds_compress_ctxt_t *) ctxt->ptr;;
306
307 destroy_worker_threads(comp_ctxt->threads, comp_ctxt->nthreads);
308
309 my_free(ctxt->root);
310 my_free(ctxt);
311 }
312
313 static inline
314 int
write_uint32_le(ds_file_t * file,ulong n)315 write_uint32_le(ds_file_t *file, ulong n)
316 {
317 char tmp[4];
318
319 int4store(tmp, n);
320 return ds_write(file, tmp, sizeof(tmp));
321 }
322
323 static inline
324 int
write_uint64_le(ds_file_t * file,ulonglong n)325 write_uint64_le(ds_file_t *file, ulonglong n)
326 {
327 char tmp[8];
328
329 int8store(tmp, n);
330 return ds_write(file, tmp, sizeof(tmp));
331 }
332
333 static
334 comp_thread_ctxt_t *
create_worker_threads(uint n)335 create_worker_threads(uint n)
336 {
337 comp_thread_ctxt_t *threads;
338 uint i;
339
340 threads = (comp_thread_ctxt_t *)
341 my_malloc(sizeof(comp_thread_ctxt_t) * n, MYF(MY_FAE));
342
343 for (i = 0; i < n; i++) {
344 comp_thread_ctxt_t *thd = threads + i;
345
346 thd->num = i + 1;
347 thd->started = FALSE;
348 thd->cancelled = FALSE;
349 thd->data_avail = FALSE;
350
351 thd->to = (char *) my_malloc(COMPRESS_CHUNK_SIZE +
352 MY_QLZ_COMPRESS_OVERHEAD,
353 MYF(MY_FAE));
354
355 /* Initialize the control mutex and condition var */
356 if (pthread_mutex_init(&thd->ctrl_mutex, NULL) ||
357 pthread_cond_init(&thd->ctrl_cond, NULL)) {
358 goto err;
359 }
360
361 /* Initialize and data mutex and condition var */
362 if (pthread_mutex_init(&thd->data_mutex, NULL) ||
363 pthread_cond_init(&thd->data_cond, NULL)) {
364 goto err;
365 }
366
367 pthread_mutex_lock(&thd->ctrl_mutex);
368
369 if (pthread_create(&thd->id, NULL, compress_worker_thread_func,
370 thd)) {
371 msg("compress: pthread_create() failed: "
372 "errno = %d", errno);
373 pthread_mutex_unlock(&thd->ctrl_mutex);
374 goto err;
375 }
376 }
377
378 /* Wait for the threads to start */
379 for (i = 0; i < n; i++) {
380 comp_thread_ctxt_t *thd = threads + i;
381
382 while (thd->started == FALSE)
383 pthread_cond_wait(&thd->ctrl_cond, &thd->ctrl_mutex);
384 pthread_mutex_unlock(&thd->ctrl_mutex);
385 }
386
387 return threads;
388
389 err:
390 while (i > 0) {
391 comp_thread_ctxt_t *thd;
392 i--;
393 thd = threads + i;
394 pthread_mutex_unlock(&thd->ctrl_mutex);
395 }
396
397 my_free(threads);
398 return NULL;
399 }
400
401 static
402 void
destroy_worker_threads(comp_thread_ctxt_t * threads,uint n)403 destroy_worker_threads(comp_thread_ctxt_t *threads, uint n)
404 {
405 uint i;
406
407 for (i = 0; i < n; i++) {
408 comp_thread_ctxt_t *thd = threads + i;
409
410 pthread_mutex_lock(&thd->data_mutex);
411 threads[i].cancelled = TRUE;
412 pthread_cond_signal(&thd->data_cond);
413 pthread_mutex_unlock(&thd->data_mutex);
414
415 pthread_join(thd->id, NULL);
416
417 pthread_cond_destroy(&thd->data_cond);
418 pthread_mutex_destroy(&thd->data_mutex);
419 pthread_cond_destroy(&thd->ctrl_cond);
420 pthread_mutex_destroy(&thd->ctrl_mutex);
421
422 my_free(thd->to);
423 }
424
425 my_free(threads);
426 }
427
428 static
429 void *
compress_worker_thread_func(void * arg)430 compress_worker_thread_func(void *arg)
431 {
432 comp_thread_ctxt_t *thd = (comp_thread_ctxt_t *) arg;
433
434 pthread_mutex_lock(&thd->ctrl_mutex);
435
436 pthread_mutex_lock(&thd->data_mutex);
437
438 thd->started = TRUE;
439 pthread_cond_signal(&thd->ctrl_cond);
440
441 pthread_mutex_unlock(&thd->ctrl_mutex);
442
443 while (1) {
444 thd->data_avail = FALSE;
445 pthread_cond_signal(&thd->data_cond);
446
447 while (!thd->data_avail && !thd->cancelled) {
448 pthread_cond_wait(&thd->data_cond, &thd->data_mutex);
449 }
450
451 if (thd->cancelled)
452 break;
453
454 thd->to_len = qlz_compress(thd->from, thd->to, thd->from_len,
455 &thd->state);
456
457 /* qpress uses 0x00010000 as the initial value, but its own
458 Adler-32 implementation treats the value differently:
459 1. higher order bits are the sum of all bytes in the sequence
460 2. lower order bits are the sum of resulting values at every
461 step.
462 So it's the other way around as compared to zlib's adler32().
463 That's why 0x00000001 is being passed here to be compatible
464 with qpress implementation. */
465
466 thd->adler = adler32(0x00000001, (uchar *) thd->to,
467 (uInt)thd->to_len);
468 }
469
470 pthread_mutex_unlock(&thd->data_mutex);
471
472 return NULL;
473 }
474