1 /* 2 * RDMA protocol and interfaces 3 * 4 * Copyright IBM, Corp. 2010-2013 5 * 6 * Authors: 7 * Michael R. Hines <mrhines@us.ibm.com> 8 * Jiuxing Liu <jl@us.ibm.com> 9 * 10 * This work is licensed under the terms of the GNU GPL, version 2 or 11 * later. See the COPYING file in the top-level directory. 12 * 13 */ 14 #include "qemu-common.h" 15 #include "migration/migration.h" 16 #include "migration/qemu-file.h" 17 #include "exec/cpu-common.h" 18 #include "qemu/error-report.h" 19 #include "qemu/main-loop.h" 20 #include "qemu/sockets.h" 21 #include "qemu/bitmap.h" 22 #include "block/coroutine.h" 23 #include <stdio.h> 24 #include <sys/types.h> 25 #include <sys/socket.h> 26 #include <netdb.h> 27 #include <arpa/inet.h> 28 #include <string.h> 29 #include <rdma/rdma_cma.h> 30 #include "trace.h" 31 32 /* 33 * Print and error on both the Monitor and the Log file. 34 */ 35 #define ERROR(errp, fmt, ...) \ 36 do { \ 37 fprintf(stderr, "RDMA ERROR: " fmt "\n", ## __VA_ARGS__); \ 38 if (errp && (*(errp) == NULL)) { \ 39 error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \ 40 } \ 41 } while (0) 42 43 #define RDMA_RESOLVE_TIMEOUT_MS 10000 44 45 /* Do not merge data if larger than this. */ 46 #define RDMA_MERGE_MAX (2 * 1024 * 1024) 47 #define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096) 48 49 #define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */ 50 51 /* 52 * This is only for non-live state being migrated. 53 * Instead of RDMA_WRITE messages, we use RDMA_SEND 54 * messages for that state, which requires a different 55 * delivery design than main memory. 56 */ 57 #define RDMA_SEND_INCREMENT 32768 58 59 /* 60 * Maximum size infiniband SEND message 61 */ 62 #define RDMA_CONTROL_MAX_BUFFER (512 * 1024) 63 #define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096 64 65 #define RDMA_CONTROL_VERSION_CURRENT 1 66 /* 67 * Capabilities for negotiation. 68 */ 69 #define RDMA_CAPABILITY_PIN_ALL 0x01 70 71 /* 72 * Add the other flags above to this list of known capabilities 73 * as they are introduced. 74 */ 75 static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL; 76 77 #define CHECK_ERROR_STATE() \ 78 do { \ 79 if (rdma->error_state) { \ 80 if (!rdma->error_reported) { \ 81 error_report("RDMA is in an error state waiting migration" \ 82 " to abort!"); \ 83 rdma->error_reported = 1; \ 84 } \ 85 return rdma->error_state; \ 86 } \ 87 } while (0); 88 89 /* 90 * A work request ID is 64-bits and we split up these bits 91 * into 3 parts: 92 * 93 * bits 0-15 : type of control message, 2^16 94 * bits 16-29: ram block index, 2^14 95 * bits 30-63: ram block chunk number, 2^34 96 * 97 * The last two bit ranges are only used for RDMA writes, 98 * in order to track their completion and potentially 99 * also track unregistration status of the message. 100 */ 101 #define RDMA_WRID_TYPE_SHIFT 0UL 102 #define RDMA_WRID_BLOCK_SHIFT 16UL 103 #define RDMA_WRID_CHUNK_SHIFT 30UL 104 105 #define RDMA_WRID_TYPE_MASK \ 106 ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL) 107 108 #define RDMA_WRID_BLOCK_MASK \ 109 (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL)) 110 111 #define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK) 112 113 /* 114 * RDMA migration protocol: 115 * 1. RDMA Writes (data messages, i.e. RAM) 116 * 2. IB Send/Recv (control channel messages) 117 */ 118 enum { 119 RDMA_WRID_NONE = 0, 120 RDMA_WRID_RDMA_WRITE = 1, 121 RDMA_WRID_SEND_CONTROL = 2000, 122 RDMA_WRID_RECV_CONTROL = 4000, 123 }; 124 125 static const char *wrid_desc[] = { 126 [RDMA_WRID_NONE] = "NONE", 127 [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA", 128 [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND", 129 [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV", 130 }; 131 132 /* 133 * Work request IDs for IB SEND messages only (not RDMA writes). 134 * This is used by the migration protocol to transmit 135 * control messages (such as device state and registration commands) 136 * 137 * We could use more WRs, but we have enough for now. 138 */ 139 enum { 140 RDMA_WRID_READY = 0, 141 RDMA_WRID_DATA, 142 RDMA_WRID_CONTROL, 143 RDMA_WRID_MAX, 144 }; 145 146 /* 147 * SEND/RECV IB Control Messages. 148 */ 149 enum { 150 RDMA_CONTROL_NONE = 0, 151 RDMA_CONTROL_ERROR, 152 RDMA_CONTROL_READY, /* ready to receive */ 153 RDMA_CONTROL_QEMU_FILE, /* QEMUFile-transmitted bytes */ 154 RDMA_CONTROL_RAM_BLOCKS_REQUEST, /* RAMBlock synchronization */ 155 RDMA_CONTROL_RAM_BLOCKS_RESULT, /* RAMBlock synchronization */ 156 RDMA_CONTROL_COMPRESS, /* page contains repeat values */ 157 RDMA_CONTROL_REGISTER_REQUEST, /* dynamic page registration */ 158 RDMA_CONTROL_REGISTER_RESULT, /* key to use after registration */ 159 RDMA_CONTROL_REGISTER_FINISHED, /* current iteration finished */ 160 RDMA_CONTROL_UNREGISTER_REQUEST, /* dynamic UN-registration */ 161 RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */ 162 }; 163 164 static const char *control_desc[] = { 165 [RDMA_CONTROL_NONE] = "NONE", 166 [RDMA_CONTROL_ERROR] = "ERROR", 167 [RDMA_CONTROL_READY] = "READY", 168 [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE", 169 [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST", 170 [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT", 171 [RDMA_CONTROL_COMPRESS] = "COMPRESS", 172 [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST", 173 [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT", 174 [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED", 175 [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST", 176 [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED", 177 }; 178 179 /* 180 * Memory and MR structures used to represent an IB Send/Recv work request. 181 * This is *not* used for RDMA writes, only IB Send/Recv. 182 */ 183 typedef struct { 184 uint8_t control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */ 185 struct ibv_mr *control_mr; /* registration metadata */ 186 size_t control_len; /* length of the message */ 187 uint8_t *control_curr; /* start of unconsumed bytes */ 188 } RDMAWorkRequestData; 189 190 /* 191 * Negotiate RDMA capabilities during connection-setup time. 192 */ 193 typedef struct { 194 uint32_t version; 195 uint32_t flags; 196 } RDMACapabilities; 197 198 static void caps_to_network(RDMACapabilities *cap) 199 { 200 cap->version = htonl(cap->version); 201 cap->flags = htonl(cap->flags); 202 } 203 204 static void network_to_caps(RDMACapabilities *cap) 205 { 206 cap->version = ntohl(cap->version); 207 cap->flags = ntohl(cap->flags); 208 } 209 210 /* 211 * Representation of a RAMBlock from an RDMA perspective. 212 * This is not transmitted, only local. 213 * This and subsequent structures cannot be linked lists 214 * because we're using a single IB message to transmit 215 * the information. It's small anyway, so a list is overkill. 216 */ 217 typedef struct RDMALocalBlock { 218 uint8_t *local_host_addr; /* local virtual address */ 219 uint64_t remote_host_addr; /* remote virtual address */ 220 uint64_t offset; 221 uint64_t length; 222 struct ibv_mr **pmr; /* MRs for chunk-level registration */ 223 struct ibv_mr *mr; /* MR for non-chunk-level registration */ 224 uint32_t *remote_keys; /* rkeys for chunk-level registration */ 225 uint32_t remote_rkey; /* rkeys for non-chunk-level registration */ 226 int index; /* which block are we */ 227 bool is_ram_block; 228 int nb_chunks; 229 unsigned long *transit_bitmap; 230 unsigned long *unregister_bitmap; 231 } RDMALocalBlock; 232 233 /* 234 * Also represents a RAMblock, but only on the dest. 235 * This gets transmitted by the dest during connection-time 236 * to the source VM and then is used to populate the 237 * corresponding RDMALocalBlock with 238 * the information needed to perform the actual RDMA. 239 */ 240 typedef struct QEMU_PACKED RDMADestBlock { 241 uint64_t remote_host_addr; 242 uint64_t offset; 243 uint64_t length; 244 uint32_t remote_rkey; 245 uint32_t padding; 246 } RDMADestBlock; 247 248 static uint64_t htonll(uint64_t v) 249 { 250 union { uint32_t lv[2]; uint64_t llv; } u; 251 u.lv[0] = htonl(v >> 32); 252 u.lv[1] = htonl(v & 0xFFFFFFFFULL); 253 return u.llv; 254 } 255 256 static uint64_t ntohll(uint64_t v) { 257 union { uint32_t lv[2]; uint64_t llv; } u; 258 u.llv = v; 259 return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]); 260 } 261 262 static void dest_block_to_network(RDMADestBlock *db) 263 { 264 db->remote_host_addr = htonll(db->remote_host_addr); 265 db->offset = htonll(db->offset); 266 db->length = htonll(db->length); 267 db->remote_rkey = htonl(db->remote_rkey); 268 } 269 270 static void network_to_dest_block(RDMADestBlock *db) 271 { 272 db->remote_host_addr = ntohll(db->remote_host_addr); 273 db->offset = ntohll(db->offset); 274 db->length = ntohll(db->length); 275 db->remote_rkey = ntohl(db->remote_rkey); 276 } 277 278 /* 279 * Virtual address of the above structures used for transmitting 280 * the RAMBlock descriptions at connection-time. 281 * This structure is *not* transmitted. 282 */ 283 typedef struct RDMALocalBlocks { 284 int nb_blocks; 285 bool init; /* main memory init complete */ 286 RDMALocalBlock *block; 287 } RDMALocalBlocks; 288 289 /* 290 * Main data structure for RDMA state. 291 * While there is only one copy of this structure being allocated right now, 292 * this is the place where one would start if you wanted to consider 293 * having more than one RDMA connection open at the same time. 294 */ 295 typedef struct RDMAContext { 296 char *host; 297 int port; 298 299 RDMAWorkRequestData wr_data[RDMA_WRID_MAX]; 300 301 /* 302 * This is used by *_exchange_send() to figure out whether or not 303 * the initial "READY" message has already been received or not. 304 * This is because other functions may potentially poll() and detect 305 * the READY message before send() does, in which case we need to 306 * know if it completed. 307 */ 308 int control_ready_expected; 309 310 /* number of outstanding writes */ 311 int nb_sent; 312 313 /* store info about current buffer so that we can 314 merge it with future sends */ 315 uint64_t current_addr; 316 uint64_t current_length; 317 /* index of ram block the current buffer belongs to */ 318 int current_index; 319 /* index of the chunk in the current ram block */ 320 int current_chunk; 321 322 bool pin_all; 323 324 /* 325 * infiniband-specific variables for opening the device 326 * and maintaining connection state and so forth. 327 * 328 * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in 329 * cm_id->verbs, cm_id->channel, and cm_id->qp. 330 */ 331 struct rdma_cm_id *cm_id; /* connection manager ID */ 332 struct rdma_cm_id *listen_id; 333 bool connected; 334 335 struct ibv_context *verbs; 336 struct rdma_event_channel *channel; 337 struct ibv_qp *qp; /* queue pair */ 338 struct ibv_comp_channel *comp_channel; /* completion channel */ 339 struct ibv_pd *pd; /* protection domain */ 340 struct ibv_cq *cq; /* completion queue */ 341 342 /* 343 * If a previous write failed (perhaps because of a failed 344 * memory registration, then do not attempt any future work 345 * and remember the error state. 346 */ 347 int error_state; 348 int error_reported; 349 350 /* 351 * Description of ram blocks used throughout the code. 352 */ 353 RDMALocalBlocks local_ram_blocks; 354 RDMADestBlock *dest_blocks; 355 356 /* 357 * Migration on *destination* started. 358 * Then use coroutine yield function. 359 * Source runs in a thread, so we don't care. 360 */ 361 int migration_started_on_destination; 362 363 int total_registrations; 364 int total_writes; 365 366 int unregister_current, unregister_next; 367 uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX]; 368 369 GHashTable *blockmap; 370 } RDMAContext; 371 372 /* 373 * Interface to the rest of the migration call stack. 374 */ 375 typedef struct QEMUFileRDMA { 376 RDMAContext *rdma; 377 size_t len; 378 void *file; 379 } QEMUFileRDMA; 380 381 /* 382 * Main structure for IB Send/Recv control messages. 383 * This gets prepended at the beginning of every Send/Recv. 384 */ 385 typedef struct QEMU_PACKED { 386 uint32_t len; /* Total length of data portion */ 387 uint32_t type; /* which control command to perform */ 388 uint32_t repeat; /* number of commands in data portion of same type */ 389 uint32_t padding; 390 } RDMAControlHeader; 391 392 static void control_to_network(RDMAControlHeader *control) 393 { 394 control->type = htonl(control->type); 395 control->len = htonl(control->len); 396 control->repeat = htonl(control->repeat); 397 } 398 399 static void network_to_control(RDMAControlHeader *control) 400 { 401 control->type = ntohl(control->type); 402 control->len = ntohl(control->len); 403 control->repeat = ntohl(control->repeat); 404 } 405 406 /* 407 * Register a single Chunk. 408 * Information sent by the source VM to inform the dest 409 * to register an single chunk of memory before we can perform 410 * the actual RDMA operation. 411 */ 412 typedef struct QEMU_PACKED { 413 union QEMU_PACKED { 414 uint64_t current_addr; /* offset into the ramblock of the chunk */ 415 uint64_t chunk; /* chunk to lookup if unregistering */ 416 } key; 417 uint32_t current_index; /* which ramblock the chunk belongs to */ 418 uint32_t padding; 419 uint64_t chunks; /* how many sequential chunks to register */ 420 } RDMARegister; 421 422 static void register_to_network(RDMARegister *reg) 423 { 424 reg->key.current_addr = htonll(reg->key.current_addr); 425 reg->current_index = htonl(reg->current_index); 426 reg->chunks = htonll(reg->chunks); 427 } 428 429 static void network_to_register(RDMARegister *reg) 430 { 431 reg->key.current_addr = ntohll(reg->key.current_addr); 432 reg->current_index = ntohl(reg->current_index); 433 reg->chunks = ntohll(reg->chunks); 434 } 435 436 typedef struct QEMU_PACKED { 437 uint32_t value; /* if zero, we will madvise() */ 438 uint32_t block_idx; /* which ram block index */ 439 uint64_t offset; /* where in the remote ramblock this chunk */ 440 uint64_t length; /* length of the chunk */ 441 } RDMACompress; 442 443 static void compress_to_network(RDMACompress *comp) 444 { 445 comp->value = htonl(comp->value); 446 comp->block_idx = htonl(comp->block_idx); 447 comp->offset = htonll(comp->offset); 448 comp->length = htonll(comp->length); 449 } 450 451 static void network_to_compress(RDMACompress *comp) 452 { 453 comp->value = ntohl(comp->value); 454 comp->block_idx = ntohl(comp->block_idx); 455 comp->offset = ntohll(comp->offset); 456 comp->length = ntohll(comp->length); 457 } 458 459 /* 460 * The result of the dest's memory registration produces an "rkey" 461 * which the source VM must reference in order to perform 462 * the RDMA operation. 463 */ 464 typedef struct QEMU_PACKED { 465 uint32_t rkey; 466 uint32_t padding; 467 uint64_t host_addr; 468 } RDMARegisterResult; 469 470 static void result_to_network(RDMARegisterResult *result) 471 { 472 result->rkey = htonl(result->rkey); 473 result->host_addr = htonll(result->host_addr); 474 }; 475 476 static void network_to_result(RDMARegisterResult *result) 477 { 478 result->rkey = ntohl(result->rkey); 479 result->host_addr = ntohll(result->host_addr); 480 }; 481 482 const char *print_wrid(int wrid); 483 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head, 484 uint8_t *data, RDMAControlHeader *resp, 485 int *resp_idx, 486 int (*callback)(RDMAContext *rdma)); 487 488 static inline uint64_t ram_chunk_index(const uint8_t *start, 489 const uint8_t *host) 490 { 491 return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT; 492 } 493 494 static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block, 495 uint64_t i) 496 { 497 return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr + 498 (i << RDMA_REG_CHUNK_SHIFT)); 499 } 500 501 static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block, 502 uint64_t i) 503 { 504 uint8_t *result = ram_chunk_start(rdma_ram_block, i) + 505 (1UL << RDMA_REG_CHUNK_SHIFT); 506 507 if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) { 508 result = rdma_ram_block->local_host_addr + rdma_ram_block->length; 509 } 510 511 return result; 512 } 513 514 static int rdma_add_block(RDMAContext *rdma, void *host_addr, 515 ram_addr_t block_offset, uint64_t length) 516 { 517 RDMALocalBlocks *local = &rdma->local_ram_blocks; 518 RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap, 519 (void *)(uintptr_t)block_offset); 520 RDMALocalBlock *old = local->block; 521 522 assert(block == NULL); 523 524 local->block = g_malloc0(sizeof(RDMALocalBlock) * (local->nb_blocks + 1)); 525 526 if (local->nb_blocks) { 527 int x; 528 529 for (x = 0; x < local->nb_blocks; x++) { 530 g_hash_table_remove(rdma->blockmap, 531 (void *)(uintptr_t)old[x].offset); 532 g_hash_table_insert(rdma->blockmap, 533 (void *)(uintptr_t)old[x].offset, 534 &local->block[x]); 535 } 536 memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks); 537 g_free(old); 538 } 539 540 block = &local->block[local->nb_blocks]; 541 542 block->local_host_addr = host_addr; 543 block->offset = block_offset; 544 block->length = length; 545 block->index = local->nb_blocks; 546 block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL; 547 block->transit_bitmap = bitmap_new(block->nb_chunks); 548 bitmap_clear(block->transit_bitmap, 0, block->nb_chunks); 549 block->unregister_bitmap = bitmap_new(block->nb_chunks); 550 bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks); 551 block->remote_keys = g_malloc0(block->nb_chunks * sizeof(uint32_t)); 552 553 block->is_ram_block = local->init ? false : true; 554 555 g_hash_table_insert(rdma->blockmap, (void *) block_offset, block); 556 557 trace_rdma_add_block(local->nb_blocks, (uintptr_t) block->local_host_addr, 558 block->offset, block->length, 559 (uintptr_t) (block->local_host_addr + block->length), 560 BITS_TO_LONGS(block->nb_chunks) * 561 sizeof(unsigned long) * 8, 562 block->nb_chunks); 563 564 local->nb_blocks++; 565 566 return 0; 567 } 568 569 /* 570 * Memory regions need to be registered with the device and queue pairs setup 571 * in advanced before the migration starts. This tells us where the RAM blocks 572 * are so that we can register them individually. 573 */ 574 static int qemu_rdma_init_one_block(const char *block_name, void *host_addr, 575 ram_addr_t block_offset, ram_addr_t length, void *opaque) 576 { 577 return rdma_add_block(opaque, host_addr, block_offset, length); 578 } 579 580 /* 581 * Identify the RAMBlocks and their quantity. They will be references to 582 * identify chunk boundaries inside each RAMBlock and also be referenced 583 * during dynamic page registration. 584 */ 585 static int qemu_rdma_init_ram_blocks(RDMAContext *rdma) 586 { 587 RDMALocalBlocks *local = &rdma->local_ram_blocks; 588 589 assert(rdma->blockmap == NULL); 590 rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal); 591 memset(local, 0, sizeof *local); 592 qemu_ram_foreach_block(qemu_rdma_init_one_block, rdma); 593 trace_qemu_rdma_init_ram_blocks(local->nb_blocks); 594 rdma->dest_blocks = (RDMADestBlock *) g_malloc0(sizeof(RDMADestBlock) * 595 rdma->local_ram_blocks.nb_blocks); 596 local->init = true; 597 return 0; 598 } 599 600 static int rdma_delete_block(RDMAContext *rdma, ram_addr_t block_offset) 601 { 602 RDMALocalBlocks *local = &rdma->local_ram_blocks; 603 RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap, 604 (void *) block_offset); 605 RDMALocalBlock *old = local->block; 606 int x; 607 608 assert(block); 609 610 if (block->pmr) { 611 int j; 612 613 for (j = 0; j < block->nb_chunks; j++) { 614 if (!block->pmr[j]) { 615 continue; 616 } 617 ibv_dereg_mr(block->pmr[j]); 618 rdma->total_registrations--; 619 } 620 g_free(block->pmr); 621 block->pmr = NULL; 622 } 623 624 if (block->mr) { 625 ibv_dereg_mr(block->mr); 626 rdma->total_registrations--; 627 block->mr = NULL; 628 } 629 630 g_free(block->transit_bitmap); 631 block->transit_bitmap = NULL; 632 633 g_free(block->unregister_bitmap); 634 block->unregister_bitmap = NULL; 635 636 g_free(block->remote_keys); 637 block->remote_keys = NULL; 638 639 for (x = 0; x < local->nb_blocks; x++) { 640 g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)old[x].offset); 641 } 642 643 if (local->nb_blocks > 1) { 644 645 local->block = g_malloc0(sizeof(RDMALocalBlock) * 646 (local->nb_blocks - 1)); 647 648 if (block->index) { 649 memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index); 650 } 651 652 if (block->index < (local->nb_blocks - 1)) { 653 memcpy(local->block + block->index, old + (block->index + 1), 654 sizeof(RDMALocalBlock) * 655 (local->nb_blocks - (block->index + 1))); 656 } 657 } else { 658 assert(block == local->block); 659 local->block = NULL; 660 } 661 662 trace_rdma_delete_block(local->nb_blocks, 663 (uintptr_t)block->local_host_addr, 664 block->offset, block->length, 665 (uintptr_t)(block->local_host_addr + block->length), 666 BITS_TO_LONGS(block->nb_chunks) * 667 sizeof(unsigned long) * 8, block->nb_chunks); 668 669 g_free(old); 670 671 local->nb_blocks--; 672 673 if (local->nb_blocks) { 674 for (x = 0; x < local->nb_blocks; x++) { 675 g_hash_table_insert(rdma->blockmap, 676 (void *)(uintptr_t)local->block[x].offset, 677 &local->block[x]); 678 } 679 } 680 681 return 0; 682 } 683 684 /* 685 * Put in the log file which RDMA device was opened and the details 686 * associated with that device. 687 */ 688 static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs) 689 { 690 struct ibv_port_attr port; 691 692 if (ibv_query_port(verbs, 1, &port)) { 693 error_report("Failed to query port information"); 694 return; 695 } 696 697 printf("%s RDMA Device opened: kernel name %s " 698 "uverbs device name %s, " 699 "infiniband_verbs class device path %s, " 700 "infiniband class device path %s, " 701 "transport: (%d) %s\n", 702 who, 703 verbs->device->name, 704 verbs->device->dev_name, 705 verbs->device->dev_path, 706 verbs->device->ibdev_path, 707 port.link_layer, 708 (port.link_layer == IBV_LINK_LAYER_INFINIBAND) ? "Infiniband" : 709 ((port.link_layer == IBV_LINK_LAYER_ETHERNET) 710 ? "Ethernet" : "Unknown")); 711 } 712 713 /* 714 * Put in the log file the RDMA gid addressing information, 715 * useful for folks who have trouble understanding the 716 * RDMA device hierarchy in the kernel. 717 */ 718 static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id) 719 { 720 char sgid[33]; 721 char dgid[33]; 722 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid); 723 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid); 724 trace_qemu_rdma_dump_gid(who, sgid, dgid); 725 } 726 727 /* 728 * As of now, IPv6 over RoCE / iWARP is not supported by linux. 729 * We will try the next addrinfo struct, and fail if there are 730 * no other valid addresses to bind against. 731 * 732 * If user is listening on '[::]', then we will not have a opened a device 733 * yet and have no way of verifying if the device is RoCE or not. 734 * 735 * In this case, the source VM will throw an error for ALL types of 736 * connections (both IPv4 and IPv6) if the destination machine does not have 737 * a regular infiniband network available for use. 738 * 739 * The only way to guarantee that an error is thrown for broken kernels is 740 * for the management software to choose a *specific* interface at bind time 741 * and validate what time of hardware it is. 742 * 743 * Unfortunately, this puts the user in a fix: 744 * 745 * If the source VM connects with an IPv4 address without knowing that the 746 * destination has bound to '[::]' the migration will unconditionally fail 747 * unless the management software is explicitly listening on the the IPv4 748 * address while using a RoCE-based device. 749 * 750 * If the source VM connects with an IPv6 address, then we're OK because we can 751 * throw an error on the source (and similarly on the destination). 752 * 753 * But in mixed environments, this will be broken for a while until it is fixed 754 * inside linux. 755 * 756 * We do provide a *tiny* bit of help in this function: We can list all of the 757 * devices in the system and check to see if all the devices are RoCE or 758 * Infiniband. 759 * 760 * If we detect that we have a *pure* RoCE environment, then we can safely 761 * thrown an error even if the management software has specified '[::]' as the 762 * bind address. 763 * 764 * However, if there is are multiple hetergeneous devices, then we cannot make 765 * this assumption and the user just has to be sure they know what they are 766 * doing. 767 * 768 * Patches are being reviewed on linux-rdma. 769 */ 770 static int qemu_rdma_broken_ipv6_kernel(Error **errp, struct ibv_context *verbs) 771 { 772 struct ibv_port_attr port_attr; 773 774 /* This bug only exists in linux, to our knowledge. */ 775 #ifdef CONFIG_LINUX 776 777 /* 778 * Verbs are only NULL if management has bound to '[::]'. 779 * 780 * Let's iterate through all the devices and see if there any pure IB 781 * devices (non-ethernet). 782 * 783 * If not, then we can safely proceed with the migration. 784 * Otherwise, there are no guarantees until the bug is fixed in linux. 785 */ 786 if (!verbs) { 787 int num_devices, x; 788 struct ibv_device ** dev_list = ibv_get_device_list(&num_devices); 789 bool roce_found = false; 790 bool ib_found = false; 791 792 for (x = 0; x < num_devices; x++) { 793 verbs = ibv_open_device(dev_list[x]); 794 if (!verbs) { 795 if (errno == EPERM) { 796 continue; 797 } else { 798 return -EINVAL; 799 } 800 } 801 802 if (ibv_query_port(verbs, 1, &port_attr)) { 803 ibv_close_device(verbs); 804 ERROR(errp, "Could not query initial IB port"); 805 return -EINVAL; 806 } 807 808 if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) { 809 ib_found = true; 810 } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) { 811 roce_found = true; 812 } 813 814 ibv_close_device(verbs); 815 816 } 817 818 if (roce_found) { 819 if (ib_found) { 820 fprintf(stderr, "WARN: migrations may fail:" 821 " IPv6 over RoCE / iWARP in linux" 822 " is broken. But since you appear to have a" 823 " mixed RoCE / IB environment, be sure to only" 824 " migrate over the IB fabric until the kernel " 825 " fixes the bug.\n"); 826 } else { 827 ERROR(errp, "You only have RoCE / iWARP devices in your systems" 828 " and your management software has specified '[::]'" 829 ", but IPv6 over RoCE / iWARP is not supported in Linux."); 830 return -ENONET; 831 } 832 } 833 834 return 0; 835 } 836 837 /* 838 * If we have a verbs context, that means that some other than '[::]' was 839 * used by the management software for binding. In which case we can 840 * actually warn the user about a potentially broken kernel. 841 */ 842 843 /* IB ports start with 1, not 0 */ 844 if (ibv_query_port(verbs, 1, &port_attr)) { 845 ERROR(errp, "Could not query initial IB port"); 846 return -EINVAL; 847 } 848 849 if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) { 850 ERROR(errp, "Linux kernel's RoCE / iWARP does not support IPv6 " 851 "(but patches on linux-rdma in progress)"); 852 return -ENONET; 853 } 854 855 #endif 856 857 return 0; 858 } 859 860 /* 861 * Figure out which RDMA device corresponds to the requested IP hostname 862 * Also create the initial connection manager identifiers for opening 863 * the connection. 864 */ 865 static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp) 866 { 867 int ret; 868 struct rdma_addrinfo *res; 869 char port_str[16]; 870 struct rdma_cm_event *cm_event; 871 char ip[40] = "unknown"; 872 struct rdma_addrinfo *e; 873 874 if (rdma->host == NULL || !strcmp(rdma->host, "")) { 875 ERROR(errp, "RDMA hostname has not been set"); 876 return -EINVAL; 877 } 878 879 /* create CM channel */ 880 rdma->channel = rdma_create_event_channel(); 881 if (!rdma->channel) { 882 ERROR(errp, "could not create CM channel"); 883 return -EINVAL; 884 } 885 886 /* create CM id */ 887 ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP); 888 if (ret) { 889 ERROR(errp, "could not create channel id"); 890 goto err_resolve_create_id; 891 } 892 893 snprintf(port_str, 16, "%d", rdma->port); 894 port_str[15] = '\0'; 895 896 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res); 897 if (ret < 0) { 898 ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host); 899 goto err_resolve_get_addr; 900 } 901 902 for (e = res; e != NULL; e = e->ai_next) { 903 inet_ntop(e->ai_family, 904 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip); 905 trace_qemu_rdma_resolve_host_trying(rdma->host, ip); 906 907 ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr, 908 RDMA_RESOLVE_TIMEOUT_MS); 909 if (!ret) { 910 if (e->ai_family == AF_INET6) { 911 ret = qemu_rdma_broken_ipv6_kernel(errp, rdma->cm_id->verbs); 912 if (ret) { 913 continue; 914 } 915 } 916 goto route; 917 } 918 } 919 920 ERROR(errp, "could not resolve address %s", rdma->host); 921 goto err_resolve_get_addr; 922 923 route: 924 qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id); 925 926 ret = rdma_get_cm_event(rdma->channel, &cm_event); 927 if (ret) { 928 ERROR(errp, "could not perform event_addr_resolved"); 929 goto err_resolve_get_addr; 930 } 931 932 if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) { 933 ERROR(errp, "result not equal to event_addr_resolved %s", 934 rdma_event_str(cm_event->event)); 935 perror("rdma_resolve_addr"); 936 rdma_ack_cm_event(cm_event); 937 ret = -EINVAL; 938 goto err_resolve_get_addr; 939 } 940 rdma_ack_cm_event(cm_event); 941 942 /* resolve route */ 943 ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS); 944 if (ret) { 945 ERROR(errp, "could not resolve rdma route"); 946 goto err_resolve_get_addr; 947 } 948 949 ret = rdma_get_cm_event(rdma->channel, &cm_event); 950 if (ret) { 951 ERROR(errp, "could not perform event_route_resolved"); 952 goto err_resolve_get_addr; 953 } 954 if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) { 955 ERROR(errp, "result not equal to event_route_resolved: %s", 956 rdma_event_str(cm_event->event)); 957 rdma_ack_cm_event(cm_event); 958 ret = -EINVAL; 959 goto err_resolve_get_addr; 960 } 961 rdma_ack_cm_event(cm_event); 962 rdma->verbs = rdma->cm_id->verbs; 963 qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs); 964 qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id); 965 return 0; 966 967 err_resolve_get_addr: 968 rdma_destroy_id(rdma->cm_id); 969 rdma->cm_id = NULL; 970 err_resolve_create_id: 971 rdma_destroy_event_channel(rdma->channel); 972 rdma->channel = NULL; 973 return ret; 974 } 975 976 /* 977 * Create protection domain and completion queues 978 */ 979 static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma) 980 { 981 /* allocate pd */ 982 rdma->pd = ibv_alloc_pd(rdma->verbs); 983 if (!rdma->pd) { 984 error_report("failed to allocate protection domain"); 985 return -1; 986 } 987 988 /* create completion channel */ 989 rdma->comp_channel = ibv_create_comp_channel(rdma->verbs); 990 if (!rdma->comp_channel) { 991 error_report("failed to allocate completion channel"); 992 goto err_alloc_pd_cq; 993 } 994 995 /* 996 * Completion queue can be filled by both read and write work requests, 997 * so must reflect the sum of both possible queue sizes. 998 */ 999 rdma->cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3), 1000 NULL, rdma->comp_channel, 0); 1001 if (!rdma->cq) { 1002 error_report("failed to allocate completion queue"); 1003 goto err_alloc_pd_cq; 1004 } 1005 1006 return 0; 1007 1008 err_alloc_pd_cq: 1009 if (rdma->pd) { 1010 ibv_dealloc_pd(rdma->pd); 1011 } 1012 if (rdma->comp_channel) { 1013 ibv_destroy_comp_channel(rdma->comp_channel); 1014 } 1015 rdma->pd = NULL; 1016 rdma->comp_channel = NULL; 1017 return -1; 1018 1019 } 1020 1021 /* 1022 * Create queue pairs. 1023 */ 1024 static int qemu_rdma_alloc_qp(RDMAContext *rdma) 1025 { 1026 struct ibv_qp_init_attr attr = { 0 }; 1027 int ret; 1028 1029 attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX; 1030 attr.cap.max_recv_wr = 3; 1031 attr.cap.max_send_sge = 1; 1032 attr.cap.max_recv_sge = 1; 1033 attr.send_cq = rdma->cq; 1034 attr.recv_cq = rdma->cq; 1035 attr.qp_type = IBV_QPT_RC; 1036 1037 ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr); 1038 if (ret) { 1039 return -1; 1040 } 1041 1042 rdma->qp = rdma->cm_id->qp; 1043 return 0; 1044 } 1045 1046 static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma) 1047 { 1048 int i; 1049 RDMALocalBlocks *local = &rdma->local_ram_blocks; 1050 1051 for (i = 0; i < local->nb_blocks; i++) { 1052 local->block[i].mr = 1053 ibv_reg_mr(rdma->pd, 1054 local->block[i].local_host_addr, 1055 local->block[i].length, 1056 IBV_ACCESS_LOCAL_WRITE | 1057 IBV_ACCESS_REMOTE_WRITE 1058 ); 1059 if (!local->block[i].mr) { 1060 perror("Failed to register local dest ram block!\n"); 1061 break; 1062 } 1063 rdma->total_registrations++; 1064 } 1065 1066 if (i >= local->nb_blocks) { 1067 return 0; 1068 } 1069 1070 for (i--; i >= 0; i--) { 1071 ibv_dereg_mr(local->block[i].mr); 1072 rdma->total_registrations--; 1073 } 1074 1075 return -1; 1076 1077 } 1078 1079 /* 1080 * Find the ram block that corresponds to the page requested to be 1081 * transmitted by QEMU. 1082 * 1083 * Once the block is found, also identify which 'chunk' within that 1084 * block that the page belongs to. 1085 * 1086 * This search cannot fail or the migration will fail. 1087 */ 1088 static int qemu_rdma_search_ram_block(RDMAContext *rdma, 1089 uintptr_t block_offset, 1090 uint64_t offset, 1091 uint64_t length, 1092 uint64_t *block_index, 1093 uint64_t *chunk_index) 1094 { 1095 uint64_t current_addr = block_offset + offset; 1096 RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap, 1097 (void *) block_offset); 1098 assert(block); 1099 assert(current_addr >= block->offset); 1100 assert((current_addr + length) <= (block->offset + block->length)); 1101 1102 *block_index = block->index; 1103 *chunk_index = ram_chunk_index(block->local_host_addr, 1104 block->local_host_addr + (current_addr - block->offset)); 1105 1106 return 0; 1107 } 1108 1109 /* 1110 * Register a chunk with IB. If the chunk was already registered 1111 * previously, then skip. 1112 * 1113 * Also return the keys associated with the registration needed 1114 * to perform the actual RDMA operation. 1115 */ 1116 static int qemu_rdma_register_and_get_keys(RDMAContext *rdma, 1117 RDMALocalBlock *block, uintptr_t host_addr, 1118 uint32_t *lkey, uint32_t *rkey, int chunk, 1119 uint8_t *chunk_start, uint8_t *chunk_end) 1120 { 1121 if (block->mr) { 1122 if (lkey) { 1123 *lkey = block->mr->lkey; 1124 } 1125 if (rkey) { 1126 *rkey = block->mr->rkey; 1127 } 1128 return 0; 1129 } 1130 1131 /* allocate memory to store chunk MRs */ 1132 if (!block->pmr) { 1133 block->pmr = g_malloc0(block->nb_chunks * sizeof(struct ibv_mr *)); 1134 } 1135 1136 /* 1137 * If 'rkey', then we're the destination, so grant access to the source. 1138 * 1139 * If 'lkey', then we're the source VM, so grant access only to ourselves. 1140 */ 1141 if (!block->pmr[chunk]) { 1142 uint64_t len = chunk_end - chunk_start; 1143 1144 trace_qemu_rdma_register_and_get_keys(len, chunk_start); 1145 1146 block->pmr[chunk] = ibv_reg_mr(rdma->pd, 1147 chunk_start, len, 1148 (rkey ? (IBV_ACCESS_LOCAL_WRITE | 1149 IBV_ACCESS_REMOTE_WRITE) : 0)); 1150 1151 if (!block->pmr[chunk]) { 1152 perror("Failed to register chunk!"); 1153 fprintf(stderr, "Chunk details: block: %d chunk index %d" 1154 " start %" PRIuPTR " end %" PRIuPTR 1155 " host %" PRIuPTR 1156 " local %" PRIuPTR " registrations: %d\n", 1157 block->index, chunk, (uintptr_t)chunk_start, 1158 (uintptr_t)chunk_end, host_addr, 1159 (uintptr_t)block->local_host_addr, 1160 rdma->total_registrations); 1161 return -1; 1162 } 1163 rdma->total_registrations++; 1164 } 1165 1166 if (lkey) { 1167 *lkey = block->pmr[chunk]->lkey; 1168 } 1169 if (rkey) { 1170 *rkey = block->pmr[chunk]->rkey; 1171 } 1172 return 0; 1173 } 1174 1175 /* 1176 * Register (at connection time) the memory used for control 1177 * channel messages. 1178 */ 1179 static int qemu_rdma_reg_control(RDMAContext *rdma, int idx) 1180 { 1181 rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd, 1182 rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER, 1183 IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE); 1184 if (rdma->wr_data[idx].control_mr) { 1185 rdma->total_registrations++; 1186 return 0; 1187 } 1188 error_report("qemu_rdma_reg_control failed"); 1189 return -1; 1190 } 1191 1192 const char *print_wrid(int wrid) 1193 { 1194 if (wrid >= RDMA_WRID_RECV_CONTROL) { 1195 return wrid_desc[RDMA_WRID_RECV_CONTROL]; 1196 } 1197 return wrid_desc[wrid]; 1198 } 1199 1200 /* 1201 * RDMA requires memory registration (mlock/pinning), but this is not good for 1202 * overcommitment. 1203 * 1204 * In preparation for the future where LRU information or workload-specific 1205 * writable writable working set memory access behavior is available to QEMU 1206 * it would be nice to have in place the ability to UN-register/UN-pin 1207 * particular memory regions from the RDMA hardware when it is determine that 1208 * those regions of memory will likely not be accessed again in the near future. 1209 * 1210 * While we do not yet have such information right now, the following 1211 * compile-time option allows us to perform a non-optimized version of this 1212 * behavior. 1213 * 1214 * By uncommenting this option, you will cause *all* RDMA transfers to be 1215 * unregistered immediately after the transfer completes on both sides of the 1216 * connection. This has no effect in 'rdma-pin-all' mode, only regular mode. 1217 * 1218 * This will have a terrible impact on migration performance, so until future 1219 * workload information or LRU information is available, do not attempt to use 1220 * this feature except for basic testing. 1221 */ 1222 //#define RDMA_UNREGISTRATION_EXAMPLE 1223 1224 /* 1225 * Perform a non-optimized memory unregistration after every transfer 1226 * for demonsration purposes, only if pin-all is not requested. 1227 * 1228 * Potential optimizations: 1229 * 1. Start a new thread to run this function continuously 1230 - for bit clearing 1231 - and for receipt of unregister messages 1232 * 2. Use an LRU. 1233 * 3. Use workload hints. 1234 */ 1235 static int qemu_rdma_unregister_waiting(RDMAContext *rdma) 1236 { 1237 while (rdma->unregistrations[rdma->unregister_current]) { 1238 int ret; 1239 uint64_t wr_id = rdma->unregistrations[rdma->unregister_current]; 1240 uint64_t chunk = 1241 (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT; 1242 uint64_t index = 1243 (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT; 1244 RDMALocalBlock *block = 1245 &(rdma->local_ram_blocks.block[index]); 1246 RDMARegister reg = { .current_index = index }; 1247 RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED, 1248 }; 1249 RDMAControlHeader head = { .len = sizeof(RDMARegister), 1250 .type = RDMA_CONTROL_UNREGISTER_REQUEST, 1251 .repeat = 1, 1252 }; 1253 1254 trace_qemu_rdma_unregister_waiting_proc(chunk, 1255 rdma->unregister_current); 1256 1257 rdma->unregistrations[rdma->unregister_current] = 0; 1258 rdma->unregister_current++; 1259 1260 if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) { 1261 rdma->unregister_current = 0; 1262 } 1263 1264 1265 /* 1266 * Unregistration is speculative (because migration is single-threaded 1267 * and we cannot break the protocol's inifinband message ordering). 1268 * Thus, if the memory is currently being used for transmission, 1269 * then abort the attempt to unregister and try again 1270 * later the next time a completion is received for this memory. 1271 */ 1272 clear_bit(chunk, block->unregister_bitmap); 1273 1274 if (test_bit(chunk, block->transit_bitmap)) { 1275 trace_qemu_rdma_unregister_waiting_inflight(chunk); 1276 continue; 1277 } 1278 1279 trace_qemu_rdma_unregister_waiting_send(chunk); 1280 1281 ret = ibv_dereg_mr(block->pmr[chunk]); 1282 block->pmr[chunk] = NULL; 1283 block->remote_keys[chunk] = 0; 1284 1285 if (ret != 0) { 1286 perror("unregistration chunk failed"); 1287 return -ret; 1288 } 1289 rdma->total_registrations--; 1290 1291 reg.key.chunk = chunk; 1292 register_to_network(®); 1293 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) ®, 1294 &resp, NULL, NULL); 1295 if (ret < 0) { 1296 return ret; 1297 } 1298 1299 trace_qemu_rdma_unregister_waiting_complete(chunk); 1300 } 1301 1302 return 0; 1303 } 1304 1305 static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index, 1306 uint64_t chunk) 1307 { 1308 uint64_t result = wr_id & RDMA_WRID_TYPE_MASK; 1309 1310 result |= (index << RDMA_WRID_BLOCK_SHIFT); 1311 result |= (chunk << RDMA_WRID_CHUNK_SHIFT); 1312 1313 return result; 1314 } 1315 1316 /* 1317 * Set bit for unregistration in the next iteration. 1318 * We cannot transmit right here, but will unpin later. 1319 */ 1320 static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index, 1321 uint64_t chunk, uint64_t wr_id) 1322 { 1323 if (rdma->unregistrations[rdma->unregister_next] != 0) { 1324 error_report("rdma migration: queue is full"); 1325 } else { 1326 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]); 1327 1328 if (!test_and_set_bit(chunk, block->unregister_bitmap)) { 1329 trace_qemu_rdma_signal_unregister_append(chunk, 1330 rdma->unregister_next); 1331 1332 rdma->unregistrations[rdma->unregister_next++] = 1333 qemu_rdma_make_wrid(wr_id, index, chunk); 1334 1335 if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) { 1336 rdma->unregister_next = 0; 1337 } 1338 } else { 1339 trace_qemu_rdma_signal_unregister_already(chunk); 1340 } 1341 } 1342 } 1343 1344 /* 1345 * Consult the connection manager to see a work request 1346 * (of any kind) has completed. 1347 * Return the work request ID that completed. 1348 */ 1349 static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out, 1350 uint32_t *byte_len) 1351 { 1352 int ret; 1353 struct ibv_wc wc; 1354 uint64_t wr_id; 1355 1356 ret = ibv_poll_cq(rdma->cq, 1, &wc); 1357 1358 if (!ret) { 1359 *wr_id_out = RDMA_WRID_NONE; 1360 return 0; 1361 } 1362 1363 if (ret < 0) { 1364 error_report("ibv_poll_cq return %d", ret); 1365 return ret; 1366 } 1367 1368 wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK; 1369 1370 if (wc.status != IBV_WC_SUCCESS) { 1371 fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n", 1372 wc.status, ibv_wc_status_str(wc.status)); 1373 fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]); 1374 1375 return -1; 1376 } 1377 1378 if (rdma->control_ready_expected && 1379 (wr_id >= RDMA_WRID_RECV_CONTROL)) { 1380 trace_qemu_rdma_poll_recv(wrid_desc[RDMA_WRID_RECV_CONTROL], 1381 wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent); 1382 rdma->control_ready_expected = 0; 1383 } 1384 1385 if (wr_id == RDMA_WRID_RDMA_WRITE) { 1386 uint64_t chunk = 1387 (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT; 1388 uint64_t index = 1389 (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT; 1390 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]); 1391 1392 trace_qemu_rdma_poll_write(print_wrid(wr_id), wr_id, rdma->nb_sent, 1393 index, chunk, block->local_host_addr, 1394 (void *)(uintptr_t)block->remote_host_addr); 1395 1396 clear_bit(chunk, block->transit_bitmap); 1397 1398 if (rdma->nb_sent > 0) { 1399 rdma->nb_sent--; 1400 } 1401 1402 if (!rdma->pin_all) { 1403 /* 1404 * FYI: If one wanted to signal a specific chunk to be unregistered 1405 * using LRU or workload-specific information, this is the function 1406 * you would call to do so. That chunk would then get asynchronously 1407 * unregistered later. 1408 */ 1409 #ifdef RDMA_UNREGISTRATION_EXAMPLE 1410 qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id); 1411 #endif 1412 } 1413 } else { 1414 trace_qemu_rdma_poll_other(print_wrid(wr_id), wr_id, rdma->nb_sent); 1415 } 1416 1417 *wr_id_out = wc.wr_id; 1418 if (byte_len) { 1419 *byte_len = wc.byte_len; 1420 } 1421 1422 return 0; 1423 } 1424 1425 /* 1426 * Block until the next work request has completed. 1427 * 1428 * First poll to see if a work request has already completed, 1429 * otherwise block. 1430 * 1431 * If we encounter completed work requests for IDs other than 1432 * the one we're interested in, then that's generally an error. 1433 * 1434 * The only exception is actual RDMA Write completions. These 1435 * completions only need to be recorded, but do not actually 1436 * need further processing. 1437 */ 1438 static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested, 1439 uint32_t *byte_len) 1440 { 1441 int num_cq_events = 0, ret = 0; 1442 struct ibv_cq *cq; 1443 void *cq_ctx; 1444 uint64_t wr_id = RDMA_WRID_NONE, wr_id_in; 1445 1446 if (ibv_req_notify_cq(rdma->cq, 0)) { 1447 return -1; 1448 } 1449 /* poll cq first */ 1450 while (wr_id != wrid_requested) { 1451 ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len); 1452 if (ret < 0) { 1453 return ret; 1454 } 1455 1456 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 1457 1458 if (wr_id == RDMA_WRID_NONE) { 1459 break; 1460 } 1461 if (wr_id != wrid_requested) { 1462 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested), 1463 wrid_requested, print_wrid(wr_id), wr_id); 1464 } 1465 } 1466 1467 if (wr_id == wrid_requested) { 1468 return 0; 1469 } 1470 1471 while (1) { 1472 /* 1473 * Coroutine doesn't start until process_incoming_migration() 1474 * so don't yield unless we know we're running inside of a coroutine. 1475 */ 1476 if (rdma->migration_started_on_destination) { 1477 yield_until_fd_readable(rdma->comp_channel->fd); 1478 } 1479 1480 if (ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx)) { 1481 perror("ibv_get_cq_event"); 1482 goto err_block_for_wrid; 1483 } 1484 1485 num_cq_events++; 1486 1487 if (ibv_req_notify_cq(cq, 0)) { 1488 goto err_block_for_wrid; 1489 } 1490 1491 while (wr_id != wrid_requested) { 1492 ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len); 1493 if (ret < 0) { 1494 goto err_block_for_wrid; 1495 } 1496 1497 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 1498 1499 if (wr_id == RDMA_WRID_NONE) { 1500 break; 1501 } 1502 if (wr_id != wrid_requested) { 1503 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested), 1504 wrid_requested, print_wrid(wr_id), wr_id); 1505 } 1506 } 1507 1508 if (wr_id == wrid_requested) { 1509 goto success_block_for_wrid; 1510 } 1511 } 1512 1513 success_block_for_wrid: 1514 if (num_cq_events) { 1515 ibv_ack_cq_events(cq, num_cq_events); 1516 } 1517 return 0; 1518 1519 err_block_for_wrid: 1520 if (num_cq_events) { 1521 ibv_ack_cq_events(cq, num_cq_events); 1522 } 1523 return ret; 1524 } 1525 1526 /* 1527 * Post a SEND message work request for the control channel 1528 * containing some data and block until the post completes. 1529 */ 1530 static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf, 1531 RDMAControlHeader *head) 1532 { 1533 int ret = 0; 1534 RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL]; 1535 struct ibv_send_wr *bad_wr; 1536 struct ibv_sge sge = { 1537 .addr = (uintptr_t)(wr->control), 1538 .length = head->len + sizeof(RDMAControlHeader), 1539 .lkey = wr->control_mr->lkey, 1540 }; 1541 struct ibv_send_wr send_wr = { 1542 .wr_id = RDMA_WRID_SEND_CONTROL, 1543 .opcode = IBV_WR_SEND, 1544 .send_flags = IBV_SEND_SIGNALED, 1545 .sg_list = &sge, 1546 .num_sge = 1, 1547 }; 1548 1549 trace_qemu_rdma_post_send_control(control_desc[head->type]); 1550 1551 /* 1552 * We don't actually need to do a memcpy() in here if we used 1553 * the "sge" properly, but since we're only sending control messages 1554 * (not RAM in a performance-critical path), then its OK for now. 1555 * 1556 * The copy makes the RDMAControlHeader simpler to manipulate 1557 * for the time being. 1558 */ 1559 assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head)); 1560 memcpy(wr->control, head, sizeof(RDMAControlHeader)); 1561 control_to_network((void *) wr->control); 1562 1563 if (buf) { 1564 memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len); 1565 } 1566 1567 1568 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr); 1569 1570 if (ret > 0) { 1571 error_report("Failed to use post IB SEND for control"); 1572 return -ret; 1573 } 1574 1575 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL); 1576 if (ret < 0) { 1577 error_report("rdma migration: send polling control error"); 1578 } 1579 1580 return ret; 1581 } 1582 1583 /* 1584 * Post a RECV work request in anticipation of some future receipt 1585 * of data on the control channel. 1586 */ 1587 static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx) 1588 { 1589 struct ibv_recv_wr *bad_wr; 1590 struct ibv_sge sge = { 1591 .addr = (uintptr_t)(rdma->wr_data[idx].control), 1592 .length = RDMA_CONTROL_MAX_BUFFER, 1593 .lkey = rdma->wr_data[idx].control_mr->lkey, 1594 }; 1595 1596 struct ibv_recv_wr recv_wr = { 1597 .wr_id = RDMA_WRID_RECV_CONTROL + idx, 1598 .sg_list = &sge, 1599 .num_sge = 1, 1600 }; 1601 1602 1603 if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) { 1604 return -1; 1605 } 1606 1607 return 0; 1608 } 1609 1610 /* 1611 * Block and wait for a RECV control channel message to arrive. 1612 */ 1613 static int qemu_rdma_exchange_get_response(RDMAContext *rdma, 1614 RDMAControlHeader *head, int expecting, int idx) 1615 { 1616 uint32_t byte_len; 1617 int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx, 1618 &byte_len); 1619 1620 if (ret < 0) { 1621 error_report("rdma migration: recv polling control error!"); 1622 return ret; 1623 } 1624 1625 network_to_control((void *) rdma->wr_data[idx].control); 1626 memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader)); 1627 1628 trace_qemu_rdma_exchange_get_response_start(control_desc[expecting]); 1629 1630 if (expecting == RDMA_CONTROL_NONE) { 1631 trace_qemu_rdma_exchange_get_response_none(control_desc[head->type], 1632 head->type); 1633 } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) { 1634 error_report("Was expecting a %s (%d) control message" 1635 ", but got: %s (%d), length: %d", 1636 control_desc[expecting], expecting, 1637 control_desc[head->type], head->type, head->len); 1638 return -EIO; 1639 } 1640 if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) { 1641 error_report("too long length: %d", head->len); 1642 return -EINVAL; 1643 } 1644 if (sizeof(*head) + head->len != byte_len) { 1645 error_report("Malformed length: %d byte_len %d", head->len, byte_len); 1646 return -EINVAL; 1647 } 1648 1649 return 0; 1650 } 1651 1652 /* 1653 * When a RECV work request has completed, the work request's 1654 * buffer is pointed at the header. 1655 * 1656 * This will advance the pointer to the data portion 1657 * of the control message of the work request's buffer that 1658 * was populated after the work request finished. 1659 */ 1660 static void qemu_rdma_move_header(RDMAContext *rdma, int idx, 1661 RDMAControlHeader *head) 1662 { 1663 rdma->wr_data[idx].control_len = head->len; 1664 rdma->wr_data[idx].control_curr = 1665 rdma->wr_data[idx].control + sizeof(RDMAControlHeader); 1666 } 1667 1668 /* 1669 * This is an 'atomic' high-level operation to deliver a single, unified 1670 * control-channel message. 1671 * 1672 * Additionally, if the user is expecting some kind of reply to this message, 1673 * they can request a 'resp' response message be filled in by posting an 1674 * additional work request on behalf of the user and waiting for an additional 1675 * completion. 1676 * 1677 * The extra (optional) response is used during registration to us from having 1678 * to perform an *additional* exchange of message just to provide a response by 1679 * instead piggy-backing on the acknowledgement. 1680 */ 1681 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head, 1682 uint8_t *data, RDMAControlHeader *resp, 1683 int *resp_idx, 1684 int (*callback)(RDMAContext *rdma)) 1685 { 1686 int ret = 0; 1687 1688 /* 1689 * Wait until the dest is ready before attempting to deliver the message 1690 * by waiting for a READY message. 1691 */ 1692 if (rdma->control_ready_expected) { 1693 RDMAControlHeader resp; 1694 ret = qemu_rdma_exchange_get_response(rdma, 1695 &resp, RDMA_CONTROL_READY, RDMA_WRID_READY); 1696 if (ret < 0) { 1697 return ret; 1698 } 1699 } 1700 1701 /* 1702 * If the user is expecting a response, post a WR in anticipation of it. 1703 */ 1704 if (resp) { 1705 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA); 1706 if (ret) { 1707 error_report("rdma migration: error posting" 1708 " extra control recv for anticipated result!"); 1709 return ret; 1710 } 1711 } 1712 1713 /* 1714 * Post a WR to replace the one we just consumed for the READY message. 1715 */ 1716 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 1717 if (ret) { 1718 error_report("rdma migration: error posting first control recv!"); 1719 return ret; 1720 } 1721 1722 /* 1723 * Deliver the control message that was requested. 1724 */ 1725 ret = qemu_rdma_post_send_control(rdma, data, head); 1726 1727 if (ret < 0) { 1728 error_report("Failed to send control buffer!"); 1729 return ret; 1730 } 1731 1732 /* 1733 * If we're expecting a response, block and wait for it. 1734 */ 1735 if (resp) { 1736 if (callback) { 1737 trace_qemu_rdma_exchange_send_issue_callback(); 1738 ret = callback(rdma); 1739 if (ret < 0) { 1740 return ret; 1741 } 1742 } 1743 1744 trace_qemu_rdma_exchange_send_waiting(control_desc[resp->type]); 1745 ret = qemu_rdma_exchange_get_response(rdma, resp, 1746 resp->type, RDMA_WRID_DATA); 1747 1748 if (ret < 0) { 1749 return ret; 1750 } 1751 1752 qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp); 1753 if (resp_idx) { 1754 *resp_idx = RDMA_WRID_DATA; 1755 } 1756 trace_qemu_rdma_exchange_send_received(control_desc[resp->type]); 1757 } 1758 1759 rdma->control_ready_expected = 1; 1760 1761 return 0; 1762 } 1763 1764 /* 1765 * This is an 'atomic' high-level operation to receive a single, unified 1766 * control-channel message. 1767 */ 1768 static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head, 1769 int expecting) 1770 { 1771 RDMAControlHeader ready = { 1772 .len = 0, 1773 .type = RDMA_CONTROL_READY, 1774 .repeat = 1, 1775 }; 1776 int ret; 1777 1778 /* 1779 * Inform the source that we're ready to receive a message. 1780 */ 1781 ret = qemu_rdma_post_send_control(rdma, NULL, &ready); 1782 1783 if (ret < 0) { 1784 error_report("Failed to send control buffer!"); 1785 return ret; 1786 } 1787 1788 /* 1789 * Block and wait for the message. 1790 */ 1791 ret = qemu_rdma_exchange_get_response(rdma, head, 1792 expecting, RDMA_WRID_READY); 1793 1794 if (ret < 0) { 1795 return ret; 1796 } 1797 1798 qemu_rdma_move_header(rdma, RDMA_WRID_READY, head); 1799 1800 /* 1801 * Post a new RECV work request to replace the one we just consumed. 1802 */ 1803 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 1804 if (ret) { 1805 error_report("rdma migration: error posting second control recv!"); 1806 return ret; 1807 } 1808 1809 return 0; 1810 } 1811 1812 /* 1813 * Write an actual chunk of memory using RDMA. 1814 * 1815 * If we're using dynamic registration on the dest-side, we have to 1816 * send a registration command first. 1817 */ 1818 static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma, 1819 int current_index, uint64_t current_addr, 1820 uint64_t length) 1821 { 1822 struct ibv_sge sge; 1823 struct ibv_send_wr send_wr = { 0 }; 1824 struct ibv_send_wr *bad_wr; 1825 int reg_result_idx, ret, count = 0; 1826 uint64_t chunk, chunks; 1827 uint8_t *chunk_start, *chunk_end; 1828 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]); 1829 RDMARegister reg; 1830 RDMARegisterResult *reg_result; 1831 RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT }; 1832 RDMAControlHeader head = { .len = sizeof(RDMARegister), 1833 .type = RDMA_CONTROL_REGISTER_REQUEST, 1834 .repeat = 1, 1835 }; 1836 1837 retry: 1838 sge.addr = (uintptr_t)(block->local_host_addr + 1839 (current_addr - block->offset)); 1840 sge.length = length; 1841 1842 chunk = ram_chunk_index(block->local_host_addr, 1843 (uint8_t *)(uintptr_t)sge.addr); 1844 chunk_start = ram_chunk_start(block, chunk); 1845 1846 if (block->is_ram_block) { 1847 chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT); 1848 1849 if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) { 1850 chunks--; 1851 } 1852 } else { 1853 chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT); 1854 1855 if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) { 1856 chunks--; 1857 } 1858 } 1859 1860 trace_qemu_rdma_write_one_top(chunks + 1, 1861 (chunks + 1) * 1862 (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024); 1863 1864 chunk_end = ram_chunk_end(block, chunk + chunks); 1865 1866 if (!rdma->pin_all) { 1867 #ifdef RDMA_UNREGISTRATION_EXAMPLE 1868 qemu_rdma_unregister_waiting(rdma); 1869 #endif 1870 } 1871 1872 while (test_bit(chunk, block->transit_bitmap)) { 1873 (void)count; 1874 trace_qemu_rdma_write_one_block(count++, current_index, chunk, 1875 sge.addr, length, rdma->nb_sent, block->nb_chunks); 1876 1877 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 1878 1879 if (ret < 0) { 1880 error_report("Failed to Wait for previous write to complete " 1881 "block %d chunk %" PRIu64 1882 " current %" PRIu64 " len %" PRIu64 " %d", 1883 current_index, chunk, sge.addr, length, rdma->nb_sent); 1884 return ret; 1885 } 1886 } 1887 1888 if (!rdma->pin_all || !block->is_ram_block) { 1889 if (!block->remote_keys[chunk]) { 1890 /* 1891 * This chunk has not yet been registered, so first check to see 1892 * if the entire chunk is zero. If so, tell the other size to 1893 * memset() + madvise() the entire chunk without RDMA. 1894 */ 1895 1896 if (can_use_buffer_find_nonzero_offset((void *)(uintptr_t)sge.addr, 1897 length) 1898 && buffer_find_nonzero_offset((void *)(uintptr_t)sge.addr, 1899 length) == length) { 1900 RDMACompress comp = { 1901 .offset = current_addr, 1902 .value = 0, 1903 .block_idx = current_index, 1904 .length = length, 1905 }; 1906 1907 head.len = sizeof(comp); 1908 head.type = RDMA_CONTROL_COMPRESS; 1909 1910 trace_qemu_rdma_write_one_zero(chunk, sge.length, 1911 current_index, current_addr); 1912 1913 compress_to_network(&comp); 1914 ret = qemu_rdma_exchange_send(rdma, &head, 1915 (uint8_t *) &comp, NULL, NULL, NULL); 1916 1917 if (ret < 0) { 1918 return -EIO; 1919 } 1920 1921 acct_update_position(f, sge.length, true); 1922 1923 return 1; 1924 } 1925 1926 /* 1927 * Otherwise, tell other side to register. 1928 */ 1929 reg.current_index = current_index; 1930 if (block->is_ram_block) { 1931 reg.key.current_addr = current_addr; 1932 } else { 1933 reg.key.chunk = chunk; 1934 } 1935 reg.chunks = chunks; 1936 1937 trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index, 1938 current_addr); 1939 1940 register_to_network(®); 1941 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) ®, 1942 &resp, ®_result_idx, NULL); 1943 if (ret < 0) { 1944 return ret; 1945 } 1946 1947 /* try to overlap this single registration with the one we sent. */ 1948 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 1949 &sge.lkey, NULL, chunk, 1950 chunk_start, chunk_end)) { 1951 error_report("cannot get lkey"); 1952 return -EINVAL; 1953 } 1954 1955 reg_result = (RDMARegisterResult *) 1956 rdma->wr_data[reg_result_idx].control_curr; 1957 1958 network_to_result(reg_result); 1959 1960 trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk], 1961 reg_result->rkey, chunk); 1962 1963 block->remote_keys[chunk] = reg_result->rkey; 1964 block->remote_host_addr = reg_result->host_addr; 1965 } else { 1966 /* already registered before */ 1967 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 1968 &sge.lkey, NULL, chunk, 1969 chunk_start, chunk_end)) { 1970 error_report("cannot get lkey!"); 1971 return -EINVAL; 1972 } 1973 } 1974 1975 send_wr.wr.rdma.rkey = block->remote_keys[chunk]; 1976 } else { 1977 send_wr.wr.rdma.rkey = block->remote_rkey; 1978 1979 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 1980 &sge.lkey, NULL, chunk, 1981 chunk_start, chunk_end)) { 1982 error_report("cannot get lkey!"); 1983 return -EINVAL; 1984 } 1985 } 1986 1987 /* 1988 * Encode the ram block index and chunk within this wrid. 1989 * We will use this information at the time of completion 1990 * to figure out which bitmap to check against and then which 1991 * chunk in the bitmap to look for. 1992 */ 1993 send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE, 1994 current_index, chunk); 1995 1996 send_wr.opcode = IBV_WR_RDMA_WRITE; 1997 send_wr.send_flags = IBV_SEND_SIGNALED; 1998 send_wr.sg_list = &sge; 1999 send_wr.num_sge = 1; 2000 send_wr.wr.rdma.remote_addr = block->remote_host_addr + 2001 (current_addr - block->offset); 2002 2003 trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr, 2004 sge.length); 2005 2006 /* 2007 * ibv_post_send() does not return negative error numbers, 2008 * per the specification they are positive - no idea why. 2009 */ 2010 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr); 2011 2012 if (ret == ENOMEM) { 2013 trace_qemu_rdma_write_one_queue_full(); 2014 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 2015 if (ret < 0) { 2016 error_report("rdma migration: failed to make " 2017 "room in full send queue! %d", ret); 2018 return ret; 2019 } 2020 2021 goto retry; 2022 2023 } else if (ret > 0) { 2024 perror("rdma migration: post rdma write failed"); 2025 return -ret; 2026 } 2027 2028 set_bit(chunk, block->transit_bitmap); 2029 acct_update_position(f, sge.length, false); 2030 rdma->total_writes++; 2031 2032 return 0; 2033 } 2034 2035 /* 2036 * Push out any unwritten RDMA operations. 2037 * 2038 * We support sending out multiple chunks at the same time. 2039 * Not all of them need to get signaled in the completion queue. 2040 */ 2041 static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma) 2042 { 2043 int ret; 2044 2045 if (!rdma->current_length) { 2046 return 0; 2047 } 2048 2049 ret = qemu_rdma_write_one(f, rdma, 2050 rdma->current_index, rdma->current_addr, rdma->current_length); 2051 2052 if (ret < 0) { 2053 return ret; 2054 } 2055 2056 if (ret == 0) { 2057 rdma->nb_sent++; 2058 trace_qemu_rdma_write_flush(rdma->nb_sent); 2059 } 2060 2061 rdma->current_length = 0; 2062 rdma->current_addr = 0; 2063 2064 return 0; 2065 } 2066 2067 static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma, 2068 uint64_t offset, uint64_t len) 2069 { 2070 RDMALocalBlock *block; 2071 uint8_t *host_addr; 2072 uint8_t *chunk_end; 2073 2074 if (rdma->current_index < 0) { 2075 return 0; 2076 } 2077 2078 if (rdma->current_chunk < 0) { 2079 return 0; 2080 } 2081 2082 block = &(rdma->local_ram_blocks.block[rdma->current_index]); 2083 host_addr = block->local_host_addr + (offset - block->offset); 2084 chunk_end = ram_chunk_end(block, rdma->current_chunk); 2085 2086 if (rdma->current_length == 0) { 2087 return 0; 2088 } 2089 2090 /* 2091 * Only merge into chunk sequentially. 2092 */ 2093 if (offset != (rdma->current_addr + rdma->current_length)) { 2094 return 0; 2095 } 2096 2097 if (offset < block->offset) { 2098 return 0; 2099 } 2100 2101 if ((offset + len) > (block->offset + block->length)) { 2102 return 0; 2103 } 2104 2105 if ((host_addr + len) > chunk_end) { 2106 return 0; 2107 } 2108 2109 return 1; 2110 } 2111 2112 /* 2113 * We're not actually writing here, but doing three things: 2114 * 2115 * 1. Identify the chunk the buffer belongs to. 2116 * 2. If the chunk is full or the buffer doesn't belong to the current 2117 * chunk, then start a new chunk and flush() the old chunk. 2118 * 3. To keep the hardware busy, we also group chunks into batches 2119 * and only require that a batch gets acknowledged in the completion 2120 * qeueue instead of each individual chunk. 2121 */ 2122 static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma, 2123 uint64_t block_offset, uint64_t offset, 2124 uint64_t len) 2125 { 2126 uint64_t current_addr = block_offset + offset; 2127 uint64_t index = rdma->current_index; 2128 uint64_t chunk = rdma->current_chunk; 2129 int ret; 2130 2131 /* If we cannot merge it, we flush the current buffer first. */ 2132 if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) { 2133 ret = qemu_rdma_write_flush(f, rdma); 2134 if (ret) { 2135 return ret; 2136 } 2137 rdma->current_length = 0; 2138 rdma->current_addr = current_addr; 2139 2140 ret = qemu_rdma_search_ram_block(rdma, block_offset, 2141 offset, len, &index, &chunk); 2142 if (ret) { 2143 error_report("ram block search failed"); 2144 return ret; 2145 } 2146 rdma->current_index = index; 2147 rdma->current_chunk = chunk; 2148 } 2149 2150 /* merge it */ 2151 rdma->current_length += len; 2152 2153 /* flush it if buffer is too large */ 2154 if (rdma->current_length >= RDMA_MERGE_MAX) { 2155 return qemu_rdma_write_flush(f, rdma); 2156 } 2157 2158 return 0; 2159 } 2160 2161 static void qemu_rdma_cleanup(RDMAContext *rdma) 2162 { 2163 struct rdma_cm_event *cm_event; 2164 int ret, idx; 2165 2166 if (rdma->cm_id && rdma->connected) { 2167 if (rdma->error_state) { 2168 RDMAControlHeader head = { .len = 0, 2169 .type = RDMA_CONTROL_ERROR, 2170 .repeat = 1, 2171 }; 2172 error_report("Early error. Sending error."); 2173 qemu_rdma_post_send_control(rdma, NULL, &head); 2174 } 2175 2176 ret = rdma_disconnect(rdma->cm_id); 2177 if (!ret) { 2178 trace_qemu_rdma_cleanup_waiting_for_disconnect(); 2179 ret = rdma_get_cm_event(rdma->channel, &cm_event); 2180 if (!ret) { 2181 rdma_ack_cm_event(cm_event); 2182 } 2183 } 2184 trace_qemu_rdma_cleanup_disconnect(); 2185 rdma->connected = false; 2186 } 2187 2188 g_free(rdma->dest_blocks); 2189 rdma->dest_blocks = NULL; 2190 2191 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2192 if (rdma->wr_data[idx].control_mr) { 2193 rdma->total_registrations--; 2194 ibv_dereg_mr(rdma->wr_data[idx].control_mr); 2195 } 2196 rdma->wr_data[idx].control_mr = NULL; 2197 } 2198 2199 if (rdma->local_ram_blocks.block) { 2200 while (rdma->local_ram_blocks.nb_blocks) { 2201 rdma_delete_block(rdma, rdma->local_ram_blocks.block->offset); 2202 } 2203 } 2204 2205 if (rdma->qp) { 2206 rdma_destroy_qp(rdma->cm_id); 2207 rdma->qp = NULL; 2208 } 2209 if (rdma->cq) { 2210 ibv_destroy_cq(rdma->cq); 2211 rdma->cq = NULL; 2212 } 2213 if (rdma->comp_channel) { 2214 ibv_destroy_comp_channel(rdma->comp_channel); 2215 rdma->comp_channel = NULL; 2216 } 2217 if (rdma->pd) { 2218 ibv_dealloc_pd(rdma->pd); 2219 rdma->pd = NULL; 2220 } 2221 if (rdma->cm_id) { 2222 rdma_destroy_id(rdma->cm_id); 2223 rdma->cm_id = NULL; 2224 } 2225 if (rdma->listen_id) { 2226 rdma_destroy_id(rdma->listen_id); 2227 rdma->listen_id = NULL; 2228 } 2229 if (rdma->channel) { 2230 rdma_destroy_event_channel(rdma->channel); 2231 rdma->channel = NULL; 2232 } 2233 g_free(rdma->host); 2234 rdma->host = NULL; 2235 } 2236 2237 2238 static int qemu_rdma_source_init(RDMAContext *rdma, Error **errp, bool pin_all) 2239 { 2240 int ret, idx; 2241 Error *local_err = NULL, **temp = &local_err; 2242 2243 /* 2244 * Will be validated against destination's actual capabilities 2245 * after the connect() completes. 2246 */ 2247 rdma->pin_all = pin_all; 2248 2249 ret = qemu_rdma_resolve_host(rdma, temp); 2250 if (ret) { 2251 goto err_rdma_source_init; 2252 } 2253 2254 ret = qemu_rdma_alloc_pd_cq(rdma); 2255 if (ret) { 2256 ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()" 2257 " limits may be too low. Please check $ ulimit -a # and " 2258 "search for 'ulimit -l' in the output"); 2259 goto err_rdma_source_init; 2260 } 2261 2262 ret = qemu_rdma_alloc_qp(rdma); 2263 if (ret) { 2264 ERROR(temp, "rdma migration: error allocating qp!"); 2265 goto err_rdma_source_init; 2266 } 2267 2268 ret = qemu_rdma_init_ram_blocks(rdma); 2269 if (ret) { 2270 ERROR(temp, "rdma migration: error initializing ram blocks!"); 2271 goto err_rdma_source_init; 2272 } 2273 2274 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2275 ret = qemu_rdma_reg_control(rdma, idx); 2276 if (ret) { 2277 ERROR(temp, "rdma migration: error registering %d control!", 2278 idx); 2279 goto err_rdma_source_init; 2280 } 2281 } 2282 2283 return 0; 2284 2285 err_rdma_source_init: 2286 error_propagate(errp, local_err); 2287 qemu_rdma_cleanup(rdma); 2288 return -1; 2289 } 2290 2291 static int qemu_rdma_connect(RDMAContext *rdma, Error **errp) 2292 { 2293 RDMACapabilities cap = { 2294 .version = RDMA_CONTROL_VERSION_CURRENT, 2295 .flags = 0, 2296 }; 2297 struct rdma_conn_param conn_param = { .initiator_depth = 2, 2298 .retry_count = 5, 2299 .private_data = &cap, 2300 .private_data_len = sizeof(cap), 2301 }; 2302 struct rdma_cm_event *cm_event; 2303 int ret; 2304 2305 /* 2306 * Only negotiate the capability with destination if the user 2307 * on the source first requested the capability. 2308 */ 2309 if (rdma->pin_all) { 2310 trace_qemu_rdma_connect_pin_all_requested(); 2311 cap.flags |= RDMA_CAPABILITY_PIN_ALL; 2312 } 2313 2314 caps_to_network(&cap); 2315 2316 ret = rdma_connect(rdma->cm_id, &conn_param); 2317 if (ret) { 2318 perror("rdma_connect"); 2319 ERROR(errp, "connecting to destination!"); 2320 goto err_rdma_source_connect; 2321 } 2322 2323 ret = rdma_get_cm_event(rdma->channel, &cm_event); 2324 if (ret) { 2325 perror("rdma_get_cm_event after rdma_connect"); 2326 ERROR(errp, "connecting to destination!"); 2327 rdma_ack_cm_event(cm_event); 2328 goto err_rdma_source_connect; 2329 } 2330 2331 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) { 2332 perror("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect"); 2333 ERROR(errp, "connecting to destination!"); 2334 rdma_ack_cm_event(cm_event); 2335 goto err_rdma_source_connect; 2336 } 2337 rdma->connected = true; 2338 2339 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap)); 2340 network_to_caps(&cap); 2341 2342 /* 2343 * Verify that the *requested* capabilities are supported by the destination 2344 * and disable them otherwise. 2345 */ 2346 if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) { 2347 ERROR(errp, "Server cannot support pinning all memory. " 2348 "Will register memory dynamically."); 2349 rdma->pin_all = false; 2350 } 2351 2352 trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all); 2353 2354 rdma_ack_cm_event(cm_event); 2355 2356 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 2357 if (ret) { 2358 ERROR(errp, "posting second control recv!"); 2359 goto err_rdma_source_connect; 2360 } 2361 2362 rdma->control_ready_expected = 1; 2363 rdma->nb_sent = 0; 2364 return 0; 2365 2366 err_rdma_source_connect: 2367 qemu_rdma_cleanup(rdma); 2368 return -1; 2369 } 2370 2371 static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp) 2372 { 2373 int ret, idx; 2374 struct rdma_cm_id *listen_id; 2375 char ip[40] = "unknown"; 2376 struct rdma_addrinfo *res, *e; 2377 char port_str[16]; 2378 2379 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2380 rdma->wr_data[idx].control_len = 0; 2381 rdma->wr_data[idx].control_curr = NULL; 2382 } 2383 2384 if (!rdma->host || !rdma->host[0]) { 2385 ERROR(errp, "RDMA host is not set!"); 2386 rdma->error_state = -EINVAL; 2387 return -1; 2388 } 2389 /* create CM channel */ 2390 rdma->channel = rdma_create_event_channel(); 2391 if (!rdma->channel) { 2392 ERROR(errp, "could not create rdma event channel"); 2393 rdma->error_state = -EINVAL; 2394 return -1; 2395 } 2396 2397 /* create CM id */ 2398 ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP); 2399 if (ret) { 2400 ERROR(errp, "could not create cm_id!"); 2401 goto err_dest_init_create_listen_id; 2402 } 2403 2404 snprintf(port_str, 16, "%d", rdma->port); 2405 port_str[15] = '\0'; 2406 2407 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res); 2408 if (ret < 0) { 2409 ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host); 2410 goto err_dest_init_bind_addr; 2411 } 2412 2413 for (e = res; e != NULL; e = e->ai_next) { 2414 inet_ntop(e->ai_family, 2415 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip); 2416 trace_qemu_rdma_dest_init_trying(rdma->host, ip); 2417 ret = rdma_bind_addr(listen_id, e->ai_dst_addr); 2418 if (ret) { 2419 continue; 2420 } 2421 if (e->ai_family == AF_INET6) { 2422 ret = qemu_rdma_broken_ipv6_kernel(errp, listen_id->verbs); 2423 if (ret) { 2424 continue; 2425 } 2426 } 2427 break; 2428 } 2429 2430 if (!e) { 2431 ERROR(errp, "Error: could not rdma_bind_addr!"); 2432 goto err_dest_init_bind_addr; 2433 } 2434 2435 rdma->listen_id = listen_id; 2436 qemu_rdma_dump_gid("dest_init", listen_id); 2437 return 0; 2438 2439 err_dest_init_bind_addr: 2440 rdma_destroy_id(listen_id); 2441 err_dest_init_create_listen_id: 2442 rdma_destroy_event_channel(rdma->channel); 2443 rdma->channel = NULL; 2444 rdma->error_state = ret; 2445 return ret; 2446 2447 } 2448 2449 static void *qemu_rdma_data_init(const char *host_port, Error **errp) 2450 { 2451 RDMAContext *rdma = NULL; 2452 InetSocketAddress *addr; 2453 2454 if (host_port) { 2455 rdma = g_malloc0(sizeof(RDMAContext)); 2456 rdma->current_index = -1; 2457 rdma->current_chunk = -1; 2458 2459 addr = inet_parse(host_port, NULL); 2460 if (addr != NULL) { 2461 rdma->port = atoi(addr->port); 2462 rdma->host = g_strdup(addr->host); 2463 } else { 2464 ERROR(errp, "bad RDMA migration address '%s'", host_port); 2465 g_free(rdma); 2466 rdma = NULL; 2467 } 2468 2469 qapi_free_InetSocketAddress(addr); 2470 } 2471 2472 return rdma; 2473 } 2474 2475 /* 2476 * QEMUFile interface to the control channel. 2477 * SEND messages for control only. 2478 * VM's ram is handled with regular RDMA messages. 2479 */ 2480 static int qemu_rdma_put_buffer(void *opaque, const uint8_t *buf, 2481 int64_t pos, int size) 2482 { 2483 QEMUFileRDMA *r = opaque; 2484 QEMUFile *f = r->file; 2485 RDMAContext *rdma = r->rdma; 2486 size_t remaining = size; 2487 uint8_t * data = (void *) buf; 2488 int ret; 2489 2490 CHECK_ERROR_STATE(); 2491 2492 /* 2493 * Push out any writes that 2494 * we're queued up for VM's ram. 2495 */ 2496 ret = qemu_rdma_write_flush(f, rdma); 2497 if (ret < 0) { 2498 rdma->error_state = ret; 2499 return ret; 2500 } 2501 2502 while (remaining) { 2503 RDMAControlHeader head; 2504 2505 r->len = MIN(remaining, RDMA_SEND_INCREMENT); 2506 remaining -= r->len; 2507 2508 head.len = r->len; 2509 head.type = RDMA_CONTROL_QEMU_FILE; 2510 2511 ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL); 2512 2513 if (ret < 0) { 2514 rdma->error_state = ret; 2515 return ret; 2516 } 2517 2518 data += r->len; 2519 } 2520 2521 return size; 2522 } 2523 2524 static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf, 2525 int size, int idx) 2526 { 2527 size_t len = 0; 2528 2529 if (rdma->wr_data[idx].control_len) { 2530 trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size); 2531 2532 len = MIN(size, rdma->wr_data[idx].control_len); 2533 memcpy(buf, rdma->wr_data[idx].control_curr, len); 2534 rdma->wr_data[idx].control_curr += len; 2535 rdma->wr_data[idx].control_len -= len; 2536 } 2537 2538 return len; 2539 } 2540 2541 /* 2542 * QEMUFile interface to the control channel. 2543 * RDMA links don't use bytestreams, so we have to 2544 * return bytes to QEMUFile opportunistically. 2545 */ 2546 static int qemu_rdma_get_buffer(void *opaque, uint8_t *buf, 2547 int64_t pos, int size) 2548 { 2549 QEMUFileRDMA *r = opaque; 2550 RDMAContext *rdma = r->rdma; 2551 RDMAControlHeader head; 2552 int ret = 0; 2553 2554 CHECK_ERROR_STATE(); 2555 2556 /* 2557 * First, we hold on to the last SEND message we 2558 * were given and dish out the bytes until we run 2559 * out of bytes. 2560 */ 2561 r->len = qemu_rdma_fill(r->rdma, buf, size, 0); 2562 if (r->len) { 2563 return r->len; 2564 } 2565 2566 /* 2567 * Once we run out, we block and wait for another 2568 * SEND message to arrive. 2569 */ 2570 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE); 2571 2572 if (ret < 0) { 2573 rdma->error_state = ret; 2574 return ret; 2575 } 2576 2577 /* 2578 * SEND was received with new bytes, now try again. 2579 */ 2580 return qemu_rdma_fill(r->rdma, buf, size, 0); 2581 } 2582 2583 /* 2584 * Block until all the outstanding chunks have been delivered by the hardware. 2585 */ 2586 static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma) 2587 { 2588 int ret; 2589 2590 if (qemu_rdma_write_flush(f, rdma) < 0) { 2591 return -EIO; 2592 } 2593 2594 while (rdma->nb_sent) { 2595 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 2596 if (ret < 0) { 2597 error_report("rdma migration: complete polling error!"); 2598 return -EIO; 2599 } 2600 } 2601 2602 qemu_rdma_unregister_waiting(rdma); 2603 2604 return 0; 2605 } 2606 2607 static int qemu_rdma_close(void *opaque) 2608 { 2609 trace_qemu_rdma_close(); 2610 QEMUFileRDMA *r = opaque; 2611 if (r->rdma) { 2612 qemu_rdma_cleanup(r->rdma); 2613 g_free(r->rdma); 2614 } 2615 g_free(r); 2616 return 0; 2617 } 2618 2619 /* 2620 * Parameters: 2621 * @offset == 0 : 2622 * This means that 'block_offset' is a full virtual address that does not 2623 * belong to a RAMBlock of the virtual machine and instead 2624 * represents a private malloc'd memory area that the caller wishes to 2625 * transfer. 2626 * 2627 * @offset != 0 : 2628 * Offset is an offset to be added to block_offset and used 2629 * to also lookup the corresponding RAMBlock. 2630 * 2631 * @size > 0 : 2632 * Initiate an transfer this size. 2633 * 2634 * @size == 0 : 2635 * A 'hint' or 'advice' that means that we wish to speculatively 2636 * and asynchronously unregister this memory. In this case, there is no 2637 * guarantee that the unregister will actually happen, for example, 2638 * if the memory is being actively transmitted. Additionally, the memory 2639 * may be re-registered at any future time if a write within the same 2640 * chunk was requested again, even if you attempted to unregister it 2641 * here. 2642 * 2643 * @size < 0 : TODO, not yet supported 2644 * Unregister the memory NOW. This means that the caller does not 2645 * expect there to be any future RDMA transfers and we just want to clean 2646 * things up. This is used in case the upper layer owns the memory and 2647 * cannot wait for qemu_fclose() to occur. 2648 * 2649 * @bytes_sent : User-specificed pointer to indicate how many bytes were 2650 * sent. Usually, this will not be more than a few bytes of 2651 * the protocol because most transfers are sent asynchronously. 2652 */ 2653 static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque, 2654 ram_addr_t block_offset, ram_addr_t offset, 2655 size_t size, uint64_t *bytes_sent) 2656 { 2657 QEMUFileRDMA *rfile = opaque; 2658 RDMAContext *rdma = rfile->rdma; 2659 int ret; 2660 2661 CHECK_ERROR_STATE(); 2662 2663 qemu_fflush(f); 2664 2665 if (size > 0) { 2666 /* 2667 * Add this page to the current 'chunk'. If the chunk 2668 * is full, or the page doen't belong to the current chunk, 2669 * an actual RDMA write will occur and a new chunk will be formed. 2670 */ 2671 ret = qemu_rdma_write(f, rdma, block_offset, offset, size); 2672 if (ret < 0) { 2673 error_report("rdma migration: write error! %d", ret); 2674 goto err; 2675 } 2676 2677 /* 2678 * We always return 1 bytes because the RDMA 2679 * protocol is completely asynchronous. We do not yet know 2680 * whether an identified chunk is zero or not because we're 2681 * waiting for other pages to potentially be merged with 2682 * the current chunk. So, we have to call qemu_update_position() 2683 * later on when the actual write occurs. 2684 */ 2685 if (bytes_sent) { 2686 *bytes_sent = 1; 2687 } 2688 } else { 2689 uint64_t index, chunk; 2690 2691 /* TODO: Change QEMUFileOps prototype to be signed: size_t => long 2692 if (size < 0) { 2693 ret = qemu_rdma_drain_cq(f, rdma); 2694 if (ret < 0) { 2695 fprintf(stderr, "rdma: failed to synchronously drain" 2696 " completion queue before unregistration.\n"); 2697 goto err; 2698 } 2699 } 2700 */ 2701 2702 ret = qemu_rdma_search_ram_block(rdma, block_offset, 2703 offset, size, &index, &chunk); 2704 2705 if (ret) { 2706 error_report("ram block search failed"); 2707 goto err; 2708 } 2709 2710 qemu_rdma_signal_unregister(rdma, index, chunk, 0); 2711 2712 /* 2713 * TODO: Synchronous, guaranteed unregistration (should not occur during 2714 * fast-path). Otherwise, unregisters will process on the next call to 2715 * qemu_rdma_drain_cq() 2716 if (size < 0) { 2717 qemu_rdma_unregister_waiting(rdma); 2718 } 2719 */ 2720 } 2721 2722 /* 2723 * Drain the Completion Queue if possible, but do not block, 2724 * just poll. 2725 * 2726 * If nothing to poll, the end of the iteration will do this 2727 * again to make sure we don't overflow the request queue. 2728 */ 2729 while (1) { 2730 uint64_t wr_id, wr_id_in; 2731 int ret = qemu_rdma_poll(rdma, &wr_id_in, NULL); 2732 if (ret < 0) { 2733 error_report("rdma migration: polling error! %d", ret); 2734 goto err; 2735 } 2736 2737 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 2738 2739 if (wr_id == RDMA_WRID_NONE) { 2740 break; 2741 } 2742 } 2743 2744 return RAM_SAVE_CONTROL_DELAYED; 2745 err: 2746 rdma->error_state = ret; 2747 return ret; 2748 } 2749 2750 static int qemu_rdma_accept(RDMAContext *rdma) 2751 { 2752 RDMACapabilities cap; 2753 struct rdma_conn_param conn_param = { 2754 .responder_resources = 2, 2755 .private_data = &cap, 2756 .private_data_len = sizeof(cap), 2757 }; 2758 struct rdma_cm_event *cm_event; 2759 struct ibv_context *verbs; 2760 int ret = -EINVAL; 2761 int idx; 2762 2763 ret = rdma_get_cm_event(rdma->channel, &cm_event); 2764 if (ret) { 2765 goto err_rdma_dest_wait; 2766 } 2767 2768 if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) { 2769 rdma_ack_cm_event(cm_event); 2770 goto err_rdma_dest_wait; 2771 } 2772 2773 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap)); 2774 2775 network_to_caps(&cap); 2776 2777 if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) { 2778 error_report("Unknown source RDMA version: %d, bailing...", 2779 cap.version); 2780 rdma_ack_cm_event(cm_event); 2781 goto err_rdma_dest_wait; 2782 } 2783 2784 /* 2785 * Respond with only the capabilities this version of QEMU knows about. 2786 */ 2787 cap.flags &= known_capabilities; 2788 2789 /* 2790 * Enable the ones that we do know about. 2791 * Add other checks here as new ones are introduced. 2792 */ 2793 if (cap.flags & RDMA_CAPABILITY_PIN_ALL) { 2794 rdma->pin_all = true; 2795 } 2796 2797 rdma->cm_id = cm_event->id; 2798 verbs = cm_event->id->verbs; 2799 2800 rdma_ack_cm_event(cm_event); 2801 2802 trace_qemu_rdma_accept_pin_state(rdma->pin_all); 2803 2804 caps_to_network(&cap); 2805 2806 trace_qemu_rdma_accept_pin_verbsc(verbs); 2807 2808 if (!rdma->verbs) { 2809 rdma->verbs = verbs; 2810 } else if (rdma->verbs != verbs) { 2811 error_report("ibv context not matching %p, %p!", rdma->verbs, 2812 verbs); 2813 goto err_rdma_dest_wait; 2814 } 2815 2816 qemu_rdma_dump_id("dest_init", verbs); 2817 2818 ret = qemu_rdma_alloc_pd_cq(rdma); 2819 if (ret) { 2820 error_report("rdma migration: error allocating pd and cq!"); 2821 goto err_rdma_dest_wait; 2822 } 2823 2824 ret = qemu_rdma_alloc_qp(rdma); 2825 if (ret) { 2826 error_report("rdma migration: error allocating qp!"); 2827 goto err_rdma_dest_wait; 2828 } 2829 2830 ret = qemu_rdma_init_ram_blocks(rdma); 2831 if (ret) { 2832 error_report("rdma migration: error initializing ram blocks!"); 2833 goto err_rdma_dest_wait; 2834 } 2835 2836 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2837 ret = qemu_rdma_reg_control(rdma, idx); 2838 if (ret) { 2839 error_report("rdma: error registering %d control", idx); 2840 goto err_rdma_dest_wait; 2841 } 2842 } 2843 2844 qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL); 2845 2846 ret = rdma_accept(rdma->cm_id, &conn_param); 2847 if (ret) { 2848 error_report("rdma_accept returns %d", ret); 2849 goto err_rdma_dest_wait; 2850 } 2851 2852 ret = rdma_get_cm_event(rdma->channel, &cm_event); 2853 if (ret) { 2854 error_report("rdma_accept get_cm_event failed %d", ret); 2855 goto err_rdma_dest_wait; 2856 } 2857 2858 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) { 2859 error_report("rdma_accept not event established"); 2860 rdma_ack_cm_event(cm_event); 2861 goto err_rdma_dest_wait; 2862 } 2863 2864 rdma_ack_cm_event(cm_event); 2865 rdma->connected = true; 2866 2867 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 2868 if (ret) { 2869 error_report("rdma migration: error posting second control recv"); 2870 goto err_rdma_dest_wait; 2871 } 2872 2873 qemu_rdma_dump_gid("dest_connect", rdma->cm_id); 2874 2875 return 0; 2876 2877 err_rdma_dest_wait: 2878 rdma->error_state = ret; 2879 qemu_rdma_cleanup(rdma); 2880 return ret; 2881 } 2882 2883 /* 2884 * During each iteration of the migration, we listen for instructions 2885 * by the source VM to perform dynamic page registrations before they 2886 * can perform RDMA operations. 2887 * 2888 * We respond with the 'rkey'. 2889 * 2890 * Keep doing this until the source tells us to stop. 2891 */ 2892 static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque, 2893 uint64_t flags) 2894 { 2895 RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult), 2896 .type = RDMA_CONTROL_REGISTER_RESULT, 2897 .repeat = 0, 2898 }; 2899 RDMAControlHeader unreg_resp = { .len = 0, 2900 .type = RDMA_CONTROL_UNREGISTER_FINISHED, 2901 .repeat = 0, 2902 }; 2903 RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT, 2904 .repeat = 1 }; 2905 QEMUFileRDMA *rfile = opaque; 2906 RDMAContext *rdma = rfile->rdma; 2907 RDMALocalBlocks *local = &rdma->local_ram_blocks; 2908 RDMAControlHeader head; 2909 RDMARegister *reg, *registers; 2910 RDMACompress *comp; 2911 RDMARegisterResult *reg_result; 2912 static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE]; 2913 RDMALocalBlock *block; 2914 void *host_addr; 2915 int ret = 0; 2916 int idx = 0; 2917 int count = 0; 2918 int i = 0; 2919 2920 CHECK_ERROR_STATE(); 2921 2922 do { 2923 trace_qemu_rdma_registration_handle_wait(flags); 2924 2925 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE); 2926 2927 if (ret < 0) { 2928 break; 2929 } 2930 2931 if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) { 2932 error_report("rdma: Too many requests in this message (%d)." 2933 "Bailing.", head.repeat); 2934 ret = -EIO; 2935 break; 2936 } 2937 2938 switch (head.type) { 2939 case RDMA_CONTROL_COMPRESS: 2940 comp = (RDMACompress *) rdma->wr_data[idx].control_curr; 2941 network_to_compress(comp); 2942 2943 trace_qemu_rdma_registration_handle_compress(comp->length, 2944 comp->block_idx, 2945 comp->offset); 2946 block = &(rdma->local_ram_blocks.block[comp->block_idx]); 2947 2948 host_addr = block->local_host_addr + 2949 (comp->offset - block->offset); 2950 2951 ram_handle_compressed(host_addr, comp->value, comp->length); 2952 break; 2953 2954 case RDMA_CONTROL_REGISTER_FINISHED: 2955 trace_qemu_rdma_registration_handle_finished(); 2956 goto out; 2957 2958 case RDMA_CONTROL_RAM_BLOCKS_REQUEST: 2959 trace_qemu_rdma_registration_handle_ram_blocks(); 2960 2961 if (rdma->pin_all) { 2962 ret = qemu_rdma_reg_whole_ram_blocks(rdma); 2963 if (ret) { 2964 error_report("rdma migration: error dest " 2965 "registering ram blocks"); 2966 goto out; 2967 } 2968 } 2969 2970 /* 2971 * Dest uses this to prepare to transmit the RAMBlock descriptions 2972 * to the source VM after connection setup. 2973 * Both sides use the "remote" structure to communicate and update 2974 * their "local" descriptions with what was sent. 2975 */ 2976 for (i = 0; i < local->nb_blocks; i++) { 2977 rdma->dest_blocks[i].remote_host_addr = 2978 (uintptr_t)(local->block[i].local_host_addr); 2979 2980 if (rdma->pin_all) { 2981 rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey; 2982 } 2983 2984 rdma->dest_blocks[i].offset = local->block[i].offset; 2985 rdma->dest_blocks[i].length = local->block[i].length; 2986 2987 dest_block_to_network(&rdma->dest_blocks[i]); 2988 } 2989 2990 blocks.len = rdma->local_ram_blocks.nb_blocks 2991 * sizeof(RDMADestBlock); 2992 2993 2994 ret = qemu_rdma_post_send_control(rdma, 2995 (uint8_t *) rdma->dest_blocks, &blocks); 2996 2997 if (ret < 0) { 2998 error_report("rdma migration: error sending remote info"); 2999 goto out; 3000 } 3001 3002 break; 3003 case RDMA_CONTROL_REGISTER_REQUEST: 3004 trace_qemu_rdma_registration_handle_register(head.repeat); 3005 3006 reg_resp.repeat = head.repeat; 3007 registers = (RDMARegister *) rdma->wr_data[idx].control_curr; 3008 3009 for (count = 0; count < head.repeat; count++) { 3010 uint64_t chunk; 3011 uint8_t *chunk_start, *chunk_end; 3012 3013 reg = ®isters[count]; 3014 network_to_register(reg); 3015 3016 reg_result = &results[count]; 3017 3018 trace_qemu_rdma_registration_handle_register_loop(count, 3019 reg->current_index, reg->key.current_addr, reg->chunks); 3020 3021 block = &(rdma->local_ram_blocks.block[reg->current_index]); 3022 if (block->is_ram_block) { 3023 host_addr = (block->local_host_addr + 3024 (reg->key.current_addr - block->offset)); 3025 chunk = ram_chunk_index(block->local_host_addr, 3026 (uint8_t *) host_addr); 3027 } else { 3028 chunk = reg->key.chunk; 3029 host_addr = block->local_host_addr + 3030 (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT)); 3031 } 3032 chunk_start = ram_chunk_start(block, chunk); 3033 chunk_end = ram_chunk_end(block, chunk + reg->chunks); 3034 if (qemu_rdma_register_and_get_keys(rdma, block, 3035 (uintptr_t)host_addr, NULL, ®_result->rkey, 3036 chunk, chunk_start, chunk_end)) { 3037 error_report("cannot get rkey"); 3038 ret = -EINVAL; 3039 goto out; 3040 } 3041 3042 reg_result->host_addr = (uintptr_t)block->local_host_addr; 3043 3044 trace_qemu_rdma_registration_handle_register_rkey( 3045 reg_result->rkey); 3046 3047 result_to_network(reg_result); 3048 } 3049 3050 ret = qemu_rdma_post_send_control(rdma, 3051 (uint8_t *) results, ®_resp); 3052 3053 if (ret < 0) { 3054 error_report("Failed to send control buffer"); 3055 goto out; 3056 } 3057 break; 3058 case RDMA_CONTROL_UNREGISTER_REQUEST: 3059 trace_qemu_rdma_registration_handle_unregister(head.repeat); 3060 unreg_resp.repeat = head.repeat; 3061 registers = (RDMARegister *) rdma->wr_data[idx].control_curr; 3062 3063 for (count = 0; count < head.repeat; count++) { 3064 reg = ®isters[count]; 3065 network_to_register(reg); 3066 3067 trace_qemu_rdma_registration_handle_unregister_loop(count, 3068 reg->current_index, reg->key.chunk); 3069 3070 block = &(rdma->local_ram_blocks.block[reg->current_index]); 3071 3072 ret = ibv_dereg_mr(block->pmr[reg->key.chunk]); 3073 block->pmr[reg->key.chunk] = NULL; 3074 3075 if (ret != 0) { 3076 perror("rdma unregistration chunk failed"); 3077 ret = -ret; 3078 goto out; 3079 } 3080 3081 rdma->total_registrations--; 3082 3083 trace_qemu_rdma_registration_handle_unregister_success( 3084 reg->key.chunk); 3085 } 3086 3087 ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp); 3088 3089 if (ret < 0) { 3090 error_report("Failed to send control buffer"); 3091 goto out; 3092 } 3093 break; 3094 case RDMA_CONTROL_REGISTER_RESULT: 3095 error_report("Invalid RESULT message at dest."); 3096 ret = -EIO; 3097 goto out; 3098 default: 3099 error_report("Unknown control message %s", control_desc[head.type]); 3100 ret = -EIO; 3101 goto out; 3102 } 3103 } while (1); 3104 out: 3105 if (ret < 0) { 3106 rdma->error_state = ret; 3107 } 3108 return ret; 3109 } 3110 3111 static int qemu_rdma_registration_start(QEMUFile *f, void *opaque, 3112 uint64_t flags) 3113 { 3114 QEMUFileRDMA *rfile = opaque; 3115 RDMAContext *rdma = rfile->rdma; 3116 3117 CHECK_ERROR_STATE(); 3118 3119 trace_qemu_rdma_registration_start(flags); 3120 qemu_put_be64(f, RAM_SAVE_FLAG_HOOK); 3121 qemu_fflush(f); 3122 3123 return 0; 3124 } 3125 3126 /* 3127 * Inform dest that dynamic registrations are done for now. 3128 * First, flush writes, if any. 3129 */ 3130 static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque, 3131 uint64_t flags) 3132 { 3133 Error *local_err = NULL, **errp = &local_err; 3134 QEMUFileRDMA *rfile = opaque; 3135 RDMAContext *rdma = rfile->rdma; 3136 RDMAControlHeader head = { .len = 0, .repeat = 1 }; 3137 int ret = 0; 3138 3139 CHECK_ERROR_STATE(); 3140 3141 qemu_fflush(f); 3142 ret = qemu_rdma_drain_cq(f, rdma); 3143 3144 if (ret < 0) { 3145 goto err; 3146 } 3147 3148 if (flags == RAM_CONTROL_SETUP) { 3149 RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT }; 3150 RDMALocalBlocks *local = &rdma->local_ram_blocks; 3151 int reg_result_idx, i, j, nb_dest_blocks; 3152 3153 head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST; 3154 trace_qemu_rdma_registration_stop_ram(); 3155 3156 /* 3157 * Make sure that we parallelize the pinning on both sides. 3158 * For very large guests, doing this serially takes a really 3159 * long time, so we have to 'interleave' the pinning locally 3160 * with the control messages by performing the pinning on this 3161 * side before we receive the control response from the other 3162 * side that the pinning has completed. 3163 */ 3164 ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp, 3165 ®_result_idx, rdma->pin_all ? 3166 qemu_rdma_reg_whole_ram_blocks : NULL); 3167 if (ret < 0) { 3168 ERROR(errp, "receiving remote info!"); 3169 return ret; 3170 } 3171 3172 nb_dest_blocks = resp.len / sizeof(RDMADestBlock); 3173 3174 /* 3175 * The protocol uses two different sets of rkeys (mutually exclusive): 3176 * 1. One key to represent the virtual address of the entire ram block. 3177 * (dynamic chunk registration disabled - pin everything with one rkey.) 3178 * 2. One to represent individual chunks within a ram block. 3179 * (dynamic chunk registration enabled - pin individual chunks.) 3180 * 3181 * Once the capability is successfully negotiated, the destination transmits 3182 * the keys to use (or sends them later) including the virtual addresses 3183 * and then propagates the remote ram block descriptions to his local copy. 3184 */ 3185 3186 if (local->nb_blocks != nb_dest_blocks) { 3187 ERROR(errp, "ram blocks mismatch #1! " 3188 "Your QEMU command line parameters are probably " 3189 "not identical on both the source and destination."); 3190 return -EINVAL; 3191 } 3192 3193 qemu_rdma_move_header(rdma, reg_result_idx, &resp); 3194 memcpy(rdma->dest_blocks, 3195 rdma->wr_data[reg_result_idx].control_curr, resp.len); 3196 for (i = 0; i < nb_dest_blocks; i++) { 3197 network_to_dest_block(&rdma->dest_blocks[i]); 3198 3199 /* search local ram blocks */ 3200 for (j = 0; j < local->nb_blocks; j++) { 3201 if (rdma->dest_blocks[i].offset != local->block[j].offset) { 3202 continue; 3203 } 3204 3205 if (rdma->dest_blocks[i].length != local->block[j].length) { 3206 ERROR(errp, "ram blocks mismatch #2! " 3207 "Your QEMU command line parameters are probably " 3208 "not identical on both the source and destination."); 3209 return -EINVAL; 3210 } 3211 local->block[j].remote_host_addr = 3212 rdma->dest_blocks[i].remote_host_addr; 3213 local->block[j].remote_rkey = rdma->dest_blocks[i].remote_rkey; 3214 break; 3215 } 3216 3217 if (j >= local->nb_blocks) { 3218 ERROR(errp, "ram blocks mismatch #3! " 3219 "Your QEMU command line parameters are probably " 3220 "not identical on both the source and destination."); 3221 return -EINVAL; 3222 } 3223 } 3224 } 3225 3226 trace_qemu_rdma_registration_stop(flags); 3227 3228 head.type = RDMA_CONTROL_REGISTER_FINISHED; 3229 ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL); 3230 3231 if (ret < 0) { 3232 goto err; 3233 } 3234 3235 return 0; 3236 err: 3237 rdma->error_state = ret; 3238 return ret; 3239 } 3240 3241 static int qemu_rdma_get_fd(void *opaque) 3242 { 3243 QEMUFileRDMA *rfile = opaque; 3244 RDMAContext *rdma = rfile->rdma; 3245 3246 return rdma->comp_channel->fd; 3247 } 3248 3249 static const QEMUFileOps rdma_read_ops = { 3250 .get_buffer = qemu_rdma_get_buffer, 3251 .get_fd = qemu_rdma_get_fd, 3252 .close = qemu_rdma_close, 3253 .hook_ram_load = qemu_rdma_registration_handle, 3254 }; 3255 3256 static const QEMUFileOps rdma_write_ops = { 3257 .put_buffer = qemu_rdma_put_buffer, 3258 .close = qemu_rdma_close, 3259 .before_ram_iterate = qemu_rdma_registration_start, 3260 .after_ram_iterate = qemu_rdma_registration_stop, 3261 .save_page = qemu_rdma_save_page, 3262 }; 3263 3264 static void *qemu_fopen_rdma(RDMAContext *rdma, const char *mode) 3265 { 3266 QEMUFileRDMA *r = g_malloc0(sizeof(QEMUFileRDMA)); 3267 3268 if (qemu_file_mode_is_not_valid(mode)) { 3269 return NULL; 3270 } 3271 3272 r->rdma = rdma; 3273 3274 if (mode[0] == 'w') { 3275 r->file = qemu_fopen_ops(r, &rdma_write_ops); 3276 } else { 3277 r->file = qemu_fopen_ops(r, &rdma_read_ops); 3278 } 3279 3280 return r->file; 3281 } 3282 3283 static void rdma_accept_incoming_migration(void *opaque) 3284 { 3285 RDMAContext *rdma = opaque; 3286 int ret; 3287 QEMUFile *f; 3288 Error *local_err = NULL, **errp = &local_err; 3289 3290 trace_qemu_dma_accept_incoming_migration(); 3291 ret = qemu_rdma_accept(rdma); 3292 3293 if (ret) { 3294 ERROR(errp, "RDMA Migration initialization failed!"); 3295 return; 3296 } 3297 3298 trace_qemu_dma_accept_incoming_migration_accepted(); 3299 3300 f = qemu_fopen_rdma(rdma, "rb"); 3301 if (f == NULL) { 3302 ERROR(errp, "could not qemu_fopen_rdma!"); 3303 qemu_rdma_cleanup(rdma); 3304 return; 3305 } 3306 3307 rdma->migration_started_on_destination = 1; 3308 process_incoming_migration(f); 3309 } 3310 3311 void rdma_start_incoming_migration(const char *host_port, Error **errp) 3312 { 3313 int ret; 3314 RDMAContext *rdma; 3315 Error *local_err = NULL; 3316 3317 trace_rdma_start_incoming_migration(); 3318 rdma = qemu_rdma_data_init(host_port, &local_err); 3319 3320 if (rdma == NULL) { 3321 goto err; 3322 } 3323 3324 ret = qemu_rdma_dest_init(rdma, &local_err); 3325 3326 if (ret) { 3327 goto err; 3328 } 3329 3330 trace_rdma_start_incoming_migration_after_dest_init(); 3331 3332 ret = rdma_listen(rdma->listen_id, 5); 3333 3334 if (ret) { 3335 ERROR(errp, "listening on socket!"); 3336 goto err; 3337 } 3338 3339 trace_rdma_start_incoming_migration_after_rdma_listen(); 3340 3341 qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration, 3342 NULL, (void *)(intptr_t)rdma); 3343 return; 3344 err: 3345 error_propagate(errp, local_err); 3346 g_free(rdma); 3347 } 3348 3349 void rdma_start_outgoing_migration(void *opaque, 3350 const char *host_port, Error **errp) 3351 { 3352 MigrationState *s = opaque; 3353 Error *local_err = NULL, **temp = &local_err; 3354 RDMAContext *rdma = qemu_rdma_data_init(host_port, &local_err); 3355 int ret = 0; 3356 3357 if (rdma == NULL) { 3358 ERROR(temp, "Failed to initialize RDMA data structures! %d", ret); 3359 goto err; 3360 } 3361 3362 ret = qemu_rdma_source_init(rdma, &local_err, 3363 s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL]); 3364 3365 if (ret) { 3366 goto err; 3367 } 3368 3369 trace_rdma_start_outgoing_migration_after_rdma_source_init(); 3370 ret = qemu_rdma_connect(rdma, &local_err); 3371 3372 if (ret) { 3373 goto err; 3374 } 3375 3376 trace_rdma_start_outgoing_migration_after_rdma_connect(); 3377 3378 s->file = qemu_fopen_rdma(rdma, "wb"); 3379 migrate_fd_connect(s); 3380 return; 3381 err: 3382 error_propagate(errp, local_err); 3383 g_free(rdma); 3384 migrate_fd_error(s); 3385 } 3386