1 /******************************************************
2 Copyright (c) 2011-2013 Percona LLC and/or its affiliates.
3 
4 Compressing datasink implementation for XtraBackup.
5 
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; version 2 of the License.
9 
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14 
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1335  USA
18 
19 *******************************************************/
20 
21 #include <my_global.h>
22 #include <mysql_version.h>
23 #include <my_base.h>
24 #include <quicklz.h>
25 #include <zlib.h>
26 #include "common.h"
27 #include "datasink.h"
28 
29 #define COMPRESS_CHUNK_SIZE ((size_t) (xtrabackup_compress_chunk_size))
30 #define MY_QLZ_COMPRESS_OVERHEAD 400
31 
32 typedef struct {
33 	pthread_t		id;
34 	uint			num;
35 	pthread_mutex_t 	ctrl_mutex;
36 	pthread_cond_t		ctrl_cond;
37 	pthread_mutex_t		data_mutex;
38 	pthread_cond_t  	data_cond;
39 	my_bool			started;
40 	my_bool			data_avail;
41 	my_bool			cancelled;
42 	const char 		*from;
43 	size_t			from_len;
44 	char			*to;
45 	size_t			to_len;
46 	qlz_state_compress	state;
47 	ulong			adler;
48 } comp_thread_ctxt_t;
49 
50 typedef struct {
51 	comp_thread_ctxt_t	*threads;
52 	uint			nthreads;
53 } ds_compress_ctxt_t;
54 
55 typedef struct {
56 	ds_file_t		*dest_file;
57 	ds_compress_ctxt_t	*comp_ctxt;
58 	size_t			bytes_processed;
59 } ds_compress_file_t;
60 
61 /* Compression options */
62 extern char		*xtrabackup_compress_alg;
63 extern uint		xtrabackup_compress_threads;
64 extern ulonglong	xtrabackup_compress_chunk_size;
65 
66 static ds_ctxt_t *compress_init(const char *root);
67 static ds_file_t *compress_open(ds_ctxt_t *ctxt, const char *path,
68 				MY_STAT *mystat);
69 static int compress_write(ds_file_t *file, const uchar *buf, size_t len);
70 static int compress_close(ds_file_t *file);
71 static void compress_deinit(ds_ctxt_t *ctxt);
72 
73 datasink_t datasink_compress = {
74 	&compress_init,
75 	&compress_open,
76 	&compress_write,
77 	&compress_close,
78 	&dummy_remove,
79 	&compress_deinit
80 };
81 
82 static inline int write_uint32_le(ds_file_t *file, ulong n);
83 static inline int write_uint64_le(ds_file_t *file, ulonglong n);
84 
85 static comp_thread_ctxt_t *create_worker_threads(uint n);
86 static void destroy_worker_threads(comp_thread_ctxt_t *threads, uint n);
87 static void *compress_worker_thread_func(void *arg);
88 
89 static
90 ds_ctxt_t *
compress_init(const char * root)91 compress_init(const char *root)
92 {
93 	ds_ctxt_t		*ctxt;
94 	ds_compress_ctxt_t	*compress_ctxt;
95 	comp_thread_ctxt_t	*threads;
96 
97 	/* Create and initialize the worker threads */
98 	threads = create_worker_threads(xtrabackup_compress_threads);
99 	if (threads == NULL) {
100 		msg("compress: failed to create worker threads.");
101 		return NULL;
102 	}
103 
104 	ctxt = (ds_ctxt_t *) my_malloc(sizeof(ds_ctxt_t) +
105 				       sizeof(ds_compress_ctxt_t),
106 				       MYF(MY_FAE));
107 
108 	compress_ctxt = (ds_compress_ctxt_t *) (ctxt + 1);
109 	compress_ctxt->threads = threads;
110 	compress_ctxt->nthreads = xtrabackup_compress_threads;
111 
112 	ctxt->ptr = compress_ctxt;
113 	ctxt->root = my_strdup(root, MYF(MY_FAE));
114 
115 	return ctxt;
116 }
117 
118 static
119 ds_file_t *
compress_open(ds_ctxt_t * ctxt,const char * path,MY_STAT * mystat)120 compress_open(ds_ctxt_t *ctxt, const char *path, MY_STAT *mystat)
121 {
122 	ds_compress_ctxt_t	*comp_ctxt;
123 	ds_ctxt_t		*dest_ctxt;
124  	ds_file_t		*dest_file;
125 	char			new_name[FN_REFLEN];
126 	size_t			name_len;
127 	ds_file_t		*file;
128 	ds_compress_file_t	*comp_file;
129 
130 	xb_ad(ctxt->pipe_ctxt != NULL);
131 	dest_ctxt = ctxt->pipe_ctxt;
132 
133 	comp_ctxt = (ds_compress_ctxt_t *) ctxt->ptr;
134 
135 	/* Append the .qp extension to the filename */
136 	fn_format(new_name, path, "", ".qp", MYF(MY_APPEND_EXT));
137 
138 	dest_file = ds_open(dest_ctxt, new_name, mystat);
139 	if (dest_file == NULL) {
140 		return NULL;
141 	}
142 
143 	/* Write the qpress archive header */
144 	if (ds_write(dest_file, "qpress10", 8) ||
145 	    write_uint64_le(dest_file, COMPRESS_CHUNK_SIZE)) {
146 		goto err;
147 	}
148 
149 	/* We are going to create a one-file "flat" (i.e. with no
150 	subdirectories) archive. So strip the directory part from the path and
151 	remove the '.qp' suffix. */
152 	fn_format(new_name, path, "", "", MYF(MY_REPLACE_DIR));
153 
154 	/* Write the qpress file header */
155 	name_len = strlen(new_name);
156 	if (ds_write(dest_file, "F", 1) ||
157 	    write_uint32_le(dest_file, (uint)name_len) ||
158 	    /* we want to write the terminating \0 as well */
159 	    ds_write(dest_file, new_name, name_len + 1)) {
160 		goto err;
161 	}
162 
163 	file = (ds_file_t *) my_malloc(sizeof(ds_file_t) +
164 				       sizeof(ds_compress_file_t),
165 				       MYF(MY_FAE));
166 	comp_file = (ds_compress_file_t *) (file + 1);
167 	comp_file->dest_file = dest_file;
168 	comp_file->comp_ctxt = comp_ctxt;
169 	comp_file->bytes_processed = 0;
170 
171 	file->ptr = comp_file;
172 	file->path = dest_file->path;
173 
174 	return file;
175 
176 err:
177 	ds_close(dest_file);
178 	return NULL;
179 }
180 
181 static
182 int
compress_write(ds_file_t * file,const uchar * buf,size_t len)183 compress_write(ds_file_t *file, const uchar *buf, size_t len)
184 {
185 	ds_compress_file_t	*comp_file;
186 	ds_compress_ctxt_t	*comp_ctxt;
187 	comp_thread_ctxt_t	*threads;
188 	comp_thread_ctxt_t	*thd;
189 	uint			nthreads;
190 	uint			i;
191 	const char		*ptr;
192 	ds_file_t		*dest_file;
193 
194 	comp_file = (ds_compress_file_t *) file->ptr;
195 	comp_ctxt = comp_file->comp_ctxt;
196 	dest_file = comp_file->dest_file;
197 
198 	threads = comp_ctxt->threads;
199 	nthreads = comp_ctxt->nthreads;
200 
201 	ptr = (const char *) buf;
202 	while (len > 0) {
203 		uint max_thread;
204 
205 		/* Send data to worker threads for compression */
206 		for (i = 0; i < nthreads; i++) {
207 			size_t chunk_len;
208 
209 			thd = threads + i;
210 
211 			pthread_mutex_lock(&thd->ctrl_mutex);
212 
213 			chunk_len = (len > COMPRESS_CHUNK_SIZE) ?
214 				COMPRESS_CHUNK_SIZE : len;
215 			thd->from = ptr;
216 			thd->from_len = chunk_len;
217 
218 			pthread_mutex_lock(&thd->data_mutex);
219 			thd->data_avail = TRUE;
220 			pthread_cond_signal(&thd->data_cond);
221 			pthread_mutex_unlock(&thd->data_mutex);
222 
223 			len -= chunk_len;
224 			if (len == 0) {
225 				break;
226 			}
227 			ptr += chunk_len;
228 		}
229 
230 		max_thread = (i < nthreads) ? i :  nthreads - 1;
231 
232 		/* Reap and stream the compressed data */
233 		for (i = 0; i <= max_thread; i++) {
234 			thd = threads + i;
235 
236 			pthread_mutex_lock(&thd->data_mutex);
237 			while (thd->data_avail == TRUE) {
238 				pthread_cond_wait(&thd->data_cond,
239 						  &thd->data_mutex);
240 			}
241 
242 			xb_a(threads[i].to_len > 0);
243 
244 			if (ds_write(dest_file, "NEWBNEWB", 8) ||
245 			    write_uint64_le(dest_file,
246 					    comp_file->bytes_processed)) {
247 				msg("compress: write to the destination stream "
248 				    "failed.");
249 				return 1;
250 			}
251 
252 			comp_file->bytes_processed += threads[i].from_len;
253 
254 			if (write_uint32_le(dest_file, threads[i].adler) ||
255 			    ds_write(dest_file, threads[i].to,
256 					   threads[i].to_len)) {
257 				msg("compress: write to the destination stream "
258 				    "failed.");
259 				return 1;
260 			}
261 
262 			pthread_mutex_unlock(&threads[i].data_mutex);
263 			pthread_mutex_unlock(&threads[i].ctrl_mutex);
264 		}
265 	}
266 
267 	return 0;
268 }
269 
270 static
271 int
compress_close(ds_file_t * file)272 compress_close(ds_file_t *file)
273 {
274 	ds_compress_file_t	*comp_file;
275 	ds_file_t		*dest_file;
276 	int			rc;
277 
278 	comp_file = (ds_compress_file_t *) file->ptr;
279 	dest_file = comp_file->dest_file;
280 
281 	/* Write the qpress file trailer */
282 	ds_write(dest_file, "ENDSENDS", 8);
283 
284 	/* Supposedly the number of written bytes should be written as a
285 	"recovery information" in the file trailer, but in reality qpress
286 	always writes 8 zeros here. Let's do the same */
287 
288 	write_uint64_le(dest_file, 0);
289 
290 	rc = ds_close(dest_file);
291 
292 	my_free(file);
293 
294 	return rc;
295 }
296 
297 static
298 void
compress_deinit(ds_ctxt_t * ctxt)299 compress_deinit(ds_ctxt_t *ctxt)
300 {
301 	ds_compress_ctxt_t 	*comp_ctxt;
302 
303 	xb_ad(ctxt->pipe_ctxt != NULL);
304 
305 	comp_ctxt = (ds_compress_ctxt_t *) ctxt->ptr;;
306 
307 	destroy_worker_threads(comp_ctxt->threads, comp_ctxt->nthreads);
308 
309 	my_free(ctxt->root);
310 	my_free(ctxt);
311 }
312 
313 static inline
314 int
write_uint32_le(ds_file_t * file,ulong n)315 write_uint32_le(ds_file_t *file, ulong n)
316 {
317 	char tmp[4];
318 
319 	int4store(tmp, n);
320 	return ds_write(file, tmp, sizeof(tmp));
321 }
322 
323 static inline
324 int
write_uint64_le(ds_file_t * file,ulonglong n)325 write_uint64_le(ds_file_t *file, ulonglong n)
326 {
327 	char tmp[8];
328 
329 	int8store(tmp, n);
330 	return ds_write(file, tmp, sizeof(tmp));
331 }
332 
333 static
334 comp_thread_ctxt_t *
create_worker_threads(uint n)335 create_worker_threads(uint n)
336 {
337 	comp_thread_ctxt_t	*threads;
338 	uint 			i;
339 
340 	threads = (comp_thread_ctxt_t *)
341 		my_malloc(sizeof(comp_thread_ctxt_t) * n, MYF(MY_FAE));
342 
343 	for (i = 0; i < n; i++) {
344 		comp_thread_ctxt_t *thd = threads + i;
345 
346 		thd->num = i + 1;
347 		thd->started = FALSE;
348 		thd->cancelled = FALSE;
349 		thd->data_avail = FALSE;
350 
351 		thd->to = (char *) my_malloc(COMPRESS_CHUNK_SIZE +
352 						   MY_QLZ_COMPRESS_OVERHEAD,
353 						   MYF(MY_FAE));
354 
355 		/* Initialize the control mutex and condition var */
356 		if (pthread_mutex_init(&thd->ctrl_mutex, NULL) ||
357 		    pthread_cond_init(&thd->ctrl_cond, NULL)) {
358 			goto err;
359 		}
360 
361 		/* Initialize and data mutex and condition var */
362 		if (pthread_mutex_init(&thd->data_mutex, NULL) ||
363 		    pthread_cond_init(&thd->data_cond, NULL)) {
364 			goto err;
365 		}
366 
367 		pthread_mutex_lock(&thd->ctrl_mutex);
368 
369 		if (pthread_create(&thd->id, NULL, compress_worker_thread_func,
370 				   thd)) {
371 			msg("compress: pthread_create() failed: "
372 			    "errno = %d", errno);
373 			pthread_mutex_unlock(&thd->ctrl_mutex);
374 			goto err;
375 		}
376 	}
377 
378 	/* Wait for the threads to start */
379 	for (i = 0; i < n; i++) {
380 		comp_thread_ctxt_t *thd = threads + i;
381 
382 		while (thd->started == FALSE)
383 			pthread_cond_wait(&thd->ctrl_cond, &thd->ctrl_mutex);
384 		pthread_mutex_unlock(&thd->ctrl_mutex);
385 	}
386 
387 	return threads;
388 
389 err:
390 	while (i > 0) {
391 		comp_thread_ctxt_t *thd;
392 		i--;
393 		thd = threads + i;
394 		pthread_mutex_unlock(&thd->ctrl_mutex);
395 	}
396 
397 	my_free(threads);
398 	return NULL;
399 }
400 
401 static
402 void
destroy_worker_threads(comp_thread_ctxt_t * threads,uint n)403 destroy_worker_threads(comp_thread_ctxt_t *threads, uint n)
404 {
405 	uint i;
406 
407 	for (i = 0; i < n; i++) {
408 		comp_thread_ctxt_t *thd = threads + i;
409 
410 		pthread_mutex_lock(&thd->data_mutex);
411 		threads[i].cancelled = TRUE;
412 		pthread_cond_signal(&thd->data_cond);
413 		pthread_mutex_unlock(&thd->data_mutex);
414 
415 		pthread_join(thd->id, NULL);
416 
417 		pthread_cond_destroy(&thd->data_cond);
418 		pthread_mutex_destroy(&thd->data_mutex);
419 		pthread_cond_destroy(&thd->ctrl_cond);
420 		pthread_mutex_destroy(&thd->ctrl_mutex);
421 
422 		my_free(thd->to);
423 	}
424 
425 	my_free(threads);
426 }
427 
428 static
429 void *
compress_worker_thread_func(void * arg)430 compress_worker_thread_func(void *arg)
431 {
432 	comp_thread_ctxt_t *thd = (comp_thread_ctxt_t *) arg;
433 
434 	pthread_mutex_lock(&thd->ctrl_mutex);
435 
436 	pthread_mutex_lock(&thd->data_mutex);
437 
438 	thd->started = TRUE;
439 	pthread_cond_signal(&thd->ctrl_cond);
440 
441 	pthread_mutex_unlock(&thd->ctrl_mutex);
442 
443 	while (1) {
444 		thd->data_avail = FALSE;
445 		pthread_cond_signal(&thd->data_cond);
446 
447 		while (!thd->data_avail && !thd->cancelled) {
448 			pthread_cond_wait(&thd->data_cond, &thd->data_mutex);
449 		}
450 
451 		if (thd->cancelled)
452 			break;
453 
454 		thd->to_len = qlz_compress(thd->from, thd->to, thd->from_len,
455 					   &thd->state);
456 
457 		/* qpress uses 0x00010000 as the initial value, but its own
458 		Adler-32 implementation treats the value differently:
459 		  1. higher order bits are the sum of all bytes in the sequence
460 		  2. lower order bits are the sum of resulting values at every
461 		     step.
462 		So it's the other way around as compared to zlib's adler32().
463 		That's why  0x00000001 is being passed here to be compatible
464 		with qpress implementation. */
465 
466 		thd->adler = adler32(0x00000001, (uchar *) thd->to,
467 				     (uInt)thd->to_len);
468 	}
469 
470 	pthread_mutex_unlock(&thd->data_mutex);
471 
472 	return NULL;
473 }
474