1 /******************************************************
2 Copyright (c) 2011-2017 Percona LLC and/or its affiliates.
3
4 The xbstream format writer implementation.
5
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; version 2 of the License.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
18
19 *******************************************************/
20
21 #include <my_global.h>
22 #include <my_base.h>
23 #include <zlib.h>
24 #include "common.h"
25 #include "xbstream.h"
26 #include "crc_glue.h"
27
28 /* Group writes smaller than this into a single chunk */
29 #define XB_STREAM_MIN_CHUNK_SIZE (10 * 1024 * 1024)
30
31 struct xb_wstream_struct {
32 pthread_mutex_t mutex;
33 };
34
35 struct xb_wstream_file_struct {
36 xb_wstream_t *stream;
37 char *path;
38 size_t path_len;
39 char chunk[XB_STREAM_MIN_CHUNK_SIZE];
40 char *chunk_ptr;
41 size_t chunk_free;
42 my_off_t offset;
43 void *userdata;
44 xb_stream_write_callback *write;
45 };
46
47 static int xb_stream_flush(xb_wstream_file_t *file);
48 static int xb_stream_write_chunk(xb_wstream_file_t *file,
49 const void *buf, size_t len);
50 static int xb_stream_write_eof(xb_wstream_file_t *file);
51
52 static
53 ssize_t
xb_stream_default_write_callback(xb_wstream_file_t * file,void * userdata,const void * buf,size_t len)54 xb_stream_default_write_callback(xb_wstream_file_t *file __attribute__((unused)),
55 void *userdata __attribute__((unused)),
56 const void *buf, size_t len)
57 {
58 if (my_write(my_fileno(stdout), (const uchar *)buf, len, MYF(MY_WME | MY_NABP)))
59 return -1;
60 return len;
61 }
62
63 xb_wstream_t *
xb_stream_write_new(void)64 xb_stream_write_new(void)
65 {
66 xb_wstream_t *stream;
67
68 stream = (xb_wstream_t *) my_malloc(sizeof(xb_wstream_t), MYF(MY_FAE));
69 pthread_mutex_init(&stream->mutex, NULL);
70
71 return stream;;
72 }
73
74 xb_wstream_file_t *
xb_stream_write_open(xb_wstream_t * stream,const char * path,MY_STAT * mystat,void * userdata,xb_stream_write_callback * onwrite)75 xb_stream_write_open(xb_wstream_t *stream, const char *path,
76 MY_STAT *mystat __attribute__((unused)),
77 void *userdata,
78 xb_stream_write_callback *onwrite)
79 {
80 xb_wstream_file_t *file;
81 size_t path_len;
82
83 path_len = strlen(path);
84
85 if (path_len > FN_REFLEN) {
86 msg("xb_stream_write_open(): file path is too long.");
87 return NULL;
88 }
89
90 file = (xb_wstream_file_t *) my_malloc(sizeof(xb_wstream_file_t) +
91 path_len + 1, MYF(MY_FAE));
92
93 file->path = (char *) (file + 1);
94 #ifdef _WIN32
95 /* Normalize path on Windows, so we can restore elsewhere.*/
96 {
97 int i;
98 for (i = 0; ; i++) {
99 file->path[i] = (path[i] == '\\') ? '/' : path[i];
100 if (!path[i])
101 break;
102 }
103 }
104 #else
105 memcpy(file->path, path, path_len + 1);
106 #endif
107 file->path_len = path_len;
108
109 file->stream = stream;
110 file->offset = 0;
111 file->chunk_ptr = file->chunk;
112 file->chunk_free = XB_STREAM_MIN_CHUNK_SIZE;
113 if (onwrite) {
114 #ifdef __WIN__
115 setmode(fileno(stdout), _O_BINARY);
116 #endif
117 file->userdata = userdata;
118 file->write = onwrite;
119 } else {
120 file->userdata = NULL;
121 file->write = xb_stream_default_write_callback;
122 }
123
124 return file;
125 }
126
127 int
xb_stream_write_data(xb_wstream_file_t * file,const void * buf,size_t len)128 xb_stream_write_data(xb_wstream_file_t *file, const void *buf, size_t len)
129 {
130 if (len < file->chunk_free) {
131 memcpy(file->chunk_ptr, buf, len);
132 file->chunk_ptr += len;
133 file->chunk_free -= len;
134
135 return 0;
136 }
137
138 if (xb_stream_flush(file))
139 return 1;
140
141 return xb_stream_write_chunk(file, buf, len);
142 }
143
144 int
xb_stream_write_close(xb_wstream_file_t * file)145 xb_stream_write_close(xb_wstream_file_t *file)
146 {
147 if (xb_stream_flush(file) ||
148 xb_stream_write_eof(file)) {
149 my_free(file);
150 return 1;
151 }
152
153 my_free(file);
154
155 return 0;
156 }
157
158 int
xb_stream_write_done(xb_wstream_t * stream)159 xb_stream_write_done(xb_wstream_t *stream)
160 {
161 pthread_mutex_destroy(&stream->mutex);
162
163 my_free(stream);
164
165 return 0;
166 }
167
168 static
169 int
xb_stream_flush(xb_wstream_file_t * file)170 xb_stream_flush(xb_wstream_file_t *file)
171 {
172 if (file->chunk_ptr == file->chunk) {
173 return 0;
174 }
175
176 if (xb_stream_write_chunk(file, file->chunk,
177 file->chunk_ptr - file->chunk)) {
178 return 1;
179 }
180
181 file->chunk_ptr = file->chunk;
182 file->chunk_free = XB_STREAM_MIN_CHUNK_SIZE;
183
184 return 0;
185 }
186
187 static
188 int
xb_stream_write_chunk(xb_wstream_file_t * file,const void * buf,size_t len)189 xb_stream_write_chunk(xb_wstream_file_t *file, const void *buf, size_t len)
190 {
191 /* Chunk magic + flags + chunk type + path_len + path + len + offset +
192 checksum */
193 uchar tmpbuf[sizeof(XB_STREAM_CHUNK_MAGIC) - 1 + 1 + 1 + 4 +
194 FN_REFLEN + 8 + 8 + 4];
195 uchar *ptr;
196 xb_wstream_t *stream = file->stream;
197 ulong checksum;
198
199 /* Write xbstream header */
200 ptr = tmpbuf;
201
202 /* Chunk magic */
203 memcpy(ptr, XB_STREAM_CHUNK_MAGIC, sizeof(XB_STREAM_CHUNK_MAGIC) - 1);
204 ptr += sizeof(XB_STREAM_CHUNK_MAGIC) - 1;
205
206 *ptr++ = 0; /* Chunk flags */
207
208 *ptr++ = (uchar) XB_CHUNK_TYPE_PAYLOAD; /* Chunk type */
209
210 int4store(ptr, file->path_len); /* Path length */
211 ptr += 4;
212
213 memcpy(ptr, file->path, file->path_len); /* Path */
214 ptr += file->path_len;
215
216 int8store(ptr, len); /* Payload length */
217 ptr += 8;
218
219 checksum = crc32_iso3309(0, (const uchar *)buf, (uint)len); /* checksum */
220
221 pthread_mutex_lock(&stream->mutex);
222
223 int8store(ptr, file->offset); /* Payload offset */
224 ptr += 8;
225
226 int4store(ptr, checksum);
227 ptr += 4;
228
229 xb_ad(ptr <= tmpbuf + sizeof(tmpbuf));
230
231 if (file->write(file, file->userdata, tmpbuf, ptr-tmpbuf) == -1)
232 goto err;
233
234
235 if (file->write(file, file->userdata, buf, len) == -1) /* Payload */
236 goto err;
237
238 file->offset+= len;
239
240 pthread_mutex_unlock(&stream->mutex);
241
242 return 0;
243
244 err:
245
246 pthread_mutex_unlock(&stream->mutex);
247
248 return 1;
249 }
250
251 static
252 int
xb_stream_write_eof(xb_wstream_file_t * file)253 xb_stream_write_eof(xb_wstream_file_t *file)
254 {
255 /* Chunk magic + flags + chunk type + path_len + path */
256 uchar tmpbuf[sizeof(XB_STREAM_CHUNK_MAGIC) - 1 + 1 + 1 + 4 +
257 FN_REFLEN];
258 uchar *ptr;
259 xb_wstream_t *stream = file->stream;
260
261 pthread_mutex_lock(&stream->mutex);
262
263 /* Write xbstream header */
264 ptr = tmpbuf;
265
266 /* Chunk magic */
267 memcpy(ptr, XB_STREAM_CHUNK_MAGIC, sizeof(XB_STREAM_CHUNK_MAGIC) - 1);
268 ptr += sizeof(XB_STREAM_CHUNK_MAGIC) - 1;
269
270 *ptr++ = 0; /* Chunk flags */
271
272 *ptr++ = (uchar) XB_CHUNK_TYPE_EOF; /* Chunk type */
273
274 int4store(ptr, file->path_len); /* Path length */
275 ptr += 4;
276
277 memcpy(ptr, file->path, file->path_len); /* Path */
278 ptr += file->path_len;
279
280 xb_ad(ptr <= tmpbuf + sizeof(tmpbuf));
281
282 if (file->write(file, file->userdata, tmpbuf,
283 (ulonglong) (ptr - tmpbuf)) == -1)
284 goto err;
285
286 pthread_mutex_unlock(&stream->mutex);
287
288 return 0;
289 err:
290
291 pthread_mutex_unlock(&stream->mutex);
292
293 return 1;
294 }
295