1 /******************************************************
2 Copyright (c) 2012 Percona LLC and/or its affiliates.
3 
4 tmpfile datasink for XtraBackup.
5 
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; version 2 of the License.
9 
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14 
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1335  USA
18 
19 *******************************************************/
20 
21 /* Do all writes to temporary files first, then pipe them to the specified
22 datasink in a serialized way in deinit(). */
23 
24 #include <my_global.h>
25 #include <my_base.h>
26 #include "common.h"
27 #include "datasink.h"
28 
29 typedef struct {
30 	pthread_mutex_t	 mutex;
31 	LIST		*file_list;
32 } ds_tmpfile_ctxt_t;
33 
34 typedef struct {
35 	LIST		 list;
36 	File		 fd;
37 	char		*orig_path;
38 	MY_STAT		 mystat;
39 	ds_file_t	*file;
40 } ds_tmp_file_t;
41 
42 static ds_ctxt_t *tmpfile_init(const char *root);
43 static ds_file_t *tmpfile_open(ds_ctxt_t *ctxt, const char *path,
44 			       MY_STAT *mystat);
45 static int tmpfile_write(ds_file_t *file, const uchar *buf, size_t len);
46 static int tmpfile_close(ds_file_t *file);
47 static void tmpfile_deinit(ds_ctxt_t *ctxt);
48 
49 datasink_t datasink_tmpfile = {
50 	&tmpfile_init,
51 	&tmpfile_open,
52 	&tmpfile_write,
53 	&tmpfile_close,
54 	&dummy_remove,
55 	&tmpfile_deinit
56 };
57 
58 
59 static ds_ctxt_t *
tmpfile_init(const char * root)60 tmpfile_init(const char *root)
61 {
62 	ds_ctxt_t		*ctxt;
63 	ds_tmpfile_ctxt_t	*tmpfile_ctxt;
64 
65 	ctxt = (ds_ctxt_t *)my_malloc(sizeof(ds_ctxt_t) + sizeof(ds_tmpfile_ctxt_t),
66 			 MYF(MY_FAE));
67 	tmpfile_ctxt = (ds_tmpfile_ctxt_t *) (ctxt + 1);
68 	tmpfile_ctxt->file_list = NULL;
69 	if (pthread_mutex_init(&tmpfile_ctxt->mutex, NULL)) {
70 
71 		my_free(ctxt);
72 		return NULL;
73 	}
74 
75 	ctxt->ptr = tmpfile_ctxt;
76 	ctxt->root = my_strdup(root, MYF(MY_FAE));
77 
78 	return ctxt;
79 }
80 
81 static ds_file_t *
tmpfile_open(ds_ctxt_t * ctxt,const char * path,MY_STAT * mystat)82 tmpfile_open(ds_ctxt_t *ctxt, const char *path,
83 			       MY_STAT *mystat)
84 {
85 	ds_tmpfile_ctxt_t	*tmpfile_ctxt;
86 	char			 tmp_path[FN_REFLEN];
87 	ds_tmp_file_t		*tmp_file;
88 	ds_file_t		*file;
89 	size_t			 path_len;
90 	File			 fd;
91 
92 	/* Create a temporary file in tmpdir. The file will be automatically
93 	removed on close. Code copied from mysql_tmpfile(). */
94 	fd = create_temp_file(tmp_path,xtrabackup_tmpdir,
95 			      "xbtemp", O_BINARY | O_SEQUENTIAL,
96 			      MYF(MY_WME | MY_TEMPORARY));
97 
98 	if (fd < 0) {
99 		return NULL;
100 	}
101 
102 	path_len = strlen(path) + 1; /* terminating '\0' */
103 
104 	file = (ds_file_t *) my_malloc(sizeof(ds_file_t) +
105 				       sizeof(ds_tmp_file_t) + path_len,
106 				       MYF(MY_FAE));
107 
108 	tmp_file = (ds_tmp_file_t *) (file + 1);
109 	tmp_file->file = file;
110 	memcpy(&tmp_file->mystat, mystat, sizeof(MY_STAT));
111 	/* Save a copy of 'path', since it may not be accessible later */
112 	tmp_file->orig_path = (char *) tmp_file + sizeof(ds_tmp_file_t);
113 
114 	tmp_file->fd = fd;
115 	memcpy(tmp_file->orig_path, path, path_len);
116 
117 	/* Store the real temporary file name in file->path */
118 	file->path = my_strdup(tmp_path, MYF(MY_FAE));
119 	file->ptr = tmp_file;
120 
121 	/* Store the file object in the list to be piped later */
122 	tmpfile_ctxt = (ds_tmpfile_ctxt_t *) ctxt->ptr;
123 	tmp_file->list.data = tmp_file;
124 
125 	pthread_mutex_lock(&tmpfile_ctxt->mutex);
126 	tmpfile_ctxt->file_list = list_add(tmpfile_ctxt->file_list,
127 					   &tmp_file->list);
128 	pthread_mutex_unlock(&tmpfile_ctxt->mutex);
129 
130 	return file;
131 }
132 
133 static int
tmpfile_write(ds_file_t * file,const uchar * buf,size_t len)134 tmpfile_write(ds_file_t *file, const uchar *buf, size_t len)
135 {
136 	File fd = ((ds_tmp_file_t *) file->ptr)->fd;
137 
138 	if (!my_write(fd, buf, len, MYF(MY_WME | MY_NABP))) {
139 		posix_fadvise(fd, 0, 0, POSIX_FADV_DONTNEED);
140 		return 0;
141 	}
142 
143 	return 1;
144 }
145 
146 static int
tmpfile_close(ds_file_t * file)147 tmpfile_close(ds_file_t *file)
148 {
149 	/* Do nothing -- we will close (and thus remove) the file after piping
150 	it to the destination datasink in tmpfile_deinit(). */
151 
152 	my_free(file->path);
153 
154 	return 0;
155 }
156 
157 static void
tmpfile_deinit(ds_ctxt_t * ctxt)158 tmpfile_deinit(ds_ctxt_t *ctxt)
159 {
160 	LIST			*list;
161 	ds_tmpfile_ctxt_t	*tmpfile_ctxt;
162 	MY_STAT			 mystat;
163 	ds_tmp_file_t 		*tmp_file;
164 	ds_file_t		*dst_file;
165 	ds_ctxt_t		*pipe_ctxt;
166 	void			*buf = NULL;
167 	const size_t		 buf_size = 10 * 1024 * 1024;
168 	size_t			 bytes;
169 	size_t			 offset;
170 
171 	pipe_ctxt = ctxt->pipe_ctxt;
172 	xb_a(pipe_ctxt != NULL);
173 
174 	buf = my_malloc(buf_size, MYF(MY_FAE));
175 
176 	tmpfile_ctxt = (ds_tmpfile_ctxt_t *) ctxt->ptr;
177 	list = tmpfile_ctxt->file_list;
178 
179 	/* Walk the files in the order they have been added */
180 	list = list_reverse(list);
181 	while (list != NULL) {
182 		tmp_file = (ds_tmp_file_t *)list->data;
183 		/* Stat the file to replace size and mtime on the original
184 		* mystat struct */
185 		if (my_fstat(tmp_file->fd, &mystat, MYF(0))) {
186 			die("my_fstat() failed.");
187 		}
188 		tmp_file->mystat.st_size = mystat.st_size;
189 		tmp_file->mystat.st_mtime = mystat.st_mtime;
190 
191 		dst_file = ds_open(pipe_ctxt, tmp_file->orig_path,
192 				   &tmp_file->mystat);
193 		if (dst_file == NULL) {
194 			die("could not stream a temporary file to "
195 			    "'%s'", tmp_file->orig_path);
196 		}
197 
198 		/* copy to the destination datasink */
199 		posix_fadvise(tmp_file->fd, 0, 0, POSIX_FADV_SEQUENTIAL);
200 		if (my_seek(tmp_file->fd, 0, SEEK_SET, MYF(0)) ==
201 		    MY_FILEPOS_ERROR) {
202 			die("my_seek() failed for '%s', errno = %d.",
203 			    tmp_file->file->path, my_errno);
204 		}
205 		offset = 0;
206 		while ((bytes = my_read(tmp_file->fd, (unsigned char *)buf, buf_size,
207 					MYF(MY_WME))) > 0) {
208 			posix_fadvise(tmp_file->fd, offset, buf_size, POSIX_FADV_DONTNEED);
209 			offset += buf_size;
210 			if (ds_write(dst_file, buf, bytes)) {
211 				die("cannot write to stream for '%s'.",
212 				    tmp_file->orig_path);
213 			}
214 		}
215 		if (bytes == (size_t) -1) {
216 			die("my_read failed for %s", tmp_file->orig_path);
217 		}
218 
219 		my_close(tmp_file->fd, MYF(MY_WME));
220 		ds_close(dst_file);
221 
222 		list = list_rest(list);
223 		my_free(tmp_file->file);
224 	}
225 
226 	pthread_mutex_destroy(&tmpfile_ctxt->mutex);
227 
228 	my_free(buf);
229 	my_free(ctxt->root);
230 	my_free(ctxt);
231 }
232