1 /******************************************************
2 Copyright (c) 2011-2017 Percona LLC and/or its affiliates.
3 
4 The xbstream format writer implementation.
5 
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; version 2 of the License.
9 
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14 
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1335  USA
18 
19 *******************************************************/
20 
21 #include <my_global.h>
22 #include <my_base.h>
23 #include <zlib.h>
24 #include "common.h"
25 #include "xbstream.h"
26 
27 /* Group writes smaller than this into a single chunk */
28 #define XB_STREAM_MIN_CHUNK_SIZE (10 * 1024 * 1024)
29 
30 struct xb_wstream_struct {
31 	pthread_mutex_t	mutex;
32 };
33 
34 struct xb_wstream_file_struct {
35 	xb_wstream_t	*stream;
36 	char		*path;
37 	size_t		path_len;
38 	char		chunk[XB_STREAM_MIN_CHUNK_SIZE];
39 	char		*chunk_ptr;
40 	size_t		chunk_free;
41 	my_off_t	offset;
42 	void		*userdata;
43 	xb_stream_write_callback *write;
44 };
45 
46 static int xb_stream_flush(xb_wstream_file_t *file);
47 static int xb_stream_write_chunk(xb_wstream_file_t *file,
48 				 const void *buf, size_t len);
49 static int xb_stream_write_eof(xb_wstream_file_t *file);
50 
51 static
52 ssize_t
xb_stream_default_write_callback(xb_wstream_file_t * file,void * userdata,const void * buf,size_t len)53 xb_stream_default_write_callback(xb_wstream_file_t *file __attribute__((unused)),
54 				 void *userdata __attribute__((unused)),
55 				 const void *buf, size_t len)
56 {
57 	if (my_write(my_fileno(stdout), (const uchar *)buf, len, MYF(MY_WME | MY_NABP)))
58 		return -1;
59 	return len;
60 }
61 
62 xb_wstream_t *
xb_stream_write_new(void)63 xb_stream_write_new(void)
64 {
65 	xb_wstream_t	*stream;
66 
67 	stream = (xb_wstream_t *) my_malloc(PSI_NOT_INSTRUMENTED, sizeof(xb_wstream_t), MYF(MY_FAE));
68 	pthread_mutex_init(&stream->mutex, NULL);
69 
70 	return stream;;
71 }
72 
73 xb_wstream_file_t *
xb_stream_write_open(xb_wstream_t * stream,const char * path,MY_STAT * mystat,void * userdata,xb_stream_write_callback * onwrite)74 xb_stream_write_open(xb_wstream_t *stream, const char *path,
75 		     MY_STAT *mystat __attribute__((unused)),
76 		     void *userdata,
77 		     xb_stream_write_callback *onwrite)
78 {
79 	xb_wstream_file_t	*file;
80 	size_t			path_len;
81 
82 	path_len = strlen(path);
83 
84 	if (path_len > FN_REFLEN) {
85 		msg("xb_stream_write_open(): file path is too long.");
86 		return NULL;
87 	}
88 
89 	file = (xb_wstream_file_t *) my_malloc(PSI_NOT_INSTRUMENTED, sizeof(xb_wstream_file_t) +
90 					       path_len + 1, MYF(MY_FAE));
91 
92 	file->path = (char *) (file + 1);
93 #ifdef _WIN32
94 	/* Normalize path on Windows, so we can restore elsewhere.*/
95 	{
96 		int i;
97 		for (i = 0; ; i++) {
98 			file->path[i] = (path[i] == '\\') ? '/' : path[i];
99 			if (!path[i])
100 				break;
101 		}
102 	}
103 #else
104 	memcpy(file->path, path, path_len + 1);
105 #endif
106 	file->path_len = path_len;
107 
108 	file->stream = stream;
109 	file->offset = 0;
110 	file->chunk_ptr = file->chunk;
111 	file->chunk_free = XB_STREAM_MIN_CHUNK_SIZE;
112 	if (onwrite) {
113 #ifdef __WIN__
114 		setmode(fileno(stdout), _O_BINARY);
115 #endif
116 		file->userdata = userdata;
117 		file->write = onwrite;
118 	} else {
119 		file->userdata = NULL;
120 		file->write = xb_stream_default_write_callback;
121 	}
122 
123 	return file;
124 }
125 
126 int
xb_stream_write_data(xb_wstream_file_t * file,const void * buf,size_t len)127 xb_stream_write_data(xb_wstream_file_t *file, const void *buf, size_t len)
128 {
129 	if (len < file->chunk_free) {
130 		memcpy(file->chunk_ptr, buf, len);
131 		file->chunk_ptr += len;
132 		file->chunk_free -= len;
133 
134 		return 0;
135 	}
136 
137 	if (xb_stream_flush(file))
138 		return 1;
139 
140 	return xb_stream_write_chunk(file, buf, len);
141 }
142 
143 int
xb_stream_write_close(xb_wstream_file_t * file)144 xb_stream_write_close(xb_wstream_file_t *file)
145 {
146 	if (xb_stream_flush(file) ||
147 	    xb_stream_write_eof(file)) {
148 		my_free(file);
149 		return 1;
150 	}
151 
152 	my_free(file);
153 
154 	return 0;
155 }
156 
157 int
xb_stream_write_done(xb_wstream_t * stream)158 xb_stream_write_done(xb_wstream_t *stream)
159 {
160 	pthread_mutex_destroy(&stream->mutex);
161 
162 	my_free(stream);
163 
164 	return 0;
165 }
166 
167 static
168 int
xb_stream_flush(xb_wstream_file_t * file)169 xb_stream_flush(xb_wstream_file_t *file)
170 {
171 	if (file->chunk_ptr == file->chunk) {
172 		return 0;
173 	}
174 
175 	if (xb_stream_write_chunk(file, file->chunk,
176 				  file->chunk_ptr - file->chunk)) {
177 		return 1;
178 	}
179 
180 	file->chunk_ptr = file->chunk;
181 	file->chunk_free = XB_STREAM_MIN_CHUNK_SIZE;
182 
183 	return 0;
184 }
185 
186 static
187 int
xb_stream_write_chunk(xb_wstream_file_t * file,const void * buf,size_t len)188 xb_stream_write_chunk(xb_wstream_file_t *file, const void *buf, size_t len)
189 {
190 	/* Chunk magic + flags + chunk type + path_len + path + len + offset +
191 	checksum */
192 	uchar		tmpbuf[sizeof(XB_STREAM_CHUNK_MAGIC) - 1 + 1 + 1 + 4 +
193 			       FN_REFLEN + 8 + 8 + 4];
194 	uchar		*ptr;
195 	xb_wstream_t	*stream = file->stream;
196 	ulong		checksum;
197 
198 	/* Write xbstream header */
199 	ptr = tmpbuf;
200 
201 	/* Chunk magic */
202 	memcpy(ptr, XB_STREAM_CHUNK_MAGIC, sizeof(XB_STREAM_CHUNK_MAGIC) - 1);
203 	ptr += sizeof(XB_STREAM_CHUNK_MAGIC) - 1;
204 
205 	*ptr++ = 0;                              /* Chunk flags */
206 
207 	*ptr++ = (uchar) XB_CHUNK_TYPE_PAYLOAD;  /* Chunk type */
208 
209 	int4store(ptr, file->path_len);          /* Path length */
210 	ptr += 4;
211 
212 	memcpy(ptr, file->path, file->path_len); /* Path */
213 	ptr += file->path_len;
214 
215 	int8store(ptr, len);                     /* Payload length */
216 	ptr += 8;
217 
218 	checksum = my_checksum(0, buf, len);
219 
220 	pthread_mutex_lock(&stream->mutex);
221 
222 	int8store(ptr, file->offset);            /* Payload offset */
223 	ptr += 8;
224 
225 	int4store(ptr, checksum);
226 	ptr += 4;
227 
228 	xb_ad(ptr <= tmpbuf + sizeof(tmpbuf));
229 
230 	if (file->write(file, file->userdata, tmpbuf, ptr-tmpbuf) == -1)
231 		goto err;
232 
233 
234 	if (file->write(file, file->userdata, buf, len) == -1) /* Payload */
235 		goto err;
236 
237 	file->offset+= len;
238 
239 	pthread_mutex_unlock(&stream->mutex);
240 
241 	return 0;
242 
243 err:
244 
245 	pthread_mutex_unlock(&stream->mutex);
246 
247 	return 1;
248 }
249 
250 static
251 int
xb_stream_write_eof(xb_wstream_file_t * file)252 xb_stream_write_eof(xb_wstream_file_t *file)
253 {
254 	/* Chunk magic + flags + chunk type + path_len + path */
255 	uchar		tmpbuf[sizeof(XB_STREAM_CHUNK_MAGIC) - 1 + 1 + 1 + 4 +
256 			       FN_REFLEN];
257 	uchar		*ptr;
258 	xb_wstream_t	*stream = file->stream;
259 
260 	pthread_mutex_lock(&stream->mutex);
261 
262 	/* Write xbstream header */
263 	ptr = tmpbuf;
264 
265 	/* Chunk magic */
266 	memcpy(ptr, XB_STREAM_CHUNK_MAGIC, sizeof(XB_STREAM_CHUNK_MAGIC) - 1);
267 	ptr += sizeof(XB_STREAM_CHUNK_MAGIC) - 1;
268 
269 	*ptr++ = 0;                              /* Chunk flags */
270 
271 	*ptr++ = (uchar) XB_CHUNK_TYPE_EOF;      /* Chunk type */
272 
273 	int4store(ptr, file->path_len);          /* Path length */
274 	ptr += 4;
275 
276 	memcpy(ptr, file->path, file->path_len); /* Path */
277 	ptr += file->path_len;
278 
279 	xb_ad(ptr <= tmpbuf + sizeof(tmpbuf));
280 
281 	if (file->write(file, file->userdata, tmpbuf,
282 			(ulonglong) (ptr - tmpbuf)) == -1)
283 		goto err;
284 
285 	pthread_mutex_unlock(&stream->mutex);
286 
287 	return 0;
288 err:
289 
290 	pthread_mutex_unlock(&stream->mutex);
291 
292 	return 1;
293 }
294