1 /* 2 * Multifd zlib compression implementation 3 * 4 * Copyright (c) 2020 Red Hat Inc 5 * 6 * Authors: 7 * Juan Quintela <quintela@redhat.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or later. 10 * See the COPYING file in the top-level directory. 11 */ 12 13 #include "qemu/osdep.h" 14 #include <zlib.h> 15 #include "qemu/rcu.h" 16 #include "exec/target_page.h" 17 #include "qapi/error.h" 18 #include "migration.h" 19 #include "trace.h" 20 #include "multifd.h" 21 22 struct zlib_data { 23 /* stream for compression */ 24 z_stream zs; 25 /* compressed buffer */ 26 uint8_t *zbuff; 27 /* size of compressed buffer */ 28 uint32_t zbuff_len; 29 }; 30 31 /* Multifd zlib compression */ 32 33 /** 34 * zlib_send_setup: setup send side 35 * 36 * Setup each channel with zlib compression. 37 * 38 * Returns 0 for success or -1 for error 39 * 40 * @p: Params for the channel that we are using 41 * @errp: pointer to an error 42 */ 43 static int zlib_send_setup(MultiFDSendParams *p, Error **errp) 44 { 45 struct zlib_data *z = g_malloc0(sizeof(struct zlib_data)); 46 z_stream *zs = &z->zs; 47 48 zs->zalloc = Z_NULL; 49 zs->zfree = Z_NULL; 50 zs->opaque = Z_NULL; 51 if (deflateInit(zs, migrate_multifd_zlib_level()) != Z_OK) { 52 g_free(z); 53 error_setg(errp, "multifd %d: deflate init failed", p->id); 54 return -1; 55 } 56 /* To be safe, we reserve twice the size of the packet */ 57 z->zbuff_len = MULTIFD_PACKET_SIZE * 2; 58 z->zbuff = g_try_malloc(z->zbuff_len); 59 if (!z->zbuff) { 60 deflateEnd(&z->zs); 61 g_free(z); 62 error_setg(errp, "multifd %d: out of memory for zbuff", p->id); 63 return -1; 64 } 65 p->data = z; 66 return 0; 67 } 68 69 /** 70 * zlib_send_cleanup: cleanup send side 71 * 72 * Close the channel and return memory. 73 * 74 * @p: Params for the channel that we are using 75 * @errp: pointer to an error 76 */ 77 static void zlib_send_cleanup(MultiFDSendParams *p, Error **errp) 78 { 79 struct zlib_data *z = p->data; 80 81 deflateEnd(&z->zs); 82 g_free(z->zbuff); 83 z->zbuff = NULL; 84 g_free(p->data); 85 p->data = NULL; 86 } 87 88 /** 89 * zlib_send_prepare: prepare date to be able to send 90 * 91 * Create a compressed buffer with all the pages that we are going to 92 * send. 93 * 94 * Returns 0 for success or -1 for error 95 * 96 * @p: Params for the channel that we are using 97 * @errp: pointer to an error 98 */ 99 static int zlib_send_prepare(MultiFDSendParams *p, Error **errp) 100 { 101 struct iovec *iov = p->pages->iov; 102 struct zlib_data *z = p->data; 103 z_stream *zs = &z->zs; 104 uint32_t out_size = 0; 105 int ret; 106 uint32_t i; 107 108 for (i = 0; i < p->pages->num; i++) { 109 uint32_t available = z->zbuff_len - out_size; 110 int flush = Z_NO_FLUSH; 111 112 if (i == p->pages->num - 1) { 113 flush = Z_SYNC_FLUSH; 114 } 115 116 zs->avail_in = iov[i].iov_len; 117 zs->next_in = iov[i].iov_base; 118 119 zs->avail_out = available; 120 zs->next_out = z->zbuff + out_size; 121 122 /* 123 * Welcome to deflate semantics 124 * 125 * We need to loop while: 126 * - return is Z_OK 127 * - there are stuff to be compressed 128 * - there are output space free 129 */ 130 do { 131 ret = deflate(zs, flush); 132 } while (ret == Z_OK && zs->avail_in && zs->avail_out); 133 if (ret == Z_OK && zs->avail_in) { 134 error_setg(errp, "multifd %d: deflate failed to compress all input", 135 p->id); 136 return -1; 137 } 138 if (ret != Z_OK) { 139 error_setg(errp, "multifd %d: deflate returned %d instead of Z_OK", 140 p->id, ret); 141 return -1; 142 } 143 out_size += available - zs->avail_out; 144 } 145 p->next_packet_size = out_size; 146 p->flags |= MULTIFD_FLAG_ZLIB; 147 148 return 0; 149 } 150 151 /** 152 * zlib_send_write: do the actual write of the data 153 * 154 * Do the actual write of the comprresed buffer. 155 * 156 * Returns 0 for success or -1 for error 157 * 158 * @p: Params for the channel that we are using 159 * @used: number of pages used 160 * @errp: pointer to an error 161 */ 162 static int zlib_send_write(MultiFDSendParams *p, uint32_t used, Error **errp) 163 { 164 struct zlib_data *z = p->data; 165 166 return qio_channel_write_all(p->c, (void *)z->zbuff, p->next_packet_size, 167 errp); 168 } 169 170 /** 171 * zlib_recv_setup: setup receive side 172 * 173 * Create the compressed channel and buffer. 174 * 175 * Returns 0 for success or -1 for error 176 * 177 * @p: Params for the channel that we are using 178 * @errp: pointer to an error 179 */ 180 static int zlib_recv_setup(MultiFDRecvParams *p, Error **errp) 181 { 182 struct zlib_data *z = g_malloc0(sizeof(struct zlib_data)); 183 z_stream *zs = &z->zs; 184 185 p->data = z; 186 zs->zalloc = Z_NULL; 187 zs->zfree = Z_NULL; 188 zs->opaque = Z_NULL; 189 zs->avail_in = 0; 190 zs->next_in = Z_NULL; 191 if (inflateInit(zs) != Z_OK) { 192 error_setg(errp, "multifd %d: inflate init failed", p->id); 193 return -1; 194 } 195 /* To be safe, we reserve twice the size of the packet */ 196 z->zbuff_len = MULTIFD_PACKET_SIZE * 2; 197 z->zbuff = g_try_malloc(z->zbuff_len); 198 if (!z->zbuff) { 199 inflateEnd(zs); 200 error_setg(errp, "multifd %d: out of memory for zbuff", p->id); 201 return -1; 202 } 203 return 0; 204 } 205 206 /** 207 * zlib_recv_cleanup: setup receive side 208 * 209 * For no compression this function does nothing. 210 * 211 * @p: Params for the channel that we are using 212 */ 213 static void zlib_recv_cleanup(MultiFDRecvParams *p) 214 { 215 struct zlib_data *z = p->data; 216 217 inflateEnd(&z->zs); 218 g_free(z->zbuff); 219 z->zbuff = NULL; 220 g_free(p->data); 221 p->data = NULL; 222 } 223 224 /** 225 * zlib_recv_pages: read the data from the channel into actual pages 226 * 227 * Read the compressed buffer, and uncompress it into the actual 228 * pages. 229 * 230 * Returns 0 for success or -1 for error 231 * 232 * @p: Params for the channel that we are using 233 * @errp: pointer to an error 234 */ 235 static int zlib_recv_pages(MultiFDRecvParams *p, Error **errp) 236 { 237 struct zlib_data *z = p->data; 238 z_stream *zs = &z->zs; 239 uint32_t in_size = p->next_packet_size; 240 /* we measure the change of total_out */ 241 uint32_t out_size = zs->total_out; 242 uint32_t expected_size = p->pages->num * qemu_target_page_size(); 243 uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK; 244 int ret; 245 int i; 246 247 if (flags != MULTIFD_FLAG_ZLIB) { 248 error_setg(errp, "multifd %d: flags received %x flags expected %x", 249 p->id, flags, MULTIFD_FLAG_ZLIB); 250 return -1; 251 } 252 ret = qio_channel_read_all(p->c, (void *)z->zbuff, in_size, errp); 253 254 if (ret != 0) { 255 return ret; 256 } 257 258 zs->avail_in = in_size; 259 zs->next_in = z->zbuff; 260 261 for (i = 0; i < p->pages->num; i++) { 262 struct iovec *iov = &p->pages->iov[i]; 263 int flush = Z_NO_FLUSH; 264 unsigned long start = zs->total_out; 265 266 if (i == p->pages->num - 1) { 267 flush = Z_SYNC_FLUSH; 268 } 269 270 zs->avail_out = iov->iov_len; 271 zs->next_out = iov->iov_base; 272 273 /* 274 * Welcome to inflate semantics 275 * 276 * We need to loop while: 277 * - return is Z_OK 278 * - there are input available 279 * - we haven't completed a full page 280 */ 281 do { 282 ret = inflate(zs, flush); 283 } while (ret == Z_OK && zs->avail_in 284 && (zs->total_out - start) < iov->iov_len); 285 if (ret == Z_OK && (zs->total_out - start) < iov->iov_len) { 286 error_setg(errp, "multifd %d: inflate generated too few output", 287 p->id); 288 return -1; 289 } 290 if (ret != Z_OK) { 291 error_setg(errp, "multifd %d: inflate returned %d instead of Z_OK", 292 p->id, ret); 293 return -1; 294 } 295 } 296 out_size = zs->total_out - out_size; 297 if (out_size != expected_size) { 298 error_setg(errp, "multifd %d: packet size received %d size expected %d", 299 p->id, out_size, expected_size); 300 return -1; 301 } 302 return 0; 303 } 304 305 static MultiFDMethods multifd_zlib_ops = { 306 .send_setup = zlib_send_setup, 307 .send_cleanup = zlib_send_cleanup, 308 .send_prepare = zlib_send_prepare, 309 .send_write = zlib_send_write, 310 .recv_setup = zlib_recv_setup, 311 .recv_cleanup = zlib_recv_cleanup, 312 .recv_pages = zlib_recv_pages 313 }; 314 315 static void multifd_zlib_register(void) 316 { 317 multifd_register_ops(MULTIFD_COMPRESSION_ZLIB, &multifd_zlib_ops); 318 } 319 320 migration_init(multifd_zlib_register); 321