1 /******************************************************
2 Copyright (c) 2011-2017 Percona LLC and/or its affiliates.
3 
4 The xbstream format writer implementation.
5 
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; version 2 of the License.
9 
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14 
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1335  USA
18 
19 *******************************************************/
20 
21 #include <my_global.h>
22 #include <my_base.h>
23 #include <zlib.h>
24 #include "common.h"
25 #include "xbstream.h"
26 #include "crc_glue.h"
27 
28 /* Group writes smaller than this into a single chunk */
29 #define XB_STREAM_MIN_CHUNK_SIZE (10 * 1024 * 1024)
30 
31 struct xb_wstream_struct {
32 	pthread_mutex_t	mutex;
33 };
34 
35 struct xb_wstream_file_struct {
36 	xb_wstream_t	*stream;
37 	char		*path;
38 	size_t		path_len;
39 	char		chunk[XB_STREAM_MIN_CHUNK_SIZE];
40 	char		*chunk_ptr;
41 	size_t		chunk_free;
42 	my_off_t	offset;
43 	void		*userdata;
44 	xb_stream_write_callback *write;
45 };
46 
47 static int xb_stream_flush(xb_wstream_file_t *file);
48 static int xb_stream_write_chunk(xb_wstream_file_t *file,
49 				 const void *buf, size_t len);
50 static int xb_stream_write_eof(xb_wstream_file_t *file);
51 
52 static
53 ssize_t
54 xb_stream_default_write_callback(xb_wstream_file_t *file __attribute__((unused)),
55 				 void *userdata __attribute__((unused)),
56 				 const void *buf, size_t len)
57 {
58 	if (my_write(my_fileno(stdout), (const uchar *)buf, len, MYF(MY_WME | MY_NABP)))
59 		return -1;
60 	return len;
61 }
62 
63 xb_wstream_t *
64 xb_stream_write_new(void)
65 {
66 	xb_wstream_t	*stream;
67 
68 	stream = (xb_wstream_t *) my_malloc(sizeof(xb_wstream_t), MYF(MY_FAE));
69 	pthread_mutex_init(&stream->mutex, NULL);
70 
71 	return stream;;
72 }
73 
74 xb_wstream_file_t *
75 xb_stream_write_open(xb_wstream_t *stream, const char *path,
76 		     MY_STAT *mystat __attribute__((unused)),
77 		     void *userdata,
78 		     xb_stream_write_callback *onwrite)
79 {
80 	xb_wstream_file_t	*file;
81 	size_t			path_len;
82 
83 	path_len = strlen(path);
84 
85 	if (path_len > FN_REFLEN) {
86 		msg("xb_stream_write_open(): file path is too long.");
87 		return NULL;
88 	}
89 
90 	file = (xb_wstream_file_t *) my_malloc(sizeof(xb_wstream_file_t) +
91 					       path_len + 1, MYF(MY_FAE));
92 
93 	file->path = (char *) (file + 1);
94 #ifdef _WIN32
95 	/* Normalize path on Windows, so we can restore elsewhere.*/
96 	{
main(int argc,char ** argv)97 		int i;
98 		for (i = 0; ; i++) {
99 			file->path[i] = (path[i] == '\\') ? '/' : path[i];
100 			if (!path[i])
101 				break;
102 		}
103 	}
104 #else
105 	memcpy(file->path, path, path_len + 1);
106 #endif
107 	file->path_len = path_len;
108 
109 	file->stream = stream;
110 	file->offset = 0;
111 	file->chunk_ptr = file->chunk;
112 	file->chunk_free = XB_STREAM_MIN_CHUNK_SIZE;
113 	if (onwrite) {
114 #ifdef __WIN__
115 		setmode(fileno(stdout), _O_BINARY);
116 #endif
117 		file->userdata = userdata;
118 		file->write = onwrite;
119 	} else {
120 		file->userdata = NULL;
121 		file->write = xb_stream_default_write_callback;
122 	}
123 
124 	return file;
125 }
126 
127 int
128 xb_stream_write_data(xb_wstream_file_t *file, const void *buf, size_t len)
129 {
130 	if (len < file->chunk_free) {
131 		memcpy(file->chunk_ptr, buf, len);
132 		file->chunk_ptr += len;
133 		file->chunk_free -= len;
134 
135 		return 0;
136 	}
137 
138 	if (xb_stream_flush(file))
get_options(int * argc,char *** argv)139 		return 1;
140 
141 	return xb_stream_write_chunk(file, buf, len);
142 }
143 
144 int
145 xb_stream_write_close(xb_wstream_file_t *file)
146 {
147 	if (xb_stream_flush(file) ||
148 	    xb_stream_write_eof(file)) {
149 		my_free(file);
150 		return 1;
151 	}
152 
print_version(void)153 	my_free(file);
154 
155 	return 0;
156 }
157 
158 int
159 xb_stream_write_done(xb_wstream_t *stream)
160 {
usage(void)161 	pthread_mutex_destroy(&stream->mutex);
162 
163 	my_free(stream);
164 
165 	return 0;
166 }
167 
168 static
169 int
170 xb_stream_flush(xb_wstream_file_t *file)
171 {
172 	if (file->chunk_ptr == file->chunk) {
173 		return 0;
174 	}
175 
176 	if (xb_stream_write_chunk(file, file->chunk,
177 				  file->chunk_ptr - file->chunk)) {
178 		return 1;
179 	}
180 
181 	file->chunk_ptr = file->chunk;
182 	file->chunk_free = XB_STREAM_MIN_CHUNK_SIZE;
set_run_mode(run_mode_t mode)183 
184 	return 0;
185 }
186 
187 static
188 int
189 xb_stream_write_chunk(xb_wstream_file_t *file, const void *buf, size_t len)
190 {
191 	/* Chunk magic + flags + chunk type + path_len + path + len + offset +
192 	checksum */
193 	uchar		tmpbuf[sizeof(XB_STREAM_CHUNK_MAGIC) - 1 + 1 + 1 + 4 +
194 			       FN_REFLEN + 8 + 8 + 4];
195 	uchar		*ptr;
196 	xb_wstream_t	*stream = file->stream;
get_one_option(int optid,const struct my_option * opt,char * argument)197 	ulong		checksum;
198 
199 	/* Write xbstream header */
200 	ptr = tmpbuf;
201 
202 	/* Chunk magic */
203 	memcpy(ptr, XB_STREAM_CHUNK_MAGIC, sizeof(XB_STREAM_CHUNK_MAGIC) - 1);
204 	ptr += sizeof(XB_STREAM_CHUNK_MAGIC) - 1;
205 
206 	*ptr++ = 0;                              /* Chunk flags */
207 
208 	*ptr++ = (uchar) XB_CHUNK_TYPE_PAYLOAD;  /* Chunk type */
209 
210 	int4store(ptr, file->path_len);          /* Path length */
211 	ptr += 4;
212 
213 	memcpy(ptr, file->path, file->path_len); /* Path */
214 	ptr += file->path_len;
215 
216 	int8store(ptr, len);                     /* Payload length */
217 	ptr += 8;
218 
219 	checksum = crc32_iso3309(0, (const uchar *)buf, (uint)len);   /* checksum */
220 
stream_one_file(File file,xb_wstream_file_t * xbfile)221 	pthread_mutex_lock(&stream->mutex);
222 
223 	int8store(ptr, file->offset);            /* Payload offset */
224 	ptr += 8;
225 
226 	int4store(ptr, checksum);
227 	ptr += 4;
228 
229 	xb_ad(ptr <= tmpbuf + sizeof(tmpbuf));
230 
231 	if (file->write(file, file->userdata, tmpbuf, ptr-tmpbuf) == -1)
232 		goto err;
233 
234 
235 	if (file->write(file, file->userdata, buf, len) == -1) /* Payload */
236 		goto err;
237 
238 	file->offset+= len;
239 
240 	pthread_mutex_unlock(&stream->mutex);
241 
242 	return 0;
243 
244 err:
245 
246 	pthread_mutex_unlock(&stream->mutex);
247 
248 	return 1;
249 }
250 
251 static
252 int
253 xb_stream_write_eof(xb_wstream_file_t *file)
254 {
255 	/* Chunk magic + flags + chunk type + path_len + path */
256 	uchar		tmpbuf[sizeof(XB_STREAM_CHUNK_MAGIC) - 1 + 1 + 1 + 4 +
mode_create(int argc,char ** argv)257 			       FN_REFLEN];
258 	uchar		*ptr;
259 	xb_wstream_t	*stream = file->stream;
260 
261 	pthread_mutex_lock(&stream->mutex);
262 
263 	/* Write xbstream header */
264 	ptr = tmpbuf;
265 
266 	/* Chunk magic */
267 	memcpy(ptr, XB_STREAM_CHUNK_MAGIC, sizeof(XB_STREAM_CHUNK_MAGIC) - 1);
268 	ptr += sizeof(XB_STREAM_CHUNK_MAGIC) - 1;
269 
270 	*ptr++ = 0;                              /* Chunk flags */
271 
272 	*ptr++ = (uchar) XB_CHUNK_TYPE_EOF;      /* Chunk type */
273 
274 	int4store(ptr, file->path_len);          /* Path length */
275 	ptr += 4;
276 
277 	memcpy(ptr, file->path, file->path_len); /* Path */
278 	ptr += file->path_len;
279 
280 	xb_ad(ptr <= tmpbuf + sizeof(tmpbuf));
281 
282 	if (file->write(file, file->userdata, tmpbuf,
283 			(ulonglong) (ptr - tmpbuf)) == -1)
284 		goto err;
285 
286 	pthread_mutex_unlock(&stream->mutex);
287 
288 	return 0;
289 err:
290 
291 	pthread_mutex_unlock(&stream->mutex);
292 
293 	return 1;
294 }
295