1 /****************************************************** 2 Copyright (c) 2011-2017 Percona LLC and/or its affiliates. 3 4 The xbstream format writer implementation. 5 6 This program is free software; you can redistribute it and/or modify 7 it under the terms of the GNU General Public License as published by 8 the Free Software Foundation; version 2 of the License. 9 10 This program is distributed in the hope that it will be useful, 11 but WITHOUT ANY WARRANTY; without even the implied warranty of 12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 GNU General Public License for more details. 14 15 You should have received a copy of the GNU General Public License 16 along with this program; if not, write to the Free Software 17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA 18 19 *******************************************************/ 20 21 #include <my_global.h> 22 #include <my_base.h> 23 #include <zlib.h> 24 #include "common.h" 25 #include "xbstream.h" 26 #include "crc_glue.h" 27 28 /* Group writes smaller than this into a single chunk */ 29 #define XB_STREAM_MIN_CHUNK_SIZE (10 * 1024 * 1024) 30 31 struct xb_wstream_struct { 32 pthread_mutex_t mutex; 33 }; 34 35 struct xb_wstream_file_struct { 36 xb_wstream_t *stream; 37 char *path; 38 size_t path_len; 39 char chunk[XB_STREAM_MIN_CHUNK_SIZE]; 40 char *chunk_ptr; 41 size_t chunk_free; 42 my_off_t offset; 43 void *userdata; 44 xb_stream_write_callback *write; 45 }; 46 47 static int xb_stream_flush(xb_wstream_file_t *file); 48 static int xb_stream_write_chunk(xb_wstream_file_t *file, 49 const void *buf, size_t len); 50 static int xb_stream_write_eof(xb_wstream_file_t *file); 51 52 static 53 ssize_t 54 xb_stream_default_write_callback(xb_wstream_file_t *file __attribute__((unused)), 55 void *userdata __attribute__((unused)), 56 const void *buf, size_t len) 57 { 58 if (my_write(my_fileno(stdout), (const uchar *)buf, len, MYF(MY_WME | MY_NABP))) 59 return -1; 60 return len; 61 } 62 63 xb_wstream_t * 64 xb_stream_write_new(void) 65 { 66 xb_wstream_t *stream; 67 68 stream = (xb_wstream_t *) my_malloc(sizeof(xb_wstream_t), MYF(MY_FAE)); 69 pthread_mutex_init(&stream->mutex, NULL); 70 71 return stream;; 72 } 73 74 xb_wstream_file_t * 75 xb_stream_write_open(xb_wstream_t *stream, const char *path, 76 MY_STAT *mystat __attribute__((unused)), 77 void *userdata, 78 xb_stream_write_callback *onwrite) 79 { 80 xb_wstream_file_t *file; 81 size_t path_len; 82 83 path_len = strlen(path); 84 85 if (path_len > FN_REFLEN) { 86 msg("xb_stream_write_open(): file path is too long."); 87 return NULL; 88 } 89 90 file = (xb_wstream_file_t *) my_malloc(sizeof(xb_wstream_file_t) + 91 path_len + 1, MYF(MY_FAE)); 92 93 file->path = (char *) (file + 1); 94 #ifdef _WIN32 95 /* Normalize path on Windows, so we can restore elsewhere.*/ 96 { main(int argc,char ** argv)97 int i; 98 for (i = 0; ; i++) { 99 file->path[i] = (path[i] == '\\') ? '/' : path[i]; 100 if (!path[i]) 101 break; 102 } 103 } 104 #else 105 memcpy(file->path, path, path_len + 1); 106 #endif 107 file->path_len = path_len; 108 109 file->stream = stream; 110 file->offset = 0; 111 file->chunk_ptr = file->chunk; 112 file->chunk_free = XB_STREAM_MIN_CHUNK_SIZE; 113 if (onwrite) { 114 #ifdef __WIN__ 115 setmode(fileno(stdout), _O_BINARY); 116 #endif 117 file->userdata = userdata; 118 file->write = onwrite; 119 } else { 120 file->userdata = NULL; 121 file->write = xb_stream_default_write_callback; 122 } 123 124 return file; 125 } 126 127 int 128 xb_stream_write_data(xb_wstream_file_t *file, const void *buf, size_t len) 129 { 130 if (len < file->chunk_free) { 131 memcpy(file->chunk_ptr, buf, len); 132 file->chunk_ptr += len; 133 file->chunk_free -= len; 134 135 return 0; 136 } 137 138 if (xb_stream_flush(file)) get_options(int * argc,char *** argv)139 return 1; 140 141 return xb_stream_write_chunk(file, buf, len); 142 } 143 144 int 145 xb_stream_write_close(xb_wstream_file_t *file) 146 { 147 if (xb_stream_flush(file) || 148 xb_stream_write_eof(file)) { 149 my_free(file); 150 return 1; 151 } 152 print_version(void)153 my_free(file); 154 155 return 0; 156 } 157 158 int 159 xb_stream_write_done(xb_wstream_t *stream) 160 { usage(void)161 pthread_mutex_destroy(&stream->mutex); 162 163 my_free(stream); 164 165 return 0; 166 } 167 168 static 169 int 170 xb_stream_flush(xb_wstream_file_t *file) 171 { 172 if (file->chunk_ptr == file->chunk) { 173 return 0; 174 } 175 176 if (xb_stream_write_chunk(file, file->chunk, 177 file->chunk_ptr - file->chunk)) { 178 return 1; 179 } 180 181 file->chunk_ptr = file->chunk; 182 file->chunk_free = XB_STREAM_MIN_CHUNK_SIZE; set_run_mode(run_mode_t mode)183 184 return 0; 185 } 186 187 static 188 int 189 xb_stream_write_chunk(xb_wstream_file_t *file, const void *buf, size_t len) 190 { 191 /* Chunk magic + flags + chunk type + path_len + path + len + offset + 192 checksum */ 193 uchar tmpbuf[sizeof(XB_STREAM_CHUNK_MAGIC) - 1 + 1 + 1 + 4 + 194 FN_REFLEN + 8 + 8 + 4]; 195 uchar *ptr; 196 xb_wstream_t *stream = file->stream; get_one_option(int optid,const struct my_option * opt,char * argument)197 ulong checksum; 198 199 /* Write xbstream header */ 200 ptr = tmpbuf; 201 202 /* Chunk magic */ 203 memcpy(ptr, XB_STREAM_CHUNK_MAGIC, sizeof(XB_STREAM_CHUNK_MAGIC) - 1); 204 ptr += sizeof(XB_STREAM_CHUNK_MAGIC) - 1; 205 206 *ptr++ = 0; /* Chunk flags */ 207 208 *ptr++ = (uchar) XB_CHUNK_TYPE_PAYLOAD; /* Chunk type */ 209 210 int4store(ptr, file->path_len); /* Path length */ 211 ptr += 4; 212 213 memcpy(ptr, file->path, file->path_len); /* Path */ 214 ptr += file->path_len; 215 216 int8store(ptr, len); /* Payload length */ 217 ptr += 8; 218 219 checksum = crc32_iso3309(0, (const uchar *)buf, (uint)len); /* checksum */ 220 stream_one_file(File file,xb_wstream_file_t * xbfile)221 pthread_mutex_lock(&stream->mutex); 222 223 int8store(ptr, file->offset); /* Payload offset */ 224 ptr += 8; 225 226 int4store(ptr, checksum); 227 ptr += 4; 228 229 xb_ad(ptr <= tmpbuf + sizeof(tmpbuf)); 230 231 if (file->write(file, file->userdata, tmpbuf, ptr-tmpbuf) == -1) 232 goto err; 233 234 235 if (file->write(file, file->userdata, buf, len) == -1) /* Payload */ 236 goto err; 237 238 file->offset+= len; 239 240 pthread_mutex_unlock(&stream->mutex); 241 242 return 0; 243 244 err: 245 246 pthread_mutex_unlock(&stream->mutex); 247 248 return 1; 249 } 250 251 static 252 int 253 xb_stream_write_eof(xb_wstream_file_t *file) 254 { 255 /* Chunk magic + flags + chunk type + path_len + path */ 256 uchar tmpbuf[sizeof(XB_STREAM_CHUNK_MAGIC) - 1 + 1 + 1 + 4 + mode_create(int argc,char ** argv)257 FN_REFLEN]; 258 uchar *ptr; 259 xb_wstream_t *stream = file->stream; 260 261 pthread_mutex_lock(&stream->mutex); 262 263 /* Write xbstream header */ 264 ptr = tmpbuf; 265 266 /* Chunk magic */ 267 memcpy(ptr, XB_STREAM_CHUNK_MAGIC, sizeof(XB_STREAM_CHUNK_MAGIC) - 1); 268 ptr += sizeof(XB_STREAM_CHUNK_MAGIC) - 1; 269 270 *ptr++ = 0; /* Chunk flags */ 271 272 *ptr++ = (uchar) XB_CHUNK_TYPE_EOF; /* Chunk type */ 273 274 int4store(ptr, file->path_len); /* Path length */ 275 ptr += 4; 276 277 memcpy(ptr, file->path, file->path_len); /* Path */ 278 ptr += file->path_len; 279 280 xb_ad(ptr <= tmpbuf + sizeof(tmpbuf)); 281 282 if (file->write(file, file->userdata, tmpbuf, 283 (ulonglong) (ptr - tmpbuf)) == -1) 284 goto err; 285 286 pthread_mutex_unlock(&stream->mutex); 287 288 return 0; 289 err: 290 291 pthread_mutex_unlock(&stream->mutex); 292 293 return 1; 294 } 295