1 /******************************************************
2 Copyright (c) 2012 Percona LLC and/or its affiliates.
3
4 tmpfile datasink for XtraBackup.
5
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; version 2 of the License.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
18
19 *******************************************************/
20
21 /* Do all writes to temporary files first, then pipe them to the specified
22 datasink in a serialized way in deinit(). */
23
24 #include <my_global.h>
25 #include <my_base.h>
26 #include "common.h"
27 #include "datasink.h"
28
29 typedef struct {
30 pthread_mutex_t mutex;
31 LIST *file_list;
32 } ds_tmpfile_ctxt_t;
33
34 typedef struct {
35 LIST list;
36 File fd;
37 char *orig_path;
38 MY_STAT mystat;
39 ds_file_t *file;
40 } ds_tmp_file_t;
41
42 static ds_ctxt_t *tmpfile_init(const char *root);
43 static ds_file_t *tmpfile_open(ds_ctxt_t *ctxt, const char *path,
44 MY_STAT *mystat);
45 static int tmpfile_write(ds_file_t *file, const uchar *buf, size_t len);
46 static int tmpfile_close(ds_file_t *file);
47 static void tmpfile_deinit(ds_ctxt_t *ctxt);
48
49 datasink_t datasink_tmpfile = {
50 &tmpfile_init,
51 &tmpfile_open,
52 &tmpfile_write,
53 &tmpfile_close,
54 &dummy_remove,
55 &tmpfile_deinit
56 };
57
58
59 static ds_ctxt_t *
tmpfile_init(const char * root)60 tmpfile_init(const char *root)
61 {
62 ds_ctxt_t *ctxt;
63 ds_tmpfile_ctxt_t *tmpfile_ctxt;
64
65 ctxt = (ds_ctxt_t *)my_malloc(sizeof(ds_ctxt_t) + sizeof(ds_tmpfile_ctxt_t),
66 MYF(MY_FAE));
67 tmpfile_ctxt = (ds_tmpfile_ctxt_t *) (ctxt + 1);
68 tmpfile_ctxt->file_list = NULL;
69 if (pthread_mutex_init(&tmpfile_ctxt->mutex, NULL)) {
70
71 my_free(ctxt);
72 return NULL;
73 }
74
75 ctxt->ptr = tmpfile_ctxt;
76 ctxt->root = my_strdup(root, MYF(MY_FAE));
77
78 return ctxt;
79 }
80
81 static ds_file_t *
tmpfile_open(ds_ctxt_t * ctxt,const char * path,MY_STAT * mystat)82 tmpfile_open(ds_ctxt_t *ctxt, const char *path,
83 MY_STAT *mystat)
84 {
85 ds_tmpfile_ctxt_t *tmpfile_ctxt;
86 char tmp_path[FN_REFLEN];
87 ds_tmp_file_t *tmp_file;
88 ds_file_t *file;
89 size_t path_len;
90 File fd;
91
92 /* Create a temporary file in tmpdir. The file will be automatically
93 removed on close. Code copied from mysql_tmpfile(). */
94 fd = create_temp_file(tmp_path,xtrabackup_tmpdir,
95 "xbtemp", O_BINARY | O_SEQUENTIAL,
96 MYF(MY_WME | MY_TEMPORARY));
97
98 if (fd < 0) {
99 return NULL;
100 }
101
102 path_len = strlen(path) + 1; /* terminating '\0' */
103
104 file = (ds_file_t *) my_malloc(sizeof(ds_file_t) +
105 sizeof(ds_tmp_file_t) + path_len,
106 MYF(MY_FAE));
107
108 tmp_file = (ds_tmp_file_t *) (file + 1);
109 tmp_file->file = file;
110 memcpy(&tmp_file->mystat, mystat, sizeof(MY_STAT));
111 /* Save a copy of 'path', since it may not be accessible later */
112 tmp_file->orig_path = (char *) tmp_file + sizeof(ds_tmp_file_t);
113
114 tmp_file->fd = fd;
115 memcpy(tmp_file->orig_path, path, path_len);
116
117 /* Store the real temporary file name in file->path */
118 file->path = my_strdup(tmp_path, MYF(MY_FAE));
119 file->ptr = tmp_file;
120
121 /* Store the file object in the list to be piped later */
122 tmpfile_ctxt = (ds_tmpfile_ctxt_t *) ctxt->ptr;
123 tmp_file->list.data = tmp_file;
124
125 pthread_mutex_lock(&tmpfile_ctxt->mutex);
126 tmpfile_ctxt->file_list = list_add(tmpfile_ctxt->file_list,
127 &tmp_file->list);
128 pthread_mutex_unlock(&tmpfile_ctxt->mutex);
129
130 return file;
131 }
132
133 static int
tmpfile_write(ds_file_t * file,const uchar * buf,size_t len)134 tmpfile_write(ds_file_t *file, const uchar *buf, size_t len)
135 {
136 File fd = ((ds_tmp_file_t *) file->ptr)->fd;
137
138 if (!my_write(fd, buf, len, MYF(MY_WME | MY_NABP))) {
139 posix_fadvise(fd, 0, 0, POSIX_FADV_DONTNEED);
140 return 0;
141 }
142
143 return 1;
144 }
145
146 static int
tmpfile_close(ds_file_t * file)147 tmpfile_close(ds_file_t *file)
148 {
149 /* Do nothing -- we will close (and thus remove) the file after piping
150 it to the destination datasink in tmpfile_deinit(). */
151
152 my_free(file->path);
153
154 return 0;
155 }
156
157 static void
tmpfile_deinit(ds_ctxt_t * ctxt)158 tmpfile_deinit(ds_ctxt_t *ctxt)
159 {
160 LIST *list;
161 ds_tmpfile_ctxt_t *tmpfile_ctxt;
162 MY_STAT mystat;
163 ds_tmp_file_t *tmp_file;
164 ds_file_t *dst_file;
165 ds_ctxt_t *pipe_ctxt;
166 void *buf = NULL;
167 const size_t buf_size = 10 * 1024 * 1024;
168 size_t bytes;
169 size_t offset;
170
171 pipe_ctxt = ctxt->pipe_ctxt;
172 xb_a(pipe_ctxt != NULL);
173
174 buf = my_malloc(buf_size, MYF(MY_FAE));
175
176 tmpfile_ctxt = (ds_tmpfile_ctxt_t *) ctxt->ptr;
177 list = tmpfile_ctxt->file_list;
178
179 /* Walk the files in the order they have been added */
180 list = list_reverse(list);
181 while (list != NULL) {
182 tmp_file = (ds_tmp_file_t *)list->data;
183 /* Stat the file to replace size and mtime on the original
184 * mystat struct */
185 if (my_fstat(tmp_file->fd, &mystat, MYF(0))) {
186 die("my_fstat() failed.");
187 }
188 tmp_file->mystat.st_size = mystat.st_size;
189 tmp_file->mystat.st_mtime = mystat.st_mtime;
190
191 dst_file = ds_open(pipe_ctxt, tmp_file->orig_path,
192 &tmp_file->mystat);
193 if (dst_file == NULL) {
194 die("could not stream a temporary file to "
195 "'%s'", tmp_file->orig_path);
196 }
197
198 /* copy to the destination datasink */
199 posix_fadvise(tmp_file->fd, 0, 0, POSIX_FADV_SEQUENTIAL);
200 if (my_seek(tmp_file->fd, 0, SEEK_SET, MYF(0)) ==
201 MY_FILEPOS_ERROR) {
202 die("my_seek() failed for '%s', errno = %d.",
203 tmp_file->file->path, my_errno);
204 }
205 offset = 0;
206 while ((bytes = my_read(tmp_file->fd, (unsigned char *)buf, buf_size,
207 MYF(MY_WME))) > 0) {
208 posix_fadvise(tmp_file->fd, offset, buf_size, POSIX_FADV_DONTNEED);
209 offset += buf_size;
210 if (ds_write(dst_file, buf, bytes)) {
211 die("cannot write to stream for '%s'.",
212 tmp_file->orig_path);
213 }
214 }
215 if (bytes == (size_t) -1) {
216 die("my_read failed for %s", tmp_file->orig_path);
217 }
218
219 my_close(tmp_file->fd, MYF(MY_WME));
220 ds_close(dst_file);
221
222 list = list_rest(list);
223 my_free(tmp_file->file);
224 }
225
226 pthread_mutex_destroy(&tmpfile_ctxt->mutex);
227
228 my_free(buf);
229 my_free(ctxt->root);
230 my_free(ctxt);
231 }
232